diff --git a/src/main/java/com/dji/sample/component/ApplicationBootInitial.java b/src/main/java/com/dji/sample/component/ApplicationBootInitial.java index 16a691c..5b1573a 100644 --- a/src/main/java/com/dji/sample/component/ApplicationBootInitial.java +++ b/src/main/java/com/dji/sample/component/ApplicationBootInitial.java @@ -11,6 +11,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; +import javax.annotation.Resource; import java.util.Optional; /** @@ -27,6 +28,9 @@ public class ApplicationBootInitial implements CommandLineRunner { @Autowired private IDeviceRedisService deviceRedisService; + @Resource + SDKManager sdkManager; + /** * Subscribe to the devices that exist in the redis when the program starts, * to prevent the data from being different from the pilot side due to program interruptions. @@ -44,7 +48,7 @@ public class ApplicationBootInitial implements CommandLineRunner { .map(Optional::get) .filter(device -> DeviceDomainEnum.DRONE != device.getDomain()) .forEach(device -> deviceService.subDeviceOnlineSubscribeTopic( - SDKManager.registerDevice(device.getDeviceSn(), device.getChildDeviceSn(), device.getDomain(), + sdkManager.registerDevice(device.getDeviceSn(), device.getChildDeviceSn(), device.getDomain(), device.getType(), device.getSubType(), device.getThingVersion(), deviceRedisService.getDeviceOnline(device.getChildDeviceSn()).map(DeviceDTO::getThingVersion).orElse(null)))); diff --git a/src/main/java/com/dji/sample/component/oss/service/impl/AliyunOssServiceImpl.java b/src/main/java/com/dji/sample/component/oss/service/impl/AliyunOssServiceImpl.java index cf78637..a55cb85 100644 --- a/src/main/java/com/dji/sample/component/oss/service/impl/AliyunOssServiceImpl.java +++ b/src/main/java/com/dji/sample/component/oss/service/impl/AliyunOssServiceImpl.java @@ -86,6 +86,7 @@ public class AliyunOssServiceImpl implements IOssService { @Override public InputStream getObject(String bucket, String objectKey) { + //bug: ossClient.getObject -> OSSObject need to close manually. otherwise will hang up main thread to wait available connection. by witcom @2023.12.08 return ossClient.getObject(bucket, objectKey).getObjectContent(); } @@ -94,6 +95,7 @@ public class AliyunOssServiceImpl implements IOssService { if (ossClient.doesObjectExist(bucket, objectKey)) { throw new RuntimeException("The filename already exists."); } + //bug: PutObjectResult need to close manually. otherwise will hang up main thread to wait available connection. by witcom @2023.12.08 PutObjectResult objectResult = ossClient.putObject(new PutObjectRequest(bucket, objectKey, input, new ObjectMetadata())); log.info("Upload FlighttaskCreateFile: {}", objectResult.getETag()); } diff --git a/src/main/java/com/dji/sample/control/service/impl/ControlServiceImpl.java b/src/main/java/com/dji/sample/control/service/impl/ControlServiceImpl.java index 530729e..4bd5b0f 100644 --- a/src/main/java/com/dji/sample/control/service/impl/ControlServiceImpl.java +++ b/src/main/java/com/dji/sample/control/service/impl/ControlServiceImpl.java @@ -68,6 +68,9 @@ public class ControlServiceImpl implements IControlService { @Qualifier("SDKWaylineService") private AbstractWaylineService abstractWaylineService; + @Autowired + SDKManager SDKManager; + private RemoteDebugHandler checkDebugCondition(String sn, RemoteDebugParam param, RemoteDebugMethodEnum controlMethodEnum) { RemoteDebugHandler handler = Objects.nonNull(controlMethodEnum.getClazz()) ? mapper.convertValue(Objects.nonNull(param) ? param : new Object(), controlMethodEnum.getClazz()) diff --git a/src/main/java/com/dji/sample/control/service/impl/DrcServiceImpl.java b/src/main/java/com/dji/sample/control/service/impl/DrcServiceImpl.java index 7aa86f3..29a61ec 100644 --- a/src/main/java/com/dji/sample/control/service/impl/DrcServiceImpl.java +++ b/src/main/java/com/dji/sample/control/service/impl/DrcServiceImpl.java @@ -84,6 +84,9 @@ public class DrcServiceImpl implements IDrcService { @Autowired private AbstractControlService abstractControlService; + @Autowired + SDKManager SDKManager; + @Override public void setDrcModeInRedis(String dockSn, String clientId) { RedisOpsUtils.setWithExpire(RedisConst.DRC_PREFIX + dockSn, clientId, RedisConst.DRC_MODE_ALIVE_SECOND); diff --git a/src/main/java/com/dji/sample/manage/service/impl/DeviceLogsServiceImpl.java b/src/main/java/com/dji/sample/manage/service/impl/DeviceLogsServiceImpl.java index 799de27..8acb72e 100644 --- a/src/main/java/com/dji/sample/manage/service/impl/DeviceLogsServiceImpl.java +++ b/src/main/java/com/dji/sample/manage/service/impl/DeviceLogsServiceImpl.java @@ -85,6 +85,9 @@ public class DeviceLogsServiceImpl extends AbstractLogService implements IDevice @Autowired private AbstractLogService abstractLogService; + @Autowired + SDKManager SDKManager; + @Override public PaginationData<DeviceLogsDTO> getUploadedLogs(String deviceSn, DeviceLogsQueryParam param) { LambdaQueryWrapper<DeviceLogsEntity> queryWrapper = new LambdaQueryWrapper<DeviceLogsEntity>() diff --git a/src/main/java/com/dji/sample/manage/service/impl/DeviceServiceImpl.java b/src/main/java/com/dji/sample/manage/service/impl/DeviceServiceImpl.java index f5bbe95..018ab0b 100644 --- a/src/main/java/com/dji/sample/manage/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/dji/sample/manage/service/impl/DeviceServiceImpl.java @@ -124,6 +124,9 @@ public class DeviceServiceImpl implements IDeviceService { @Autowired private AbstractFirmwareService abstractFirmwareService; + @Autowired + SDKManager SDKManager; + @Override public void subDeviceOffline(String deviceSn) { // If no information about this device exists in the cache, the drone is considered to be offline. diff --git a/src/main/java/com/dji/sample/manage/service/impl/LiveStreamServiceImpl.java b/src/main/java/com/dji/sample/manage/service/impl/LiveStreamServiceImpl.java index 2b1f590..13d14ff 100644 --- a/src/main/java/com/dji/sample/manage/service/impl/LiveStreamServiceImpl.java +++ b/src/main/java/com/dji/sample/manage/service/impl/LiveStreamServiceImpl.java @@ -46,6 +46,9 @@ public class LiveStreamServiceImpl implements ILiveStreamService { @Autowired private AbstractLivestreamService abstractLivestreamService; + @Autowired + SDKManager SDKManager; + @Override public List<CapacityDeviceDTO> getLiveCapacity(String workspaceId) { diff --git a/src/main/java/com/dji/sample/manage/service/impl/SDKDeviceService.java b/src/main/java/com/dji/sample/manage/service/impl/SDKDeviceService.java index 81db422..541212d 100644 --- a/src/main/java/com/dji/sample/manage/service/impl/SDKDeviceService.java +++ b/src/main/java/com/dji/sample/manage/service/impl/SDKDeviceService.java @@ -57,6 +57,9 @@ public class SDKDeviceService extends AbstractDeviceService { @Autowired private IDevicePayloadService devicePayloadService; + @Autowired + SDKManager SDKManager; + @Override public TopicStatusResponse<MqttReply> updateTopoOnline(TopicStatusRequest<UpdateTopo> request, MessageHeaders headers) { UpdateTopoSubDevice updateTopoSubDevice = request.getData().getSubDevices().get(0); diff --git a/src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java b/src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java index 938cd80..a7589e6 100644 --- a/src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java +++ b/src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java @@ -62,6 +62,9 @@ public class FlightTaskServiceImpl extends AbstractWaylineService implements IFl @Autowired private ObjectMapper mapper; + @Autowired + SDKManager SDKManager; + @Autowired private IWebSocketMessageService websocketMessageService; diff --git a/src/main/java/com/dji/sdk/cloudapi/control/FlyToPointRequest.java b/src/main/java/com/dji/sdk/cloudapi/control/FlyToPointRequest.java index 8af4ca9..297d8a9 100644 --- a/src/main/java/com/dji/sdk/cloudapi/control/FlyToPointRequest.java +++ b/src/main/java/com/dji/sdk/cloudapi/control/FlyToPointRequest.java @@ -13,7 +13,7 @@ import java.util.List; */ public class FlyToPointRequest extends BaseModel { - @Pattern(regexp = "^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$") + //@Pattern(regexp = "^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$") @NotNull private String flyToId; diff --git a/src/main/java/com/dji/sdk/cloudapi/control/TakeoffToPointRequest.java b/src/main/java/com/dji/sdk/cloudapi/control/TakeoffToPointRequest.java index 2c5424e..823a83f 100644 --- a/src/main/java/com/dji/sdk/cloudapi/control/TakeoffToPointRequest.java +++ b/src/main/java/com/dji/sdk/cloudapi/control/TakeoffToPointRequest.java @@ -19,7 +19,7 @@ import javax.validation.constraints.Pattern; */ public class TakeoffToPointRequest extends BaseModel { - @Pattern(regexp = "^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$") + //@Pattern(regexp = "^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$") @NotNull private String flightId; diff --git a/src/main/java/com/dji/sdk/cloudapi/debug/RemoteDebugStepKeyEnum.java b/src/main/java/com/dji/sdk/cloudapi/debug/RemoteDebugStepKeyEnum.java index ddeca50..60f99b6 100644 --- a/src/main/java/com/dji/sdk/cloudapi/debug/RemoteDebugStepKeyEnum.java +++ b/src/main/java/com/dji/sdk/cloudapi/debug/RemoteDebugStepKeyEnum.java @@ -59,7 +59,9 @@ public enum RemoteDebugStepKeyEnum { FREE_PUTTER("free_putter", "Free Putter"), - STOP_CHARGE("stop_charge", "Stop charging"); + STOP_CHARGE("stop_charge", "Stop charging"), + + UNKNOWN("unknown","Unknown"); private final String stepKey; @@ -79,10 +81,11 @@ public enum RemoteDebugStepKeyEnum { return message; } + //fix: 提供unknown取代异常 witcom@2023.10.30 @JsonCreator public static RemoteDebugStepKeyEnum find(String stepKey) { return Arrays.stream(values()).filter(stepKeyEnum -> stepKeyEnum.stepKey.equals(stepKey)).findAny() - .orElseThrow(() -> new CloudSDKException(RemoteDebugStepKeyEnum.class,stepKey)); + .orElse(UNKNOWN); } } diff --git a/src/main/java/com/dji/sdk/cloudapi/device/CameraModeEnum.java b/src/main/java/com/dji/sdk/cloudapi/device/CameraModeEnum.java index 3f82d43..750279b 100644 --- a/src/main/java/com/dji/sdk/cloudapi/device/CameraModeEnum.java +++ b/src/main/java/com/dji/sdk/cloudapi/device/CameraModeEnum.java @@ -13,6 +13,8 @@ import java.util.Arrays; */ public enum CameraModeEnum { + //fix: Cannot construct instance of `com.dji.sdk.cloudapi.device.CameraModeEnum`, problem: com.dji.sdk.cloudapi.device.CameraModeEnum has unknown data: [-1] vincent @ 2023.12.07 + UNKNOWN(-1), PHOTO(0), VIDEO(1); diff --git a/src/main/java/com/dji/sdk/cloudapi/device/DockLiveErrorStatus.java b/src/main/java/com/dji/sdk/cloudapi/device/DockLiveErrorStatus.java index aa67276..1d1b0a2 100644 --- a/src/main/java/com/dji/sdk/cloudapi/device/DockLiveErrorStatus.java +++ b/src/main/java/com/dji/sdk/cloudapi/device/DockLiveErrorStatus.java @@ -40,11 +40,14 @@ public class DockLiveErrorStatus { } public String getMessage() { + if(success){ return "success";} return errorCode.getMessage(); } @JsonValue public Integer getCode() { + //witcom: errorCode.getCode() will cause npe 2023.10.30 + if(success){ return 0; } return source.getSource() * MOD + errorCode.getCode(); } diff --git a/src/main/java/com/dji/sdk/cloudapi/device/DroneModeCodeEnum.java b/src/main/java/com/dji/sdk/cloudapi/device/DroneModeCodeEnum.java index 526460b..fba4165 100644 --- a/src/main/java/com/dji/sdk/cloudapi/device/DroneModeCodeEnum.java +++ b/src/main/java/com/dji/sdk/cloudapi/device/DroneModeCodeEnum.java @@ -45,7 +45,10 @@ public enum DroneModeCodeEnum { APAS(15), - VIRTUAL_JOYSTICK(16); + VIRTUAL_JOYSTICK(16), + + //witcom: 飞机报这个号上来报异常 + UNKNOWN_1(17); private final int code; diff --git a/src/main/java/com/dji/sdk/cloudapi/device/api/AbstractDeviceService.java b/src/main/java/com/dji/sdk/cloudapi/device/api/AbstractDeviceService.java index b8b5352..5b3c7e3 100644 --- a/src/main/java/com/dji/sdk/cloudapi/device/api/AbstractDeviceService.java +++ b/src/main/java/com/dji/sdk/cloudapi/device/api/AbstractDeviceService.java @@ -169,7 +169,9 @@ public class AbstractDeviceService { * @param request data * @param headers The headers for a {@link Message}. */ - @ServiceActivator(inputChannel = ChannelName.INBOUND_STATE_DOCK_WPMZ_VERSION) + //witcom: wrong ChannelName. is INBOUND_STATE_DOCK_PAYLOAD?? +// @ServiceActivator(inputChannel = ChannelName.INBOUND_STATE_DOCK_WPMZ_VERSION) + @ServiceActivator(inputChannel = ChannelName.INBOUND_STATE_DOCK_PAYLOAD) public void dockPayload(TopicStateRequest<DockPayload> request, MessageHeaders headers) { throw new UnsupportedOperationException("dockPayload not implemented"); } diff --git a/src/main/java/com/dji/sdk/cloudapi/media/UploadFlighttaskMediaPrioritize.java b/src/main/java/com/dji/sdk/cloudapi/media/UploadFlighttaskMediaPrioritize.java index 59c197f..5dfe88f 100644 --- a/src/main/java/com/dji/sdk/cloudapi/media/UploadFlighttaskMediaPrioritize.java +++ b/src/main/java/com/dji/sdk/cloudapi/media/UploadFlighttaskMediaPrioritize.java @@ -12,8 +12,9 @@ import javax.validation.constraints.Pattern; */ public class UploadFlighttaskMediaPrioritize extends BaseModel { + /* fix: flightId是应用层提供的,应用层有自己的id构建规则,如果不是设备原因必须指定格式,最好就不要控制flightId的格式了 by witcom@2023.09.22 */ @NotNull - @Pattern(regexp = "^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$") + //@Pattern(regexp = "^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$") private String flightId; public UploadFlighttaskMediaPrioritize() { diff --git a/src/main/java/com/dji/sdk/cloudapi/wayline/ExecutionStepEnum.java b/src/main/java/com/dji/sdk/cloudapi/wayline/ExecutionStepEnum.java index f142ce6..8a4e287 100644 --- a/src/main/java/com/dji/sdk/cloudapi/wayline/ExecutionStepEnum.java +++ b/src/main/java/com/dji/sdk/cloudapi/wayline/ExecutionStepEnum.java @@ -52,15 +52,16 @@ public enum ExecutionStepEnum { return msg; } + //witcom: 这个函数跑不进来,跑了下面的函数,导致FlightTaskProgress报不上来 @JsonCreator public static ExecutionStepEnum find(int step) { return Arrays.stream(values()).filter(stepEnum -> stepEnum.min <= step && stepEnum.max >= step).findAny() .orElseThrow(() -> new CloudSDKException(ExecutionStepEnum.class, step)); } - @JsonCreator - public static ExecutionStepEnum find(String msg) { - return Arrays.stream(values()).filter(stepEnum -> stepEnum.msg.equals(msg)).findAny() - .orElseThrow(() -> new CloudSDKException(ExecutionStepEnum.class, msg)); - } +// @JsonCreator +// public static ExecutionStepEnum find(String msg) { +// return Arrays.stream(values()).filter(stepEnum -> stepEnum.msg.equals(msg)).findAny() +// .orElseThrow(() -> new CloudSDKException(ExecutionStepEnum.class, msg)); +// } } diff --git a/src/main/java/com/dji/sdk/cloudapi/wayline/FlighttaskBreakPoint.java b/src/main/java/com/dji/sdk/cloudapi/wayline/FlighttaskBreakPoint.java index 6d0c232..45f50fe 100755 --- a/src/main/java/com/dji/sdk/cloudapi/wayline/FlighttaskBreakPoint.java +++ b/src/main/java/com/dji/sdk/cloudapi/wayline/FlighttaskBreakPoint.java @@ -24,12 +24,10 @@ public class FlighttaskBreakPoint { @NotNull private BreakpointStateEnum state; - /** - * Current wayline segment process - */ - @NotNull - @Min(0) - @Max(1) + +// @NotNull +// @Min(0) +// @Max(1) private Float progress; /** @@ -37,10 +35,14 @@ public class FlighttaskBreakPoint { */ private Integer waylineID; + /** + * Current wayline segment process + * bug: Cannot deserialize value of type `com.dji.sdk.cloudapi.wayline.FlighttaskBreakReasonEnum` from number 1281: index value outside legal index range [0..-1] mark by witcom 2023.11.09 + */ /** * Break reason */ - private FlighttaskBreakReasonEnum breakReason; + private Integer breakReason; /** * Breakpoint latitude @@ -116,11 +118,11 @@ public class FlighttaskBreakPoint { return this; } - public FlighttaskBreakReasonEnum getBreakReason() { + public Integer getBreakReason() { return breakReason; } - public FlighttaskBreakPoint setBreakReason(FlighttaskBreakReasonEnum breakReason) { + public FlighttaskBreakPoint setBreakReason(Integer breakReason) { this.breakReason = breakReason; return this; } diff --git a/src/main/java/com/dji/sdk/cloudapi/wayline/FlighttaskExecuteRequest.java b/src/main/java/com/dji/sdk/cloudapi/wayline/FlighttaskExecuteRequest.java index 451b784..eddd18c 100644 --- a/src/main/java/com/dji/sdk/cloudapi/wayline/FlighttaskExecuteRequest.java +++ b/src/main/java/com/dji/sdk/cloudapi/wayline/FlighttaskExecuteRequest.java @@ -12,8 +12,11 @@ import javax.validation.constraints.Pattern; */ public class FlighttaskExecuteRequest extends BaseModel { + /** + * fix: flightId是应用层提供的,应用层有自己的id构建规则,如果不是设备原因必须指定格式,最好就不要控制flightId的格式了 by witcom@2023.09.22 + */ @NotNull - @Pattern(regexp = "^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$") + //@Pattern(regexp = "^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$") private String flightId; public FlighttaskExecuteRequest() { diff --git a/src/main/java/com/dji/sdk/cloudapi/wayline/FlighttaskPrepareRequest.java b/src/main/java/com/dji/sdk/cloudapi/wayline/FlighttaskPrepareRequest.java index 41a7934..b111338 100755 --- a/src/main/java/com/dji/sdk/cloudapi/wayline/FlighttaskPrepareRequest.java +++ b/src/main/java/com/dji/sdk/cloudapi/wayline/FlighttaskPrepareRequest.java @@ -20,9 +20,10 @@ public class FlighttaskPrepareRequest extends BaseModel { /** * Task ID + * fix: flightId是应用层提供的,应用层有自己的id构建规则,如果不是设备原因必须指定格式,最好就不要控制flightId的格式了 by witcom@2023.09.22 */ @NotNull - @Pattern(regexp = "^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$") + //@Pattern(regexp = "^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$") private String flightId; /** diff --git a/src/main/java/com/dji/sdk/cloudapi/wayline/FlighttaskUndoRequest.java b/src/main/java/com/dji/sdk/cloudapi/wayline/FlighttaskUndoRequest.java index a38af28..cf385df 100644 --- a/src/main/java/com/dji/sdk/cloudapi/wayline/FlighttaskUndoRequest.java +++ b/src/main/java/com/dji/sdk/cloudapi/wayline/FlighttaskUndoRequest.java @@ -14,9 +14,12 @@ import java.util.List; */ public class FlighttaskUndoRequest extends BaseModel { + /** + * fix: flightId是应用层提供的,应用层有自己的id构建规则,如果不是设备原因必须指定格式,最好就不要控制flightId的格式了 by witcom@2023.09.22 + */ @NotNull @Size(min = 1) - private List<@Pattern(regexp = "^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$") String> flightIds; + private List</*@Pattern(regexp = "^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$") */String> flightIds; public FlighttaskUndoRequest() { } diff --git a/src/main/java/com/dji/sdk/common/JDKLockBarrierImpl.java b/src/main/java/com/dji/sdk/common/JDKLockBarrierImpl.java new file mode 100644 index 0000000..e7d5ae8 --- /dev/null +++ b/src/main/java/com/dji/sdk/common/JDKLockBarrierImpl.java @@ -0,0 +1,89 @@ +/************************************************* + * @copyright 2017 Flision Corporation Inc. + * @author: Vincent Chan @ Canton + * @date: 2023年09月22日 + * @version: 1.0.0 + * @description: + **************************************************/ +package com.dji.sdk.common; + +import com.dji.sdk.mqtt.Chan; +import com.dji.sdk.mqtt.CommonTopicRequest; +import com.dji.sdk.mqtt.CommonTopicResponse; + +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +public class JDKLockBarrierImpl implements PublishBarrier{ + + /** + * 在我的实现中是采用一个定期清理的TimedCache储存请求 + */ + private final ConcurrentHashMap<String, JDKHolder> container = new ConcurrentHashMap<>(); + + @Override + public String generateIdentity(CommonTopicRequest requestData) { + return requestData.getTid(); + } + + @Override + public String generateIdentity(CommonTopicResponse receiveData) { + return receiveData.getTid(); + } + + @Override + public void put(String identity, CommonTopicResponse receiveData) { + if(hasIdentity(identity)){ + container.get(identity).setData(receiveData); + } + } + + @Override + public void registerRequest(String identity, CommonTopicRequest requestData) { + container.put(identity,new JDKHolder()); + } + + @Override + public PublishBarrierResult await(String identity, long timeout) { + JDKHolder jdkHolder = container.get(identity); + if(Objects.isNull(jdkHolder)){ + throw new RuntimeException("等待指令返回前未注册指令到栅栏"); + } + jdkHolder.await(timeout); + + return jdkHolder.getResult(); + } + + @Override + public boolean hasIdentity(String identity) { + return container.containsKey(identity); + } + + public static class JDKHolder{ + volatile Object locker = new Object(); + CommonTopicResponse response = null; + + public void await(long timeout) { + synchronized (locker){ + try { + locker.wait(timeout); + }catch (InterruptedException e){} + } + } + + public void setData(CommonTopicResponse receiveData) { + this.response = receiveData; + synchronized (locker){ + locker.notify(); + } + } + + public PublishBarrierResult getResult() { + if(Objects.nonNull(response)){ + return PublishBarrierResult.ok(response); + }else{ + return PublishBarrierResult.EMPTY; + } + } + } +} diff --git a/src/main/java/com/dji/sdk/common/LocalCacheSDKManager.java b/src/main/java/com/dji/sdk/common/LocalCacheSDKManager.java new file mode 100644 index 0000000..5bf9ff5 --- /dev/null +++ b/src/main/java/com/dji/sdk/common/LocalCacheSDKManager.java @@ -0,0 +1,65 @@ +/************************************************* + * @copyright 2017 Flision Corporation Inc. + * @author: Vincent Chan @ Canton + * @date: 2023年09月25日 + * @version: 1.0.0 + * @description: + **************************************************/ +package com.dji.sdk.common; + +import com.dji.sdk.cloudapi.device.DeviceDomainEnum; +import com.dji.sdk.cloudapi.device.DeviceEnum; +import com.dji.sdk.cloudapi.device.DeviceSubTypeEnum; +import com.dji.sdk.cloudapi.device.DeviceTypeEnum; +import com.dji.sdk.exception.CloudSDKErrorEnum; +import com.dji.sdk.exception.CloudSDKException; +import org.springframework.stereotype.Component; + +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +public class LocalCacheSDKManager implements SDKManager{ + + final ConcurrentHashMap<String, GatewayManager> SDK_MAP = new ConcurrentHashMap<>(16); + + + @Override + public GatewayManager getDeviceSDK(String gatewaySn) { + if (SDK_MAP.containsKey(gatewaySn)) { + return SDK_MAP.get(gatewaySn); + } + throw new CloudSDKException(CloudSDKErrorEnum.NOT_REGISTERED, + "The device has not been registered, please call the 'SDKManager.registerDevice()' method to register the device first."); + } + + @Override + public Optional<GatewayManager> findDeviceSDK(String gatewaySn) { + if(SDK_MAP.containsKey(gatewaySn)){ + return Optional.of(SDK_MAP.get(gatewaySn)); + }else { + return Optional.empty(); + } + } + + @Override + public GatewayManager registerDevice(String gatewaySn, String droneSn, DeviceDomainEnum domain, DeviceTypeEnum type, DeviceSubTypeEnum subType, String gatewayThingVersion, String droneThingVersion) { + return registerDevice(gatewaySn, droneSn, GatewayTypeEnum.find(DeviceEnum.find(domain, type, subType)), gatewayThingVersion, droneThingVersion); + } + + @Override + public GatewayManager registerDevice(String gatewaySn, String droneSn, GatewayTypeEnum type, String gatewayThingVersion, String droneThingVersion) { + return registerDevice(new GatewayManager(Objects.requireNonNull(gatewaySn), droneSn, type, gatewayThingVersion, droneThingVersion)); + } + + @Override + public GatewayManager registerDevice(GatewayManager gateway) { + SDK_MAP.put(gateway.getGatewaySn(), gateway); + return gateway; + } + + @Override + public void logoutDevice(String gatewaySn) { + SDK_MAP.remove(gatewaySn); + } +} diff --git a/src/main/java/com/dji/sdk/common/PublishBarrier.java b/src/main/java/com/dji/sdk/common/PublishBarrier.java new file mode 100644 index 0000000..e5b0071 --- /dev/null +++ b/src/main/java/com/dji/sdk/common/PublishBarrier.java @@ -0,0 +1,26 @@ +/************************************************* + * @copyright 2017 Flision Corporation Inc. + * @author: Vincent Chan @ Canton + * @date: 2023年09月22日 + * @version: 1.0.0 + * @description: + **************************************************/ +package com.dji.sdk.common; + +import com.dji.sdk.mqtt.CommonTopicRequest; +import com.dji.sdk.mqtt.CommonTopicResponse; + +public interface PublishBarrier { + + //构建栅栏标识方法 + String generateIdentity(CommonTopicRequest requestData); + String generateIdentity(CommonTopicResponse receiveData); + + void put(String identity, CommonTopicResponse receiveData); + + void registerRequest(String identity, CommonTopicRequest requestData); + + PublishBarrierResult await(String identity,long timeout); + + boolean hasIdentity(String identity); +} diff --git a/src/main/java/com/dji/sdk/common/PublishBarrierResult.java b/src/main/java/com/dji/sdk/common/PublishBarrierResult.java new file mode 100644 index 0000000..38314fd --- /dev/null +++ b/src/main/java/com/dji/sdk/common/PublishBarrierResult.java @@ -0,0 +1,41 @@ +/************************************************* + * @copyright 2017 Flision Corporation Inc. + * @author: Vincent Chan @ Canton + * @date: 2023年09月22日 + * @version: 1.0.0 + * @description: + **************************************************/ +package com.dji.sdk.common; + +import com.dji.sdk.mqtt.CommonTopicResponse; + +public class PublishBarrierResult implements PublishResult { + + public static PublishBarrierResult EMPTY = new PublishBarrierResult(); + + public static PublishBarrierResult ok(CommonTopicResponse data){ + return new PublishBarrierResult().setData(data); + } + + + boolean timeout = true; + + CommonTopicResponse data; + + private PublishBarrierResult() { + } + + private PublishBarrierResult setData(CommonTopicResponse data) { + this.data = data; + this.timeout = false; + return this; + } + + public boolean isTimeout(){ + return timeout; + } + + public CommonTopicResponse getData(){ + return data; + } +} diff --git a/src/main/java/com/dji/sdk/common/PublishConfiguration.java b/src/main/java/com/dji/sdk/common/PublishConfiguration.java new file mode 100644 index 0000000..bfd008c --- /dev/null +++ b/src/main/java/com/dji/sdk/common/PublishConfiguration.java @@ -0,0 +1,88 @@ +/************************************************* + * @copyright 2017 Flision Corporation Inc. + * @author: Vincent Chan @ Canton + * @date: 2023年09月22日 + * @version: 1.0.0 + * @description: + **************************************************/ +package com.dji.sdk.common; + +import java.util.Objects; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +public class PublishConfiguration implements ReadonlyPublishConfiguration { + + String bid; + String tid; + + //默认超时 + int timeout = 3; + //请求发送前调用 + Consumer<PublishRequest> beforePublishHook = null; + //收到请求回信后调用 + BiConsumer<PublishRequest, PublishBarrierResult> afterPublishHook = null; + + + public String getBid() { + return bid; + } + + public String getTid() { + return tid; + } + + public long getTimeout() { + return timeout * 1000; + } + + public void setBizId(String bid) { + this.bid = bid; + } + + public void setTransactionId(String tid) { + this.tid = tid; + } + + public void setTimeout(int timeout) { + this.timeout = timeout; + } + + public void setBeforePublishHook(Consumer<PublishRequest> callback) { + beforePublishHook = callback; + } + + public void setAfterPublishReplyHook(BiConsumer<PublishRequest, PublishBarrierResult> callback) { + afterPublishHook = callback; + } + + public void invokeBeforePublishHook(PublishRequest req){ + if(Objects.nonNull(beforePublishHook)){ + try { + beforePublishHook.accept(req); + }catch (Throwable ex){ + //do nothing + //业务层的异常不理会 + } + } + } + + public void invokeAfterPublishReplyHook(PublishRequest req, PublishBarrierResult result){ + if(Objects.nonNull(afterPublishHook)){ + try{ + afterPublishHook.accept(req,result); + }catch (Throwable ex){ + //do nothing + //业务层的异常不理会 + } + } + } + + public boolean noneBeforePublishHook() { + return Objects.isNull(beforePublishHook); + } + + public boolean noneAfterPublishHook() { + return Objects.isNull(afterPublishHook); + } +} diff --git a/src/main/java/com/dji/sdk/common/PublishOption.java b/src/main/java/com/dji/sdk/common/PublishOption.java new file mode 100644 index 0000000..5f33e49 --- /dev/null +++ b/src/main/java/com/dji/sdk/common/PublishOption.java @@ -0,0 +1,57 @@ +/************************************************* + * @copyright 2017 Flision Corporation Inc. + * @author: Vincent Chan @ Canton + * @date: 2023年09月22日 + * @version: 1.0.0 + * @description: + **************************************************/ +package com.dji.sdk.common; + +import com.google.common.base.Strings; + +import java.util.Objects; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +public class PublishOption { + + public static Consumer<PublishOption> DEFAULT = (cfg)->{}; + + final PublishConfiguration configuration; + + public PublishOption(PublishConfiguration configuration) { + this.configuration = configuration; + } + + public PublishOption withBizId(String bid){ + if(!Strings.isNullOrEmpty(bid)){ + configuration.setBizId(bid); + } + return this; + } + + public PublishOption withTransactionId(String tid){ + if(!Strings.isNullOrEmpty(tid)){ + configuration.setTransactionId(tid); + } + return this; + } + + public PublishOption timeout(int second){ + configuration.setTimeout(second); + return this; + } + + public PublishOption beforePublish(Consumer<PublishRequest> callback){ + if(Objects.nonNull(callback)){ + configuration.setBeforePublishHook(callback); + } + return this; + } + public PublishOption afterPublishReply(BiConsumer<PublishRequest, PublishBarrierResult> callback){ + if(Objects.nonNull(callback)){ + configuration.setAfterPublishReplyHook(callback); + } + return this; + } +} diff --git a/src/main/java/com/dji/sdk/common/PublishRequest.java b/src/main/java/com/dji/sdk/common/PublishRequest.java new file mode 100644 index 0000000..81c8674 --- /dev/null +++ b/src/main/java/com/dji/sdk/common/PublishRequest.java @@ -0,0 +1,20 @@ +/************************************************* + * @copyright 2017 Flision Corporation Inc. + * @author: Vincent Chan @ Canton + * @date: 2023年09月25日 + * @version: 1.0.0 + * @description: + **************************************************/ +package com.dji.sdk.common; + +import com.dji.sdk.common.ReadonlyPublishConfiguration; +import com.dji.sdk.mqtt.CommonTopicRequest; + +public interface PublishRequest { + + String getTopic(); + + CommonTopicRequest getOriginRequest(); + + ReadonlyPublishConfiguration getConfiguration(); +} diff --git a/src/main/java/com/dji/sdk/common/PublishResult.java b/src/main/java/com/dji/sdk/common/PublishResult.java new file mode 100644 index 0000000..8b23693 --- /dev/null +++ b/src/main/java/com/dji/sdk/common/PublishResult.java @@ -0,0 +1,17 @@ +/************************************************* + * @copyright 2017 Flision Corporation Inc. + * @author: Vincent Chan @ Canton + * @date: 2023年09月25日 + * @version: 1.0.0 + * @description: + **************************************************/ +package com.dji.sdk.common; + +import com.dji.sdk.mqtt.CommonTopicResponse; + +public interface PublishResult { + + boolean isTimeout(); + + CommonTopicResponse getData(); +} diff --git a/src/main/java/com/dji/sdk/common/ReadonlyPublishConfiguration.java b/src/main/java/com/dji/sdk/common/ReadonlyPublishConfiguration.java new file mode 100644 index 0000000..ff4e880 --- /dev/null +++ b/src/main/java/com/dji/sdk/common/ReadonlyPublishConfiguration.java @@ -0,0 +1,17 @@ +/************************************************* + * @copyright 2017 Flision Corporation Inc. + * @author: Vincent Chan @ Canton + * @date: 2023年09月25日 + * @version: 1.0.0 + * @description: + **************************************************/ +package com.dji.sdk.common; + +public interface ReadonlyPublishConfiguration { + + String getBid(); + + String getTid(); + + long getTimeout(); +} diff --git a/src/main/java/com/dji/sdk/common/SDKManager.java b/src/main/java/com/dji/sdk/common/SDKManager.java index ce07689..f67fc7d 100644 --- a/src/main/java/com/dji/sdk/common/SDKManager.java +++ b/src/main/java/com/dji/sdk/common/SDKManager.java @@ -8,15 +8,39 @@ import com.dji.sdk.exception.CloudSDKErrorEnum; import com.dji.sdk.exception.CloudSDKException; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; /** * @author sean * @version 1.7 * @date 2023/5/19 + * + * fix: 改接口,由客户决定Gateway管理策略 witcom@2023.09.25 */ -public class SDKManager { +public interface SDKManager { + default GatewayManager getDeviceSDK(String gatewaySn){ + return findDeviceSDK(gatewaySn) + .orElseThrow(()-> new CloudSDKException(CloudSDKErrorEnum.NOT_REGISTERED, + "The device has not been registered, please call the 'SDKManager.registerDevice()' method to register the device first.")); + } + + Optional<GatewayManager> findDeviceSDK(String gatewaySn); + default GatewayManager registerDevice(String gatewaySn, String droneSn, + DeviceDomainEnum domain, DeviceTypeEnum type, DeviceSubTypeEnum subType, String gatewayThingVersion, String droneThingVersion){ + return registerDevice(gatewaySn, droneSn, GatewayTypeEnum.find(DeviceEnum.find(domain, type, subType)), gatewayThingVersion, droneThingVersion); + } + + default GatewayManager registerDevice(String gatewaySn, String droneSn, GatewayTypeEnum type, String gatewayThingVersion, String droneThingVersion){ + return registerDevice(new GatewayManager(Objects.requireNonNull(gatewaySn), droneSn, type, gatewayThingVersion, droneThingVersion)); + } + + GatewayManager registerDevice(GatewayManager gateway); + + void logoutDevice(String gatewaySn); + +/* private SDKManager() { } @@ -30,6 +54,14 @@ public class SDKManager { "The device has not been registered, please call the 'SDKManager.registerDevice()' method to register the device first."); } + public static Optional<GatewayManager> findDeviceSDK(String gatewaySn) { + if(SDK_MAP.containsKey(gatewaySn)){ + return Optional.of(SDK_MAP.get(gatewaySn)); + }else { + return Optional.empty(); + } + } + public static GatewayManager registerDevice(String gatewaySn, String droneSn, DeviceDomainEnum domain, DeviceTypeEnum type, DeviceSubTypeEnum subType, String gatewayThingVersion, String droneThingVersion) { return registerDevice(gatewaySn, droneSn, GatewayTypeEnum.find(DeviceEnum.find(domain, type, subType)), gatewayThingVersion, droneThingVersion); @@ -47,4 +79,6 @@ public class SDKManager { public static void logoutDevice(String gatewaySn) { SDK_MAP.remove(gatewaySn); } + + */ } diff --git a/src/main/java/com/dji/sdk/config/DefaultBeanConfiguration.java b/src/main/java/com/dji/sdk/config/DefaultBeanConfiguration.java new file mode 100644 index 0000000..9829b4a --- /dev/null +++ b/src/main/java/com/dji/sdk/config/DefaultBeanConfiguration.java @@ -0,0 +1,80 @@ +/************************************************* + * @copyright 2017 Flision Corporation Inc. + * @author: Vincent Chan @ Canton + * @date: 2023年09月22日 + * @version: 1.0.0 + * @description: + **************************************************/ +package com.dji.sdk.config; + +import com.dji.sdk.common.*; +import com.dji.sdk.mqtt.ChanBarrierAdapter; +import com.dji.sdk.mqtt.GlobalPublishOption; +import com.dji.sdk.common.PublishRequest; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.UUID; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Supplier; + +@Configuration +public class DefaultBeanConfiguration { + + /** + * 全局发送默认设置 + * @return + */ + @Bean + @ConditionalOnMissingBean(GlobalPublishOption.class) + public GlobalPublishOption defaultPublishOption(){ + return new GlobalPublishOption() { + @Override + public Supplier<String> defaultTransactionId() { + return ()-> UUID.randomUUID().toString(); + } + + @Override + public Supplier<String> defaultBizId() { + return ()-> UUID.randomUUID().toString(); + } + + @Override + public Consumer<PublishRequest> defaultBeforePublishHook() { + return null; + } + + @Override + public BiConsumer<PublishRequest, PublishBarrierResult> defaultAfterPublishHook() { + return null; + } + }; + } + + @Bean + @ConditionalOnMissingBean(SDKManager.class) + public SDKManager localCacheSDKManager(){ + return new LocalCacheSDKManager(); + } + + /** + * 使用者可以自定义PublishBarrier的实现,默认采用Chan实现 + */ + @Bean + @ConditionalOnMissingBean(PublishBarrier.class) + public PublishBarrier chanBarrier(){ + /** 原Chan实现 */ + return new ChanBarrierAdapter(); + } + + /** + * PublishBarrier 另一个实现, 采用同步锁 + */ +// @Bean +// @ConditionalOnMissingBean(PublishBarrier.class) +// public PublishBarrier jdkBarrier(){ +// return new JDKLockBarrierImpl(); +// } +} diff --git a/src/main/java/com/dji/sdk/mqtt/ChanBarrierAdapter.java b/src/main/java/com/dji/sdk/mqtt/ChanBarrierAdapter.java new file mode 100644 index 0000000..368db35 --- /dev/null +++ b/src/main/java/com/dji/sdk/mqtt/ChanBarrierAdapter.java @@ -0,0 +1,53 @@ +/************************************************* + * @copyright 2017 Flision Corporation Inc. + * @author: Vincent Chan @ Canton + * @date: 2023年09月22日 + * @version: 1.0.0 + * @description: + **************************************************/ +package com.dji.sdk.mqtt; + +import com.dji.sdk.common.PublishBarrier; +import com.dji.sdk.common.PublishBarrierResult; + +import java.util.Objects; + +public class ChanBarrierAdapter implements PublishBarrier { + @Override + public String generateIdentity(CommonTopicRequest requestData) { + return requestData.getTid(); + } + + @Override + public String generateIdentity(CommonTopicResponse receiveData) { + return receiveData.getTid(); + } + + @Override + public void put(String identity, CommonTopicResponse receiveData) { + Chan instance = Chan.getInstance(identity, false); + if(Objects.nonNull(instance)){ + instance.put(receiveData); + } + } + + @Override + public void registerRequest(String identity, CommonTopicRequest requestData) { + Chan.getInstance(identity, true); + } + + @Override + public PublishBarrierResult await(String identity,long timeout) { + Chan instance = Chan.getInstance(identity, false); + + CommonTopicResponse response = instance.get(identity, timeout); + + return Objects.nonNull(response) ? PublishBarrierResult.ok(response) : PublishBarrierResult.EMPTY; + } + + @Override + public boolean hasIdentity(String identity) { + Chan instance = Chan.getInstance(identity, false); + return Objects.nonNull(instance); + } +} diff --git a/src/main/java/com/dji/sdk/mqtt/FlowTransformWrapper.java b/src/main/java/com/dji/sdk/mqtt/FlowTransformWrapper.java new file mode 100644 index 0000000..b0c87a8 --- /dev/null +++ b/src/main/java/com/dji/sdk/mqtt/FlowTransformWrapper.java @@ -0,0 +1,51 @@ +/************************************************* + * @copyright 2017 Flision Corporation Inc. + * @author: Vincent Chan @ Canton + * @date: 2023年10月09日 + * @version: 1.0.0 + * @description: + **************************************************/ +package com.dji.sdk.mqtt; + +public class FlowTransformWrapper { + + public final static String DEFAULT_ERROR_MSG = "null"; + + public static FlowTransformWrapper error(){ + return new FlowTransformWrapper(DEFAULT_ERROR_MSG); + } + + public static FlowTransformWrapper ok(CommonTopicRequest request){ + return new FlowTransformWrapper(request); + } + + CommonTopicRequest request; + boolean bError; + String errorMessage; + + private FlowTransformWrapper(CommonTopicRequest request){ + this.request = request; + this.bError = false; + } + + private FlowTransformWrapper(String errorMessage){ + this.bError = true; + this.errorMessage = errorMessage; + } + + public CommonTopicRequest getRequest() { + return request; + } + + public boolean hasError() { + return bError; + } + + public boolean continuee(){ + return !hasError(); + } + + public String getErrorMessage() { + return errorMessage; + } +} diff --git a/src/main/java/com/dji/sdk/mqtt/GlobalPublishOption.java b/src/main/java/com/dji/sdk/mqtt/GlobalPublishOption.java new file mode 100644 index 0000000..50872ca --- /dev/null +++ b/src/main/java/com/dji/sdk/mqtt/GlobalPublishOption.java @@ -0,0 +1,24 @@ +/************************************************* + * @copyright 2017 Flision Corporation Inc. + * @author: Vincent Chan @ Canton + * @date: 2023年09月25日 + * @version: 1.0.0 + * @description: 全局发送默认配置 + **************************************************/ +package com.dji.sdk.mqtt; + +import com.dji.sdk.common.PublishBarrierResult; +import com.dji.sdk.common.PublishRequest; + +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Supplier; + +public interface GlobalPublishOption { + Supplier<String> defaultTransactionId(); + Supplier<String> defaultBizId(); + + Consumer<PublishRequest> defaultBeforePublishHook(); + BiConsumer<PublishRequest, PublishBarrierResult> defaultAfterPublishHook(); + +} diff --git a/src/main/java/com/dji/sdk/mqtt/InboundMessageRouter.java b/src/main/java/com/dji/sdk/mqtt/InboundMessageRouter.java index 4d2a417..aae6d98 100644 --- a/src/main/java/com/dji/sdk/mqtt/InboundMessageRouter.java +++ b/src/main/java/com/dji/sdk/mqtt/InboundMessageRouter.java @@ -37,7 +37,10 @@ public class InboundMessageRouter extends AbstractMessageRouter { String topic = headers.get(MqttHeaders.RECEIVED_TOPIC).toString(); byte[] payload = (byte[])message.getPayload(); - log.debug("received topic: {} \t payload =>{}", topic, new String(payload)); + //fix: 修复未启动debug时仍然需要构造debug参数的问题 by witcom@2023.09.22 + if(log.isDebugEnabled()) { + log.debug("received topic: {} \t payload =>{}", topic, new String(payload)); + } CloudApiTopicEnum topicEnum = CloudApiTopicEnum.find(topic); MessageChannel bean = (MessageChannel) SpringBeanUtils.getBean(topicEnum.getBeanName()); diff --git a/src/main/java/com/dji/sdk/mqtt/MqttGatewayPublish.java b/src/main/java/com/dji/sdk/mqtt/MqttGatewayPublish.java index 8d60425..f25155c 100644 --- a/src/main/java/com/dji/sdk/mqtt/MqttGatewayPublish.java +++ b/src/main/java/com/dji/sdk/mqtt/MqttGatewayPublish.java @@ -1,9 +1,10 @@ package com.dji.sdk.mqtt; -import com.dji.sdk.common.Common; +import com.dji.sdk.common.*; import com.dji.sdk.exception.CloudSDKErrorEnum; import com.dji.sdk.exception.CloudSDKException; import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.base.Strings; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.TypeMismatchException; import org.springframework.integration.mqtt.support.MqttHeaders; @@ -14,7 +15,9 @@ import org.springframework.util.StringUtils; import javax.annotation.Resource; import java.util.Objects; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; /** * @author sean.zhou @@ -32,9 +35,17 @@ public class MqttGatewayPublish { @Resource private IMqttMessageGateway messageGateway; + @Resource + private PublishBarrier publishBarrier; + + @Resource + private GlobalPublishOption globalOptions; + public void publish(String topic, int qos, CommonTopicRequest request) { try { - log.debug("send topic: {}, payload: {}", topic, request.toString()); + if(log.isDebugEnabled()) { + log.debug("send topic: {}, payload: {}", topic, request.toString()); + } byte[] payload = Common.getObjectMapper().writeValueAsBytes(request); messageGateway.publish(topic, payload, qos); } catch (JsonProcessingException e) { @@ -45,7 +56,9 @@ public class MqttGatewayPublish { public void publish(String topic, int qos, CommonTopicResponse response) { try { - log.debug("send topic: {}, payload: {}", topic, response.toString()); + if(log.isDebugEnabled()) { + log.debug("send topic: {}, payload: {}", topic, response.toString()); + } byte[] payload = Common.getObjectMapper().writeValueAsBytes(response); messageGateway.publish(topic, payload, qos); } catch (JsonProcessingException e) { @@ -73,7 +86,9 @@ public class MqttGatewayPublish { AtomicInteger time = new AtomicInteger(0); boolean hasBid = StringUtils.hasText(request.getBid()); request.setBid(hasBid ? request.getBid() : UUID.randomUUID().toString()); + // Retry + // Is Retry necessary? why not use Spring @Retryable instead? while (time.getAndIncrement() <= retryCount) { this.publish(topic, request); @@ -97,5 +112,88 @@ public class MqttGatewayPublish { throw new CloudSDKException(CloudSDKErrorEnum.MQTT_PUBLISH_ABNORMAL, "No message reply received."); } + public <T> CompletableFuture<CommonTopicResponse<T>> publishWithReply(Class<T> clazz, String topic, CommonTopicRequest request, Consumer<PublishOption> options){ + PublishConfiguration config = prepareConfiguration(options); + request.setBid(config.getBid()); + request.setTid(config.getTid()); + + //use to log request data or the last chance to change some data + //CommonTopicRequest丢失了一些需要记录的内容,把这些内容封到PublishRequest交出去 + PublishRequest wrapRequest = new CommonTopicRequestWrapper<T>(clazz,topic, request, config); + config.invokeBeforePublishHook(wrapRequest); + + //注册barrier + String identity = publishBarrier.generateIdentity(request); //提供栅栏标识 + publishBarrier.registerRequest(identity, request); + + return CompletableFuture.supplyAsync(()->{ + this.publish(topic, request); + + if(log.isDebugEnabled()){ log.debug("等待{}指令返回",identity); }; + PublishBarrierResult result = publishBarrier.await(identity,config.getTimeout()); + config.invokeAfterPublishReplyHook(wrapRequest, result); + + if(result.isTimeout()){ + throw new CloudSDKException("Timeout"); //TODO: 换个更明确的异常更好 + } + + if(log.isDebugEnabled()){ log.debug("{}指令已返回",identity); } + return result.getData(); + }); + } + + private PublishConfiguration prepareConfiguration(Consumer<PublishOption> options){ + PublishConfiguration config = new PublishConfiguration(); + PublishOption option = new PublishOption(config); + options.accept(option); + + if(Strings.isNullOrEmpty(config.getBid())){ + config.setBizId(globalOptions.defaultBizId().get()); + } + + if(Strings.isNullOrEmpty(config.getTid())){ + config.setTransactionId(globalOptions.defaultTransactionId().get()); + } + + if(config.noneBeforePublishHook()){ + config.setBeforePublishHook(globalOptions.defaultBeforePublishHook()); + } + + if(config.noneAfterPublishHook()){ + config.setAfterPublishReplyHook(globalOptions.defaultAfterPublishHook()); + } + return config; + } + + static class CommonTopicRequestWrapper<T> implements PublishRequest{ + final CommonTopicRequest request; + final String topic; + final Class<T> clazz; + + final ReadonlyPublishConfiguration config; + + public CommonTopicRequestWrapper(Class<T> clazz, String topic,CommonTopicRequest request, PublishConfiguration config) { + this.clazz = clazz; + this.request = request; + this.topic = topic; + this.config = config; + } + + @Override + public String getTopic() { + return topic; + } + + @Override + public CommonTopicRequest getOriginRequest() { + return request; + } + @Override + public ReadonlyPublishConfiguration getConfiguration() { + return config; + } + + + } } \ No newline at end of file diff --git a/src/main/java/com/dji/sdk/mqtt/osd/OsdRouter.java b/src/main/java/com/dji/sdk/mqtt/osd/OsdRouter.java index 87f5282..814dfa2 100644 --- a/src/main/java/com/dji/sdk/mqtt/osd/OsdRouter.java +++ b/src/main/java/com/dji/sdk/mqtt/osd/OsdRouter.java @@ -14,11 +14,9 @@ import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.Message; +import javax.annotation.Resource; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; +import java.util.*; import static com.dji.sdk.mqtt.TopicConst.*; @@ -32,7 +30,7 @@ import static com.dji.sdk.mqtt.TopicConst.*; public class OsdRouter { @Bean - public IntegrationFlow osdRouterFlow() { + public IntegrationFlow osdRouterFlow(SDKManager sdkManager) { return IntegrationFlows .from(ChannelName.INBOUND_OSD) .transform(Message.class, source -> { @@ -45,19 +43,28 @@ public class OsdRouter { } }, null) .<TopicOsdRequest>handle((response, headers) -> { - GatewayManager gateway = SDKManager.getDeviceSDK(response.getGateway()); - OsdDeviceTypeEnum typeEnum = OsdDeviceTypeEnum.find(gateway.getType(), response.getFrom().equals(response.getGateway())); - Map<String, Object> data = (Map<String, Object>) response.getData(); - if (!typeEnum.isGateway()) { - List payloadData = (List) data.getOrDefault(PayloadModelEnum.PAYLOAD_KEY, new ArrayList<>()); - PayloadModelEnum.getAllIndexWithPosition().stream().filter(data::containsKey) - .map(data::get).forEach(payloadData::add); - data.put(PayloadModelEnum.PAYLOAD_KEY, payloadData); - } - return response.setData(Common.getObjectMapper().convertValue(data, typeEnum.getClassType())); + + // fix: getDeviceSDK抛出异常导致在设备未注册的情况下报osd时产生大量日志 witcom@2023.09.22 + //GatewayManager gateway = SDKManager.getDeviceSDK(response.getGateway()); + return sdkManager.findDeviceSDK(response.getGateway()) + .map(gateway-> { + + OsdDeviceTypeEnum typeEnum = OsdDeviceTypeEnum.find(gateway.getType(), response.getFrom().equals(response.getGateway())); + Map<String, Object> data = (Map<String, Object>) response.getData(); + if (!typeEnum.isGateway()) { + List payloadData = (List) data.getOrDefault(PayloadModelEnum.PAYLOAD_KEY, new ArrayList<>()); + PayloadModelEnum.getAllIndexWithPosition().stream().filter(data::containsKey) + .map(data::get).forEach(payloadData::add); + data.put(PayloadModelEnum.PAYLOAD_KEY, payloadData); + } + return response.setData(Common.getObjectMapper().convertValue(data, typeEnum.getClassType())); + }) + .orElse(null); }) + .<TopicOsdRequest>filter(Objects::nonNull) .<TopicOsdRequest, OsdDeviceTypeEnum>route(response -> OsdDeviceTypeEnum.find(response.getData().getClass()), - mapping -> Arrays.stream(OsdDeviceTypeEnum.values()).forEach(key -> mapping.channelMapping(key, key.getChannelName()))) + mapping -> Arrays.stream(OsdDeviceTypeEnum.values()) + .forEach(key -> mapping.channelMapping(key, key.getChannelName()))) .get(); } diff --git a/src/main/java/com/dji/sdk/mqtt/osd/OsdSubscribe.java b/src/main/java/com/dji/sdk/mqtt/osd/OsdSubscribe.java index bc359b2..ca433c6 100644 --- a/src/main/java/com/dji/sdk/mqtt/osd/OsdSubscribe.java +++ b/src/main/java/com/dji/sdk/mqtt/osd/OsdSubscribe.java @@ -22,8 +22,11 @@ public class OsdSubscribe { @Resource private IMqttTopicService topicService; + @Resource + SDKManager sdkManager; + public void subscribe(GatewayManager gateway, boolean unsubscribeSubDevice) { - SDKManager.registerDevice(gateway); + sdkManager.registerDevice(gateway); topicService.subscribe(String.format(TOPIC, gateway.getGatewaySn())); if (unsubscribeSubDevice) { topicService.unsubscribe(String.format(TOPIC, gateway.getDroneSn())); @@ -35,7 +38,7 @@ public class OsdSubscribe { } public void unsubscribe(GatewayManager gateway) { - SDKManager.logoutDevice(gateway.getGatewaySn()); + sdkManager.logoutDevice(gateway.getGatewaySn()); topicService.unsubscribe(String.format(TOPIC, gateway.getGatewaySn())); if (null != gateway.getDroneSn()) { topicService.unsubscribe(String.format(TOPIC, gateway.getDroneSn())); diff --git a/src/main/java/com/dji/sdk/mqtt/property/PropertySetPublish.java b/src/main/java/com/dji/sdk/mqtt/property/PropertySetPublish.java index 3426a85..2927c8b 100644 --- a/src/main/java/com/dji/sdk/mqtt/property/PropertySetPublish.java +++ b/src/main/java/com/dji/sdk/mqtt/property/PropertySetPublish.java @@ -1,5 +1,8 @@ package com.dji.sdk.mqtt.property; +import com.dji.sdk.common.PublishOption; +import com.dji.sdk.mqtt.CommonTopicRequest; +import com.dji.sdk.mqtt.CommonTopicResponse; import com.dji.sdk.mqtt.MqttGatewayPublish; import com.dji.sdk.mqtt.TopicConst; import org.springframework.stereotype.Component; @@ -7,6 +10,8 @@ import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Objects; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; /** * @author sean @@ -29,12 +34,33 @@ public class PropertySetPublish { public PropertySetReplyResultEnum publish(String sn, Object data, int retryCount, long timeout) { String topic = TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT + Objects.requireNonNull(sn) + TopicConst.PROPERTY_SUF + TopicConst.SET_SUF; + + return gatewayPublish.publishWithReply(PropertySetReplyResultEnum.class,topic, + new CommonTopicRequest<>() + .setTimestamp(System.currentTimeMillis()) + .setData(Objects.requireNonNull(data)), + ops->ops.timeout((int)(timeout/1000))) + .join() + .getData(); + +// return gatewayPublish.publishWithReply( +// PropertySetReplyResultEnum.class, topic, new TopicPropertySetRequest<>() +// .setTid(UUID.randomUUID().toString()) +// .setBid(null) +// .setTimestamp(System.currentTimeMillis()) +// .setData(Objects.requireNonNull(data)), retryCount, timeout).getData(); + } + + public CompletableFuture<CommonTopicResponse<PropertySetReplyResultEnum>> + publish(String sn, Object data, Consumer<PublishOption> options){ + String topic = TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT + Objects.requireNonNull(sn) + TopicConst.PROPERTY_SUF + TopicConst.SET_SUF; return gatewayPublish.publishWithReply( - PropertySetReplyResultEnum.class, topic, new TopicPropertySetRequest<>() - .setTid(UUID.randomUUID().toString()) - .setBid(null) + PropertySetReplyResultEnum.class, + topic, + new CommonTopicRequest<>() .setTimestamp(System.currentTimeMillis()) - .setData(Objects.requireNonNull(data)), retryCount, timeout).getData(); + .setData(Objects.requireNonNull(data)), + options); } } diff --git a/src/main/java/com/dji/sdk/mqtt/property/PropertySetReplyHandler.java b/src/main/java/com/dji/sdk/mqtt/property/PropertySetReplyHandler.java index 1723e22..11c171e 100644 --- a/src/main/java/com/dji/sdk/mqtt/property/PropertySetReplyHandler.java +++ b/src/main/java/com/dji/sdk/mqtt/property/PropertySetReplyHandler.java @@ -1,6 +1,7 @@ package com.dji.sdk.mqtt.property; import com.dji.sdk.common.Common; +import com.dji.sdk.common.PublishBarrier; import com.dji.sdk.mqtt.Chan; import com.dji.sdk.mqtt.ChannelName; import com.fasterxml.jackson.core.type.TypeReference; @@ -9,6 +10,7 @@ import org.springframework.integration.annotation.ServiceActivator; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; +import javax.annotation.Resource; import java.io.IOException; import java.util.Objects; @@ -20,6 +22,9 @@ import java.util.Objects; @Component public class PropertySetReplyHandler { + @Resource + PublishBarrier barrier; + private static final String RESULT_KEY = "result"; /** @@ -32,13 +37,19 @@ public class PropertySetReplyHandler { byte[] payload = (byte[])message.getPayload(); TopicPropertySetResponse receiver = Common.getObjectMapper().readValue(payload, new TypeReference<TopicPropertySetResponse>() {}); - Chan chan = Chan.getInstance(receiver.getTid(), false); - if (Objects.isNull(chan)) { + //fix: use Barrier instead witcom@2023.09.22 + //Chan chan = Chan.getInstance(receiver.getTid(), false); +// if (Objects.isNull(chan)) { +// return; +// } + String identity = barrier.generateIdentity(receiver); + if(!barrier.hasIdentity(identity)) { return; } receiver.setData(PropertySetReplyResultEnum.find( Common.getObjectMapper().convertValue(receiver.getData(), JsonNode.class).findValue(RESULT_KEY).intValue())); // Put the message to the chan object. - chan.put(receiver); + //chan.put(receiver); + barrier.put(identity, receiver); } } diff --git a/src/main/java/com/dji/sdk/mqtt/services/ServicesPublish.java b/src/main/java/com/dji/sdk/mqtt/services/ServicesPublish.java index 9ca29d2..d554365 100644 --- a/src/main/java/com/dji/sdk/mqtt/services/ServicesPublish.java +++ b/src/main/java/com/dji/sdk/mqtt/services/ServicesPublish.java @@ -1,6 +1,8 @@ package com.dji.sdk.mqtt.services; import com.dji.sdk.common.Common; +import com.dji.sdk.common.PublishOption; +import com.dji.sdk.mqtt.CommonTopicResponse; import com.dji.sdk.mqtt.MqttGatewayPublish; import com.dji.sdk.mqtt.TopicConst; import com.fasterxml.jackson.core.type.TypeReference; @@ -10,6 +12,8 @@ import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Objects; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; /** * @author sean @@ -60,6 +64,8 @@ public class ServicesPublish { public <T> TopicServicesResponse<ServicesReplyData<T>> publish( TypeReference<T> clazz, String sn, String method, Object data, String bid, int retryCount, long timeout) { + return this.publish(clazz, sn, method,data,ops-> ops.withBizId(bid).timeout((int)(timeout / 1000))).join(); + /* String topic = TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT + Objects.requireNonNull(sn) + TopicConst.SERVICES_SUF; TopicServicesResponse response = (TopicServicesResponse) gatewayPublish.publishWithReply( ServicesReplyReceiver.class, topic, new TopicServicesRequest<>() @@ -84,6 +90,44 @@ public class ServicesPublish { reply.setOutput(mapper.convertValue(replyReceiver.getOutput(), clazz)); } return response.setData(reply); + + */ } + /** + * Remark by witcom@2023.09.22 + * TODO: 在贡献的版本到这里就不再往下修改了,需要修改所有AbstractXXXService和AOP,改动量很大 + * 主要思想是 + * 1.提供异步支持 + * 2.Chan的实现可由用户自定义 + * 3.提供发送选项参数 + * 4.提供发送前,接收后钩子用于记录请求和返回接近最原始的记录 + */ + + public <T> CompletableFuture<TopicServicesResponse<ServicesReplyData<T>>> publish(TypeReference<T> clazz, String sn, String method, Object data, Consumer<PublishOption> options){ + String topic = TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT + Objects.requireNonNull(sn) + TopicConst.SERVICES_SUF; + return gatewayPublish.publishWithReply(ServicesReplyReceiver.class, topic, new TopicServicesRequest<>() + .setTimestamp(System.currentTimeMillis()) + .setMethod(method) + .setData(Objects.requireNonNullElse(data, "")), options) + .thenApply(response->(TopicServicesResponse)response) + .thenApply(response->{ + ServicesReplyReceiver replyReceiver = (ServicesReplyReceiver) response.getData(); + ServicesReplyData<T> reply = new ServicesReplyData<T>().setResult(replyReceiver.getResult()); + if (Objects.isNull(clazz)) { + reply.setOutput((T) Objects.requireNonNullElse( + replyReceiver.getOutput(), Objects.requireNonNullElse(replyReceiver.getInfo(), ""))); + return response.setData(reply); + } + // put together in "output" + ObjectMapper mapper = Common.getObjectMapper(); + if (Objects.nonNull(replyReceiver.getInfo())) { + reply.setOutput(mapper.convertValue(replyReceiver.getInfo(), clazz)); + } + if (Objects.nonNull(replyReceiver.getOutput())) { + reply.setOutput(mapper.convertValue(replyReceiver.getOutput(), clazz)); + } + return response.setData(reply); + }); + } } diff --git a/src/main/java/com/dji/sdk/mqtt/services/ServicesReplyHandler.java b/src/main/java/com/dji/sdk/mqtt/services/ServicesReplyHandler.java index 01ee174..1ea2af3 100644 --- a/src/main/java/com/dji/sdk/mqtt/services/ServicesReplyHandler.java +++ b/src/main/java/com/dji/sdk/mqtt/services/ServicesReplyHandler.java @@ -3,6 +3,7 @@ package com.dji.sdk.mqtt.services; import com.dji.sdk.cloudapi.log.FileUploadListResponse; import com.dji.sdk.cloudapi.log.LogMethodEnum; import com.dji.sdk.common.Common; +import com.dji.sdk.common.PublishBarrier; import com.dji.sdk.mqtt.Chan; import com.dji.sdk.mqtt.ChannelName; import com.fasterxml.jackson.core.type.TypeReference; @@ -10,6 +11,7 @@ import org.springframework.integration.annotation.ServiceActivator; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; +import javax.annotation.Resource; import java.io.IOException; import java.util.Objects; @@ -21,6 +23,9 @@ import java.util.Objects; @Component public class ServicesReplyHandler { + @Resource + PublishBarrier barrier; + /** * Handle the reply message from topic "/services_reply". * @param message reply message @@ -32,14 +37,20 @@ public class ServicesReplyHandler { TopicServicesResponse<ServicesReplyReceiver> receiver = Common.getObjectMapper() .readValue(payload, new TypeReference<TopicServicesResponse<ServicesReplyReceiver>>() {}); - Chan chan = Chan.getInstance(receiver.getTid(), false); - if (Objects.isNull(chan)) { + //fix: use Barrier instead witcom@2023.09.22 +// Chan chan = Chan.getInstance(receiver.getTid(), false); +// if (Objects.isNull(chan)) { +// return; +// } + String identity = barrier.generateIdentity(receiver); + if(!barrier.hasIdentity(identity)){ return; } if (LogMethodEnum.FILE_UPLOAD_LIST.getMethod().equals(receiver.getMethod())) { receiver.getData().setOutput(Common.getObjectMapper().convertValue(receiver.getData(), new TypeReference<FileUploadListResponse>() {})); } - chan.put(receiver); + barrier.put(identity, receiver); + //chan.put(receiver); } } diff --git a/src/main/java/com/dji/sdk/mqtt/state/DockStateDataKeyEnum.java b/src/main/java/com/dji/sdk/mqtt/state/DockStateDataKeyEnum.java index f657eb0..f19879a 100644 --- a/src/main/java/com/dji/sdk/mqtt/state/DockStateDataKeyEnum.java +++ b/src/main/java/com/dji/sdk/mqtt/state/DockStateDataKeyEnum.java @@ -6,6 +6,7 @@ import com.dji.sdk.exception.CloudSDKException; import java.util.Arrays; import java.util.Collections; +import java.util.Optional; import java.util.Set; /** @@ -47,9 +48,10 @@ public enum DockStateDataKeyEnum { return keys; } - public static DockStateDataKeyEnum find(Set<String> keys) { - return Arrays.stream(values()).filter(keyEnum -> !Collections.disjoint(keys, keyEnum.keys)).findAny() - .orElseThrow(() -> new CloudSDKException(DockStateDataKeyEnum.class, keys)); + public static Optional<DockStateDataKeyEnum> find(Set<String> keys) { + // com.dji.sdk.mqtt.state.DockStateDataKeyEnum has unknown data: [[mode_code_reason]] + return Arrays.stream(values()).filter(keyEnum -> !Collections.disjoint(keys, keyEnum.keys)).findAny(); + // .orElseThrow(() -> new CloudSDKException(DockStateDataKeyEnum.class, keys)); } } diff --git a/src/main/java/com/dji/sdk/mqtt/state/RcStateDataKeyEnum.java b/src/main/java/com/dji/sdk/mqtt/state/RcStateDataKeyEnum.java index b0dfb1d..9b31ed2 100644 --- a/src/main/java/com/dji/sdk/mqtt/state/RcStateDataKeyEnum.java +++ b/src/main/java/com/dji/sdk/mqtt/state/RcStateDataKeyEnum.java @@ -6,6 +6,7 @@ import com.dji.sdk.exception.CloudSDKException; import java.util.Arrays; import java.util.Collections; +import java.util.Optional; import java.util.Set; /** @@ -45,9 +46,9 @@ public enum RcStateDataKeyEnum { return keys; } - public static RcStateDataKeyEnum find(Set<String> keys) { - return Arrays.stream(values()).filter(keyEnum -> !Collections.disjoint(keys, keyEnum.keys)).findAny() - .orElseThrow(() -> new CloudSDKException(RcStateDataKeyEnum.class, keys)); + public static Optional<RcStateDataKeyEnum> find(Set<String> keys) { + return Arrays.stream(values()).filter(keyEnum -> !Collections.disjoint(keys, keyEnum.keys)).findAny(); + //.orElseThrow(() -> new CloudSDKException(RcStateDataKeyEnum.class, keys)); } } diff --git a/src/main/java/com/dji/sdk/mqtt/state/StateRouter.java b/src/main/java/com/dji/sdk/mqtt/state/StateRouter.java index b26c00e..7ba7886 100644 --- a/src/main/java/com/dji/sdk/mqtt/state/StateRouter.java +++ b/src/main/java/com/dji/sdk/mqtt/state/StateRouter.java @@ -1,11 +1,15 @@ package com.dji.sdk.mqtt.state; import com.dji.sdk.common.Common; +import com.dji.sdk.common.GatewayTypeEnum; import com.dji.sdk.common.SDKManager; import com.dji.sdk.exception.CloudSDKErrorEnum; import com.dji.sdk.exception.CloudSDKException; import com.dji.sdk.mqtt.ChannelName; +import com.dji.sdk.mqtt.FlowTransformWrapper; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.dsl.IntegrationFlow; @@ -13,51 +17,79 @@ import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.Message; +import javax.annotation.Resource; import java.io.IOException; -import java.util.Arrays; -import java.util.Map; -import java.util.Set; +import java.util.*; import static com.dji.sdk.mqtt.TopicConst.*; /** - * * @author sean.zhou - * @date 2021/11/17 * @version 0.1 + * @date 2021/11/17 */ +@Slf4j @Configuration public class StateRouter { + @Resource + SDKManager sdkManager; + @Bean public IntegrationFlow stateDataRouterFlow() { + ObjectMapper objectMapper = Common.getObjectMapper(); return IntegrationFlows .from(ChannelName.INBOUND_STATE) .transform(Message.class, source -> { try { - TopicStateRequest response = Common.getObjectMapper().readValue((byte[]) source.getPayload(), new TypeReference<TopicStateRequest>() {}); + TopicStateRequest response = objectMapper.readValue( + (byte[]) source.getPayload(), + new TypeReference<TopicStateRequest>() {}); String topic = String.valueOf(source.getHeaders().get(MqttHeaders.RECEIVED_TOPIC)); String from = topic.substring((THING_MODEL_PRE + PRODUCT).length(), topic.indexOf(STATE_SUF)); - return response.setFrom(from) - .setData(Common.getObjectMapper().convertValue(response.getData(), getTypeReference(response.getGateway(), response.getData()))); - } catch (IOException e) { - throw new CloudSDKException(e); + + return FlowTransformWrapper.ok(response.setFrom(from)); + } catch (Exception ex) { + log.warn("[StateRouter]"+ex.getMessage()); + return FlowTransformWrapper.error(); } }, null) + .filter(FlowTransformWrapper::continuee) + .<FlowTransformWrapper>handle((wrapper, headers) -> { + + TopicStateRequest response = (TopicStateRequest)wrapper.getRequest(); + + //fix: 修复设备未注册前设备推送state导致产生大量日志的问题 witcom@2023.10.08 + try { + return getTypeReference(response.getGateway(), response.getData()) + .map(clazz -> response.setData(objectMapper.convertValue(response.getData(), clazz))) + .orElse(null); + }catch (CloudSDKException ex){ + log.warn("[StateRouter]"+ex.getMessage()); + return null; + } + }) + .filter(Objects::nonNull) .<TopicStateRequest, StateDataKeyEnum>route(response -> StateDataKeyEnum.find(response.getData().getClass()), mapping -> Arrays.stream(StateDataKeyEnum.values()).forEach(key -> mapping.channelMapping(key, key.getChannelName()))) .get(); } - private Class getTypeReference(String gatewaySn, Object data) { + private Optional<Class> getTypeReference(String gatewaySn, Object data) { Set<String> keys = ((Map<String, Object>) data).keySet(); - switch (SDKManager.getDeviceSDK(gatewaySn).getType()) { - case RC: - return RcStateDataKeyEnum.find(keys).getClassType(); - case DOCK: - return DockStateDataKeyEnum.find(keys).getClassType(); - default: - throw new CloudSDKException(CloudSDKErrorEnum.WRONG_DATA, "Unexpected value: " + SDKManager.getDeviceSDK(gatewaySn).getType()); - } + //fix: 捕捉数据流发现在注册前设备可能会推送state主题导致产生大量日志 witcom@2023.10.08 + //GatewayTypeEnum type = sdkManager.getDeviceSDK(gatewaySn).getType(); + return sdkManager.findDeviceSDK(gatewaySn) + .flatMap(gw -> { + GatewayTypeEnum type = gw.getType(); + switch (type) { + case RC: + return RcStateDataKeyEnum.find(keys).map(v->v.getClassType()); + case DOCK: + return DockStateDataKeyEnum.find(keys).map(v->v.getClassType()); + default: + throw new CloudSDKException(CloudSDKErrorEnum.WRONG_DATA, "Unexpected value: " + type); + } + }); } } \ No newline at end of file diff --git a/src/main/java/com/dji/sdk/mqtt/state/StateSubscribe.java b/src/main/java/com/dji/sdk/mqtt/state/StateSubscribe.java index 7a1a746..cf8ec17 100644 --- a/src/main/java/com/dji/sdk/mqtt/state/StateSubscribe.java +++ b/src/main/java/com/dji/sdk/mqtt/state/StateSubscribe.java @@ -20,10 +20,13 @@ public class StateSubscribe { @Resource private IMqttTopicService topicService; + @Resource + SDKManager sdkManager; + public static final String TOPIC = TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT + "%s" + TopicConst.STATE_SUF; public void subscribe(GatewayManager gateway, boolean unsubscribeSubDevice) { - SDKManager.registerDevice(gateway); + sdkManager.registerDevice(gateway); topicService.subscribe(String.format(TOPIC, gateway.getGatewaySn())); if (unsubscribeSubDevice) { topicService.unsubscribe(String.format(TOPIC, gateway.getDroneSn())); @@ -35,7 +38,7 @@ public class StateSubscribe { } public void unsubscribe(GatewayManager gateway) { - SDKManager.logoutDevice(gateway.getGatewaySn()); + sdkManager.logoutDevice(gateway.getGatewaySn()); topicService.unsubscribe(String.format(TOPIC, gateway.getGatewaySn())); if (null != gateway.getDroneSn()) { topicService.unsubscribe(String.format(TOPIC, gateway.getDroneSn())); diff --git a/src/main/java/com/dji/sdk/mqtt/status/StatusSubscribe.java b/src/main/java/com/dji/sdk/mqtt/status/StatusSubscribe.java index 2412067..178e70d 100644 --- a/src/main/java/com/dji/sdk/mqtt/status/StatusSubscribe.java +++ b/src/main/java/com/dji/sdk/mqtt/status/StatusSubscribe.java @@ -22,8 +22,11 @@ public class StatusSubscribe { @Resource private IMqttTopicService topicService; + @Resource + SDKManager sdkManager; + public void subscribe(GatewayManager gateway) { - SDKManager.registerDevice(gateway); + sdkManager.registerDevice(gateway); topicService.subscribe(String.format(TOPIC, gateway.getGatewaySn())); } @@ -32,7 +35,7 @@ public class StatusSubscribe { } public void unsubscribe(GatewayManager gateway) { - SDKManager.logoutDevice(gateway.getGatewaySn()); + sdkManager.logoutDevice(gateway.getGatewaySn()); topicService.unsubscribe(String.format(TOPIC, gateway.getGatewaySn())); }