|
|
@ -12,10 +12,11 @@ import com.dji.sample.manage.model.dto.DeviceDTO; |
|
|
|
import com.dji.sample.manage.model.enums.UserTypeEnum; |
|
|
|
import com.dji.sample.manage.model.enums.UserTypeEnum; |
|
|
|
import com.dji.sample.manage.service.IDeviceRedisService; |
|
|
|
import com.dji.sample.manage.service.IDeviceRedisService; |
|
|
|
import com.dji.sample.media.model.MediaFileCountDTO; |
|
|
|
import com.dji.sample.media.model.MediaFileCountDTO; |
|
|
|
import com.dji.sample.wayline.model.dto.ConditionalWaylineJobKey; |
|
|
|
|
|
|
|
import com.dji.sample.wayline.model.dto.WaylineJobDTO; |
|
|
|
import com.dji.sample.wayline.model.dto.WaylineJobDTO; |
|
|
|
|
|
|
|
import com.dji.sample.wayline.model.dto.WaylineJobKey; |
|
|
|
import com.dji.sample.wayline.model.dto.WaylineTaskProgressReceiver; |
|
|
|
import com.dji.sample.wayline.model.dto.WaylineTaskProgressReceiver; |
|
|
|
import com.dji.sample.wayline.model.enums.WaylineJobStatusEnum; |
|
|
|
import com.dji.sample.wayline.model.enums.WaylineJobStatusEnum; |
|
|
|
|
|
|
|
import com.dji.sample.wayline.model.enums.WaylineTaskTypeEnum; |
|
|
|
import com.dji.sample.wayline.service.IFlightTaskService; |
|
|
|
import com.dji.sample.wayline.service.IFlightTaskService; |
|
|
|
import com.dji.sample.wayline.service.IWaylineJobService; |
|
|
|
import com.dji.sample.wayline.service.IWaylineJobService; |
|
|
|
import com.dji.sample.wayline.service.IWaylineRedisService; |
|
|
|
import com.dji.sample.wayline.service.IWaylineRedisService; |
|
|
@ -25,7 +26,6 @@ import lombok.extern.slf4j.Slf4j; |
|
|
|
import org.apache.http.HttpStatus; |
|
|
|
import org.apache.http.HttpStatus; |
|
|
|
import org.springframework.beans.factory.annotation.Autowired; |
|
|
|
import org.springframework.beans.factory.annotation.Autowired; |
|
|
|
import org.springframework.integration.annotation.ServiceActivator; |
|
|
|
import org.springframework.integration.annotation.ServiceActivator; |
|
|
|
import org.springframework.integration.mqtt.support.MqttHeaders; |
|
|
|
|
|
|
|
import org.springframework.messaging.MessageHeaders; |
|
|
|
import org.springframework.messaging.MessageHeaders; |
|
|
|
import org.springframework.scheduling.annotation.Scheduled; |
|
|
|
import org.springframework.scheduling.annotation.Scheduled; |
|
|
|
import org.springframework.stereotype.Service; |
|
|
|
import org.springframework.stereotype.Service; |
|
|
@ -70,9 +70,6 @@ public class FlightTaskServiceImpl implements IFlightTaskService { |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
@ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_FLIGHT_TASK_PROGRESS, outputChannel = ChannelName.OUTBOUND_EVENTS) |
|
|
|
@ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_FLIGHT_TASK_PROGRESS, outputChannel = ChannelName.OUTBOUND_EVENTS) |
|
|
|
public CommonTopicReceiver handleProgress(CommonTopicReceiver receiver, MessageHeaders headers) { |
|
|
|
public CommonTopicReceiver handleProgress(CommonTopicReceiver receiver, MessageHeaders headers) { |
|
|
|
String receivedTopic = String.valueOf(headers.get(MqttHeaders.RECEIVED_TOPIC)); |
|
|
|
|
|
|
|
String dockSn = receivedTopic.substring((TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT).length(), |
|
|
|
|
|
|
|
receivedTopic.indexOf(TopicConst.EVENTS_SUF)); |
|
|
|
|
|
|
|
EventsReceiver<WaylineTaskProgressReceiver> eventsReceiver = mapper.convertValue(receiver.getData(), |
|
|
|
EventsReceiver<WaylineTaskProgressReceiver> eventsReceiver = mapper.convertValue(receiver.getData(), |
|
|
|
new TypeReference<EventsReceiver<WaylineTaskProgressReceiver>>(){}); |
|
|
|
new TypeReference<EventsReceiver<WaylineTaskProgressReceiver>>(){}); |
|
|
|
eventsReceiver.setBid(receiver.getBid()); |
|
|
|
eventsReceiver.setBid(receiver.getBid()); |
|
|
@ -87,14 +84,30 @@ public class FlightTaskServiceImpl implements IFlightTaskService { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
EventsResultStatusEnum statusEnum = EventsResultStatusEnum.find(output.getStatus()); |
|
|
|
EventsResultStatusEnum statusEnum = EventsResultStatusEnum.find(output.getStatus()); |
|
|
|
waylineRedisService.setRunningWaylineJob(dockSn, eventsReceiver); |
|
|
|
waylineRedisService.setRunningWaylineJob(receiver.getGateway(), eventsReceiver); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Optional<DeviceDTO> deviceOpt = deviceRedisService.getDeviceOnline(receiver.getGateway()); |
|
|
|
|
|
|
|
if (deviceOpt.isEmpty()) { |
|
|
|
|
|
|
|
return null; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if (statusEnum.getEnd()) { |
|
|
|
if (statusEnum.getEnd()) { |
|
|
|
|
|
|
|
handleEndStatus(receiver, statusEnum, output.getExt().getMediaCount(), eventsReceiver.getResult(), deviceOpt.get()); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
websocketMessageService.sendBatch(deviceOpt.get().getWorkspaceId(), UserTypeEnum.WEB.getVal(), |
|
|
|
|
|
|
|
BizCodeEnum.FLIGHT_TASK_PROGRESS.getCode(), eventsReceiver); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return receiver; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private void handleEndStatus(CommonTopicReceiver receiver, EventsResultStatusEnum statusEnum, int mediaCount, int code, DeviceDTO dock) { |
|
|
|
|
|
|
|
|
|
|
|
WaylineJobDTO job = WaylineJobDTO.builder() |
|
|
|
WaylineJobDTO job = WaylineJobDTO.builder() |
|
|
|
.jobId(receiver.getBid()) |
|
|
|
.jobId(receiver.getBid()) |
|
|
|
.status(WaylineJobStatusEnum.SUCCESS.getVal()) |
|
|
|
.status(WaylineJobStatusEnum.SUCCESS.getVal()) |
|
|
|
.completedTime(LocalDateTime.now()) |
|
|
|
.completedTime(LocalDateTime.now()) |
|
|
|
.mediaCount(output.getExt().getMediaCount()) |
|
|
|
.mediaCount(mediaCount) |
|
|
|
.build(); |
|
|
|
.build(); |
|
|
|
|
|
|
|
|
|
|
|
// record the update of the media count.
|
|
|
|
// record the update of the media count.
|
|
|
@ -104,23 +117,17 @@ public class FlightTaskServiceImpl implements IFlightTaskService { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if (EventsResultStatusEnum.OK != statusEnum) { |
|
|
|
if (EventsResultStatusEnum.OK != statusEnum) { |
|
|
|
job.setCode(eventsReceiver.getResult()); |
|
|
|
job.setCode(code); |
|
|
|
job.setStatus(WaylineJobStatusEnum.FAILED.getVal()); |
|
|
|
job.setStatus(WaylineJobStatusEnum.FAILED.getVal()); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
waylineRedisService.getConditionalWaylineJob(receiver.getBid()).ifPresent(waylineJob -> |
|
|
|
|
|
|
|
retryPrepareConditionJob(new WaylineJobKey(dock.getWorkspaceId(), dock.getDeviceSn(), receiver.getBid()), waylineJob)); |
|
|
|
waylineJobService.updateJob(job); |
|
|
|
waylineJobService.updateJob(job); |
|
|
|
waylineRedisService.delRunningWaylineJob(dockSn); |
|
|
|
waylineRedisService.delRunningWaylineJob(receiver.getGateway()); |
|
|
|
waylineRedisService.delPausedWaylineJob(receiver.getBid()); |
|
|
|
waylineRedisService.delPausedWaylineJob(receiver.getBid()); |
|
|
|
} |
|
|
|
waylineRedisService.delBlockedWaylineJobId(receiver.getGateway()); |
|
|
|
|
|
|
|
|
|
|
|
Optional<DeviceDTO> deviceOpt = deviceRedisService.getDeviceOnline(receiver.getGateway()); |
|
|
|
|
|
|
|
if (deviceOpt.isEmpty()) { |
|
|
|
|
|
|
|
return null; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
websocketMessageService.sendBatch(deviceOpt.get().getWorkspaceId(), UserTypeEnum.WEB.getVal(), |
|
|
|
|
|
|
|
BizCodeEnum.FLIGHT_TASK_PROGRESS.getCode(), eventsReceiver); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return receiver; |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
@ -130,147 +137,131 @@ public class FlightTaskServiceImpl implements IFlightTaskService { |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
@ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_FLIGHT_TASK_READY, outputChannel = ChannelName.OUTBOUND_EVENTS) |
|
|
|
@ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_FLIGHT_TASK_READY, outputChannel = ChannelName.OUTBOUND_EVENTS) |
|
|
|
public CommonTopicReceiver handleTaskNotifications(CommonTopicReceiver receiver, MessageHeaders headers) { |
|
|
|
public CommonTopicReceiver handleTaskNotifications(CommonTopicReceiver receiver, MessageHeaders headers) { |
|
|
|
String receivedTopic = String.valueOf(headers.get(MqttHeaders.RECEIVED_TOPIC)); |
|
|
|
String dockSn = receiver.getGateway(); |
|
|
|
String dockSn = receivedTopic.substring((TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT).length(), |
|
|
|
Set<String> flightIds = mapper.convertValue(receiver.getData(), |
|
|
|
receivedTopic.indexOf(TopicConst.EVENTS_SUF)); |
|
|
|
new TypeReference<Map<String, Set<String>>>(){}).get(MapKeyConst.FLIGHT_IDS); |
|
|
|
List<String> flightIds = mapper.convertValue(receiver.getData(), |
|
|
|
|
|
|
|
new TypeReference<Map<String, List<String>>>(){}).get(MapKeyConst.FLIGHT_IDS); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
log.info("ready task list:{}", Arrays.toString(flightIds.toArray()) ); |
|
|
|
log.info("ready task list:{}", Arrays.toString(flightIds.toArray())); |
|
|
|
// Check conditional task blocking status.
|
|
|
|
// Check conditional task blocking status.
|
|
|
|
String blockedId = waylineRedisService.getBlockedWaylineJobId(dockSn); |
|
|
|
String blockedId = waylineRedisService.getBlockedWaylineJobId(dockSn); |
|
|
|
if (!StringUtils.hasText(blockedId)) { |
|
|
|
if (StringUtils.hasText(blockedId)) { |
|
|
|
|
|
|
|
log.info("The dock is in a state of wayline congestion, and the task will not be executed."); |
|
|
|
return null; |
|
|
|
return null; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
Optional<DeviceDTO> deviceOpt = deviceRedisService.getDeviceOnline(dockSn); |
|
|
|
Optional<DeviceDTO> deviceOpt = deviceRedisService.getDeviceOnline(dockSn); |
|
|
|
if (deviceOpt.isEmpty()) { |
|
|
|
if (deviceOpt.isEmpty()) { |
|
|
|
|
|
|
|
log.info("The dock is offline."); |
|
|
|
return null; |
|
|
|
return null; |
|
|
|
} |
|
|
|
} |
|
|
|
DeviceDTO device = deviceOpt.get(); |
|
|
|
DeviceDTO device = deviceOpt.get(); |
|
|
|
|
|
|
|
Optional<WaylineJobDTO> jobOpt = waylineJobService.getJobsByConditions(device.getWorkspaceId(), flightIds, WaylineJobStatusEnum.PENDING) |
|
|
|
|
|
|
|
.stream().filter(job -> flightIds.contains(job.getJobId())) |
|
|
|
|
|
|
|
.sorted(Comparator.comparingInt(a -> a.getTaskType().getVal())) |
|
|
|
|
|
|
|
.min(Comparator.comparing(WaylineJobDTO::getBeginTime)); |
|
|
|
|
|
|
|
if (jobOpt.isEmpty()) { |
|
|
|
|
|
|
|
return receiver; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
executeReadyTask(jobOpt.get()); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return receiver; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private void executeReadyTask(WaylineJobDTO waylineJob) { |
|
|
|
try { |
|
|
|
try { |
|
|
|
for (String jobId : flightIds) { |
|
|
|
boolean isExecute = waylineJobService.executeFlightTask(waylineJob.getWorkspaceId(), waylineJob.getJobId()); |
|
|
|
boolean isExecute = waylineJobService.executeFlightTask(device.getWorkspaceId(), jobId); |
|
|
|
if (isExecute || WaylineTaskTypeEnum.CONDITION != waylineJob.getTaskType()) { |
|
|
|
if (!isExecute) { |
|
|
|
return; |
|
|
|
return null; |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
Optional<WaylineJobDTO> waylineJobOpt = waylineRedisService.getConditionalWaylineJob(jobId); |
|
|
|
Optional<WaylineJobDTO> waylineJobOpt = waylineRedisService.getConditionalWaylineJob(waylineJob.getJobId()); |
|
|
|
if (waylineJobOpt.isEmpty()) { |
|
|
|
if (waylineJobOpt.isEmpty()) { |
|
|
|
log.info("The conditional job has expired and will no longer be executed."); |
|
|
|
log.info("The conditional job has expired and will no longer be executed."); |
|
|
|
return receiver; |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
WaylineJobDTO waylineJob = waylineJobOpt.get(); |
|
|
|
|
|
|
|
this.retryPrepareJob(new ConditionalWaylineJobKey(device.getWorkspaceId(), dockSn, jobId), waylineJob); |
|
|
|
|
|
|
|
return receiver; |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
waylineJob = waylineJobOpt.get(); |
|
|
|
|
|
|
|
this.retryPrepareConditionJob(new WaylineJobKey(waylineJob.getWorkspaceId(), waylineJob.getDockSn(), waylineJob.getJobId()), waylineJob); |
|
|
|
} catch (Exception e) { |
|
|
|
} catch (Exception e) { |
|
|
|
log.error("Failed to execute conditional task."); |
|
|
|
log.error("Failed to execute task. ID: {}, Name:{}", waylineJob.getJobId(), waylineJob.getJobName()); |
|
|
|
|
|
|
|
this.retryPrepareConditionJob(new WaylineJobKey(waylineJob.getWorkspaceId(), waylineJob.getDockSn(), waylineJob.getJobId()), waylineJob); |
|
|
|
e.printStackTrace(); |
|
|
|
e.printStackTrace(); |
|
|
|
} |
|
|
|
} |
|
|
|
return receiver; |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Scheduled(initialDelay = 10, fixedRate = 5, timeUnit = TimeUnit.SECONDS) |
|
|
|
@Scheduled(initialDelay = 10, fixedRate = 5, timeUnit = TimeUnit.SECONDS) |
|
|
|
private void checkScheduledJob() { |
|
|
|
private void prepareWaylineJob() { |
|
|
|
Object jobIdValue = RedisOpsUtils.zGetMin(RedisConst.WAYLINE_JOB_TIMED_EXECUTE); |
|
|
|
Optional<WaylineJobKey> jobKeyOpt = waylineRedisService.getNearestPreparedWaylineJob(); |
|
|
|
if (Objects.isNull(jobIdValue)) { |
|
|
|
if (jobKeyOpt.isEmpty()) { |
|
|
|
return; |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
log.info("Check the timed tasks of the wayline. {}", jobIdValue); |
|
|
|
|
|
|
|
// format: {workspace_id}:{dock_sn}:{job_id}
|
|
|
|
// format: {workspace_id}:{dock_sn}:{job_id}
|
|
|
|
String[] jobArr = String.valueOf(jobIdValue).split(RedisConst.DELIMITER); |
|
|
|
WaylineJobKey jobKey = jobKeyOpt.get(); |
|
|
|
double time = RedisOpsUtils.zScore(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, jobIdValue); |
|
|
|
log.info("Check the prepared tasks of the wayline. {}", jobKey.toString()); |
|
|
|
long now = System.currentTimeMillis(); |
|
|
|
|
|
|
|
int offset = 30_000; |
|
|
|
WaylineJobDTO job = WaylineJobDTO.builder() |
|
|
|
|
|
|
|
.jobId(jobKey.getJobId()) |
|
|
|
// Expired tasks are deleted directly.
|
|
|
|
|
|
|
|
if (time < now - offset) { |
|
|
|
|
|
|
|
RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, jobIdValue); |
|
|
|
|
|
|
|
waylineJobService.updateJob(WaylineJobDTO.builder() |
|
|
|
|
|
|
|
.jobId(jobArr[2]) |
|
|
|
|
|
|
|
.status(WaylineJobStatusEnum.FAILED.getVal()) |
|
|
|
.status(WaylineJobStatusEnum.FAILED.getVal()) |
|
|
|
.executeTime(LocalDateTime.now()) |
|
|
|
.executeTime(LocalDateTime.now()) |
|
|
|
.completedTime(LocalDateTime.now()) |
|
|
|
.completedTime(LocalDateTime.now()) |
|
|
|
.code(HttpStatus.SC_REQUEST_TIMEOUT).build()); |
|
|
|
.code(HttpStatus.SC_INTERNAL_SERVER_ERROR).build(); |
|
|
|
|
|
|
|
Optional<WaylineJobDTO> waylineJobOpt = getPreparedJob(jobKey, job); |
|
|
|
|
|
|
|
if (waylineJobOpt.isEmpty()) { |
|
|
|
return; |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if (now <= time && time <= now + offset) { |
|
|
|
WaylineJobDTO waylineJob = waylineJobOpt.get(); |
|
|
|
try { |
|
|
|
try { |
|
|
|
waylineJobService.executeFlightTask(jobArr[0], jobArr[2]); |
|
|
|
ResponseResult result = waylineJobService.publishOneFlightTask(waylineJob); |
|
|
|
} catch (Exception e) { |
|
|
|
if (ResponseResult.CODE_SUCCESS == result.getCode()) { |
|
|
|
log.info("The scheduled task delivery failed."); |
|
|
|
return; |
|
|
|
waylineJobService.updateJob(WaylineJobDTO.builder() |
|
|
|
|
|
|
|
.jobId(jobArr[2]) |
|
|
|
|
|
|
|
.status(WaylineJobStatusEnum.FAILED.getVal()) |
|
|
|
|
|
|
|
.executeTime(LocalDateTime.now()) |
|
|
|
|
|
|
|
.completedTime(LocalDateTime.now()) |
|
|
|
|
|
|
|
.code(HttpStatus.SC_INTERNAL_SERVER_ERROR).build()); |
|
|
|
|
|
|
|
} finally { |
|
|
|
|
|
|
|
RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, jobIdValue); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
log.info("Failed to prepare the task. {}", result.getMessage()); |
|
|
|
|
|
|
|
job.setCode(result.getCode()); |
|
|
|
|
|
|
|
waylineJobService.updateJob(job); |
|
|
|
|
|
|
|
// Retry if the end time has not been exceeded.
|
|
|
|
|
|
|
|
this.retryPrepareConditionJob(jobKey, waylineJob); |
|
|
|
|
|
|
|
} catch (Exception e) { |
|
|
|
|
|
|
|
log.info("Failed to prepare the task. {}", e.getLocalizedMessage()); |
|
|
|
|
|
|
|
waylineJobService.updateJob(job); |
|
|
|
|
|
|
|
this.retryPrepareConditionJob(jobKey, waylineJob); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Scheduled(initialDelay = 10, fixedRate = 5, timeUnit = TimeUnit.SECONDS) |
|
|
|
private boolean checkTime(long time) { |
|
|
|
private void prepareConditionJob() { |
|
|
|
|
|
|
|
Optional<ConditionalWaylineJobKey> jobKeyOpt = waylineRedisService.getNearestConditionalWaylineJob(); |
|
|
|
|
|
|
|
if (jobKeyOpt.isEmpty()) { |
|
|
|
|
|
|
|
return; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
ConditionalWaylineJobKey jobKey = jobKeyOpt.get(); |
|
|
|
|
|
|
|
log.info("Check the conditional tasks of the wayline. {}", jobKey.toString()); |
|
|
|
|
|
|
|
// format: {workspace_id}:{dock_sn}:{job_id}
|
|
|
|
|
|
|
|
double time = waylineRedisService.getConditionalWaylineJobTime(jobKey); |
|
|
|
|
|
|
|
long now = System.currentTimeMillis(); |
|
|
|
|
|
|
|
// prepare the task one day in advance.
|
|
|
|
// prepare the task one day in advance.
|
|
|
|
int offset = 86_400_000; |
|
|
|
int offset = 86_400_000; |
|
|
|
|
|
|
|
return System.currentTimeMillis() + offset >= time; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if (now + offset < time) { |
|
|
|
private Optional<WaylineJobDTO> getPreparedJob(WaylineJobKey jobKey, WaylineJobDTO job) { |
|
|
|
return; |
|
|
|
long time = waylineRedisService.getPreparedWaylineJobTime(jobKey).longValue(); |
|
|
|
|
|
|
|
if (!checkTime(time)) { |
|
|
|
|
|
|
|
return Optional.empty(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
WaylineJobDTO job = WaylineJobDTO.builder() |
|
|
|
|
|
|
|
.jobId(jobKey.getJobId()) |
|
|
|
|
|
|
|
.status(WaylineJobStatusEnum.FAILED.getVal()) |
|
|
|
|
|
|
|
.executeTime(LocalDateTime.now()) |
|
|
|
|
|
|
|
.completedTime(LocalDateTime.now()) |
|
|
|
|
|
|
|
.code(HttpStatus.SC_INTERNAL_SERVER_ERROR).build(); |
|
|
|
|
|
|
|
try { |
|
|
|
|
|
|
|
Optional<WaylineJobDTO> waylineJobOpt = waylineRedisService.getConditionalWaylineJob(jobKey.getJobId()); |
|
|
|
Optional<WaylineJobDTO> waylineJobOpt = waylineRedisService.getConditionalWaylineJob(jobKey.getJobId()); |
|
|
|
|
|
|
|
// Determine whether the conditional task or the scheduled task has expired.
|
|
|
|
if (waylineJobOpt.isEmpty()) { |
|
|
|
if (waylineJobOpt.isEmpty()) { |
|
|
|
|
|
|
|
waylineJobOpt = waylineJobService.getJobByJobId(jobKey.getWorkspaceId(), jobKey.getJobId()); |
|
|
|
|
|
|
|
if (waylineJobOpt.isEmpty() || waylineJobOpt.get().getEndTime().isBefore(LocalDateTime.now())) { |
|
|
|
job.setCode(CommonErrorEnum.REDIS_DATA_NOT_FOUND.getErrorCode()); |
|
|
|
job.setCode(CommonErrorEnum.REDIS_DATA_NOT_FOUND.getErrorCode()); |
|
|
|
waylineJobService.updateJob(job); |
|
|
|
waylineJobService.updateJob(job); |
|
|
|
waylineRedisService.removePrepareConditionalWaylineJob(jobKey); |
|
|
|
return Optional.empty(); |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
waylineRedisService.removePreparedWaylineJob(jobKey); |
|
|
|
|
|
|
|
return waylineJobOpt; |
|
|
|
} |
|
|
|
} |
|
|
|
WaylineJobDTO waylineJob = waylineJobOpt.get(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ResponseResult result = waylineJobService.publishOneFlightTask(waylineJob); |
|
|
|
|
|
|
|
waylineRedisService.removePrepareConditionalWaylineJob(jobKey); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (ResponseResult.CODE_SUCCESS == result.getCode()) { |
|
|
|
private void retryPrepareConditionJob(WaylineJobKey jobKey, WaylineJobDTO waylineJob) { |
|
|
|
|
|
|
|
if (WaylineTaskTypeEnum.CONDITION != waylineJob.getTaskType()) { |
|
|
|
return; |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// If the end time is exceeded, no more retries will be made.
|
|
|
|
// If the end time is exceeded, no more retries will be made.
|
|
|
|
waylineRedisService.delConditionalWaylineJob(jobKey.getJobId()); |
|
|
|
waylineRedisService.delConditionalWaylineJob(jobKey.getJobId()); |
|
|
|
if (waylineJob.getEndTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - RedisConst.WAYLINE_JOB_BLOCK_TIME * 1000 < now) { |
|
|
|
if (waylineJob.getEndTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() < System.currentTimeMillis()) { |
|
|
|
return; |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Retry if the end time has not been exceeded.
|
|
|
|
|
|
|
|
this.retryPrepareJob(jobKey, waylineJob); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
} catch (Exception e) { |
|
|
|
|
|
|
|
log.info("Failed to prepare the conditional task."); |
|
|
|
|
|
|
|
waylineJobService.updateJob(job); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private void retryPrepareJob(ConditionalWaylineJobKey jobKey, WaylineJobDTO waylineJob) { |
|
|
|
|
|
|
|
Optional<WaylineJobDTO> childJobOpt = waylineJobService.createWaylineJobByParent(jobKey.getWorkspaceId(), jobKey.getJobId()); |
|
|
|
Optional<WaylineJobDTO> childJobOpt = waylineJobService.createWaylineJobByParent(jobKey.getWorkspaceId(), jobKey.getJobId()); |
|
|
|
if (childJobOpt.isEmpty()) { |
|
|
|
if (childJobOpt.isEmpty()) { |
|
|
|
log.error("Failed to create wayline job."); |
|
|
|
log.error("Failed to create wayline job."); |
|
|
@ -279,7 +270,7 @@ public class FlightTaskServiceImpl implements IFlightTaskService { |
|
|
|
|
|
|
|
|
|
|
|
WaylineJobDTO newJob = childJobOpt.get(); |
|
|
|
WaylineJobDTO newJob = childJobOpt.get(); |
|
|
|
newJob.setBeginTime(LocalDateTime.now().plusSeconds(RedisConst.WAYLINE_JOB_BLOCK_TIME)); |
|
|
|
newJob.setBeginTime(LocalDateTime.now().plusSeconds(RedisConst.WAYLINE_JOB_BLOCK_TIME)); |
|
|
|
boolean isAdd = waylineRedisService.addPrepareConditionalWaylineJob(newJob); |
|
|
|
boolean isAdd = waylineRedisService.addPreparedWaylineJob(newJob); |
|
|
|
if (!isAdd) { |
|
|
|
if (!isAdd) { |
|
|
|
log.error("Failed to create wayline job. {}", newJob.getJobId()); |
|
|
|
log.error("Failed to create wayline job. {}", newJob.getJobId()); |
|
|
|
return; |
|
|
|
return; |
|
|
|