Browse Source

fix: 修复flow.transform不能返回null导致产生大量日志的问题

pull/41/head
Vincent 1 year ago
parent
commit
89c34336e7
  1. 51
      src/main/java/com/dji/sdk/mqtt/FlowTransformWrapper.java
  2. 32
      src/main/java/com/dji/sdk/mqtt/state/StateRouter.java

51
src/main/java/com/dji/sdk/mqtt/FlowTransformWrapper.java

@ -0,0 +1,51 @@ @@ -0,0 +1,51 @@
/*************************************************
* @copyright 2017 Flision Corporation Inc.
* @author: Vincent Chan @ Canton
* @date: 2023年10月09日
* @version: 1.0.0
* @description:
**************************************************/
package com.dji.sdk.mqtt;
public class FlowTransformWrapper {
public final static String DEFAULT_ERROR_MSG = "null";
public static FlowTransformWrapper error(){
return new FlowTransformWrapper(DEFAULT_ERROR_MSG);
}
public static FlowTransformWrapper ok(CommonTopicRequest request){
return new FlowTransformWrapper(request);
}
CommonTopicRequest request;
boolean bError;
String errorMessage;
private FlowTransformWrapper(CommonTopicRequest request){
this.request = request;
this.bError = false;
}
private FlowTransformWrapper(String errorMessage){
this.bError = true;
this.errorMessage = errorMessage;
}
public CommonTopicRequest getRequest() {
return request;
}
public boolean hasError() {
return bError;
}
public boolean continuee(){
return !hasError();
}
public String getErrorMessage() {
return errorMessage;
}
}

32
src/main/java/com/dji/sdk/mqtt/state/StateRouter.java

@ -6,6 +6,7 @@ import com.dji.sdk.common.SDKManager; @@ -6,6 +6,7 @@ import com.dji.sdk.common.SDKManager;
import com.dji.sdk.exception.CloudSDKErrorEnum;
import com.dji.sdk.exception.CloudSDKException;
import com.dji.sdk.mqtt.ChannelName;
import com.dji.sdk.mqtt.FlowTransformWrapper;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
@ -36,29 +37,38 @@ public class StateRouter { @@ -36,29 +37,38 @@ public class StateRouter {
@Bean
public IntegrationFlow stateDataRouterFlow() {
ObjectMapper objectMapper = Common.getObjectMapper();
return IntegrationFlows
.from(ChannelName.INBOUND_STATE)
.transform(Message.class, source -> {
ObjectMapper objectMapper = Common.getObjectMapper();
try {
TopicStateRequest response = objectMapper.readValue((byte[]) source.getPayload(), new TypeReference<TopicStateRequest>() {
});
TopicStateRequest response = objectMapper.readValue(
(byte[]) source.getPayload(),
new TypeReference<TopicStateRequest>() {});
String topic = String.valueOf(source.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
String from = topic.substring((THING_MODEL_PRE + PRODUCT).length(), topic.indexOf(STATE_SUF));
return FlowTransformWrapper.ok(response.setFrom(from));
} catch (Exception ex) {
log.warn("[StateRouter]"+ex.getMessage());
return FlowTransformWrapper.error();
}
}, null)
.filter(FlowTransformWrapper::continuee)
.<FlowTransformWrapper>handle((wrapper, headers) -> {
TopicStateRequest response = (TopicStateRequest)wrapper.getRequest();
//fix: 修复设备未注册前设备推送state导致产生大量日志的问题 witcom@2023.10.08
try {
return getTypeReference(response.getGateway(), response.getData())
.map(clazz -> response.setFrom(from).setData(objectMapper.convertValue(response.getData(), clazz)))
.map(clazz -> response.setData(objectMapper.convertValue(response.getData(), clazz)))
.orElse(null);
// return response.setFrom(from)
// .setData(objectMapper.convertValue(response.getData(), typeReference));
} /*catch (IOException e) {
//witcom: Which part will throw IOException???
throw new CloudSDKException(e);
}*/ catch (Exception ex) {
}catch (CloudSDKException ex){
log.warn("[StateRouter]"+ex.getMessage());
return null;
}
}, null)
})
.filter(Objects::nonNull)
.<TopicStateRequest, StateDataKeyEnum>route(response -> StateDataKeyEnum.find(response.getData().getClass()),
mapping -> Arrays.stream(StateDataKeyEnum.values()).forEach(key -> mapping.channelMapping(key, key.getChannelName())))

Loading…
Cancel
Save