@ -12,10 +12,11 @@ import com.dji.sample.manage.model.dto.DeviceDTO;
@@ -12,10 +12,11 @@ import com.dji.sample.manage.model.dto.DeviceDTO;
import com.dji.sample.manage.model.enums.UserTypeEnum ;
import com.dji.sample.manage.service.IDeviceRedisService ;
import com.dji.sample.media.model.MediaFileCountDTO ;
import com.dji.sample.wayline.model.dto.ConditionalWaylineJobKey ;
import com.dji.sample.wayline.model.dto.WaylineJobDTO ;
import com.dji.sample.wayline.model.dto.WaylineJobKey ;
import com.dji.sample.wayline.model.dto.WaylineTaskProgressReceiver ;
import com.dji.sample.wayline.model.enums.WaylineJobStatusEnum ;
import com.dji.sample.wayline.model.enums.WaylineTaskTypeEnum ;
import com.dji.sample.wayline.service.IFlightTaskService ;
import com.dji.sample.wayline.service.IWaylineJobService ;
import com.dji.sample.wayline.service.IWaylineRedisService ;
@ -25,7 +26,6 @@ import lombok.extern.slf4j.Slf4j;
@@ -25,7 +26,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpStatus ;
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.integration.annotation.ServiceActivator ;
import org.springframework.integration.mqtt.support.MqttHeaders ;
import org.springframework.messaging.MessageHeaders ;
import org.springframework.scheduling.annotation.Scheduled ;
import org.springframework.stereotype.Service ;
@ -70,9 +70,6 @@ public class FlightTaskServiceImpl implements IFlightTaskService {
@@ -70,9 +70,6 @@ public class FlightTaskServiceImpl implements IFlightTaskService {
* /
@ServiceActivator ( inputChannel = ChannelName . INBOUND_EVENTS_FLIGHT_TASK_PROGRESS , outputChannel = ChannelName . OUTBOUND_EVENTS )
public CommonTopicReceiver handleProgress ( CommonTopicReceiver receiver , MessageHeaders headers ) {
String receivedTopic = String . valueOf ( headers . get ( MqttHeaders . RECEIVED_TOPIC ) ) ;
String dockSn = receivedTopic . substring ( ( TopicConst . THING_MODEL_PRE + TopicConst . PRODUCT ) . length ( ) ,
receivedTopic . indexOf ( TopicConst . EVENTS_SUF ) ) ;
EventsReceiver < WaylineTaskProgressReceiver > eventsReceiver = mapper . convertValue ( receiver . getData ( ) ,
new TypeReference < EventsReceiver < WaylineTaskProgressReceiver > > ( ) { } ) ;
eventsReceiver . setBid ( receiver . getBid ( ) ) ;
@ -87,42 +84,52 @@ public class FlightTaskServiceImpl implements IFlightTaskService {
@@ -87,42 +84,52 @@ public class FlightTaskServiceImpl implements IFlightTaskService {
}
EventsResultStatusEnum statusEnum = EventsResultStatusEnum . find ( output . getStatus ( ) ) ;
waylineRedisService . setRunningWaylineJob ( dockSn , eventsReceiver ) ;
if ( statusEnum . getEnd ( ) ) {
WaylineJobDTO job = WaylineJobDTO . builder ( )
. jobId ( receiver . getBid ( ) )
. status ( WaylineJobStatusEnum . SUCCESS . getVal ( ) )
. completedTime ( LocalDateTime . now ( ) )
. mediaCount ( output . getExt ( ) . getMediaCount ( ) )
. build ( ) ;
// record the update of the media count.
if ( Objects . nonNull ( job . getMediaCount ( ) ) & & job . getMediaCount ( ) ! = 0 ) {
RedisOpsUtils . hashSet ( RedisConst . MEDIA_FILE_PREFIX + receiver . getGateway ( ) , job . getJobId ( ) ,
MediaFileCountDTO . builder ( ) . jobId ( receiver . getBid ( ) ) . mediaCount ( job . getMediaCount ( ) ) . uploadedCount ( 0 ) . build ( ) ) ;
}
if ( EventsResultStatusEnum . OK ! = statusEnum ) {
job . setCode ( eventsReceiver . getResult ( ) ) ;
job . setStatus ( WaylineJobStatusEnum . FAILED . getVal ( ) ) ;
}
waylineJobService . updateJob ( job ) ;
waylineRedisService . delRunningWaylineJob ( dockSn ) ;
waylineRedisService . delPausedWaylineJob ( receiver . getBid ( ) ) ;
}
waylineRedisService . setRunningWaylineJob ( receiver . getGateway ( ) , eventsReceiver ) ;
Optional < DeviceDTO > deviceOpt = deviceRedisService . getDeviceOnline ( receiver . getGateway ( ) ) ;
if ( deviceOpt . isEmpty ( ) ) {
return null ;
}
if ( statusEnum . getEnd ( ) ) {
handleEndStatus ( receiver , statusEnum , output . getExt ( ) . getMediaCount ( ) , eventsReceiver . getResult ( ) , deviceOpt . get ( ) ) ;
}
websocketMessageService . sendBatch ( deviceOpt . get ( ) . getWorkspaceId ( ) , UserTypeEnum . WEB . getVal ( ) ,
BizCodeEnum . FLIGHT_TASK_PROGRESS . getCode ( ) , eventsReceiver ) ;
return receiver ;
}
private void handleEndStatus ( CommonTopicReceiver receiver , EventsResultStatusEnum statusEnum , int mediaCount , int code , DeviceDTO dock ) {
WaylineJobDTO job = WaylineJobDTO . builder ( )
. jobId ( receiver . getBid ( ) )
. status ( WaylineJobStatusEnum . SUCCESS . getVal ( ) )
. completedTime ( LocalDateTime . now ( ) )
. mediaCount ( mediaCount )
. build ( ) ;
// record the update of the media count.
if ( Objects . nonNull ( job . getMediaCount ( ) ) & & job . getMediaCount ( ) ! = 0 ) {
RedisOpsUtils . hashSet ( RedisConst . MEDIA_FILE_PREFIX + receiver . getGateway ( ) , job . getJobId ( ) ,
MediaFileCountDTO . builder ( ) . jobId ( receiver . getBid ( ) ) . mediaCount ( job . getMediaCount ( ) ) . uploadedCount ( 0 ) . build ( ) ) ;
}
if ( EventsResultStatusEnum . OK ! = statusEnum ) {
job . setCode ( code ) ;
job . setStatus ( WaylineJobStatusEnum . FAILED . getVal ( ) ) ;
}
waylineRedisService . getConditionalWaylineJob ( receiver . getBid ( ) ) . ifPresent ( waylineJob - >
retryPrepareConditionJob ( new WaylineJobKey ( dock . getWorkspaceId ( ) , dock . getDeviceSn ( ) , receiver . getBid ( ) ) , waylineJob ) ) ;
waylineJobService . updateJob ( job ) ;
waylineRedisService . delRunningWaylineJob ( receiver . getGateway ( ) ) ;
waylineRedisService . delPausedWaylineJob ( receiver . getBid ( ) ) ;
waylineRedisService . delBlockedWaylineJobId ( receiver . getGateway ( ) ) ;
}
/ * *
* Notifications will be received through this interface when tasks are ready on the device .
* @param receiver
@ -130,106 +137,65 @@ public class FlightTaskServiceImpl implements IFlightTaskService {
@@ -130,106 +137,65 @@ public class FlightTaskServiceImpl implements IFlightTaskService {
* /
@ServiceActivator ( inputChannel = ChannelName . INBOUND_EVENTS_FLIGHT_TASK_READY , outputChannel = ChannelName . OUTBOUND_EVENTS )
public CommonTopicReceiver handleTaskNotifications ( CommonTopicReceiver receiver , MessageHeaders headers ) {
String receivedTopic = String . valueOf ( headers . get ( MqttHeaders . RECEIVED_TOPIC ) ) ;
String dockSn = receivedTopic . substring ( ( TopicConst . THING_MODEL_PRE + TopicConst . PRODUCT ) . length ( ) ,
receivedTopic . indexOf ( TopicConst . EVENTS_SUF ) ) ;
List < String > flightIds = mapper . convertValue ( receiver . getData ( ) ,
new TypeReference < Map < String , List < String > > > ( ) { } ) . get ( MapKeyConst . FLIGHT_IDS ) ;
String dockSn = receiver . getGateway ( ) ;
Set < String > flightIds = mapper . convertValue ( receiver . getData ( ) ,
new TypeReference < Map < String , Set < String > > > ( ) { } ) . get ( MapKeyConst . FLIGHT_IDS ) ;
log . info ( "ready task list:{}" , Arrays . toString ( flightIds . toArray ( ) ) ) ;
log . info ( "ready task list:{}" , Arrays . toString ( flightIds . toArray ( ) ) ) ;
// Check conditional task blocking status.
String blockedId = waylineRedisService . getBlockedWaylineJobId ( dockSn ) ;
if ( ! StringUtils . hasText ( blockedId ) ) {
if ( StringUtils . hasText ( blockedId ) ) {
log . info ( "The dock is in a state of wayline congestion, and the task will not be executed." ) ;
return null ;
}
Optional < DeviceDTO > deviceOpt = deviceRedisService . getDeviceOnline ( dockSn ) ;
if ( deviceOpt . isEmpty ( ) ) {
log . info ( "The dock is offline." ) ;
return null ;
}
DeviceDTO device = deviceOpt . get ( ) ;
try {
for ( String jobId : flightIds ) {
boolean isExecute = waylineJobService . executeFlightTask ( device . getWorkspaceId ( ) , jobId ) ;
if ( ! isExecute ) {
return null ;
}
Optional < WaylineJobDTO > waylineJobOpt = waylineRedisService . getConditionalWaylineJob ( jobId ) ;
if ( waylineJobOpt . isEmpty ( ) ) {
log . info ( "The conditional job has expired and will no longer be executed." ) ;
return receiver ;
}
WaylineJobDTO waylineJob = waylineJobOpt . get ( ) ;
this . retryPrepareJob ( new ConditionalWaylineJobKey ( device . getWorkspaceId ( ) , dockSn , jobId ) , waylineJob ) ;
return receiver ;
}
} catch ( Exception e ) {
log . error ( "Failed to execute conditional task." ) ;
e . printStackTrace ( ) ;
Optional < WaylineJobDTO > jobOpt = waylineJobService . getJobsByConditions ( device . getWorkspaceId ( ) , flightIds , WaylineJobStatusEnum . PENDING )
. stream ( ) . filter ( job - > flightIds . contains ( job . getJobId ( ) ) )
. sorted ( Comparator . comparingInt ( a - > a . getTaskType ( ) . getVal ( ) ) )
. min ( Comparator . comparing ( WaylineJobDTO : : getBeginTime ) ) ;
if ( jobOpt . isEmpty ( ) ) {
return receiver ;
}
executeReadyTask ( jobOpt . get ( ) ) ;
return receiver ;
}
@Scheduled ( initialDelay = 10 , fixedRate = 5 , timeUnit = TimeUnit . SECONDS )
private void checkScheduledJob ( ) {
Object jobIdValue = RedisOpsUtils . zGetMin ( RedisConst . WAYLINE_JOB_TIMED_EXECUTE ) ;
if ( Objects . isNull ( jobIdValue ) ) {
return ;
}
log . info ( "Check the timed tasks of the wayline. {}" , jobIdValue ) ;
// format: {workspace_id}:{dock_sn}:{job_id}
String [ ] jobArr = String . valueOf ( jobIdValue ) . split ( RedisConst . DELIMITER ) ;
double time = RedisOpsUtils . zScore ( RedisConst . WAYLINE_JOB_TIMED_EXECUTE , jobIdValue ) ;
long now = System . currentTimeMillis ( ) ;
int offset = 30_000 ;
// Expired tasks are deleted directly.
if ( time < now - offset ) {
RedisOpsUtils . zRemove ( RedisConst . WAYLINE_JOB_TIMED_EXECUTE , jobIdValue ) ;
waylineJobService . updateJob ( WaylineJobDTO . builder ( )
. jobId ( jobArr [ 2 ] )
. status ( WaylineJobStatusEnum . FAILED . getVal ( ) )
. executeTime ( LocalDateTime . now ( ) )
. completedTime ( LocalDateTime . now ( ) )
. code ( HttpStatus . SC_REQUEST_TIMEOUT ) . build ( ) ) ;
return ;
}
if ( now < = time & & time < = now + offset ) {
try {
waylineJobService . executeFlightTask ( jobArr [ 0 ] , jobArr [ 2 ] ) ;
} catch ( Exception e ) {
log . info ( "The scheduled task delivery failed." ) ;
waylineJobService . updateJob ( WaylineJobDTO . builder ( )
. jobId ( jobArr [ 2 ] )
. status ( WaylineJobStatusEnum . FAILED . getVal ( ) )
. executeTime ( LocalDateTime . now ( ) )
. completedTime ( LocalDateTime . now ( ) )
. code ( HttpStatus . SC_INTERNAL_SERVER_ERROR ) . build ( ) ) ;
} finally {
RedisOpsUtils . zRemove ( RedisConst . WAYLINE_JOB_TIMED_EXECUTE , jobIdValue ) ;
private void executeReadyTask ( WaylineJobDTO waylineJob ) {
try {
boolean isExecute = waylineJobService . executeFlightTask ( waylineJob . getWorkspaceId ( ) , waylineJob . getJobId ( ) ) ;
if ( isExecute | | WaylineTaskTypeEnum . CONDITION ! = waylineJob . getTaskType ( ) ) {
return ;
}
Optional < WaylineJobDTO > waylineJobOpt = waylineRedisService . getConditionalWaylineJob ( waylineJob . getJobId ( ) ) ;
if ( waylineJobOpt . isEmpty ( ) ) {
log . info ( "The conditional job has expired and will no longer be executed." ) ;
return ;
}
waylineJob = waylineJobOpt . get ( ) ;
this . retryPrepareConditionJob ( new WaylineJobKey ( waylineJob . getWorkspaceId ( ) , waylineJob . getDockSn ( ) , waylineJob . getJobId ( ) ) , waylineJob ) ;
} catch ( Exception e ) {
log . error ( "Failed to execute task. ID: {}, Name:{}" , waylineJob . getJobId ( ) , waylineJob . getJobName ( ) ) ;
this . retryPrepareConditionJob ( new WaylineJobKey ( waylineJob . getWorkspaceId ( ) , waylineJob . getDockSn ( ) , waylineJob . getJobId ( ) ) , waylineJob ) ;
e . printStackTrace ( ) ;
}
}
@Scheduled ( initialDelay = 10 , fixedRate = 5 , timeUnit = TimeUnit . SECONDS )
private void prepareConditionJob ( ) {
Optional < ConditionalWaylineJobKey > jobKeyOpt = waylineRedisService . getNearestConditionalWaylineJob ( ) ;
private void prepareWaylineJob ( ) {
Optional < WaylineJobKey > jobKeyOpt = waylineRedisService . getNearestPreparedWaylineJob ( ) ;
if ( jobKeyOpt . isEmpty ( ) ) {
return ;
}
ConditionalWaylineJobKey jobKey = jobKeyOpt . get ( ) ;
log . info ( "Check the conditional tasks of the wayline. {}" , jobKey . toString ( ) ) ;
// format: {workspace_id}:{dock_sn}:{job_id}
double time = waylineRedisService . getConditionalWaylineJobTime ( jobKey ) ;
long now = System . currentTimeMillis ( ) ;
// prepare the task one day in advance.
int offset = 86_400_000 ;
if ( now + offset < time ) {
return ;
}
WaylineJobKey jobKey = jobKeyOpt . get ( ) ;
log . info ( "Check the prepared tasks of the wayline. {}" , jobKey . toString ( ) ) ;
WaylineJobDTO job = WaylineJobDTO . builder ( )
. jobId ( jobKey . getJobId ( ) )
@ -237,40 +203,65 @@ public class FlightTaskServiceImpl implements IFlightTaskService {
@@ -237,40 +203,65 @@ public class FlightTaskServiceImpl implements IFlightTaskService {
. executeTime ( LocalDateTime . now ( ) )
. completedTime ( LocalDateTime . now ( ) )
. code ( HttpStatus . SC_INTERNAL_SERVER_ERROR ) . build ( ) ;
try {
Optional < WaylineJobDTO > waylineJobOpt = waylineRedisService . getConditionalWaylineJob ( jobKey . getJobId ( ) ) ;
if ( waylineJobOpt . isEmpty ( ) ) {
job . setCode ( CommonErrorEnum . REDIS_DATA_NOT_FOUND . getErrorCode ( ) ) ;
waylineJobService . updateJob ( job ) ;
waylineRedisService . removePrepareConditionalWaylineJob ( jobKey ) ;
return ;
}
WaylineJobDTO waylineJob = waylineJobOpt . get ( ) ;
Optional < WaylineJobDTO > waylineJobOpt = getPreparedJob ( jobKey , job ) ;
if ( waylineJobOpt . isEmpty ( ) ) {
return ;
}
WaylineJobDTO waylineJob = waylineJobOpt . get ( ) ;
try {
ResponseResult result = waylineJobService . publishOneFlightTask ( waylineJob ) ;
waylineRedisService . removePrepareConditionalWaylineJob ( jobKey ) ;
if ( ResponseResult . CODE_SUCCESS = = result . getCode ( ) ) {
return ;
}
// If the end time is exceeded, no more retries will be made.
waylineRedisService . delConditionalWaylineJob ( jobKey . getJobId ( ) ) ;
if ( waylineJob . getEndTime ( ) . atZone ( ZoneId . systemDefault ( ) ) . toInstant ( ) . toEpochMilli ( ) - RedisConst . WAYLINE_JOB_BLOCK_TIME * 1000 < now ) {
return ;
}
log . info ( "Failed to prepare the task. {}" , result . getMessage ( ) ) ;
job . setCode ( result . getCode ( ) ) ;
waylineJobService . updateJob ( job ) ;
// Retry if the end time has not been exceeded.
this . retryPrepareJob ( jobKey , waylineJob ) ;
this . retryPrepareConditionJob ( jobKey , waylineJob ) ;
} catch ( Exception e ) {
log . info ( "Failed to prepare the conditional task." ) ;
log . info ( "Failed to prepare the task. {}" , e . getLocalizedMessage ( ) ) ;
waylineJobService . updateJob ( job ) ;
this . retryPrepareConditionJob ( jobKey , waylineJob ) ;
}
}
private boolean checkTime ( long time ) {
// prepare the task one day in advance.
int offset = 86_400_000 ;
return System . currentTimeMillis ( ) + offset > = time ;
}
private Optional < WaylineJobDTO > getPreparedJob ( WaylineJobKey jobKey , WaylineJobDTO job ) {
long time = waylineRedisService . getPreparedWaylineJobTime ( jobKey ) . longValue ( ) ;
if ( ! checkTime ( time ) ) {
return Optional . empty ( ) ;
}
Optional < WaylineJobDTO > waylineJobOpt = waylineRedisService . getConditionalWaylineJob ( jobKey . getJobId ( ) ) ;
// Determine whether the conditional task or the scheduled task has expired.
if ( waylineJobOpt . isEmpty ( ) ) {
waylineJobOpt = waylineJobService . getJobByJobId ( jobKey . getWorkspaceId ( ) , jobKey . getJobId ( ) ) ;
if ( waylineJobOpt . isEmpty ( ) | | waylineJobOpt . get ( ) . getEndTime ( ) . isBefore ( LocalDateTime . now ( ) ) ) {
job . setCode ( CommonErrorEnum . REDIS_DATA_NOT_FOUND . getErrorCode ( ) ) ;
waylineJobService . updateJob ( job ) ;
return Optional . empty ( ) ;
}
}
waylineRedisService . removePreparedWaylineJob ( jobKey ) ;
return waylineJobOpt ;
}
private void retryPrepareJob ( ConditionalWaylineJobKey jobKey , WaylineJobDTO waylineJob ) {
private void retryPrepareConditionJob ( WaylineJobKey jobKey , WaylineJobDTO waylineJob ) {
if ( WaylineTaskTypeEnum . CONDITION ! = waylineJob . getTaskType ( ) ) {
return ;
}
// If the end time is exceeded, no more retries will be made.
waylineRedisService . delConditionalWaylineJob ( jobKey . getJobId ( ) ) ;
if ( waylineJob . getEndTime ( ) . atZone ( ZoneId . systemDefault ( ) ) . toInstant ( ) . toEpochMilli ( ) < System . currentTimeMillis ( ) ) {
return ;
}
Optional < WaylineJobDTO > childJobOpt = waylineJobService . createWaylineJobByParent ( jobKey . getWorkspaceId ( ) , jobKey . getJobId ( ) ) ;
if ( childJobOpt . isEmpty ( ) ) {
log . error ( "Failed to create wayline job." ) ;
@ -279,7 +270,7 @@ public class FlightTaskServiceImpl implements IFlightTaskService {
@@ -279,7 +270,7 @@ public class FlightTaskServiceImpl implements IFlightTaskService {
WaylineJobDTO newJob = childJobOpt . get ( ) ;
newJob . setBeginTime ( LocalDateTime . now ( ) . plusSeconds ( RedisConst . WAYLINE_JOB_BLOCK_TIME ) ) ;
boolean isAdd = waylineRedisService . addPrepareCon ditional WaylineJob ( newJob ) ;
boolean isAdd = waylineRedisService . addPreparedWaylineJob ( newJob ) ;
if ( ! isAdd ) {
log . error ( "Failed to create wayline job. {}" , newJob . getJobId ( ) ) ;
return ;