From 860d5d4995d0ae0f258080e5b60b34b242b6bdb7 Mon Sep 17 00:00:00 2001 From: "sean.zhou" Date: Fri, 21 Oct 2022 18:11:46 +0800 Subject: [PATCH] v1.3.0-beta1 --- sql/cloud_sample.sql | 33 +- .../sample/common/error/LiveErrorEnum.java | 4 +- .../component/GlobalScheduleService.java | 1 + .../mqtt/config/MqttMessageChannel.java | 5 + .../mqtt/handler/RequestsRouter.java | 1 + .../component/mqtt/model/ChannelName.java | 2 + .../mqtt/model/CommonTopicReceiver.java | 1 - .../component/mqtt/model/EventsReceiver.java | 6 + .../component/mqtt/model/MapKeyConst.java | 5 + .../mqtt/model/RequestsMethodEnum.java | 2 + .../mqtt/model/ServicesMethodEnum.java | 10 +- .../sample/component/redis/RedisConst.java | 2 + .../sample/component/redis/RedisOpsUtils.java | 54 +++ .../SpringBeanConfiguration.java | 1 + .../service/impl/ControlServiceImpl.java | 2 +- .../controller/LiveStreamController.java | 5 + .../manage/model/dto/CapacityVideoDTO.java | 4 + .../sample/manage/model/dto/LiveTypeDTO.java | 2 + .../model/receiver/CapacityVideoReceiver.java | 4 + .../manage/service/ILiveStreamService.java | 7 + .../service/impl/CameraVideoServiceImpl.java | 3 +- .../impl/DeviceFirmwareServiceImpl.java | 2 +- .../service/impl/DeviceLogsServiceImpl.java | 2 +- .../service/impl/LiveStreamServiceImpl.java | 46 ++- .../media/service/impl/MediaServiceImpl.java | 2 +- .../controller/WaylineJobController.java | 15 +- .../model/dto/FlightTaskCreateDTO.java | 6 +- .../wayline/model/dto/FlightTaskFileDTO.java | 2 +- .../wayline/model/dto/WaylineJobDTO.java | 15 +- .../model/entity/WaylineJobEntity.java | 22 +- .../model/enums/WaylineJobStatusEnum.java | 39 +++ .../model/enums/WaylineTaskTypeEnum.java | 22 ++ .../model/enums/WaylineTemplateTypeEnum.java | 26 ++ .../wayline/model/param/CreateJobParam.java | 6 +- .../wayline/service/IWaylineJobService.java | 39 ++- .../service/impl/FlightTaskServiceImpl.java | 78 ++++- .../service/impl/WaylineJobServiceImpl.java | 320 +++++++++++++++--- 37 files changed, 703 insertions(+), 93 deletions(-) create mode 100644 src/main/java/com/dji/sample/wayline/model/enums/WaylineJobStatusEnum.java create mode 100644 src/main/java/com/dji/sample/wayline/model/enums/WaylineTaskTypeEnum.java create mode 100644 src/main/java/com/dji/sample/wayline/model/enums/WaylineTemplateTypeEnum.java diff --git a/sql/cloud_sample.sql b/sql/cloud_sample.sql index 993aabd..749d8a1 100644 --- a/sql/cloud_sample.sql +++ b/sql/cloud_sample.sql @@ -127,7 +127,12 @@ VALUES (15,1,90742,0,'L1',NULL), (16,2,56,0,'DJI Smart Controller','Remote control for M300'), (17,2,119,0,'DJI RC Plus','Remote control for M30'), - (18,3,1,0,'DJI Dock',''); + (18,3,1,0,'DJI Dock',''), + (19,0,77,0,'Mavic 3E',NULL), + (20,0,77,1,'Mavic 3T',NULL), + (21,1,66,0,'Mavic 3E Camera',NULL), + (22,1,67,0,'Mavic 3T Camera',NULL), + (23,2,144,0,'DJI RC Pro','Remote control for Mavic 3E/T'); /*!40000 ALTER TABLE `manage_device_dictionary` ENABLE KEYS */; UNLOCK TABLES; @@ -441,6 +446,32 @@ CREATE TABLE `wayline_job` ( ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 COMMENT='Wayline mission information of the dock.'; +# wayline_job_new +# ------------------------------------------------------------ + +DROP TABLE IF EXISTS `wayline_job_new`; + +CREATE TABLE `wayline_job_new` ( + `id` int unsigned NOT NULL AUTO_INCREMENT, + `job_id` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '' COMMENT 'uuid', + `name` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '' COMMENT 'The name of the job.', + `file_id` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '' COMMENT 'The wayline file used for this job.', + `dock_sn` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '' COMMENT 'Which dock executes the job.', + `workspace_id` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '' COMMENT 'Which workspace the current job belongs to.', + `task_type` int NOT NULL, + `wayline_type` int NOT NULL COMMENT 'The template type of the wayline.', + `execute_time` bigint NOT NULL, + `username` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '' COMMENT 'The name of the creator.', + `end_time` bigint DEFAULT NULL COMMENT 'end time of the job.', + `error_code` int DEFAULT NULL, + `status` int NOT NULL COMMENT '1: pending; 2: in progress; 3: success; 4: cancel; 5: failed', + `create_time` bigint NOT NULL, + `update_time` bigint NOT NULL, + PRIMARY KEY (`id`), + UNIQUE KEY `job_id_UNIQUE` (`job_id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 COMMENT='Wayline mission information of the dock.'; + + /*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */; /*!40101 SET SQL_MODE=@OLD_SQL_MODE */; diff --git a/src/main/java/com/dji/sample/common/error/LiveErrorEnum.java b/src/main/java/com/dji/sample/common/error/LiveErrorEnum.java index 0a65eed..6ba3f95 100644 --- a/src/main/java/com/dji/sample/common/error/LiveErrorEnum.java +++ b/src/main/java/com/dji/sample/common/error/LiveErrorEnum.java @@ -67,9 +67,9 @@ public enum LiveErrorEnum implements IErrorInfo { * @return enumeration object */ public static LiveErrorEnum find(int code) { - + final int MOD = 100_000; for (LiveErrorEnum errorEnum : LiveErrorEnum.class.getEnumConstants()) { - if (errorEnum.code == code) { + if (errorEnum.code % MOD == code % MOD) { return errorEnum; } } diff --git a/src/main/java/com/dji/sample/component/GlobalScheduleService.java b/src/main/java/com/dji/sample/component/GlobalScheduleService.java index 2813388..0f8a11b 100644 --- a/src/main/java/com/dji/sample/component/GlobalScheduleService.java +++ b/src/main/java/com/dji/sample/component/GlobalScheduleService.java @@ -6,6 +6,7 @@ import com.dji.sample.component.redis.RedisOpsUtils; import com.dji.sample.manage.model.dto.DeviceDTO; import com.dji.sample.manage.model.enums.DeviceDomainEnum; import com.dji.sample.manage.service.IDeviceService; +import com.dji.sample.wayline.service.IWaylineJobService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; diff --git a/src/main/java/com/dji/sample/component/mqtt/config/MqttMessageChannel.java b/src/main/java/com/dji/sample/component/mqtt/config/MqttMessageChannel.java index 4b278a0..cbf27e0 100644 --- a/src/main/java/com/dji/sample/component/mqtt/config/MqttMessageChannel.java +++ b/src/main/java/com/dji/sample/component/mqtt/config/MqttMessageChannel.java @@ -152,4 +152,9 @@ public class MqttMessageChannel { return new DirectChannel(); } + @Bean(name = ChannelName.INBOUND_REQUESTS_FLIGHT_TASK_RESOURCE_GET) + public MessageChannel requestsFlightTaskResourceGet() { + return new DirectChannel(); + } + } diff --git a/src/main/java/com/dji/sample/component/mqtt/handler/RequestsRouter.java b/src/main/java/com/dji/sample/component/mqtt/handler/RequestsRouter.java index 753bad2..8320737 100644 --- a/src/main/java/com/dji/sample/component/mqtt/handler/RequestsRouter.java +++ b/src/main/java/com/dji/sample/component/mqtt/handler/RequestsRouter.java @@ -42,6 +42,7 @@ public class RequestsRouter { mapping.channelMapping(RequestsMethodEnum.AIRPORT_BIND_STATUS, ChannelName.INBOUND_REQUESTS_AIRPORT_BIND_STATUS); mapping.channelMapping(RequestsMethodEnum.AIRPORT_ORGANIZATION_GET, ChannelName.INBOUND_REQUESTS_AIRPORT_ORGANIZATION_GET); mapping.channelMapping(RequestsMethodEnum.AIRPORT_ORGANIZATION_BIND, ChannelName.INBOUND_REQUESTS_AIRPORT_ORGANIZATION_BIND); + mapping.channelMapping(RequestsMethodEnum.FLIGHT_TASK_RESOURCE_GET, ChannelName.INBOUND_REQUESTS_FLIGHT_TASK_RESOURCE_GET); mapping.channelMapping(RequestsMethodEnum.UNKNOWN, ChannelName.DEFAULT); }) .get(); diff --git a/src/main/java/com/dji/sample/component/mqtt/model/ChannelName.java b/src/main/java/com/dji/sample/component/mqtt/model/ChannelName.java index 83913c4..670ff33 100644 --- a/src/main/java/com/dji/sample/component/mqtt/model/ChannelName.java +++ b/src/main/java/com/dji/sample/component/mqtt/model/ChannelName.java @@ -68,4 +68,6 @@ public class ChannelName { public static final String INBOUND_EVENTS_OTA_PROGRESS = "inboundEventsOtaProgress"; public static final String INBOUND_EVENTS_FILE_UPLOAD_PROGRESS = "inboundEventsFileUploadProgress"; + + public static final String INBOUND_REQUESTS_FLIGHT_TASK_RESOURCE_GET = "inboundEventsFlightTaskResourceGet"; } diff --git a/src/main/java/com/dji/sample/component/mqtt/model/CommonTopicReceiver.java b/src/main/java/com/dji/sample/component/mqtt/model/CommonTopicReceiver.java index 8770bfc..6090338 100644 --- a/src/main/java/com/dji/sample/component/mqtt/model/CommonTopicReceiver.java +++ b/src/main/java/com/dji/sample/component/mqtt/model/CommonTopicReceiver.java @@ -31,5 +31,4 @@ public class CommonTopicReceiver { private Integer needReply; - private String from; } \ No newline at end of file diff --git a/src/main/java/com/dji/sample/component/mqtt/model/EventsReceiver.java b/src/main/java/com/dji/sample/component/mqtt/model/EventsReceiver.java index c4d26aa..1597e21 100644 --- a/src/main/java/com/dji/sample/component/mqtt/model/EventsReceiver.java +++ b/src/main/java/com/dji/sample/component/mqtt/model/EventsReceiver.java @@ -1,7 +1,10 @@ package com.dji.sample.component.mqtt.model; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; +import lombok.NoArgsConstructor; /** * @author sean @@ -10,6 +13,9 @@ import lombok.Data; */ @Data @JsonIgnoreProperties(ignoreUnknown = true) +@Builder +@NoArgsConstructor +@AllArgsConstructor public class EventsReceiver { private Integer result; diff --git a/src/main/java/com/dji/sample/component/mqtt/model/MapKeyConst.java b/src/main/java/com/dji/sample/component/mqtt/model/MapKeyConst.java index f13ec92..242fd3d 100644 --- a/src/main/java/com/dji/sample/component/mqtt/model/MapKeyConst.java +++ b/src/main/java/com/dji/sample/component/mqtt/model/MapKeyConst.java @@ -28,4 +28,9 @@ public final class MapKeyConst { public static final String LIST = "list"; public static final String MODULE_LIST = "module_list"; + + public static final String FLIGHT_ID = "flight_id"; + + public static final String FLIGHT_IDS = "flight_ids"; + } diff --git a/src/main/java/com/dji/sample/component/mqtt/model/RequestsMethodEnum.java b/src/main/java/com/dji/sample/component/mqtt/model/RequestsMethodEnum.java index ddfa599..7d80f12 100644 --- a/src/main/java/com/dji/sample/component/mqtt/model/RequestsMethodEnum.java +++ b/src/main/java/com/dji/sample/component/mqtt/model/RequestsMethodEnum.java @@ -17,6 +17,8 @@ public enum RequestsMethodEnum { AIRPORT_ORGANIZATION_GET("airport_organization_get"), + FLIGHT_TASK_RESOURCE_GET("flighttask_resource_get"), + UNKNOWN("Unknown"); private String method; diff --git a/src/main/java/com/dji/sample/component/mqtt/model/ServicesMethodEnum.java b/src/main/java/com/dji/sample/component/mqtt/model/ServicesMethodEnum.java index 40ca0fc..60a79b6 100644 --- a/src/main/java/com/dji/sample/component/mqtt/model/ServicesMethodEnum.java +++ b/src/main/java/com/dji/sample/component/mqtt/model/ServicesMethodEnum.java @@ -15,7 +15,13 @@ public enum ServicesMethodEnum { LIVE_SET_QUALITY("live_set_quality", false), - FLIGHTTASK_CREATE("flighttask_create", false), + FLIGHT_TASK_CREATE("flighttask_create", false), + + FLIGHT_TASK_PREPARE("flighttask_prepare", false), + + FLIGHT_TASK_EXECUTE("flighttask_execute", false), + + FLIGHT_TASK_CANCEL("flighttask_undo", false), DEBUG_MODE_OPEN("debug_mode_open", false), @@ -61,6 +67,8 @@ public enum ServicesMethodEnum { FILE_UPLOAD_UPDATE("fileupload_update", false), + LIVE_LENS_CHANGE("live_lens_change", false), + UNKNOWN("unknown", false); private String method; diff --git a/src/main/java/com/dji/sample/component/redis/RedisConst.java b/src/main/java/com/dji/sample/component/redis/RedisConst.java index 718ebee..dfbe44f 100644 --- a/src/main/java/com/dji/sample/component/redis/RedisConst.java +++ b/src/main/java/com/dji/sample/component/redis/RedisConst.java @@ -34,4 +34,6 @@ public final class RedisConst { public static final String STATE_PAYLOAD_PREFIX = "payload" + DELIMITER; public static final String LOGS_FILE_PREFIX = "logs_file" + DELIMITER; + + public static final String WAYLINE_JOB = "wayline_job"; } diff --git a/src/main/java/com/dji/sample/component/redis/RedisOpsUtils.java b/src/main/java/com/dji/sample/component/redis/RedisOpsUtils.java index f5b1066..5e329c9 100644 --- a/src/main/java/com/dji/sample/component/redis/RedisOpsUtils.java +++ b/src/main/java/com/dji/sample/component/redis/RedisOpsUtils.java @@ -3,6 +3,7 @@ package com.dji.sample.component.redis; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; import java.util.List; import java.util.Set; @@ -184,4 +185,57 @@ public class RedisOpsUtils { public Long listLen(String key) { return redisTemplate.opsForList().size(key); } + + /** + * ZADD + * @param key + * @param value + * @param score + */ + public Boolean zAdd(String key, Object value, double score) { + return redisTemplate.opsForZSet().add(key, value, score); + } + + /** + * ZREM + * @param key + * @param value + */ + public Boolean zRemove(String key, Object... value) { + return redisTemplate.opsForZSet().remove(key, value) > 0; + } + /** + * ZRANGE + * @param key + * @param start + * @param end + * @return + */ + public Set zRange(String key, long start, long end) { + return redisTemplate.opsForZSet().range(key, start, end); + } + + /** + * ZRANGE + * @param key + * @return + */ + public Object zGetMin(String key) { + Set objects = zRange(key, 0, 0); + if (CollectionUtils.isEmpty(objects)) { + return null; + } + return objects.iterator().next(); + } + + /** + * ZSCORE + * @param key + * @param value + * @return + */ + public Double zScore(String key, Object value) { + return redisTemplate.opsForZSet().score(key, value); + } + } diff --git a/src/main/java/com/dji/sample/configuration/SpringBeanConfiguration.java b/src/main/java/com/dji/sample/configuration/SpringBeanConfiguration.java index 9887d1e..eee6f48 100644 --- a/src/main/java/com/dji/sample/configuration/SpringBeanConfiguration.java +++ b/src/main/java/com/dji/sample/configuration/SpringBeanConfiguration.java @@ -35,6 +35,7 @@ public class SpringBeanConfiguration { objectMapper.disable(MapperFeature.IGNORE_DUPLICATE_MODULE_REGISTRATIONS); objectMapper.registerModules(timeModule); objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); objectMapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true); objectMapper.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true); objectMapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, true); diff --git a/src/main/java/com/dji/sample/control/service/impl/ControlServiceImpl.java b/src/main/java/com/dji/sample/control/service/impl/ControlServiceImpl.java index 2b0bbf4..182620c 100644 --- a/src/main/java/com/dji/sample/control/service/impl/ControlServiceImpl.java +++ b/src/main/java/com/dji/sample/control/service/impl/ControlServiceImpl.java @@ -126,7 +126,7 @@ public class ControlServiceImpl implements IControlService { .bid(receiver.getBid()) .method(receiver.getMethod()) .timestamp(System.currentTimeMillis()) - .data(ResponseResult.success()) + .data(RequestsReply.success()) .build()); } } diff --git a/src/main/java/com/dji/sample/manage/controller/LiveStreamController.java b/src/main/java/com/dji/sample/manage/controller/LiveStreamController.java index 6154e94..c8bd144 100644 --- a/src/main/java/com/dji/sample/manage/controller/LiveStreamController.java +++ b/src/main/java/com/dji/sample/manage/controller/LiveStreamController.java @@ -91,4 +91,9 @@ public class LiveStreamController { return liveStreamService.liveSetQuality(liveParam); } + @PostMapping("/streams/switch") + public ResponseResult liveLensChange(@RequestBody LiveTypeDTO liveParam) { + return liveStreamService.liveLensChange(liveParam); + } + } \ No newline at end of file diff --git a/src/main/java/com/dji/sample/manage/model/dto/CapacityVideoDTO.java b/src/main/java/com/dji/sample/manage/model/dto/CapacityVideoDTO.java index 9366472..531e7ec 100644 --- a/src/main/java/com/dji/sample/manage/model/dto/CapacityVideoDTO.java +++ b/src/main/java/com/dji/sample/manage/model/dto/CapacityVideoDTO.java @@ -5,6 +5,8 @@ import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; +import java.util.List; + /** * @author sean.zhou * @date 2021/11/22 @@ -21,4 +23,6 @@ public class CapacityVideoDTO { private String index; private String type; + + private List switchVideoTypes; } \ No newline at end of file diff --git a/src/main/java/com/dji/sample/manage/model/dto/LiveTypeDTO.java b/src/main/java/com/dji/sample/manage/model/dto/LiveTypeDTO.java index ffe341e..4a385ec 100644 --- a/src/main/java/com/dji/sample/manage/model/dto/LiveTypeDTO.java +++ b/src/main/java/com/dji/sample/manage/model/dto/LiveTypeDTO.java @@ -23,4 +23,6 @@ public class LiveTypeDTO { @JsonProperty("video_quality") private Integer videoQuality; + private String videoType; + } \ No newline at end of file diff --git a/src/main/java/com/dji/sample/manage/model/receiver/CapacityVideoReceiver.java b/src/main/java/com/dji/sample/manage/model/receiver/CapacityVideoReceiver.java index 616b5a9..6c17c15 100644 --- a/src/main/java/com/dji/sample/manage/model/receiver/CapacityVideoReceiver.java +++ b/src/main/java/com/dji/sample/manage/model/receiver/CapacityVideoReceiver.java @@ -4,6 +4,8 @@ import com.fasterxml.jackson.databind.PropertyNamingStrategy; import com.fasterxml.jackson.databind.annotation.JsonNaming; import lombok.Data; +import java.util.List; + /** * @author sean.zhou * @date 2021/11/18 @@ -16,4 +18,6 @@ public class CapacityVideoReceiver { private String videoIndex; private String videoType; + + private List switchableVideoTypes; } \ No newline at end of file diff --git a/src/main/java/com/dji/sample/manage/service/ILiveStreamService.java b/src/main/java/com/dji/sample/manage/service/ILiveStreamService.java index f2a2470..2fb7a4f 100644 --- a/src/main/java/com/dji/sample/manage/service/ILiveStreamService.java +++ b/src/main/java/com/dji/sample/manage/service/ILiveStreamService.java @@ -48,4 +48,11 @@ public interface ILiveStreamService { * @return */ ResponseResult liveSetQuality(LiveTypeDTO liveParam); + + /** + * Switches the lens of the device during the live streaming. + * @param liveParam + * @return + */ + ResponseResult liveLensChange(LiveTypeDTO liveParam); } diff --git a/src/main/java/com/dji/sample/manage/service/impl/CameraVideoServiceImpl.java b/src/main/java/com/dji/sample/manage/service/impl/CameraVideoServiceImpl.java index ec769b3..bfe6de9 100644 --- a/src/main/java/com/dji/sample/manage/service/impl/CameraVideoServiceImpl.java +++ b/src/main/java/com/dji/sample/manage/service/impl/CameraVideoServiceImpl.java @@ -23,7 +23,8 @@ public class CameraVideoServiceImpl implements ICameraVideoService { if (receiver != null) { builder.id(UUID.randomUUID().toString()) .index(receiver.getVideoIndex()) - .type(receiver.getVideoType()); + .type(receiver.getVideoType()) + .switchVideoTypes(receiver.getSwitchableVideoTypes()); } return builder.build(); } diff --git a/src/main/java/com/dji/sample/manage/service/impl/DeviceFirmwareServiceImpl.java b/src/main/java/com/dji/sample/manage/service/impl/DeviceFirmwareServiceImpl.java index 3cf9777..61e2fff 100644 --- a/src/main/java/com/dji/sample/manage/service/impl/DeviceFirmwareServiceImpl.java +++ b/src/main/java/com/dji/sample/manage/service/impl/DeviceFirmwareServiceImpl.java @@ -169,7 +169,7 @@ public class DeviceFirmwareServiceImpl implements IDeviceFirmwareService { .bid(receiver.getBid()) .method(receiver.getMethod()) .timestamp(System.currentTimeMillis()) - .data(ResponseResult.success()) + .data(RequestsReply.success()) .build()); } } diff --git a/src/main/java/com/dji/sample/manage/service/impl/DeviceLogsServiceImpl.java b/src/main/java/com/dji/sample/manage/service/impl/DeviceLogsServiceImpl.java index eec4b41..9e238cb 100644 --- a/src/main/java/com/dji/sample/manage/service/impl/DeviceLogsServiceImpl.java +++ b/src/main/java/com/dji/sample/manage/service/impl/DeviceLogsServiceImpl.java @@ -249,7 +249,7 @@ public class DeviceLogsServiceImpl implements IDeviceLogsService { .bid(receiver.getBid()) .method(receiver.getMethod()) .timestamp(System.currentTimeMillis()) - .data(ResponseResult.success()) + .data(RequestsReply.success()) .build()); } diff --git a/src/main/java/com/dji/sample/manage/service/impl/LiveStreamServiceImpl.java b/src/main/java/com/dji/sample/manage/service/impl/LiveStreamServiceImpl.java index e5b6c8f..aee86ca 100644 --- a/src/main/java/com/dji/sample/manage/service/impl/LiveStreamServiceImpl.java +++ b/src/main/java/com/dji/sample/manage/service/impl/LiveStreamServiceImpl.java @@ -70,7 +70,7 @@ public class LiveStreamServiceImpl implements ILiveStreamService { return devicesList.stream() .filter(device -> redisOps.checkExist(RedisConst.DEVICE_ONLINE_PREFIX + device.getDeviceSn())) .map(device -> CapacityDeviceDTO.builder() - .name(device.getDeviceName()) + .name(Objects.requireNonNullElse(device.getNickname(), device.getDeviceName())) .sn(device.getDeviceSn()) .camerasList(capacityCameraService.getCapacityCameraByDeviceSn(device.getDeviceSn())) .build()) @@ -96,7 +96,7 @@ public class LiveStreamServiceImpl implements ILiveStreamService { public ResponseResult liveStart(LiveTypeDTO liveParam) { // Check if this lens is available live. ResponseResult responseResult = this.checkBeforeLive(liveParam.getVideoId()); - if (responseResult.getCode() != 0) { + if (ResponseResult.CODE_SUCCESS != responseResult.getCode()) { return responseResult; } @@ -109,7 +109,7 @@ public class LiveStreamServiceImpl implements ILiveStreamService { if (receiveReplyOpt.isEmpty()) { return ResponseResult.error(LiveErrorEnum.NO_REPLY); } - if (receiveReplyOpt.get().getResult() != 0) { + if (ResponseResult.CODE_SUCCESS != receiveReplyOpt.get().getResult()) { return ResponseResult.error(LiveErrorEnum.find(receiveReplyOpt.get().getResult())); } @@ -188,12 +188,52 @@ public class LiveStreamServiceImpl implements ILiveStreamService { return ResponseResult.success(); } + @Override + public ResponseResult liveLensChange(LiveTypeDTO liveParam) { + if (!StringUtils.hasText(liveParam.getVideoType())) { + return ResponseResult.error(LiveErrorEnum.ERROR_PARAMETERS); + } + + ResponseResult responseResult = this.checkBeforeLive(liveParam.getVideoId()); + if (ResponseResult.CODE_SUCCESS != responseResult.getCode()) { + return responseResult; + } + if (DeviceDomainEnum.GATEWAY.getDesc().equals(responseResult.getData().getDomain())) { + return ResponseResult.error(LiveErrorEnum.FUNCTION_NOT_SUPPORT); + } + + String respTopic = THING_MODEL_PRE + PRODUCT + responseResult.getData().getDeviceSn() + SERVICES_SUF; + + Optional receiveReplyOpt = this.publishLiveLensChange(respTopic, liveParam); + if (receiveReplyOpt.isEmpty()) { + return ResponseResult.error(LiveErrorEnum.NO_REPLY); + } + if (receiveReplyOpt.get().getResult() != 0) { + return ResponseResult.error(LiveErrorEnum.find(receiveReplyOpt.get().getResult())); + } + + return ResponseResult.success(); + } + + private Optional publishLiveLensChange(String respTopic, LiveTypeDTO liveParam) { + CommonTopicResponse response = new CommonTopicResponse<>(); + response.setTid(UUID.randomUUID().toString()); + response.setBid(UUID.randomUUID().toString()); + response.setMethod(ServicesMethodEnum.LIVE_LENS_CHANGE.getMethod()); + response.setData(liveParam); + + return messageSender.publishWithReply(respTopic, response); + } + /** * Check if this lens is available live. * @param videoId * @return */ private ResponseResult checkBeforeLive(String videoId) { + if (!StringUtils.hasText(videoId)) { + return ResponseResult.error(LiveErrorEnum.ERROR_PARAMETERS); + } String[] videoIdArr = videoId.split("/"); // drone sn / enumeration value of the location where the payload is mounted / payload lens if (videoIdArr.length != 3) { diff --git a/src/main/java/com/dji/sample/media/service/impl/MediaServiceImpl.java b/src/main/java/com/dji/sample/media/service/impl/MediaServiceImpl.java index 0e92b0b..f632b36 100644 --- a/src/main/java/com/dji/sample/media/service/impl/MediaServiceImpl.java +++ b/src/main/java/com/dji/sample/media/service/impl/MediaServiceImpl.java @@ -77,7 +77,7 @@ public class MediaServiceImpl implements IMediaService { CommonTopicResponse data = CommonTopicResponse.builder() .timestamp(System.currentTimeMillis()) .method(EventsMethodEnum.FILE_UPLOAD_CALLBACK.getMethod()) - .data(ResponseResult.success()) + .data(RequestsReply.success()) .tid(receiver.getTid()) .bid(receiver.getBid()) .build(); diff --git a/src/main/java/com/dji/sample/wayline/controller/WaylineJobController.java b/src/main/java/com/dji/sample/wayline/controller/WaylineJobController.java index f066f54..5cf84fa 100644 --- a/src/main/java/com/dji/sample/wayline/controller/WaylineJobController.java +++ b/src/main/java/com/dji/sample/wayline/controller/WaylineJobController.java @@ -11,6 +11,7 @@ import org.springframework.web.bind.annotation.*; import javax.servlet.http.HttpServletRequest; import java.sql.SQLException; +import java.util.List; import static com.dji.sample.component.AuthInterceptor.TOKEN_CLAIM; @@ -39,8 +40,8 @@ public class WaylineJobController { @PathVariable(name = "workspace_id") String workspaceId) throws SQLException { CustomClaim customClaim = (CustomClaim)request.getAttribute(TOKEN_CLAIM); customClaim.setWorkspaceId(workspaceId); - boolean isCreate = waylineJobService.createJob(param, customClaim); - return isCreate ? ResponseResult.success() : ResponseResult.error(); + + return waylineJobService.publishFlightTask(param, customClaim); } /** @@ -59,16 +60,16 @@ public class WaylineJobController { } /** - * Issue wayline mission to the dock for execution. - * @param jobId + * Send the command to cancel the jobs. + * @param jobIds * @param workspaceId * @return * @throws SQLException */ - @PostMapping("/{workspace_id}/jobs/{job_id}") - public ResponseResult publishJob(@PathVariable(name = "job_id") String jobId, + @DeleteMapping("/{workspace_id}/jobs") + public ResponseResult publishCancelJob(@RequestParam(name = "job_id") List jobIds, @PathVariable(name = "workspace_id") String workspaceId) throws SQLException { - waylineJobService.publishFlightTask(workspaceId, jobId); + waylineJobService.cancelFlightTask(workspaceId, jobIds); return ResponseResult.success(); } } diff --git a/src/main/java/com/dji/sample/wayline/model/dto/FlightTaskCreateDTO.java b/src/main/java/com/dji/sample/wayline/model/dto/FlightTaskCreateDTO.java index 736a8c5..dcffa8f 100644 --- a/src/main/java/com/dji/sample/wayline/model/dto/FlightTaskCreateDTO.java +++ b/src/main/java/com/dji/sample/wayline/model/dto/FlightTaskCreateDTO.java @@ -18,7 +18,11 @@ public class FlightTaskCreateDTO { private String flightId; - private String type; + private Integer taskType; + + private Integer waylineType; + + private Long executeTime; private FlightTaskFileDTO file; } diff --git a/src/main/java/com/dji/sample/wayline/model/dto/FlightTaskFileDTO.java b/src/main/java/com/dji/sample/wayline/model/dto/FlightTaskFileDTO.java index ca0accb..e5858ee 100644 --- a/src/main/java/com/dji/sample/wayline/model/dto/FlightTaskFileDTO.java +++ b/src/main/java/com/dji/sample/wayline/model/dto/FlightTaskFileDTO.java @@ -18,5 +18,5 @@ public class FlightTaskFileDTO { private String url; - private String sign; + private String fingerprint; } diff --git a/src/main/java/com/dji/sample/wayline/model/dto/WaylineJobDTO.java b/src/main/java/com/dji/sample/wayline/model/dto/WaylineJobDTO.java index a8820a7..0d7bdda 100644 --- a/src/main/java/com/dji/sample/wayline/model/dto/WaylineJobDTO.java +++ b/src/main/java/com/dji/sample/wayline/model/dto/WaylineJobDTO.java @@ -32,12 +32,19 @@ public class WaylineJobDTO { private String workspaceId; - private String bid; + private Integer waylineType; - private String type; + private Integer taskType; - private String username; + private LocalDateTime executeTime; + + private LocalDateTime endTime; + + private Integer status; - private LocalDateTime updateTime; + private Integer progress; + + private String username; + private Integer code; } diff --git a/src/main/java/com/dji/sample/wayline/model/entity/WaylineJobEntity.java b/src/main/java/com/dji/sample/wayline/model/entity/WaylineJobEntity.java index f2106f6..a757e76 100644 --- a/src/main/java/com/dji/sample/wayline/model/entity/WaylineJobEntity.java +++ b/src/main/java/com/dji/sample/wayline/model/entity/WaylineJobEntity.java @@ -17,7 +17,7 @@ import java.io.Serializable; @Builder @NoArgsConstructor @AllArgsConstructor -@TableName("wayline_job") +@TableName("wayline_job_new") public class WaylineJobEntity implements Serializable { @TableId(type = IdType.AUTO) @@ -38,15 +38,27 @@ public class WaylineJobEntity implements Serializable { @TableField("workspace_id") private String workspaceId; - @TableField("bid") - private String bid; + @TableField("task_type") + private Integer taskType; - @TableField("type") - private String type; + @TableField("wayline_type") + private Integer waylineType; @TableField("username") private String username; + @TableField("execute_time") + private Long executeTime; + + @TableField("end_time") + private Long endTime; + + @TableField("error_code") + private Integer errorCode; + + @TableField("status") + private Integer status; + @TableField(value = "create_time", fill = FieldFill.INSERT) private Long createTime; diff --git a/src/main/java/com/dji/sample/wayline/model/enums/WaylineJobStatusEnum.java b/src/main/java/com/dji/sample/wayline/model/enums/WaylineJobStatusEnum.java new file mode 100644 index 0000000..b33150b --- /dev/null +++ b/src/main/java/com/dji/sample/wayline/model/enums/WaylineJobStatusEnum.java @@ -0,0 +1,39 @@ +package com.dji.sample.wayline.model.enums; + +import lombok.Getter; + +import java.util.Arrays; + +/** + * @author sean + * @version 1.3 + * @date 2022/9/26 + */ +@Getter +public enum WaylineJobStatusEnum { + + PENDING(1, false), + + IN_PROGRESS(2, false), + + SUCCESS(3, true), + + CANCEL(4, true), + + FAILED(5, true), + + UNKNOWN(6, true); + + int val; + + Boolean end; + + WaylineJobStatusEnum(int val, boolean end) { + this.end = end; + this.val = val; + } + + public static WaylineJobStatusEnum find(int val) { + return Arrays.stream(WaylineJobStatusEnum.values()).filter(statue -> statue.val == val).findAny().orElse(UNKNOWN); + } +} diff --git a/src/main/java/com/dji/sample/wayline/model/enums/WaylineTaskTypeEnum.java b/src/main/java/com/dji/sample/wayline/model/enums/WaylineTaskTypeEnum.java new file mode 100644 index 0000000..89e556a --- /dev/null +++ b/src/main/java/com/dji/sample/wayline/model/enums/WaylineTaskTypeEnum.java @@ -0,0 +1,22 @@ +package com.dji.sample.wayline.model.enums; + +import lombok.Getter; + +/** + * @author sean + * @version 1.3 + * @date 2022/9/26 + */ +@Getter +public enum WaylineTaskTypeEnum { + + IMMEDIATE(0), + + TIMED(1); + + int val; + + WaylineTaskTypeEnum(int val) { + this.val = val; + } +} diff --git a/src/main/java/com/dji/sample/wayline/model/enums/WaylineTemplateTypeEnum.java b/src/main/java/com/dji/sample/wayline/model/enums/WaylineTemplateTypeEnum.java new file mode 100644 index 0000000..2388e32 --- /dev/null +++ b/src/main/java/com/dji/sample/wayline/model/enums/WaylineTemplateTypeEnum.java @@ -0,0 +1,26 @@ +package com.dji.sample.wayline.model.enums; + +import lombok.Getter; + +/** + * @author sean + * @version 1.3 + * @date 2022/9/26 + */ +@Getter +public enum WaylineTemplateTypeEnum { + + WAYPOINT(0), + + MAPPING_2D(1), + + MAPPING_3D(2), + + MAPPING_STRIP(4); + + int val; + + WaylineTemplateTypeEnum(int val) { + this.val = val; + } +} diff --git a/src/main/java/com/dji/sample/wayline/model/param/CreateJobParam.java b/src/main/java/com/dji/sample/wayline/model/param/CreateJobParam.java index a186838..e227460 100644 --- a/src/main/java/com/dji/sample/wayline/model/param/CreateJobParam.java +++ b/src/main/java/com/dji/sample/wayline/model/param/CreateJobParam.java @@ -16,7 +16,9 @@ public class CreateJobParam { private String dockSn; - private String type; + private Integer waylineType; - private boolean immediate; + private Integer taskType; + + private Long executeTime; } diff --git a/src/main/java/com/dji/sample/wayline/service/IWaylineJobService.java b/src/main/java/com/dji/sample/wayline/service/IWaylineJobService.java index 4dc78d9..d20c53d 100644 --- a/src/main/java/com/dji/sample/wayline/service/IWaylineJobService.java +++ b/src/main/java/com/dji/sample/wayline/service/IWaylineJobService.java @@ -2,10 +2,14 @@ package com.dji.sample.wayline.service; import com.dji.sample.common.model.CustomClaim; import com.dji.sample.common.model.PaginationData; +import com.dji.sample.common.model.ResponseResult; +import com.dji.sample.component.mqtt.model.CommonTopicReceiver; import com.dji.sample.wayline.model.dto.WaylineJobDTO; import com.dji.sample.wayline.model.param.CreateJobParam; +import org.springframework.messaging.MessageHeaders; import java.sql.SQLException; +import java.util.Collection; import java.util.Optional; /** @@ -16,20 +20,36 @@ import java.util.Optional; public interface IWaylineJobService { /** - * Create a wayline mission for the dock. + * Create wayline job in the database. * @param param - * @param customClaim user info + * @param customClaim user info * @return */ - Boolean createJob(CreateJobParam param, CustomClaim customClaim) throws SQLException; + Optional createWaylineJob(CreateJobParam param, CustomClaim customClaim); /** - * Issue wayline mission to the dock for execution. - * @param workspaceId + * Issue wayline mission to the dock. + * @param param + * @param customClaim user info + * @return + */ + ResponseResult publishFlightTask(CreateJobParam param, CustomClaim customClaim) throws SQLException; + + /** + * Execute the task immediately. * @param jobId + * @throws SQLException * @return */ - void publishFlightTask(String workspaceId, String jobId) throws SQLException; + Boolean executeFlightTask(String jobId); + + /** + * Cancel the task Base on job Ids. + * @param workspaceId + * @param jobIds + * @throws SQLException + */ + void cancelFlightTask(String workspaceId, Collection jobIds); /** * Query job information based on job id. @@ -53,4 +73,11 @@ public interface IWaylineJobService { * @return */ PaginationData getJobsByWorkspaceId(String workspaceId, long page, long pageSize); + + /** + * Process to get interface data of flight mission resources. + * @param receiver + * @param headers + */ + void flightTaskResourceGet(CommonTopicReceiver receiver, MessageHeaders headers); } diff --git a/src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java b/src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java index 46774bb..021e537 100644 --- a/src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java +++ b/src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java @@ -12,16 +12,25 @@ import com.dji.sample.component.websocket.service.IWebSocketManageService; import com.dji.sample.manage.model.dto.DeviceDTO; import com.dji.sample.manage.model.enums.UserTypeEnum; import com.dji.sample.wayline.model.dto.FlightTaskProgressReceiver; +import com.dji.sample.wayline.model.dto.WaylineJobDTO; +import com.dji.sample.wayline.model.enums.WaylineJobStatusEnum; import com.dji.sample.wayline.service.IFlightTaskService; +import com.dji.sample.wayline.service.IWaylineJobService; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; +import org.apache.http.HttpStatus; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.MessageHeaders; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; +import java.time.LocalDateTime; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + /** * @author sean * @version 1.1 @@ -46,18 +55,42 @@ public class FlightTaskServiceImpl implements IFlightTaskService { @Autowired private RedisOpsUtils redisOps; + @Autowired + private IWaylineJobService waylineJobService; + @Override @ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_FLIGHT_TASK_PROGRESS, outputChannel = ChannelName.OUTBOUND) public void handleProgress(CommonTopicReceiver receiver, MessageHeaders headers) { EventsReceiver eventsReceiver = mapper.convertValue(receiver.getData(), new TypeReference>(){}); eventsReceiver.setBid(receiver.getBid()); + eventsReceiver.setSn(receiver.getGateway()); + + FlightTaskProgressReceiver output = eventsReceiver.getOutput(); - log.info("Task progress: " + eventsReceiver.getOutput().getProgress().toString()); + log.info("Task progress: {}", output.getProgress().toString()); if (eventsReceiver.getResult() != ResponseResult.CODE_SUCCESS) { - log.error("Error code: " + eventsReceiver.getResult()); + log.error("Task progress ===> Error code: " + eventsReceiver.getResult()); + } + + EventsResultStatusEnum statusEnum = EventsResultStatusEnum.find(output.getStatus()); + if (statusEnum.getEnd()) { + WaylineJobDTO job = WaylineJobDTO.builder() + .jobId(receiver.getBid()) + .status(WaylineJobStatusEnum.SUCCESS.getVal()) + .endTime(LocalDateTime.now()) + .build(); + + if (EventsResultStatusEnum.OK != statusEnum) { + job.setCode(eventsReceiver.getResult()); + job.setStatus(WaylineJobStatusEnum.FAILED.getVal()); + } + + waylineJobService.updateJob(job); + redisOps.del(receiver.getBid()); } + redisOps.setWithExpire(receiver.getBid(), eventsReceiver, RedisConst.DEVICE_ALIVE_SECOND * RedisConst.DEVICE_ALIVE_SECOND); DeviceDTO device = (DeviceDTO) redisOps.get(RedisConst.DEVICE_ONLINE_PREFIX + receiver.getGateway()); websocketMessageService.sendBatch( @@ -77,8 +110,47 @@ public class FlightTaskServiceImpl implements IFlightTaskService { .bid(receiver.getBid()) .method(EventsMethodEnum.FLIGHT_TASK_PROGRESS.getMethod()) .timestamp(System.currentTimeMillis()) - .data(ResponseResult.success()) + .data(RequestsReply.success()) .build()); } } + + @Scheduled(initialDelay = 10, fixedRate = 5, timeUnit = TimeUnit.SECONDS) + private void checkScheduledJob() { + Object jobIdValue = redisOps.zGetMin(RedisConst.WAYLINE_JOB); + log.info("Check the timed jobs of the wayline. {}", jobIdValue); + if (Objects.isNull(jobIdValue)) { + return; + } + String jobId = String.valueOf(jobIdValue); + double time = redisOps.zScore(RedisConst.WAYLINE_JOB, jobIdValue); + long now = System.currentTimeMillis(); + int offset = 30_000; + + // Expired tasks are deleted directly. + if (time < now - offset) { + redisOps.zRemove(RedisConst.WAYLINE_JOB, jobId); + waylineJobService.updateJob(WaylineJobDTO.builder() + .jobId(jobId) + .status(WaylineJobStatusEnum.FAILED.getVal()) + .endTime(LocalDateTime.now()) + .code(HttpStatus.SC_REQUEST_TIMEOUT).build()); + return; + } + + if (now <= time && time <= now + offset) { + try { + waylineJobService.executeFlightTask(jobId); + } catch (Exception e) { + log.info("The scheduled task delivery failed."); + waylineJobService.updateJob(WaylineJobDTO.builder() + .jobId(jobId) + .status(WaylineJobStatusEnum.FAILED.getVal()) + .endTime(LocalDateTime.now()) + .code(HttpStatus.SC_INTERNAL_SERVER_ERROR).build()); + } finally { + redisOps.zRemove(RedisConst.WAYLINE_JOB, jobId); + } + } + } } diff --git a/src/main/java/com/dji/sample/wayline/service/impl/WaylineJobServiceImpl.java b/src/main/java/com/dji/sample/wayline/service/impl/WaylineJobServiceImpl.java index 9a2336c..76148db 100644 --- a/src/main/java/com/dji/sample/wayline/service/impl/WaylineJobServiceImpl.java +++ b/src/main/java/com/dji/sample/wayline/service/impl/WaylineJobServiceImpl.java @@ -3,40 +3,43 @@ package com.dji.sample.wayline.service.impl; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.dji.sample.common.error.CommonErrorEnum; import com.dji.sample.common.model.CustomClaim; import com.dji.sample.common.model.Pagination; import com.dji.sample.common.model.PaginationData; -import com.dji.sample.component.mqtt.model.CommonTopicResponse; -import com.dji.sample.component.mqtt.model.ServiceReply; -import com.dji.sample.component.mqtt.model.ServicesMethodEnum; -import com.dji.sample.component.mqtt.model.TopicConst; +import com.dji.sample.common.model.ResponseResult; +import com.dji.sample.component.mqtt.model.*; import com.dji.sample.component.mqtt.service.IMessageSenderService; import com.dji.sample.component.redis.RedisConst; import com.dji.sample.component.redis.RedisOpsUtils; import com.dji.sample.manage.model.dto.DeviceDTO; import com.dji.sample.manage.service.IDeviceService; import com.dji.sample.wayline.dao.IWaylineJobMapper; -import com.dji.sample.wayline.model.dto.FlightTaskCreateDTO; -import com.dji.sample.wayline.model.dto.FlightTaskFileDTO; -import com.dji.sample.wayline.model.dto.WaylineFileDTO; -import com.dji.sample.wayline.model.dto.WaylineJobDTO; +import com.dji.sample.wayline.model.dto.*; import com.dji.sample.wayline.model.entity.WaylineJobEntity; +import com.dji.sample.wayline.model.enums.WaylineJobStatusEnum; +import com.dji.sample.wayline.model.enums.WaylineTaskTypeEnum; import com.dji.sample.wayline.model.param.CreateJobParam; import com.dji.sample.wayline.service.IWaylineFileService; import com.dji.sample.wayline.service.IWaylineJobService; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.MessageHeaders; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Isolation; import org.springframework.transaction.annotation.Transactional; +import org.springframework.util.CollectionUtils; import java.net.URL; import java.sql.SQLException; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; -import java.util.List; -import java.util.Optional; -import java.util.UUID; +import java.util.*; import java.util.stream.Collectors; /** @@ -64,10 +67,18 @@ public class WaylineJobServiceImpl implements IWaylineJobService { @Autowired private RedisOpsUtils redisOps; + @Autowired + private ObjectMapper objectMapper; + + @Override - public Boolean createJob(CreateJobParam param, CustomClaim customClaim) throws SQLException { - if (param == null) { - return false; + public Optional createWaylineJob(CreateJobParam param, CustomClaim customClaim) { + if (Objects.isNull(param)) { + return Optional.empty(); + } + // Immediate tasks, allocating time on the backend. + if (Objects.isNull(param.getExecuteTime())) { + param.setExecuteTime(System.currentTimeMillis()); } WaylineJobEntity jobEntity = WaylineJobEntity.builder() .name(param.getName()) @@ -76,58 +87,119 @@ public class WaylineJobServiceImpl implements IWaylineJobService { .username(customClaim.getUsername()) .workspaceId(customClaim.getWorkspaceId()) .jobId(UUID.randomUUID().toString()) - .type(param.getType()) + .executeTime(param.getExecuteTime()) + .status(WaylineJobStatusEnum.PENDING.getVal()) + .taskType(param.getTaskType()) + .waylineType(param.getWaylineType()) .build(); int id = mapper.insert(jobEntity); if (id <= 0) { - return false; - } - if (param.isImmediate()) { - publishFlightTask(jobEntity.getWorkspaceId(), jobEntity.getJobId()); + return Optional.empty(); } - return true; + return Optional.ofNullable(this.entity2Dto(jobEntity)); } @Override - public void publishFlightTask(String workspaceId, String jobId) throws SQLException { - // get job - Optional waylineJob = this.getJobByJobId(jobId); - if (waylineJob.isEmpty()) { - throw new IllegalArgumentException("Job doesn't exist."); + public ResponseResult publishFlightTask(CreateJobParam param, CustomClaim customClaim) throws SQLException { + Optional waylineJobOpt = this.createWaylineJob(param, customClaim); + if (waylineJobOpt.isEmpty()) { + throw new SQLException("Failed to create wayline job."); } + WaylineJobDTO waylineJob = waylineJobOpt.get(); - long expire = redisOps.getExpire(RedisConst.DEVICE_ONLINE_PREFIX + waylineJob.get().getDockSn()); + long expire = redisOps.getExpire(RedisConst.DEVICE_ONLINE_PREFIX + waylineJob.getDockSn()); if (expire < 0) { throw new RuntimeException("Dock is offline."); } // get wayline file - Optional waylineFile = waylineFileService.getWaylineByWaylineId(workspaceId, waylineJob.get().getFileId()); + Optional waylineFile = waylineFileService.getWaylineByWaylineId(waylineJob.getWorkspaceId(), waylineJob.getFileId()); if (waylineFile.isEmpty()) { - throw new IllegalArgumentException("Wayline file doesn't exist."); + throw new SQLException("Wayline file doesn't exist."); } // get file url - URL url = waylineFileService.getObjectUrl(workspaceId, waylineFile.get().getWaylineId()); + URL url = waylineFileService.getObjectUrl(waylineJob.getWorkspaceId(), waylineFile.get().getWaylineId()); - WaylineJobDTO job = waylineJob.get(); FlightTaskCreateDTO flightTask = FlightTaskCreateDTO.builder() - .flightId(jobId) - .type(job.getType()) + .flightId(waylineJob.getJobId()) + .executeTime(waylineJob.getExecuteTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()) + .taskType(waylineJob.getTaskType()) + .waylineType(waylineJob.getWaylineType()) .file(FlightTaskFileDTO.builder() .url(url.toString()) - .sign(waylineFile.get().getSign()) + .fingerprint(waylineFile.get().getSign()) .build()) .build(); + String topic = TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT + + waylineJob.getDockSn() + TopicConst.SERVICES_SUF; + CommonTopicResponse response = CommonTopicResponse.builder() + .tid(UUID.randomUUID().toString()) + .bid(waylineJob.getJobId()) + .timestamp(System.currentTimeMillis()) + .data(flightTask) + .method(ServicesMethodEnum.FLIGHT_TASK_PREPARE.getMethod()) + .build(); + + Optional serviceReplyOpt = messageSender.publishWithReply(topic, response); + if (serviceReplyOpt.isEmpty()) { + log.info("Timeout to receive reply."); + throw new RuntimeException("Timeout to receive reply."); + } + if (serviceReplyOpt.get().getResult() != 0) { + log.info("Prepare task ====> Error code: {}", serviceReplyOpt.get().getResult()); + this.updateJob(WaylineJobDTO.builder() + .workspaceId(waylineJob.getWorkspaceId()) + .jobId(waylineJob.getJobId()) + .status(WaylineJobStatusEnum.FAILED.getVal()) + .endTime(LocalDateTime.now()) + .code(serviceReplyOpt.get().getResult()).build()); + return ResponseResult.error("Prepare task ====> Error code: " + serviceReplyOpt.get().getResult()); + } + + // Issue an immediate task execution command. + if (WaylineTaskTypeEnum.IMMEDIATE.getVal() == waylineJob.getTaskType()) { + if (!executeFlightTask(waylineJob.getJobId())) { + return ResponseResult.error("Failed to execute job."); + } + } + + if (WaylineTaskTypeEnum.TIMED.getVal() == waylineJob.getTaskType()) { + boolean isAdd = redisOps.zAdd(RedisConst.WAYLINE_JOB, waylineJob.getJobId(), + waylineJob.getExecuteTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()); + if (!isAdd) { + return ResponseResult.error("Failed to create scheduled job."); + } + } + + return ResponseResult.success(); + } + + @Override + public Boolean executeFlightTask(String jobId) { + // get job + Optional waylineJob = this.getJobByJobId(jobId); + if (waylineJob.isEmpty()) { + throw new IllegalArgumentException("Job doesn't exist."); + } + + long expire = redisOps.getExpire(RedisConst.DEVICE_ONLINE_PREFIX + waylineJob.get().getDockSn()); + if (expire < 0) { + throw new RuntimeException("Dock is offline."); + } + + WaylineJobDTO job = waylineJob.get(); + FlightTaskCreateDTO flightTask = FlightTaskCreateDTO.builder().flightId(jobId).build(); + String topic = TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT + job.getDockSn() + TopicConst.SERVICES_SUF; CommonTopicResponse response = CommonTopicResponse.builder() .tid(UUID.randomUUID().toString()) - .bid(UUID.randomUUID().toString()) + .bid(jobId) .timestamp(System.currentTimeMillis()) .data(flightTask) - .method(ServicesMethodEnum.FLIGHTTASK_CREATE.getMethod()) + .method(ServicesMethodEnum.FLIGHT_TASK_EXECUTE.getMethod()) .build(); Optional serviceReplyOpt = messageSender.publishWithReply(topic, response); @@ -136,15 +208,90 @@ public class WaylineJobServiceImpl implements IWaylineJobService { throw new RuntimeException("Timeout to receive reply."); } if (serviceReplyOpt.get().getResult() != 0) { - log.info("Error code: {}", serviceReplyOpt.get().getResult()); - throw new RuntimeException("Error code: " + serviceReplyOpt.get().getResult()); + log.info("Execute job ====> Error code: {}", serviceReplyOpt.get().getResult()); + this.updateJob(WaylineJobDTO.builder() + .jobId(jobId) + .status(WaylineJobStatusEnum.FAILED.getVal()) + .endTime(LocalDateTime.now()) + .code(serviceReplyOpt.get().getResult()).build()); + return false; } - job.setBid(response.getBid()); - boolean isUpd = this.updateJob(job); - if (!isUpd) { - throw new SQLException("Failed to update data."); + this.updateJob(WaylineJobDTO.builder() + .jobId(jobId) + .status(WaylineJobStatusEnum.IN_PROGRESS.getVal()) + .build()); + redisOps.setWithExpire(jobId, + EventsReceiver.builder().bid(jobId).sn(job.getDockSn()).build(), + RedisConst.DEVICE_ALIVE_SECOND * RedisConst.DEVICE_ALIVE_SECOND); + return true; + } + + @Override + public void cancelFlightTask(String workspaceId, Collection jobIds) { + List waylineJobs = mapper.selectList( + new LambdaQueryWrapper() + .or(wrapper -> jobIds.forEach(id -> wrapper.eq(WaylineJobEntity::getJobId, id)))); + + // Check if the job have ended. + List endJobs = waylineJobs.stream() + .filter(job -> WaylineJobStatusEnum.find(job.getStatus()).getEnd()) + .map(WaylineJobEntity::getName) + .collect(Collectors.toList()); + if (!CollectionUtils.isEmpty(endJobs)) { + throw new IllegalArgumentException("There are jobs that have ended." + Arrays.toString(endJobs.toArray())); + } + + Set ids = waylineJobs.stream().map(WaylineJobEntity::getJobId).collect(Collectors.toSet()); + for (String id : jobIds) { + if (!ids.contains(id)) { + throw new IllegalArgumentException("Job id " + id + " doesn't exist."); + } + } + + // Group job id by dock sn. + Map> dockJobs = waylineJobs.stream() + .collect(Collectors.groupingBy(WaylineJobEntity::getDockSn, + Collectors.mapping(WaylineJobEntity::getJobId, Collectors.toList()))); + dockJobs.forEach((dockSn, idList) -> this.publishCancelTask(workspaceId, dockSn, idList)); + + } + + private void publishCancelTask(String workspaceId, String dockSn, List jobIds) { + long expire = redisOps.getExpire(RedisConst.DEVICE_ONLINE_PREFIX + dockSn); + if (expire < 0) { + throw new RuntimeException("Dock is offline."); + } + String topic = TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT + dockSn + TopicConst.SERVICES_SUF; + + CommonTopicResponse response = CommonTopicResponse.builder() + .tid(UUID.randomUUID().toString()) + .bid(UUID.randomUUID().toString()) + .timestamp(System.currentTimeMillis()) + .data(Map.of(MapKeyConst.FLIGHT_IDS, jobIds)) + .method(ServicesMethodEnum.FLIGHT_TASK_CANCEL.getMethod()) + .build(); + + Optional serviceReplyOpt = messageSender.publishWithReply(topic, response); + if (serviceReplyOpt.isEmpty()) { + log.info("Timeout to receive reply."); + throw new RuntimeException("Timeout to receive reply."); + } + if (serviceReplyOpt.get().getResult() != 0) { + log.info("Cancel job ====> Error code: {}", serviceReplyOpt.get().getResult()); + throw new RuntimeException("Failed to cancel the wayline job of " + dockSn); + } + + for (String jobId : jobIds) { + this.updateJob(WaylineJobDTO.builder() + .workspaceId(workspaceId) + .jobId(jobId) + .status(WaylineJobStatusEnum.CANCEL.getVal()) + .endTime(LocalDateTime.now()) + .build()); + redisOps.zRemove(RedisConst.WAYLINE_JOB, jobId); } + } @Override @@ -159,9 +306,7 @@ public class WaylineJobServiceImpl implements IWaylineJobService { public Boolean updateJob(WaylineJobDTO dto) { return mapper.update(this.dto2Entity(dto), new LambdaUpdateWrapper() - .eq(WaylineJobEntity::getWorkspaceId, dto.getWorkspaceId()) - .eq(WaylineJobEntity::getJobId, dto.getJobId())) - > 0; + .eq(WaylineJobEntity::getJobId, dto.getJobId())) > 0; } @Override @@ -178,14 +323,75 @@ public class WaylineJobServiceImpl implements IWaylineJobService { return new PaginationData(records, new Pagination(pageData)); } + + @Override + @ServiceActivator(inputChannel = ChannelName.INBOUND_REQUESTS_FLIGHT_TASK_RESOURCE_GET, outputChannel = ChannelName.OUTBOUND) + @Transactional(isolation = Isolation.READ_UNCOMMITTED) + public void flightTaskResourceGet(CommonTopicReceiver receiver, MessageHeaders headers) { + Map jobIdMap = objectMapper.convertValue(receiver.getData(), new TypeReference>() {}); + String jobId = jobIdMap.get(MapKeyConst.FLIGHT_ID); + + CommonTopicResponse.CommonTopicResponseBuilder builder = CommonTopicResponse.builder() + .tid(receiver.getTid()) + .bid(receiver.getBid()) + .method(RequestsMethodEnum.FLIGHT_TASK_RESOURCE_GET.getMethod()) + .timestamp(System.currentTimeMillis()); + + String topic = headers.get(MqttHeaders.RECEIVED_TOPIC).toString() + TopicConst._REPLY_SUF; + + Optional waylineJobOpt = this.getJobByJobId(jobId); + if (waylineJobOpt.isEmpty()) { + builder.data(RequestsReply.error(CommonErrorEnum.ILLEGAL_ARGUMENT)); + messageSender.publish(topic, builder.build()); + return; + } + + WaylineJobDTO waylineJob = waylineJobOpt.get(); + + // get wayline file + Optional waylineFile = waylineFileService.getWaylineByWaylineId(waylineJob.getWorkspaceId(), waylineJob.getFileId()); + if (waylineFile.isEmpty()) { + builder.data(RequestsReply.error(CommonErrorEnum.ILLEGAL_ARGUMENT)); + messageSender.publish(topic, builder.build()); + return; + } + + // get file url + URL url = null; + try { + url = waylineFileService.getObjectUrl(waylineJob.getWorkspaceId(), waylineFile.get().getWaylineId()); + builder.data(RequestsReply.success(FlightTaskCreateDTO.builder() + .file(FlightTaskFileDTO.builder() + .url(url.toString()) + .fingerprint(waylineFile.get().getSign()) + .build()) + .build())); + + } catch (SQLException | NullPointerException e) { + e.printStackTrace(); + builder.data(RequestsReply.error(CommonErrorEnum.ILLEGAL_ARGUMENT)); + messageSender.publish(topic, builder.build()); + return; + } + + messageSender.publish(topic, builder.build()); + + } + private WaylineJobEntity dto2Entity(WaylineJobDTO dto) { WaylineJobEntity.WaylineJobEntityBuilder builder = WaylineJobEntity.builder(); if (dto == null) { return builder.build(); } - return builder.type(dto.getType()) - .bid(dto.getBid()) + if (Objects.nonNull(dto.getEndTime())) { + builder.endTime(dto.getEndTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()); + } + if (Objects.nonNull(dto.getExecuteTime())) { + builder.executeTime(dto.getExecuteTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()); + } + return builder.status(dto.getStatus()) .name(dto.getJobName()) + .errorCode(dto.getCode()) .build(); } @@ -193,10 +399,9 @@ public class WaylineJobServiceImpl implements IWaylineJobService { if (entity == null) { return null; } - return WaylineJobDTO.builder() + + WaylineJobDTO.WaylineJobDTOBuilder builder = WaylineJobDTO.builder() .jobId(entity.getJobId()) - .bid(entity.getBid()) - .updateTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(entity.getUpdateTime()), ZoneId.systemDefault())) .jobName(entity.getName()) .fileId(entity.getFileId()) .fileName(waylineFileService.getWaylineByWaylineId(entity.getWorkspaceId(), entity.getFileId()) @@ -206,7 +411,20 @@ public class WaylineJobServiceImpl implements IWaylineJobService { .orElse(DeviceDTO.builder().build()).getNickname()) .username(entity.getUsername()) .workspaceId(entity.getWorkspaceId()) - .type(entity.getType()) - .build(); + .status(entity.getStatus()) + .code(entity.getErrorCode()) + .executeTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(entity.getExecuteTime()), ZoneId.systemDefault())) + .taskType(entity.getTaskType()) + .waylineType(entity.getWaylineType()); + if (Objects.nonNull(entity.getEndTime())) { + builder.endTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(entity.getEndTime()), ZoneId.systemDefault())); + } + if (WaylineJobStatusEnum.IN_PROGRESS.getVal() == entity.getStatus() && redisOps.getExpire(entity.getJobId()) > 0) { + EventsReceiver taskProgress = (EventsReceiver) redisOps.get(entity.getJobId()); + if (Objects.nonNull(taskProgress.getOutput()) && Objects.nonNull(taskProgress.getOutput().getProgress())) { + builder.progress(taskProgress.getOutput().getProgress().getPercent()); + } + } + return builder.build(); } }