Compare commits

...

1 Commits

Author SHA1 Message Date
sean.zhou 860d5d4995 v1.3.0-beta1 2 years ago
  1. 33
      sql/cloud_sample.sql
  2. 4
      src/main/java/com/dji/sample/common/error/LiveErrorEnum.java
  3. 1
      src/main/java/com/dji/sample/component/GlobalScheduleService.java
  4. 5
      src/main/java/com/dji/sample/component/mqtt/config/MqttMessageChannel.java
  5. 1
      src/main/java/com/dji/sample/component/mqtt/handler/RequestsRouter.java
  6. 2
      src/main/java/com/dji/sample/component/mqtt/model/ChannelName.java
  7. 1
      src/main/java/com/dji/sample/component/mqtt/model/CommonTopicReceiver.java
  8. 6
      src/main/java/com/dji/sample/component/mqtt/model/EventsReceiver.java
  9. 5
      src/main/java/com/dji/sample/component/mqtt/model/MapKeyConst.java
  10. 2
      src/main/java/com/dji/sample/component/mqtt/model/RequestsMethodEnum.java
  11. 10
      src/main/java/com/dji/sample/component/mqtt/model/ServicesMethodEnum.java
  12. 2
      src/main/java/com/dji/sample/component/redis/RedisConst.java
  13. 54
      src/main/java/com/dji/sample/component/redis/RedisOpsUtils.java
  14. 1
      src/main/java/com/dji/sample/configuration/SpringBeanConfiguration.java
  15. 2
      src/main/java/com/dji/sample/control/service/impl/ControlServiceImpl.java
  16. 5
      src/main/java/com/dji/sample/manage/controller/LiveStreamController.java
  17. 4
      src/main/java/com/dji/sample/manage/model/dto/CapacityVideoDTO.java
  18. 2
      src/main/java/com/dji/sample/manage/model/dto/LiveTypeDTO.java
  19. 4
      src/main/java/com/dji/sample/manage/model/receiver/CapacityVideoReceiver.java
  20. 7
      src/main/java/com/dji/sample/manage/service/ILiveStreamService.java
  21. 3
      src/main/java/com/dji/sample/manage/service/impl/CameraVideoServiceImpl.java
  22. 2
      src/main/java/com/dji/sample/manage/service/impl/DeviceFirmwareServiceImpl.java
  23. 2
      src/main/java/com/dji/sample/manage/service/impl/DeviceLogsServiceImpl.java
  24. 46
      src/main/java/com/dji/sample/manage/service/impl/LiveStreamServiceImpl.java
  25. 2
      src/main/java/com/dji/sample/media/service/impl/MediaServiceImpl.java
  26. 15
      src/main/java/com/dji/sample/wayline/controller/WaylineJobController.java
  27. 6
      src/main/java/com/dji/sample/wayline/model/dto/FlightTaskCreateDTO.java
  28. 2
      src/main/java/com/dji/sample/wayline/model/dto/FlightTaskFileDTO.java
  29. 15
      src/main/java/com/dji/sample/wayline/model/dto/WaylineJobDTO.java
  30. 22
      src/main/java/com/dji/sample/wayline/model/entity/WaylineJobEntity.java
  31. 39
      src/main/java/com/dji/sample/wayline/model/enums/WaylineJobStatusEnum.java
  32. 22
      src/main/java/com/dji/sample/wayline/model/enums/WaylineTaskTypeEnum.java
  33. 26
      src/main/java/com/dji/sample/wayline/model/enums/WaylineTemplateTypeEnum.java
  34. 6
      src/main/java/com/dji/sample/wayline/model/param/CreateJobParam.java
  35. 39
      src/main/java/com/dji/sample/wayline/service/IWaylineJobService.java
  36. 78
      src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java
  37. 320
      src/main/java/com/dji/sample/wayline/service/impl/WaylineJobServiceImpl.java

33
sql/cloud_sample.sql

@ -127,7 +127,12 @@ VALUES @@ -127,7 +127,12 @@ VALUES
(15,1,90742,0,'L1',NULL),
(16,2,56,0,'DJI Smart Controller','Remote control for M300'),
(17,2,119,0,'DJI RC Plus','Remote control for M30'),
(18,3,1,0,'DJI Dock','');
(18,3,1,0,'DJI Dock',''),
(19,0,77,0,'Mavic 3E',NULL),
(20,0,77,1,'Mavic 3T',NULL),
(21,1,66,0,'Mavic 3E Camera',NULL),
(22,1,67,0,'Mavic 3T Camera',NULL),
(23,2,144,0,'DJI RC Pro','Remote control for Mavic 3E/T');
/*!40000 ALTER TABLE `manage_device_dictionary` ENABLE KEYS */;
UNLOCK TABLES;
@ -441,6 +446,32 @@ CREATE TABLE `wayline_job` ( @@ -441,6 +446,32 @@ CREATE TABLE `wayline_job` (
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 COMMENT='Wayline mission information of the dock.';
# wayline_job_new
# ------------------------------------------------------------
DROP TABLE IF EXISTS `wayline_job_new`;
CREATE TABLE `wayline_job_new` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`job_id` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '' COMMENT 'uuid',
`name` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '' COMMENT 'The name of the job.',
`file_id` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '' COMMENT 'The wayline file used for this job.',
`dock_sn` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '' COMMENT 'Which dock executes the job.',
`workspace_id` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '' COMMENT 'Which workspace the current job belongs to.',
`task_type` int NOT NULL,
`wayline_type` int NOT NULL COMMENT 'The template type of the wayline.',
`execute_time` bigint NOT NULL,
`username` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '' COMMENT 'The name of the creator.',
`end_time` bigint DEFAULT NULL COMMENT 'end time of the job.',
`error_code` int DEFAULT NULL,
`status` int NOT NULL COMMENT '1: pending; 2: in progress; 3: success; 4: cancel; 5: failed',
`create_time` bigint NOT NULL,
`update_time` bigint NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `job_id_UNIQUE` (`job_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 COMMENT='Wayline mission information of the dock.';
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;
/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;

4
src/main/java/com/dji/sample/common/error/LiveErrorEnum.java

@ -67,9 +67,9 @@ public enum LiveErrorEnum implements IErrorInfo { @@ -67,9 +67,9 @@ public enum LiveErrorEnum implements IErrorInfo {
* @return enumeration object
*/
public static LiveErrorEnum find(int code) {
final int MOD = 100_000;
for (LiveErrorEnum errorEnum : LiveErrorEnum.class.getEnumConstants()) {
if (errorEnum.code == code) {
if (errorEnum.code % MOD == code % MOD) {
return errorEnum;
}
}

1
src/main/java/com/dji/sample/component/GlobalScheduleService.java

@ -6,6 +6,7 @@ import com.dji.sample.component.redis.RedisOpsUtils; @@ -6,6 +6,7 @@ import com.dji.sample.component.redis.RedisOpsUtils;
import com.dji.sample.manage.model.dto.DeviceDTO;
import com.dji.sample.manage.model.enums.DeviceDomainEnum;
import com.dji.sample.manage.service.IDeviceService;
import com.dji.sample.wayline.service.IWaylineJobService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;

5
src/main/java/com/dji/sample/component/mqtt/config/MqttMessageChannel.java

@ -152,4 +152,9 @@ public class MqttMessageChannel { @@ -152,4 +152,9 @@ public class MqttMessageChannel {
return new DirectChannel();
}
@Bean(name = ChannelName.INBOUND_REQUESTS_FLIGHT_TASK_RESOURCE_GET)
public MessageChannel requestsFlightTaskResourceGet() {
return new DirectChannel();
}
}

1
src/main/java/com/dji/sample/component/mqtt/handler/RequestsRouter.java

@ -42,6 +42,7 @@ public class RequestsRouter { @@ -42,6 +42,7 @@ public class RequestsRouter {
mapping.channelMapping(RequestsMethodEnum.AIRPORT_BIND_STATUS, ChannelName.INBOUND_REQUESTS_AIRPORT_BIND_STATUS);
mapping.channelMapping(RequestsMethodEnum.AIRPORT_ORGANIZATION_GET, ChannelName.INBOUND_REQUESTS_AIRPORT_ORGANIZATION_GET);
mapping.channelMapping(RequestsMethodEnum.AIRPORT_ORGANIZATION_BIND, ChannelName.INBOUND_REQUESTS_AIRPORT_ORGANIZATION_BIND);
mapping.channelMapping(RequestsMethodEnum.FLIGHT_TASK_RESOURCE_GET, ChannelName.INBOUND_REQUESTS_FLIGHT_TASK_RESOURCE_GET);
mapping.channelMapping(RequestsMethodEnum.UNKNOWN, ChannelName.DEFAULT);
})
.get();

2
src/main/java/com/dji/sample/component/mqtt/model/ChannelName.java

@ -68,4 +68,6 @@ public class ChannelName { @@ -68,4 +68,6 @@ public class ChannelName {
public static final String INBOUND_EVENTS_OTA_PROGRESS = "inboundEventsOtaProgress";
public static final String INBOUND_EVENTS_FILE_UPLOAD_PROGRESS = "inboundEventsFileUploadProgress";
public static final String INBOUND_REQUESTS_FLIGHT_TASK_RESOURCE_GET = "inboundEventsFlightTaskResourceGet";
}

1
src/main/java/com/dji/sample/component/mqtt/model/CommonTopicReceiver.java

@ -31,5 +31,4 @@ public class CommonTopicReceiver<T> { @@ -31,5 +31,4 @@ public class CommonTopicReceiver<T> {
private Integer needReply;
private String from;
}

6
src/main/java/com/dji/sample/component/mqtt/model/EventsReceiver.java

@ -1,7 +1,10 @@ @@ -1,7 +1,10 @@
package com.dji.sample.component.mqtt.model;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author sean
@ -10,6 +13,9 @@ import lombok.Data; @@ -10,6 +13,9 @@ import lombok.Data;
*/
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class EventsReceiver<T> {
private Integer result;

5
src/main/java/com/dji/sample/component/mqtt/model/MapKeyConst.java

@ -28,4 +28,9 @@ public final class MapKeyConst { @@ -28,4 +28,9 @@ public final class MapKeyConst {
public static final String LIST = "list";
public static final String MODULE_LIST = "module_list";
public static final String FLIGHT_ID = "flight_id";
public static final String FLIGHT_IDS = "flight_ids";
}

2
src/main/java/com/dji/sample/component/mqtt/model/RequestsMethodEnum.java

@ -17,6 +17,8 @@ public enum RequestsMethodEnum { @@ -17,6 +17,8 @@ public enum RequestsMethodEnum {
AIRPORT_ORGANIZATION_GET("airport_organization_get"),
FLIGHT_TASK_RESOURCE_GET("flighttask_resource_get"),
UNKNOWN("Unknown");
private String method;

10
src/main/java/com/dji/sample/component/mqtt/model/ServicesMethodEnum.java

@ -15,7 +15,13 @@ public enum ServicesMethodEnum { @@ -15,7 +15,13 @@ public enum ServicesMethodEnum {
LIVE_SET_QUALITY("live_set_quality", false),
FLIGHTTASK_CREATE("flighttask_create", false),
FLIGHT_TASK_CREATE("flighttask_create", false),
FLIGHT_TASK_PREPARE("flighttask_prepare", false),
FLIGHT_TASK_EXECUTE("flighttask_execute", false),
FLIGHT_TASK_CANCEL("flighttask_undo", false),
DEBUG_MODE_OPEN("debug_mode_open", false),
@ -61,6 +67,8 @@ public enum ServicesMethodEnum { @@ -61,6 +67,8 @@ public enum ServicesMethodEnum {
FILE_UPLOAD_UPDATE("fileupload_update", false),
LIVE_LENS_CHANGE("live_lens_change", false),
UNKNOWN("unknown", false);
private String method;

2
src/main/java/com/dji/sample/component/redis/RedisConst.java

@ -34,4 +34,6 @@ public final class RedisConst { @@ -34,4 +34,6 @@ public final class RedisConst {
public static final String STATE_PAYLOAD_PREFIX = "payload" + DELIMITER;
public static final String LOGS_FILE_PREFIX = "logs_file" + DELIMITER;
public static final String WAYLINE_JOB = "wayline_job";
}

54
src/main/java/com/dji/sample/component/redis/RedisOpsUtils.java

@ -3,6 +3,7 @@ package com.dji.sample.component.redis; @@ -3,6 +3,7 @@ package com.dji.sample.component.redis;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.Set;
@ -184,4 +185,57 @@ public class RedisOpsUtils { @@ -184,4 +185,57 @@ public class RedisOpsUtils {
public Long listLen(String key) {
return redisTemplate.opsForList().size(key);
}
/**
* ZADD
* @param key
* @param value
* @param score
*/
public Boolean zAdd(String key, Object value, double score) {
return redisTemplate.opsForZSet().add(key, value, score);
}
/**
* ZREM
* @param key
* @param value
*/
public Boolean zRemove(String key, Object... value) {
return redisTemplate.opsForZSet().remove(key, value) > 0;
}
/**
* ZRANGE
* @param key
* @param start
* @param end
* @return
*/
public Set<Object> zRange(String key, long start, long end) {
return redisTemplate.opsForZSet().range(key, start, end);
}
/**
* ZRANGE
* @param key
* @return
*/
public Object zGetMin(String key) {
Set<Object> objects = zRange(key, 0, 0);
if (CollectionUtils.isEmpty(objects)) {
return null;
}
return objects.iterator().next();
}
/**
* ZSCORE
* @param key
* @param value
* @return
*/
public Double zScore(String key, Object value) {
return redisTemplate.opsForZSet().score(key, value);
}
}

1
src/main/java/com/dji/sample/configuration/SpringBeanConfiguration.java

@ -35,6 +35,7 @@ public class SpringBeanConfiguration { @@ -35,6 +35,7 @@ public class SpringBeanConfiguration {
objectMapper.disable(MapperFeature.IGNORE_DUPLICATE_MODULE_REGISTRATIONS);
objectMapper.registerModules(timeModule);
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
objectMapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);
objectMapper.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true);
objectMapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, true);

2
src/main/java/com/dji/sample/control/service/impl/ControlServiceImpl.java

@ -126,7 +126,7 @@ public class ControlServiceImpl implements IControlService { @@ -126,7 +126,7 @@ public class ControlServiceImpl implements IControlService {
.bid(receiver.getBid())
.method(receiver.getMethod())
.timestamp(System.currentTimeMillis())
.data(ResponseResult.success())
.data(RequestsReply.success())
.build());
}
}

5
src/main/java/com/dji/sample/manage/controller/LiveStreamController.java

@ -91,4 +91,9 @@ public class LiveStreamController { @@ -91,4 +91,9 @@ public class LiveStreamController {
return liveStreamService.liveSetQuality(liveParam);
}
@PostMapping("/streams/switch")
public ResponseResult liveLensChange(@RequestBody LiveTypeDTO liveParam) {
return liveStreamService.liveLensChange(liveParam);
}
}

4
src/main/java/com/dji/sample/manage/model/dto/CapacityVideoDTO.java

@ -5,6 +5,8 @@ import lombok.Builder; @@ -5,6 +5,8 @@ import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
/**
* @author sean.zhou
* @date 2021/11/22
@ -21,4 +23,6 @@ public class CapacityVideoDTO { @@ -21,4 +23,6 @@ public class CapacityVideoDTO {
private String index;
private String type;
private List<String> switchVideoTypes;
}

2
src/main/java/com/dji/sample/manage/model/dto/LiveTypeDTO.java

@ -23,4 +23,6 @@ public class LiveTypeDTO { @@ -23,4 +23,6 @@ public class LiveTypeDTO {
@JsonProperty("video_quality")
private Integer videoQuality;
private String videoType;
}

4
src/main/java/com/dji/sample/manage/model/receiver/CapacityVideoReceiver.java

@ -4,6 +4,8 @@ import com.fasterxml.jackson.databind.PropertyNamingStrategy; @@ -4,6 +4,8 @@ import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.Data;
import java.util.List;
/**
* @author sean.zhou
* @date 2021/11/18
@ -16,4 +18,6 @@ public class CapacityVideoReceiver { @@ -16,4 +18,6 @@ public class CapacityVideoReceiver {
private String videoIndex;
private String videoType;
private List<String> switchableVideoTypes;
}

7
src/main/java/com/dji/sample/manage/service/ILiveStreamService.java

@ -48,4 +48,11 @@ public interface ILiveStreamService { @@ -48,4 +48,11 @@ public interface ILiveStreamService {
* @return
*/
ResponseResult liveSetQuality(LiveTypeDTO liveParam);
/**
* Switches the lens of the device during the live streaming.
* @param liveParam
* @return
*/
ResponseResult liveLensChange(LiveTypeDTO liveParam);
}

3
src/main/java/com/dji/sample/manage/service/impl/CameraVideoServiceImpl.java

@ -23,7 +23,8 @@ public class CameraVideoServiceImpl implements ICameraVideoService { @@ -23,7 +23,8 @@ public class CameraVideoServiceImpl implements ICameraVideoService {
if (receiver != null) {
builder.id(UUID.randomUUID().toString())
.index(receiver.getVideoIndex())
.type(receiver.getVideoType());
.type(receiver.getVideoType())
.switchVideoTypes(receiver.getSwitchableVideoTypes());
}
return builder.build();
}

2
src/main/java/com/dji/sample/manage/service/impl/DeviceFirmwareServiceImpl.java

@ -169,7 +169,7 @@ public class DeviceFirmwareServiceImpl implements IDeviceFirmwareService { @@ -169,7 +169,7 @@ public class DeviceFirmwareServiceImpl implements IDeviceFirmwareService {
.bid(receiver.getBid())
.method(receiver.getMethod())
.timestamp(System.currentTimeMillis())
.data(ResponseResult.success())
.data(RequestsReply.success())
.build());
}
}

2
src/main/java/com/dji/sample/manage/service/impl/DeviceLogsServiceImpl.java

@ -249,7 +249,7 @@ public class DeviceLogsServiceImpl implements IDeviceLogsService { @@ -249,7 +249,7 @@ public class DeviceLogsServiceImpl implements IDeviceLogsService {
.bid(receiver.getBid())
.method(receiver.getMethod())
.timestamp(System.currentTimeMillis())
.data(ResponseResult.success())
.data(RequestsReply.success())
.build());
}

46
src/main/java/com/dji/sample/manage/service/impl/LiveStreamServiceImpl.java

@ -70,7 +70,7 @@ public class LiveStreamServiceImpl implements ILiveStreamService { @@ -70,7 +70,7 @@ public class LiveStreamServiceImpl implements ILiveStreamService {
return devicesList.stream()
.filter(device -> redisOps.checkExist(RedisConst.DEVICE_ONLINE_PREFIX + device.getDeviceSn()))
.map(device -> CapacityDeviceDTO.builder()
.name(device.getDeviceName())
.name(Objects.requireNonNullElse(device.getNickname(), device.getDeviceName()))
.sn(device.getDeviceSn())
.camerasList(capacityCameraService.getCapacityCameraByDeviceSn(device.getDeviceSn()))
.build())
@ -96,7 +96,7 @@ public class LiveStreamServiceImpl implements ILiveStreamService { @@ -96,7 +96,7 @@ public class LiveStreamServiceImpl implements ILiveStreamService {
public ResponseResult liveStart(LiveTypeDTO liveParam) {
// Check if this lens is available live.
ResponseResult responseResult = this.checkBeforeLive(liveParam.getVideoId());
if (responseResult.getCode() != 0) {
if (ResponseResult.CODE_SUCCESS != responseResult.getCode()) {
return responseResult;
}
@ -109,7 +109,7 @@ public class LiveStreamServiceImpl implements ILiveStreamService { @@ -109,7 +109,7 @@ public class LiveStreamServiceImpl implements ILiveStreamService {
if (receiveReplyOpt.isEmpty()) {
return ResponseResult.error(LiveErrorEnum.NO_REPLY);
}
if (receiveReplyOpt.get().getResult() != 0) {
if (ResponseResult.CODE_SUCCESS != receiveReplyOpt.get().getResult()) {
return ResponseResult.error(LiveErrorEnum.find(receiveReplyOpt.get().getResult()));
}
@ -188,12 +188,52 @@ public class LiveStreamServiceImpl implements ILiveStreamService { @@ -188,12 +188,52 @@ public class LiveStreamServiceImpl implements ILiveStreamService {
return ResponseResult.success();
}
@Override
public ResponseResult liveLensChange(LiveTypeDTO liveParam) {
if (!StringUtils.hasText(liveParam.getVideoType())) {
return ResponseResult.error(LiveErrorEnum.ERROR_PARAMETERS);
}
ResponseResult<DeviceDTO> responseResult = this.checkBeforeLive(liveParam.getVideoId());
if (ResponseResult.CODE_SUCCESS != responseResult.getCode()) {
return responseResult;
}
if (DeviceDomainEnum.GATEWAY.getDesc().equals(responseResult.getData().getDomain())) {
return ResponseResult.error(LiveErrorEnum.FUNCTION_NOT_SUPPORT);
}
String respTopic = THING_MODEL_PRE + PRODUCT + responseResult.getData().getDeviceSn() + SERVICES_SUF;
Optional<ServiceReply> receiveReplyOpt = this.publishLiveLensChange(respTopic, liveParam);
if (receiveReplyOpt.isEmpty()) {
return ResponseResult.error(LiveErrorEnum.NO_REPLY);
}
if (receiveReplyOpt.get().getResult() != 0) {
return ResponseResult.error(LiveErrorEnum.find(receiveReplyOpt.get().getResult()));
}
return ResponseResult.success();
}
private Optional<ServiceReply> publishLiveLensChange(String respTopic, LiveTypeDTO liveParam) {
CommonTopicResponse<LiveTypeDTO> response = new CommonTopicResponse<>();
response.setTid(UUID.randomUUID().toString());
response.setBid(UUID.randomUUID().toString());
response.setMethod(ServicesMethodEnum.LIVE_LENS_CHANGE.getMethod());
response.setData(liveParam);
return messageSender.publishWithReply(respTopic, response);
}
/**
* Check if this lens is available live.
* @param videoId
* @return
*/
private ResponseResult<DeviceDTO> checkBeforeLive(String videoId) {
if (!StringUtils.hasText(videoId)) {
return ResponseResult.error(LiveErrorEnum.ERROR_PARAMETERS);
}
String[] videoIdArr = videoId.split("/");
// drone sn / enumeration value of the location where the payload is mounted / payload lens
if (videoIdArr.length != 3) {

2
src/main/java/com/dji/sample/media/service/impl/MediaServiceImpl.java

@ -77,7 +77,7 @@ public class MediaServiceImpl implements IMediaService { @@ -77,7 +77,7 @@ public class MediaServiceImpl implements IMediaService {
CommonTopicResponse<Object> data = CommonTopicResponse.builder()
.timestamp(System.currentTimeMillis())
.method(EventsMethodEnum.FILE_UPLOAD_CALLBACK.getMethod())
.data(ResponseResult.success())
.data(RequestsReply.success())
.tid(receiver.getTid())
.bid(receiver.getBid())
.build();

15
src/main/java/com/dji/sample/wayline/controller/WaylineJobController.java

@ -11,6 +11,7 @@ import org.springframework.web.bind.annotation.*; @@ -11,6 +11,7 @@ import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
import java.sql.SQLException;
import java.util.List;
import static com.dji.sample.component.AuthInterceptor.TOKEN_CLAIM;
@ -39,8 +40,8 @@ public class WaylineJobController { @@ -39,8 +40,8 @@ public class WaylineJobController {
@PathVariable(name = "workspace_id") String workspaceId) throws SQLException {
CustomClaim customClaim = (CustomClaim)request.getAttribute(TOKEN_CLAIM);
customClaim.setWorkspaceId(workspaceId);
boolean isCreate = waylineJobService.createJob(param, customClaim);
return isCreate ? ResponseResult.success() : ResponseResult.error();
return waylineJobService.publishFlightTask(param, customClaim);
}
/**
@ -59,16 +60,16 @@ public class WaylineJobController { @@ -59,16 +60,16 @@ public class WaylineJobController {
}
/**
* Issue wayline mission to the dock for execution.
* @param jobId
* Send the command to cancel the jobs.
* @param jobIds
* @param workspaceId
* @return
* @throws SQLException
*/
@PostMapping("/{workspace_id}/jobs/{job_id}")
public ResponseResult publishJob(@PathVariable(name = "job_id") String jobId,
@DeleteMapping("/{workspace_id}/jobs")
public ResponseResult publishCancelJob(@RequestParam(name = "job_id") List<String> jobIds,
@PathVariable(name = "workspace_id") String workspaceId) throws SQLException {
waylineJobService.publishFlightTask(workspaceId, jobId);
waylineJobService.cancelFlightTask(workspaceId, jobIds);
return ResponseResult.success();
}
}

6
src/main/java/com/dji/sample/wayline/model/dto/FlightTaskCreateDTO.java

@ -18,7 +18,11 @@ public class FlightTaskCreateDTO { @@ -18,7 +18,11 @@ public class FlightTaskCreateDTO {
private String flightId;
private String type;
private Integer taskType;
private Integer waylineType;
private Long executeTime;
private FlightTaskFileDTO file;
}

2
src/main/java/com/dji/sample/wayline/model/dto/FlightTaskFileDTO.java

@ -18,5 +18,5 @@ public class FlightTaskFileDTO { @@ -18,5 +18,5 @@ public class FlightTaskFileDTO {
private String url;
private String sign;
private String fingerprint;
}

15
src/main/java/com/dji/sample/wayline/model/dto/WaylineJobDTO.java

@ -32,12 +32,19 @@ public class WaylineJobDTO { @@ -32,12 +32,19 @@ public class WaylineJobDTO {
private String workspaceId;
private String bid;
private Integer waylineType;
private String type;
private Integer taskType;
private String username;
private LocalDateTime executeTime;
private LocalDateTime endTime;
private Integer status;
private LocalDateTime updateTime;
private Integer progress;
private String username;
private Integer code;
}

22
src/main/java/com/dji/sample/wayline/model/entity/WaylineJobEntity.java

@ -17,7 +17,7 @@ import java.io.Serializable; @@ -17,7 +17,7 @@ import java.io.Serializable;
@Builder
@NoArgsConstructor
@AllArgsConstructor
@TableName("wayline_job")
@TableName("wayline_job_new")
public class WaylineJobEntity implements Serializable {
@TableId(type = IdType.AUTO)
@ -38,15 +38,27 @@ public class WaylineJobEntity implements Serializable { @@ -38,15 +38,27 @@ public class WaylineJobEntity implements Serializable {
@TableField("workspace_id")
private String workspaceId;
@TableField("bid")
private String bid;
@TableField("task_type")
private Integer taskType;
@TableField("type")
private String type;
@TableField("wayline_type")
private Integer waylineType;
@TableField("username")
private String username;
@TableField("execute_time")
private Long executeTime;
@TableField("end_time")
private Long endTime;
@TableField("error_code")
private Integer errorCode;
@TableField("status")
private Integer status;
@TableField(value = "create_time", fill = FieldFill.INSERT)
private Long createTime;

39
src/main/java/com/dji/sample/wayline/model/enums/WaylineJobStatusEnum.java

@ -0,0 +1,39 @@ @@ -0,0 +1,39 @@
package com.dji.sample.wayline.model.enums;
import lombok.Getter;
import java.util.Arrays;
/**
* @author sean
* @version 1.3
* @date 2022/9/26
*/
@Getter
public enum WaylineJobStatusEnum {
PENDING(1, false),
IN_PROGRESS(2, false),
SUCCESS(3, true),
CANCEL(4, true),
FAILED(5, true),
UNKNOWN(6, true);
int val;
Boolean end;
WaylineJobStatusEnum(int val, boolean end) {
this.end = end;
this.val = val;
}
public static WaylineJobStatusEnum find(int val) {
return Arrays.stream(WaylineJobStatusEnum.values()).filter(statue -> statue.val == val).findAny().orElse(UNKNOWN);
}
}

22
src/main/java/com/dji/sample/wayline/model/enums/WaylineTaskTypeEnum.java

@ -0,0 +1,22 @@ @@ -0,0 +1,22 @@
package com.dji.sample.wayline.model.enums;
import lombok.Getter;
/**
* @author sean
* @version 1.3
* @date 2022/9/26
*/
@Getter
public enum WaylineTaskTypeEnum {
IMMEDIATE(0),
TIMED(1);
int val;
WaylineTaskTypeEnum(int val) {
this.val = val;
}
}

26
src/main/java/com/dji/sample/wayline/model/enums/WaylineTemplateTypeEnum.java

@ -0,0 +1,26 @@ @@ -0,0 +1,26 @@
package com.dji.sample.wayline.model.enums;
import lombok.Getter;
/**
* @author sean
* @version 1.3
* @date 2022/9/26
*/
@Getter
public enum WaylineTemplateTypeEnum {
WAYPOINT(0),
MAPPING_2D(1),
MAPPING_3D(2),
MAPPING_STRIP(4);
int val;
WaylineTemplateTypeEnum(int val) {
this.val = val;
}
}

6
src/main/java/com/dji/sample/wayline/model/param/CreateJobParam.java

@ -16,7 +16,9 @@ public class CreateJobParam { @@ -16,7 +16,9 @@ public class CreateJobParam {
private String dockSn;
private String type;
private Integer waylineType;
private boolean immediate;
private Integer taskType;
private Long executeTime;
}

39
src/main/java/com/dji/sample/wayline/service/IWaylineJobService.java

@ -2,10 +2,14 @@ package com.dji.sample.wayline.service; @@ -2,10 +2,14 @@ package com.dji.sample.wayline.service;
import com.dji.sample.common.model.CustomClaim;
import com.dji.sample.common.model.PaginationData;
import com.dji.sample.common.model.ResponseResult;
import com.dji.sample.component.mqtt.model.CommonTopicReceiver;
import com.dji.sample.wayline.model.dto.WaylineJobDTO;
import com.dji.sample.wayline.model.param.CreateJobParam;
import org.springframework.messaging.MessageHeaders;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Optional;
/**
@ -16,20 +20,36 @@ import java.util.Optional; @@ -16,20 +20,36 @@ import java.util.Optional;
public interface IWaylineJobService {
/**
* Create a wayline mission for the dock.
* Create wayline job in the database.
* @param param
* @param customClaim user info
* @param customClaim user info
* @return
*/
Boolean createJob(CreateJobParam param, CustomClaim customClaim) throws SQLException;
Optional<WaylineJobDTO> createWaylineJob(CreateJobParam param, CustomClaim customClaim);
/**
* Issue wayline mission to the dock for execution.
* @param workspaceId
* Issue wayline mission to the dock.
* @param param
* @param customClaim user info
* @return
*/
ResponseResult publishFlightTask(CreateJobParam param, CustomClaim customClaim) throws SQLException;
/**
* Execute the task immediately.
* @param jobId
* @throws SQLException
* @return
*/
void publishFlightTask(String workspaceId, String jobId) throws SQLException;
Boolean executeFlightTask(String jobId);
/**
* Cancel the task Base on job Ids.
* @param workspaceId
* @param jobIds
* @throws SQLException
*/
void cancelFlightTask(String workspaceId, Collection<String> jobIds);
/**
* Query job information based on job id.
@ -53,4 +73,11 @@ public interface IWaylineJobService { @@ -53,4 +73,11 @@ public interface IWaylineJobService {
* @return
*/
PaginationData<WaylineJobDTO> getJobsByWorkspaceId(String workspaceId, long page, long pageSize);
/**
* Process to get interface data of flight mission resources.
* @param receiver
* @param headers
*/
void flightTaskResourceGet(CommonTopicReceiver receiver, MessageHeaders headers);
}

78
src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java

@ -12,16 +12,25 @@ import com.dji.sample.component.websocket.service.IWebSocketManageService; @@ -12,16 +12,25 @@ import com.dji.sample.component.websocket.service.IWebSocketManageService;
import com.dji.sample.manage.model.dto.DeviceDTO;
import com.dji.sample.manage.model.enums.UserTypeEnum;
import com.dji.sample.wayline.model.dto.FlightTaskProgressReceiver;
import com.dji.sample.wayline.model.dto.WaylineJobDTO;
import com.dji.sample.wayline.model.enums.WaylineJobStatusEnum;
import com.dji.sample.wayline.service.IFlightTaskService;
import com.dji.sample.wayline.service.IWaylineJobService;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.MessageHeaders;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* @author sean
* @version 1.1
@ -46,18 +55,42 @@ public class FlightTaskServiceImpl implements IFlightTaskService { @@ -46,18 +55,42 @@ public class FlightTaskServiceImpl implements IFlightTaskService {
@Autowired
private RedisOpsUtils redisOps;
@Autowired
private IWaylineJobService waylineJobService;
@Override
@ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_FLIGHT_TASK_PROGRESS, outputChannel = ChannelName.OUTBOUND)
public void handleProgress(CommonTopicReceiver receiver, MessageHeaders headers) {
EventsReceiver<FlightTaskProgressReceiver> eventsReceiver = mapper.convertValue(receiver.getData(),
new TypeReference<EventsReceiver<FlightTaskProgressReceiver>>(){});
eventsReceiver.setBid(receiver.getBid());
eventsReceiver.setSn(receiver.getGateway());
FlightTaskProgressReceiver output = eventsReceiver.getOutput();
log.info("Task progress: " + eventsReceiver.getOutput().getProgress().toString());
log.info("Task progress: {}", output.getProgress().toString());
if (eventsReceiver.getResult() != ResponseResult.CODE_SUCCESS) {
log.error("Error code: " + eventsReceiver.getResult());
log.error("Task progress ===> Error code: " + eventsReceiver.getResult());
}
EventsResultStatusEnum statusEnum = EventsResultStatusEnum.find(output.getStatus());
if (statusEnum.getEnd()) {
WaylineJobDTO job = WaylineJobDTO.builder()
.jobId(receiver.getBid())
.status(WaylineJobStatusEnum.SUCCESS.getVal())
.endTime(LocalDateTime.now())
.build();
if (EventsResultStatusEnum.OK != statusEnum) {
job.setCode(eventsReceiver.getResult());
job.setStatus(WaylineJobStatusEnum.FAILED.getVal());
}
waylineJobService.updateJob(job);
redisOps.del(receiver.getBid());
}
redisOps.setWithExpire(receiver.getBid(), eventsReceiver, RedisConst.DEVICE_ALIVE_SECOND * RedisConst.DEVICE_ALIVE_SECOND);
DeviceDTO device = (DeviceDTO) redisOps.get(RedisConst.DEVICE_ONLINE_PREFIX + receiver.getGateway());
websocketMessageService.sendBatch(
@ -77,8 +110,47 @@ public class FlightTaskServiceImpl implements IFlightTaskService { @@ -77,8 +110,47 @@ public class FlightTaskServiceImpl implements IFlightTaskService {
.bid(receiver.getBid())
.method(EventsMethodEnum.FLIGHT_TASK_PROGRESS.getMethod())
.timestamp(System.currentTimeMillis())
.data(ResponseResult.success())
.data(RequestsReply.success())
.build());
}
}
@Scheduled(initialDelay = 10, fixedRate = 5, timeUnit = TimeUnit.SECONDS)
private void checkScheduledJob() {
Object jobIdValue = redisOps.zGetMin(RedisConst.WAYLINE_JOB);
log.info("Check the timed jobs of the wayline. {}", jobIdValue);
if (Objects.isNull(jobIdValue)) {
return;
}
String jobId = String.valueOf(jobIdValue);
double time = redisOps.zScore(RedisConst.WAYLINE_JOB, jobIdValue);
long now = System.currentTimeMillis();
int offset = 30_000;
// Expired tasks are deleted directly.
if (time < now - offset) {
redisOps.zRemove(RedisConst.WAYLINE_JOB, jobId);
waylineJobService.updateJob(WaylineJobDTO.builder()
.jobId(jobId)
.status(WaylineJobStatusEnum.FAILED.getVal())
.endTime(LocalDateTime.now())
.code(HttpStatus.SC_REQUEST_TIMEOUT).build());
return;
}
if (now <= time && time <= now + offset) {
try {
waylineJobService.executeFlightTask(jobId);
} catch (Exception e) {
log.info("The scheduled task delivery failed.");
waylineJobService.updateJob(WaylineJobDTO.builder()
.jobId(jobId)
.status(WaylineJobStatusEnum.FAILED.getVal())
.endTime(LocalDateTime.now())
.code(HttpStatus.SC_INTERNAL_SERVER_ERROR).build());
} finally {
redisOps.zRemove(RedisConst.WAYLINE_JOB, jobId);
}
}
}
}

320
src/main/java/com/dji/sample/wayline/service/impl/WaylineJobServiceImpl.java

@ -3,40 +3,43 @@ package com.dji.sample.wayline.service.impl; @@ -3,40 +3,43 @@ package com.dji.sample.wayline.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.dji.sample.common.error.CommonErrorEnum;
import com.dji.sample.common.model.CustomClaim;
import com.dji.sample.common.model.Pagination;
import com.dji.sample.common.model.PaginationData;
import com.dji.sample.component.mqtt.model.CommonTopicResponse;
import com.dji.sample.component.mqtt.model.ServiceReply;
import com.dji.sample.component.mqtt.model.ServicesMethodEnum;
import com.dji.sample.component.mqtt.model.TopicConst;
import com.dji.sample.common.model.ResponseResult;
import com.dji.sample.component.mqtt.model.*;
import com.dji.sample.component.mqtt.service.IMessageSenderService;
import com.dji.sample.component.redis.RedisConst;
import com.dji.sample.component.redis.RedisOpsUtils;
import com.dji.sample.manage.model.dto.DeviceDTO;
import com.dji.sample.manage.service.IDeviceService;
import com.dji.sample.wayline.dao.IWaylineJobMapper;
import com.dji.sample.wayline.model.dto.FlightTaskCreateDTO;
import com.dji.sample.wayline.model.dto.FlightTaskFileDTO;
import com.dji.sample.wayline.model.dto.WaylineFileDTO;
import com.dji.sample.wayline.model.dto.WaylineJobDTO;
import com.dji.sample.wayline.model.dto.*;
import com.dji.sample.wayline.model.entity.WaylineJobEntity;
import com.dji.sample.wayline.model.enums.WaylineJobStatusEnum;
import com.dji.sample.wayline.model.enums.WaylineTaskTypeEnum;
import com.dji.sample.wayline.model.param.CreateJobParam;
import com.dji.sample.wayline.service.IWaylineFileService;
import com.dji.sample.wayline.service.IWaylineJobService;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import java.net.URL;
import java.sql.SQLException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.*;
import java.util.stream.Collectors;
/**
@ -64,10 +67,18 @@ public class WaylineJobServiceImpl implements IWaylineJobService { @@ -64,10 +67,18 @@ public class WaylineJobServiceImpl implements IWaylineJobService {
@Autowired
private RedisOpsUtils redisOps;
@Autowired
private ObjectMapper objectMapper;
@Override
public Boolean createJob(CreateJobParam param, CustomClaim customClaim) throws SQLException {
if (param == null) {
return false;
public Optional<WaylineJobDTO> createWaylineJob(CreateJobParam param, CustomClaim customClaim) {
if (Objects.isNull(param)) {
return Optional.empty();
}
// Immediate tasks, allocating time on the backend.
if (Objects.isNull(param.getExecuteTime())) {
param.setExecuteTime(System.currentTimeMillis());
}
WaylineJobEntity jobEntity = WaylineJobEntity.builder()
.name(param.getName())
@ -76,58 +87,119 @@ public class WaylineJobServiceImpl implements IWaylineJobService { @@ -76,58 +87,119 @@ public class WaylineJobServiceImpl implements IWaylineJobService {
.username(customClaim.getUsername())
.workspaceId(customClaim.getWorkspaceId())
.jobId(UUID.randomUUID().toString())
.type(param.getType())
.executeTime(param.getExecuteTime())
.status(WaylineJobStatusEnum.PENDING.getVal())
.taskType(param.getTaskType())
.waylineType(param.getWaylineType())
.build();
int id = mapper.insert(jobEntity);
if (id <= 0) {
return false;
}
if (param.isImmediate()) {
publishFlightTask(jobEntity.getWorkspaceId(), jobEntity.getJobId());
return Optional.empty();
}
return true;
return Optional.ofNullable(this.entity2Dto(jobEntity));
}
@Override
public void publishFlightTask(String workspaceId, String jobId) throws SQLException {
// get job
Optional<WaylineJobDTO> waylineJob = this.getJobByJobId(jobId);
if (waylineJob.isEmpty()) {
throw new IllegalArgumentException("Job doesn't exist.");
public ResponseResult publishFlightTask(CreateJobParam param, CustomClaim customClaim) throws SQLException {
Optional<WaylineJobDTO> waylineJobOpt = this.createWaylineJob(param, customClaim);
if (waylineJobOpt.isEmpty()) {
throw new SQLException("Failed to create wayline job.");
}
WaylineJobDTO waylineJob = waylineJobOpt.get();
long expire = redisOps.getExpire(RedisConst.DEVICE_ONLINE_PREFIX + waylineJob.get().getDockSn());
long expire = redisOps.getExpire(RedisConst.DEVICE_ONLINE_PREFIX + waylineJob.getDockSn());
if (expire < 0) {
throw new RuntimeException("Dock is offline.");
}
// get wayline file
Optional<WaylineFileDTO> waylineFile = waylineFileService.getWaylineByWaylineId(workspaceId, waylineJob.get().getFileId());
Optional<WaylineFileDTO> waylineFile = waylineFileService.getWaylineByWaylineId(waylineJob.getWorkspaceId(), waylineJob.getFileId());
if (waylineFile.isEmpty()) {
throw new IllegalArgumentException("Wayline file doesn't exist.");
throw new SQLException("Wayline file doesn't exist.");
}
// get file url
URL url = waylineFileService.getObjectUrl(workspaceId, waylineFile.get().getWaylineId());
URL url = waylineFileService.getObjectUrl(waylineJob.getWorkspaceId(), waylineFile.get().getWaylineId());
WaylineJobDTO job = waylineJob.get();
FlightTaskCreateDTO flightTask = FlightTaskCreateDTO.builder()
.flightId(jobId)
.type(job.getType())
.flightId(waylineJob.getJobId())
.executeTime(waylineJob.getExecuteTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli())
.taskType(waylineJob.getTaskType())
.waylineType(waylineJob.getWaylineType())
.file(FlightTaskFileDTO.builder()
.url(url.toString())
.sign(waylineFile.get().getSign())
.fingerprint(waylineFile.get().getSign())
.build())
.build();
String topic = TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT +
waylineJob.getDockSn() + TopicConst.SERVICES_SUF;
CommonTopicResponse<Object> response = CommonTopicResponse.builder()
.tid(UUID.randomUUID().toString())
.bid(waylineJob.getJobId())
.timestamp(System.currentTimeMillis())
.data(flightTask)
.method(ServicesMethodEnum.FLIGHT_TASK_PREPARE.getMethod())
.build();
Optional<ServiceReply> serviceReplyOpt = messageSender.publishWithReply(topic, response);
if (serviceReplyOpt.isEmpty()) {
log.info("Timeout to receive reply.");
throw new RuntimeException("Timeout to receive reply.");
}
if (serviceReplyOpt.get().getResult() != 0) {
log.info("Prepare task ====> Error code: {}", serviceReplyOpt.get().getResult());
this.updateJob(WaylineJobDTO.builder()
.workspaceId(waylineJob.getWorkspaceId())
.jobId(waylineJob.getJobId())
.status(WaylineJobStatusEnum.FAILED.getVal())
.endTime(LocalDateTime.now())
.code(serviceReplyOpt.get().getResult()).build());
return ResponseResult.error("Prepare task ====> Error code: " + serviceReplyOpt.get().getResult());
}
// Issue an immediate task execution command.
if (WaylineTaskTypeEnum.IMMEDIATE.getVal() == waylineJob.getTaskType()) {
if (!executeFlightTask(waylineJob.getJobId())) {
return ResponseResult.error("Failed to execute job.");
}
}
if (WaylineTaskTypeEnum.TIMED.getVal() == waylineJob.getTaskType()) {
boolean isAdd = redisOps.zAdd(RedisConst.WAYLINE_JOB, waylineJob.getJobId(),
waylineJob.getExecuteTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
if (!isAdd) {
return ResponseResult.error("Failed to create scheduled job.");
}
}
return ResponseResult.success();
}
@Override
public Boolean executeFlightTask(String jobId) {
// get job
Optional<WaylineJobDTO> waylineJob = this.getJobByJobId(jobId);
if (waylineJob.isEmpty()) {
throw new IllegalArgumentException("Job doesn't exist.");
}
long expire = redisOps.getExpire(RedisConst.DEVICE_ONLINE_PREFIX + waylineJob.get().getDockSn());
if (expire < 0) {
throw new RuntimeException("Dock is offline.");
}
WaylineJobDTO job = waylineJob.get();
FlightTaskCreateDTO flightTask = FlightTaskCreateDTO.builder().flightId(jobId).build();
String topic = TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT +
job.getDockSn() + TopicConst.SERVICES_SUF;
CommonTopicResponse<Object> response = CommonTopicResponse.builder()
.tid(UUID.randomUUID().toString())
.bid(UUID.randomUUID().toString())
.bid(jobId)
.timestamp(System.currentTimeMillis())
.data(flightTask)
.method(ServicesMethodEnum.FLIGHTTASK_CREATE.getMethod())
.method(ServicesMethodEnum.FLIGHT_TASK_EXECUTE.getMethod())
.build();
Optional<ServiceReply> serviceReplyOpt = messageSender.publishWithReply(topic, response);
@ -136,15 +208,90 @@ public class WaylineJobServiceImpl implements IWaylineJobService { @@ -136,15 +208,90 @@ public class WaylineJobServiceImpl implements IWaylineJobService {
throw new RuntimeException("Timeout to receive reply.");
}
if (serviceReplyOpt.get().getResult() != 0) {
log.info("Error code: {}", serviceReplyOpt.get().getResult());
throw new RuntimeException("Error code: " + serviceReplyOpt.get().getResult());
log.info("Execute job ====> Error code: {}", serviceReplyOpt.get().getResult());
this.updateJob(WaylineJobDTO.builder()
.jobId(jobId)
.status(WaylineJobStatusEnum.FAILED.getVal())
.endTime(LocalDateTime.now())
.code(serviceReplyOpt.get().getResult()).build());
return false;
}
job.setBid(response.getBid());
boolean isUpd = this.updateJob(job);
if (!isUpd) {
throw new SQLException("Failed to update data.");
this.updateJob(WaylineJobDTO.builder()
.jobId(jobId)
.status(WaylineJobStatusEnum.IN_PROGRESS.getVal())
.build());
redisOps.setWithExpire(jobId,
EventsReceiver.<FlightTaskProgressReceiver>builder().bid(jobId).sn(job.getDockSn()).build(),
RedisConst.DEVICE_ALIVE_SECOND * RedisConst.DEVICE_ALIVE_SECOND);
return true;
}
@Override
public void cancelFlightTask(String workspaceId, Collection<String> jobIds) {
List<WaylineJobEntity> waylineJobs = mapper.selectList(
new LambdaQueryWrapper<WaylineJobEntity>()
.or(wrapper -> jobIds.forEach(id -> wrapper.eq(WaylineJobEntity::getJobId, id))));
// Check if the job have ended.
List<String> endJobs = waylineJobs.stream()
.filter(job -> WaylineJobStatusEnum.find(job.getStatus()).getEnd())
.map(WaylineJobEntity::getName)
.collect(Collectors.toList());
if (!CollectionUtils.isEmpty(endJobs)) {
throw new IllegalArgumentException("There are jobs that have ended." + Arrays.toString(endJobs.toArray()));
}
Set<String> ids = waylineJobs.stream().map(WaylineJobEntity::getJobId).collect(Collectors.toSet());
for (String id : jobIds) {
if (!ids.contains(id)) {
throw new IllegalArgumentException("Job id " + id + " doesn't exist.");
}
}
// Group job id by dock sn.
Map<String, List<String>> dockJobs = waylineJobs.stream()
.collect(Collectors.groupingBy(WaylineJobEntity::getDockSn,
Collectors.mapping(WaylineJobEntity::getJobId, Collectors.toList())));
dockJobs.forEach((dockSn, idList) -> this.publishCancelTask(workspaceId, dockSn, idList));
}
private void publishCancelTask(String workspaceId, String dockSn, List<String> jobIds) {
long expire = redisOps.getExpire(RedisConst.DEVICE_ONLINE_PREFIX + dockSn);
if (expire < 0) {
throw new RuntimeException("Dock is offline.");
}
String topic = TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT + dockSn + TopicConst.SERVICES_SUF;
CommonTopicResponse<Object> response = CommonTopicResponse.builder()
.tid(UUID.randomUUID().toString())
.bid(UUID.randomUUID().toString())
.timestamp(System.currentTimeMillis())
.data(Map.of(MapKeyConst.FLIGHT_IDS, jobIds))
.method(ServicesMethodEnum.FLIGHT_TASK_CANCEL.getMethod())
.build();
Optional<ServiceReply> serviceReplyOpt = messageSender.publishWithReply(topic, response);
if (serviceReplyOpt.isEmpty()) {
log.info("Timeout to receive reply.");
throw new RuntimeException("Timeout to receive reply.");
}
if (serviceReplyOpt.get().getResult() != 0) {
log.info("Cancel job ====> Error code: {}", serviceReplyOpt.get().getResult());
throw new RuntimeException("Failed to cancel the wayline job of " + dockSn);
}
for (String jobId : jobIds) {
this.updateJob(WaylineJobDTO.builder()
.workspaceId(workspaceId)
.jobId(jobId)
.status(WaylineJobStatusEnum.CANCEL.getVal())
.endTime(LocalDateTime.now())
.build());
redisOps.zRemove(RedisConst.WAYLINE_JOB, jobId);
}
}
@Override
@ -159,9 +306,7 @@ public class WaylineJobServiceImpl implements IWaylineJobService { @@ -159,9 +306,7 @@ public class WaylineJobServiceImpl implements IWaylineJobService {
public Boolean updateJob(WaylineJobDTO dto) {
return mapper.update(this.dto2Entity(dto),
new LambdaUpdateWrapper<WaylineJobEntity>()
.eq(WaylineJobEntity::getWorkspaceId, dto.getWorkspaceId())
.eq(WaylineJobEntity::getJobId, dto.getJobId()))
> 0;
.eq(WaylineJobEntity::getJobId, dto.getJobId())) > 0;
}
@Override
@ -178,14 +323,75 @@ public class WaylineJobServiceImpl implements IWaylineJobService { @@ -178,14 +323,75 @@ public class WaylineJobServiceImpl implements IWaylineJobService {
return new PaginationData<WaylineJobDTO>(records, new Pagination(pageData));
}
@Override
@ServiceActivator(inputChannel = ChannelName.INBOUND_REQUESTS_FLIGHT_TASK_RESOURCE_GET, outputChannel = ChannelName.OUTBOUND)
@Transactional(isolation = Isolation.READ_UNCOMMITTED)
public void flightTaskResourceGet(CommonTopicReceiver receiver, MessageHeaders headers) {
Map<String, String> jobIdMap = objectMapper.convertValue(receiver.getData(), new TypeReference<Map<String, String>>() {});
String jobId = jobIdMap.get(MapKeyConst.FLIGHT_ID);
CommonTopicResponse.CommonTopicResponseBuilder<RequestsReply> builder = CommonTopicResponse.<RequestsReply>builder()
.tid(receiver.getTid())
.bid(receiver.getBid())
.method(RequestsMethodEnum.FLIGHT_TASK_RESOURCE_GET.getMethod())
.timestamp(System.currentTimeMillis());
String topic = headers.get(MqttHeaders.RECEIVED_TOPIC).toString() + TopicConst._REPLY_SUF;
Optional<WaylineJobDTO> waylineJobOpt = this.getJobByJobId(jobId);
if (waylineJobOpt.isEmpty()) {
builder.data(RequestsReply.error(CommonErrorEnum.ILLEGAL_ARGUMENT));
messageSender.publish(topic, builder.build());
return;
}
WaylineJobDTO waylineJob = waylineJobOpt.get();
// get wayline file
Optional<WaylineFileDTO> waylineFile = waylineFileService.getWaylineByWaylineId(waylineJob.getWorkspaceId(), waylineJob.getFileId());
if (waylineFile.isEmpty()) {
builder.data(RequestsReply.error(CommonErrorEnum.ILLEGAL_ARGUMENT));
messageSender.publish(topic, builder.build());
return;
}
// get file url
URL url = null;
try {
url = waylineFileService.getObjectUrl(waylineJob.getWorkspaceId(), waylineFile.get().getWaylineId());
builder.data(RequestsReply.success(FlightTaskCreateDTO.builder()
.file(FlightTaskFileDTO.builder()
.url(url.toString())
.fingerprint(waylineFile.get().getSign())
.build())
.build()));
} catch (SQLException | NullPointerException e) {
e.printStackTrace();
builder.data(RequestsReply.error(CommonErrorEnum.ILLEGAL_ARGUMENT));
messageSender.publish(topic, builder.build());
return;
}
messageSender.publish(topic, builder.build());
}
private WaylineJobEntity dto2Entity(WaylineJobDTO dto) {
WaylineJobEntity.WaylineJobEntityBuilder builder = WaylineJobEntity.builder();
if (dto == null) {
return builder.build();
}
return builder.type(dto.getType())
.bid(dto.getBid())
if (Objects.nonNull(dto.getEndTime())) {
builder.endTime(dto.getEndTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
}
if (Objects.nonNull(dto.getExecuteTime())) {
builder.executeTime(dto.getExecuteTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
}
return builder.status(dto.getStatus())
.name(dto.getJobName())
.errorCode(dto.getCode())
.build();
}
@ -193,10 +399,9 @@ public class WaylineJobServiceImpl implements IWaylineJobService { @@ -193,10 +399,9 @@ public class WaylineJobServiceImpl implements IWaylineJobService {
if (entity == null) {
return null;
}
return WaylineJobDTO.builder()
WaylineJobDTO.WaylineJobDTOBuilder builder = WaylineJobDTO.builder()
.jobId(entity.getJobId())
.bid(entity.getBid())
.updateTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(entity.getUpdateTime()), ZoneId.systemDefault()))
.jobName(entity.getName())
.fileId(entity.getFileId())
.fileName(waylineFileService.getWaylineByWaylineId(entity.getWorkspaceId(), entity.getFileId())
@ -206,7 +411,20 @@ public class WaylineJobServiceImpl implements IWaylineJobService { @@ -206,7 +411,20 @@ public class WaylineJobServiceImpl implements IWaylineJobService {
.orElse(DeviceDTO.builder().build()).getNickname())
.username(entity.getUsername())
.workspaceId(entity.getWorkspaceId())
.type(entity.getType())
.build();
.status(entity.getStatus())
.code(entity.getErrorCode())
.executeTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(entity.getExecuteTime()), ZoneId.systemDefault()))
.taskType(entity.getTaskType())
.waylineType(entity.getWaylineType());
if (Objects.nonNull(entity.getEndTime())) {
builder.endTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(entity.getEndTime()), ZoneId.systemDefault()));
}
if (WaylineJobStatusEnum.IN_PROGRESS.getVal() == entity.getStatus() && redisOps.getExpire(entity.getJobId()) > 0) {
EventsReceiver<FlightTaskProgressReceiver> taskProgress = (EventsReceiver<FlightTaskProgressReceiver>) redisOps.get(entity.getJobId());
if (Objects.nonNull(taskProgress.getOutput()) && Objects.nonNull(taskProgress.getOutput().getProgress())) {
builder.progress(taskProgress.getOutput().getProgress().getPercent());
}
}
return builder.build();
}
}

Loading…
Cancel
Save