diff --git a/src/main/java/com/dji/sdk/mqtt/FlowTransformWrapper.java b/src/main/java/com/dji/sdk/mqtt/FlowTransformWrapper.java new file mode 100644 index 0000000..b0c87a8 --- /dev/null +++ b/src/main/java/com/dji/sdk/mqtt/FlowTransformWrapper.java @@ -0,0 +1,51 @@ +/************************************************* + * @copyright 2017 Flision Corporation Inc. + * @author: Vincent Chan @ Canton + * @date: 2023年10月09日 + * @version: 1.0.0 + * @description: + **************************************************/ +package com.dji.sdk.mqtt; + +public class FlowTransformWrapper { + + public final static String DEFAULT_ERROR_MSG = "null"; + + public static FlowTransformWrapper error(){ + return new FlowTransformWrapper(DEFAULT_ERROR_MSG); + } + + public static FlowTransformWrapper ok(CommonTopicRequest request){ + return new FlowTransformWrapper(request); + } + + CommonTopicRequest request; + boolean bError; + String errorMessage; + + private FlowTransformWrapper(CommonTopicRequest request){ + this.request = request; + this.bError = false; + } + + private FlowTransformWrapper(String errorMessage){ + this.bError = true; + this.errorMessage = errorMessage; + } + + public CommonTopicRequest getRequest() { + return request; + } + + public boolean hasError() { + return bError; + } + + public boolean continuee(){ + return !hasError(); + } + + public String getErrorMessage() { + return errorMessage; + } +} diff --git a/src/main/java/com/dji/sdk/mqtt/state/StateRouter.java b/src/main/java/com/dji/sdk/mqtt/state/StateRouter.java index 2dc37ed..dbc1440 100644 --- a/src/main/java/com/dji/sdk/mqtt/state/StateRouter.java +++ b/src/main/java/com/dji/sdk/mqtt/state/StateRouter.java @@ -6,6 +6,7 @@ import com.dji.sdk.common.SDKManager; import com.dji.sdk.exception.CloudSDKErrorEnum; import com.dji.sdk.exception.CloudSDKException; import com.dji.sdk.mqtt.ChannelName; +import com.dji.sdk.mqtt.FlowTransformWrapper; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; @@ -36,29 +37,38 @@ public class StateRouter { @Bean public IntegrationFlow stateDataRouterFlow() { + ObjectMapper objectMapper = Common.getObjectMapper(); return IntegrationFlows .from(ChannelName.INBOUND_STATE) .transform(Message.class, source -> { - ObjectMapper objectMapper = Common.getObjectMapper(); try { - TopicStateRequest response = objectMapper.readValue((byte[]) source.getPayload(), new TypeReference() { - }); + TopicStateRequest response = objectMapper.readValue( + (byte[]) source.getPayload(), + new TypeReference() {}); String topic = String.valueOf(source.getHeaders().get(MqttHeaders.RECEIVED_TOPIC)); String from = topic.substring((THING_MODEL_PRE + PRODUCT).length(), topic.indexOf(STATE_SUF)); - //fix: 修复设备未注册前设备推送state导致产生大量日志的问题 witcom@2023.10.08 + + return FlowTransformWrapper.ok(response.setFrom(from)); + } catch (Exception ex) { + log.warn("[StateRouter]"+ex.getMessage()); + return FlowTransformWrapper.error(); + } + }, null) + .filter(FlowTransformWrapper::continuee) + .handle((wrapper, headers) -> { + + TopicStateRequest response = (TopicStateRequest)wrapper.getRequest(); + + //fix: 修复设备未注册前设备推送state导致产生大量日志的问题 witcom@2023.10.08 + try { return getTypeReference(response.getGateway(), response.getData()) - .map(clazz -> response.setFrom(from).setData(objectMapper.convertValue(response.getData(), clazz))) + .map(clazz -> response.setData(objectMapper.convertValue(response.getData(), clazz))) .orElse(null); -// return response.setFrom(from) -// .setData(objectMapper.convertValue(response.getData(), typeReference)); - } /*catch (IOException e) { - //witcom: Which part will throw IOException??? - throw new CloudSDKException(e); - }*/ catch (Exception ex) { + }catch (CloudSDKException ex){ log.warn("[StateRouter]"+ex.getMessage()); return null; } - }, null) + }) .filter(Objects::nonNull) .route(response -> StateDataKeyEnum.find(response.getData().getClass()), mapping -> Arrays.stream(StateDataKeyEnum.values()).forEach(key -> mapping.channelMapping(key, key.getChannelName())))