diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..b53c3c1
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,38 @@
+HELP.md
+target/
+!.mvn/wrapper/maven-wrapper.jar
+!**/src/main/**/target/
+!**/src/test/**/target/
+
+### STS ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### IntelliJ IDEA ###
+.idea
+*.iws
+*.iml
+*.ipr
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/
+/.mvn/
+/mvnw
+/mvnw.cmd
+/logs/
+/src/test/
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..5cbe4c6
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,148 @@
+
+
+ 4.0.0
+
+ org.springframework.boot
+ spring-boot-starter-parent
+ 2.4.11
+
+
+
+ com.dji
+ cloud-api-sample
+ 1.0.0
+ cloud-api-sample
+
+
+ 11
+ 3.4.2
+ 1.2.6
+ 3.12.1
+ 5.5.5
+ 8.3.7
+ 4.9.1
+ 3.1.0
+ 3.12.0
+ 1.1.1
+ 2.3.3
+ 2.15.0
+ 2.3.0
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+
+
+
+ mysql
+ mysql-connector-java
+
+
+
+ org.projectlombok
+ lombok
+ true
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+
+ com.baomidou
+ mybatis-plus-boot-starter
+ ${mybatis-plus.version}
+
+
+
+ com.alibaba
+ druid-spring-boot-starter
+ ${druid.version}
+
+
+
+ com.auth0
+ java-jwt
+ ${jwt.version}
+
+
+
+ org.springframework.integration
+ spring-integration-mqtt
+ ${mqtt.version}
+
+
+
+ org.springframework.boot
+ spring-boot-starter-websocket
+
+
+ org.jetbrains
+ annotations
+ RELEASE
+ compile
+
+
+
+ io.minio
+ minio
+ ${minio.version}
+
+
+
+ com.squareup.okhttp3
+ okhttp
+ ${okhttp3.version}
+
+
+
+ com.aliyun
+ aliyun-java-sdk-sts
+ ${aliyun-sdk-sts.version}
+
+
+
+ com.aliyun.oss
+ aliyun-sdk-oss
+ ${aliyun-oss.version}
+
+
+
+ javax.xml.bind
+ jaxb-api
+
+
+ javax.activation
+ activation
+ ${javax-activation.version}
+
+
+
+ org.glassfish.jaxb
+ jaxb-runtime
+ ${glassfish-jaxb.version}
+
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+
+ org.projectlombok
+ lombok
+
+
+
+
+
+
+
diff --git a/sql/cloud_sample.sql b/sql/cloud_sample.sql
new file mode 100644
index 0000000..4cfe7fd
--- /dev/null
+++ b/sql/cloud_sample.sql
@@ -0,0 +1,324 @@
+CREATE DATABASE IF NOT EXISTS `cloud_sample` /*!40100 DEFAULT CHARACTER SET utf8 */;
+USE `cloud_sample`;
+
+/*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */;
+/*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */;
+/*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION */;
+SET NAMES utf8mb4;
+/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
+/*!40101 SET @OLD_SQL_MODE='NO_AUTO_VALUE_ON_ZERO', SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
+/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
+
+
+# manage_camera_video
+# ------------------------------------------------------------
+
+DROP TABLE IF EXISTS `manage_camera_video`;
+
+CREATE TABLE `manage_camera_video` (
+ `id` int unsigned NOT NULL AUTO_INCREMENT,
+ `camera_id` int NOT NULL,
+ `video_index` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '',
+ `video_type` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '',
+ PRIMARY KEY (`id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
+
+
+
+# manage_capacity_camera
+# ------------------------------------------------------------
+
+DROP TABLE IF EXISTS `manage_capacity_camera`;
+
+CREATE TABLE `manage_capacity_camera` (
+ `id` int unsigned NOT NULL AUTO_INCREMENT,
+ `device_sn` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '',
+ `name` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'undefined',
+ `description` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
+ `camera_index` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '',
+ `coexist_video_number_max` int NOT NULL,
+ `available_video_number` int NOT NULL,
+ PRIMARY KEY (`id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
+
+
+
+# manage_device
+# ------------------------------------------------------------
+
+DROP TABLE IF EXISTS `manage_device`;
+
+CREATE TABLE `manage_device` (
+ `id` int unsigned NOT NULL AUTO_INCREMENT,
+ `device_sn` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '',
+ `device_name` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'undefined',
+ `workspace_id` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '',
+ `device_type` smallint NOT NULL,
+ `sub_type` smallint NOT NULL,
+ `domain` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '',
+ `version` smallint NOT NULL,
+ `device_index` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
+ `child_sn` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
+ `create_time` bigint NOT NULL,
+ `update_time` bigint NOT NULL,
+ `device_desc` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
+ `url_normal` varchar(200) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
+ `url_select` varchar(200) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `product_sn_UNIQUE` (`device_sn`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
+
+
+
+# manage_device_dictionary
+# ------------------------------------------------------------
+
+DROP TABLE IF EXISTS `manage_device_dictionary`;
+
+CREATE TABLE `manage_device_dictionary` (
+ `id` int unsigned NOT NULL AUTO_INCREMENT,
+ `domain` int NOT NULL,
+ `device_type` int NOT NULL,
+ `sub_type` int NOT NULL,
+ `device_name` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '',
+ `device_desc` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
+ PRIMARY KEY (`id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
+
+LOCK TABLES `manage_device_dictionary` WRITE;
+/*!40000 ALTER TABLE `manage_device_dictionary` DISABLE KEYS */;
+
+INSERT INTO `manage_device_dictionary` (`id`, `domain`, `device_type`, `sub_type`, `device_name`, `device_desc`)
+VALUES
+ (1,0,60,0,'Matrice 300 RTK',NULL),
+ (16,2,56,0,'DJI Smart Controller','Remote control for M300'),
+ (17,1,20,0,'Z30',NULL),
+ (18,1,26,0,'XT2',NULL),
+ (19,1,39,0,'FPV',NULL),
+ (20,1,41,0,'XTS',NULL),
+ (21,1,42,0,'H20',NULL),
+ (22,1,43,0,'H20T',NULL),
+ (24,1,90742,0,'L1',NULL),
+ (27,1,50,0,'P1 24mm lens',NULL),
+ (28,1,50,1,'P1 35mm lens',NULL),
+ (29,1,50,2,'P1 50mm lens',NULL),
+ (30,0,67,0,'Matrice 30',NULL),
+ (31,0,67,1,'Matrice 30T',NULL),
+ (32,2,119,0,'DJI RC Plus','Remote control for M30'),
+ (33,1,52,0,'M30',NULL),
+ (34,1,53,1,'M30T',NULL);
+
+/*!40000 ALTER TABLE `manage_device_dictionary` ENABLE KEYS */;
+UNLOCK TABLES;
+
+
+# manage_device_payload
+# ------------------------------------------------------------
+
+DROP TABLE IF EXISTS `manage_device_payload`;
+
+CREATE TABLE `manage_device_payload` (
+ `id` int unsigned NOT NULL AUTO_INCREMENT,
+ `payload_sn` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '',
+ `payload_name` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'undefined',
+ `payload_type` smallint NOT NULL,
+ `sub_type` smallint NOT NULL,
+ `version` smallint DEFAULT NULL,
+ `payload_index` smallint NOT NULL,
+ `device_sn` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '',
+ `payload_desc` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
+ `create_time` bigint NOT NULL,
+ `update_time` bigint NOT NULL,
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `payload_sn_UNIQUE` (`payload_sn`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
+
+
+
+# manage_user
+# ------------------------------------------------------------
+
+DROP TABLE IF EXISTS `manage_user`;
+
+CREATE TABLE `manage_user` (
+ `id` int unsigned NOT NULL AUTO_INCREMENT,
+ `user_id` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '',
+ `username` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '',
+ `password` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '',
+ `workspace_id` int NOT NULL,
+ `user_type` smallint NOT NULL,
+ `mqtt_username` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '',
+ `mqtt_password` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '',
+ `create_time` bigint NOT NULL,
+ `update_time` bigint NOT NULL,
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `user_id_UNIQUE` (`user_id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
+
+LOCK TABLES `manage_user` WRITE;
+/*!40000 ALTER TABLE `manage_user` DISABLE KEYS */;
+
+INSERT INTO `manage_user` (`id`, `user_id`, `username`, `password`, `workspace_id`, `user_type`, `mqtt_username`, `mqtt_password`, `create_time`, `update_time`)
+VALUES
+ (1,'a1559e7c-8dd8-4780-b952-100cc4797da2','adminPC','adminPC',1,1,'admin','admin',1634898410751,1634898410751),
+ (2,'be7c6c3d-afe9-4be4-b9eb-c55066c0914e','pilot','pilot123',1,2,'pilot','pilot123',1634898410751,1634898410751);
+
+/*!40000 ALTER TABLE `manage_user` ENABLE KEYS */;
+UNLOCK TABLES;
+
+
+# manage_workspace
+# ------------------------------------------------------------
+
+DROP TABLE IF EXISTS `manage_workspace`;
+
+CREATE TABLE `manage_workspace` (
+ `id` int unsigned NOT NULL AUTO_INCREMENT,
+ `workspace_id` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '',
+ `workspace_name` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '',
+ `workspace_desc` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '',
+ `platform_name` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '',
+ `create_time` bigint NOT NULL,
+ `update_time` bigint NOT NULL,
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `workspace_id_UNIQUE` (`workspace_id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
+
+LOCK TABLES `manage_workspace` WRITE;
+/*!40000 ALTER TABLE `manage_workspace` DISABLE KEYS */;
+
+INSERT INTO `manage_workspace` (`id`, `workspace_id`, `workspace_name`, `workspace_desc`, `platform_name`, `create_time`, `update_time`)
+VALUES
+ (1,'e3dea0f5-37f2-4d79-ae58-490af3228069','Test Group One','Cloud Sample Test Platform','Cloud Api Platform',1634898410751,1634898410751);
+
+/*!40000 ALTER TABLE `manage_workspace` ENABLE KEYS */;
+UNLOCK TABLES;
+
+
+# map_element_coordinate
+# ------------------------------------------------------------
+
+DROP TABLE IF EXISTS `map_element_coordinate`;
+
+CREATE TABLE `map_element_coordinate` (
+ `id` int unsigned NOT NULL AUTO_INCREMENT,
+ `element_id` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '',
+ `longitude` decimal(18,14) NOT NULL,
+ `latitude` decimal(17,14) NOT NULL,
+ `altitude` decimal(17,14) DEFAULT NULL,
+ PRIMARY KEY (`id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
+
+
+
+# map_group
+# ------------------------------------------------------------
+
+DROP TABLE IF EXISTS `map_group`;
+
+CREATE TABLE `map_group` (
+ `id` int unsigned NOT NULL AUTO_INCREMENT,
+ `group_id` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
+ `group_name` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
+ `group_type` int NOT NULL,
+ `workspace_id` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '',
+ `is_distributed` tinyint(1) NOT NULL DEFAULT '1',
+ `is_lock` tinyint(1) NOT NULL DEFAULT '0',
+ `create_time` bigint NOT NULL,
+ `update_time` bigint NOT NULL,
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `group_id_UNIQUE` (`group_id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 COLLATE=utf8_bin;
+
+LOCK TABLES `map_group` WRITE;
+/*!40000 ALTER TABLE `map_group` DISABLE KEYS */;
+
+INSERT INTO `map_group` (`id`, `group_id`, `group_name`, `group_type`, `workspace_id`, `is_distributed`, `is_lock`, `create_time`, `update_time`)
+VALUES
+ (1,'e3dea0f5-37f2-4d79-ae58-490af3228060','Pilot Share Layer',2,'e3dea0f5-37f2-4d79-ae58-490af3228069',1,0,1638330077356,1638330077356),
+ (2,'e3dea0f5-37f2-4d79-ae58-490af3228011','Default Layer',1,'e3dea0f5-37f2-4d79-ae58-490af3228069',1,0,1638330077356,1638330077356);
+
+/*!40000 ALTER TABLE `map_group` ENABLE KEYS */;
+UNLOCK TABLES;
+
+
+# map_group_element
+# ------------------------------------------------------------
+
+DROP TABLE IF EXISTS `map_group_element`;
+
+CREATE TABLE `map_group_element` (
+ `id` int unsigned NOT NULL AUTO_INCREMENT,
+ `element_id` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
+ `element_name` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
+ `display` smallint NOT NULL DEFAULT '1',
+ `group_id` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '',
+ `element_type` smallint NOT NULL,
+ `username` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '',
+ `color` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '',
+ `clamp_to_ground` tinyint(1) NOT NULL DEFAULT '0',
+ `create_time` bigint NOT NULL,
+ `update_time` bigint NOT NULL,
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `element_id_UNIQUE` (`element_id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 COLLATE=utf8_bin;
+
+
+
+# media_file
+# ------------------------------------------------------------
+
+DROP TABLE IF EXISTS `media_file`;
+
+CREATE TABLE `media_file` (
+ `id` int unsigned NOT NULL AUTO_INCREMENT,
+ `file_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '',
+ `file_path` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '',
+ `workspace_id` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '',
+ `fingerprint` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '',
+ `tinny_fingerprint` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '',
+ `object_key` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '',
+ `sub_file_type` int NOT NULL,
+ `is_original` tinyint(1) NOT NULL,
+ `drone` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'undefined',
+ `payload` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'undefined',
+ `create_time` bigint NOT NULL,
+ `update_time` bigint NOT NULL,
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `fingerprint_UNIQUE` (`fingerprint`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
+
+
+
+# wayline_file
+# ------------------------------------------------------------
+
+DROP TABLE IF EXISTS `wayline_file`;
+
+CREATE TABLE `wayline_file` (
+ `id` int unsigned NOT NULL AUTO_INCREMENT,
+ `name` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
+ `wayline_id` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
+ `drone_model_key` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
+ `payload_model_keys` varchar(200) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
+ `workspace_id` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '',
+ `favorited` tinyint(1) NOT NULL DEFAULT '0',
+ `template_types` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
+ `object_key` varchar(200) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '',
+ `user_name` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
+ `create_time` bigint NOT NULL,
+ `update_time` bigint NOT NULL COMMENT 'required, can not modify.',
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `wayline_id_UNIQUE` (`wayline_id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
+
+
+
+
+/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;
+/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
+/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
+/*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT */;
+/*!40101 SET CHARACTER_SET_RESULTS=@OLD_CHARACTER_SET_RESULTS */;
+/*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */;
+
diff --git a/src/main/java/com/dji/sample/CloudApiSampleApplication.java b/src/main/java/com/dji/sample/CloudApiSampleApplication.java
new file mode 100644
index 0000000..14cca32
--- /dev/null
+++ b/src/main/java/com/dji/sample/CloudApiSampleApplication.java
@@ -0,0 +1,17 @@
+package com.dji.sample;
+
+import org.mybatis.spring.annotation.MapperScan;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.scheduling.annotation.EnableScheduling;
+
+@MapperScan("com.dji.sample.*.dao")
+@SpringBootApplication
+@EnableScheduling
+public class CloudApiSampleApplication {
+
+ public static void main(String[] args) {
+ SpringApplication.run(CloudApiSampleApplication.class, args);
+ }
+
+}
diff --git a/src/main/java/com/dji/sample/common/error/CommonErrorEnum.java b/src/main/java/com/dji/sample/common/error/CommonErrorEnum.java
new file mode 100644
index 0000000..1dab9d1
--- /dev/null
+++ b/src/main/java/com/dji/sample/common/error/CommonErrorEnum.java
@@ -0,0 +1,40 @@
+package com.dji.sample.common.error;
+
+/**
+ * @author sean.zhou
+ * @version 0.1
+ * @date 2021/11/25
+ */
+public enum CommonErrorEnum implements IErrorInfo {
+
+ SYSTEM_ERROR(600500, "system error"),
+
+ SECRET_INVALID(600100, "secret invalid"),
+
+ NO_TOKEN(600101, "accss_token is null"),
+
+ TOKEN_EXPIRED(600102, "token is expired"),
+
+ TOKEN_INVALID(600103, "token invalid"),
+
+ SIGN_INVALID(600104, "sign invalid");
+
+ private String msg;
+
+ private int code;
+
+ CommonErrorEnum(int code, String msg) {
+ this.code = code;
+ this.msg = msg;
+ }
+
+ @Override
+ public String getErrorMsg() {
+ return this.msg;
+ }
+
+ @Override
+ public Integer getErrorCode() {
+ return this.code;
+ }
+}
diff --git a/src/main/java/com/dji/sample/common/error/IErrorInfo.java b/src/main/java/com/dji/sample/common/error/IErrorInfo.java
new file mode 100644
index 0000000..3332dd3
--- /dev/null
+++ b/src/main/java/com/dji/sample/common/error/IErrorInfo.java
@@ -0,0 +1,22 @@
+package com.dji.sample.common.error;
+
+/**
+ * @author sean.zhou
+ * @version 0.1
+ * @date 2021/11/25
+ */
+public interface IErrorInfo {
+
+ /**
+ * Get error message.
+ * @return error message
+ */
+ String getErrorMsg();
+
+ /**
+ * Get error code.
+ * @return error code
+ */
+ Integer getErrorCode();
+
+}
diff --git a/src/main/java/com/dji/sample/common/error/LiveErrorEnum.java b/src/main/java/com/dji/sample/common/error/LiveErrorEnum.java
new file mode 100644
index 0000000..0a65eed
--- /dev/null
+++ b/src/main/java/com/dji/sample/common/error/LiveErrorEnum.java
@@ -0,0 +1,78 @@
+package com.dji.sample.common.error;
+
+/**
+ * Live streaming related error codes. When on-demand via mqtt,
+ * it can be matched with the error code information replied by the pilot.
+ * @author sean.zhou
+ * @version 0.1
+ * @date 2021/11/25
+ */
+public enum LiveErrorEnum implements IErrorInfo {
+
+ NO_AIRCRAFT(613001, "No aircraft."),
+
+ NO_CAMERA(613002, "No camera."),
+
+ LIVE_STREAM_ALREADY_STARTED(613003, "The camera has started live streaming."),
+
+ FUNCTION_NOT_SUPPORT(613004, "The function is not supported."),
+
+ STRATEGY_NOT_SUPPORT(613005, "The strategy is not supported."),
+
+ NOT_IN_CAMERA_INTERFACE(613006, "The current app is not in the camera interface."),
+
+ NO_FLIGHT_CONTROL(613007, "The remote control has no flight control rights and cannot respond to control commands"),
+
+ NO_STREAM_DATA(613008, "The current app has no stream data."),
+
+ TOO_FREQUENT(613009, "The operation is too frequent."),
+
+ ENABLE_FAILED(613010, "Please check whether the live stream service is normal."),
+
+ NO_LIVE_STREAM(613011, "There are no live stream currently."),
+
+ SWITCH_NOT_SUPPORT(613012, "There is already another camera in the live stream. It's not support to switch the stream directly."),
+
+ URL_TYPE_NOT_SUPPORTED(613013, "This url type is not supported."),
+
+ ERROR_PARAMETERS(613014, "The live stream parameters are abnormal or incomplete."),
+
+ NO_REPLY(613098, "No live reply received."),
+
+ UNKNOWN(613099, "UNKNOWN");
+
+
+ private String msg;
+
+ private int code;
+
+ LiveErrorEnum(int code, String msg) {
+ this.code = code;
+ this.msg = msg;
+ }
+
+ @Override
+ public String getErrorMsg() {
+ return this.msg;
+ }
+
+ @Override
+ public Integer getErrorCode() {
+ return this.code;
+ }
+
+ /**
+ * Get the corresponding enumeration object based on the error code.
+ * @param code error code
+ * @return enumeration object
+ */
+ public static LiveErrorEnum find(int code) {
+
+ for (LiveErrorEnum errorEnum : LiveErrorEnum.class.getEnumConstants()) {
+ if (errorEnum.code == code) {
+ return errorEnum;
+ }
+ }
+ return UNKNOWN;
+ }
+}
diff --git a/src/main/java/com/dji/sample/common/model/CustomClaim.java b/src/main/java/com/dji/sample/common/model/CustomClaim.java
new file mode 100644
index 0000000..85c16a9
--- /dev/null
+++ b/src/main/java/com/dji/sample/common/model/CustomClaim.java
@@ -0,0 +1,88 @@
+package com.dji.sample.common.model;
+
+import com.auth0.jwt.interfaces.Claim;
+import com.fasterxml.jackson.annotation.JsonAlias;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * A custom claim for storing custom information in the token.
+ * @author sean.zhou
+ * @date 2021/11/16
+ * @version 0.1
+ */
+@AllArgsConstructor
+@NoArgsConstructor
+@Data
+@Slf4j
+public class CustomClaim {
+
+ /**
+ * The id of the account.
+ */
+ private String id;
+
+ private String username;
+
+ @JsonAlias("user_type")
+ private Integer userType;
+
+ @JsonAlias("workspace_id")
+ private String workspaceId;
+
+ /**
+ * Convert the custom claim data type to the Map type.
+ * @return map
+ */
+ public ConcurrentHashMap convertToMap() {
+ ConcurrentHashMap map = new ConcurrentHashMap<>(4);
+ try {
+ Field[] declaredFields = this.getClass().getDeclaredFields();
+ for (Field field : declaredFields) {
+ JsonAlias annotation = field.getAnnotation(JsonAlias.class);
+ field.setAccessible(true);
+ // The value of key is named underscore.
+ map.put(annotation != null ? annotation.value()[0] : field.getName(),
+ field.get(this).toString());
+ }
+ } catch (IllegalAccessException e) {
+ log.info("CustomClaim converts failed. {}", this.toString());
+ e.printStackTrace();
+ }
+ return map;
+ }
+
+ /**
+ * Convert the data in Map into a custom claim object.
+ * @param claimMap
+ */
+ public CustomClaim (Map claimMap) {
+ Field[] declaredFields = this.getClass().getDeclaredFields();
+ for (Field field : declaredFields) {
+ field.setAccessible(true);
+ JsonAlias annotation = field.getAnnotation(JsonAlias.class);
+
+ Claim value = claimMap.get(annotation == null ? field.getName() : annotation.value()[0]);
+ try {
+ Class> type = field.getType();
+ if (Integer.class.equals(type)) {
+ field.set(this, Integer.valueOf(value.asString()));
+ continue;
+ }
+ if (String.class.equals(type)) {
+ field.set(this, value.asString());
+ continue;
+ }
+ } catch (IllegalAccessException e) {
+ log.info("Claim parses failed. {}", claimMap.toString());
+ e.printStackTrace();
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/dji/sample/common/model/Pagination.java b/src/main/java/com/dji/sample/common/model/Pagination.java
new file mode 100644
index 0000000..74d1e8a
--- /dev/null
+++ b/src/main/java/com/dji/sample/common/model/Pagination.java
@@ -0,0 +1,36 @@
+package com.dji.sample.common.model;
+
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import lombok.Data;
+
+/**
+ * Used for paging display in the wayline. These field names cannot be changed.
+ * Because they need to be the same as the pilot.
+ * @author sean
+ * @version 0.3
+ * @date 2021/12/22
+ */
+@Data
+public class Pagination {
+
+ /**
+ * The current page number.
+ */
+ private long page;
+
+ /**
+ * The amount of data displayed per page.
+ */
+ private long pageSize;
+
+ /**
+ * The total amount of all data.
+ */
+ private long total;
+
+ public Pagination(Page page) {
+ this.page = page.getCurrent();
+ this.pageSize = page.getSize();
+ this.total = page.getTotal();
+ }
+}
diff --git a/src/main/java/com/dji/sample/common/model/PaginationData.java b/src/main/java/com/dji/sample/common/model/PaginationData.java
new file mode 100644
index 0000000..95e2442
--- /dev/null
+++ b/src/main/java/com/dji/sample/common/model/PaginationData.java
@@ -0,0 +1,27 @@
+package com.dji.sample.common.model;
+
+import lombok.Data;
+
+import java.util.List;
+
+/**
+ * The format of the data response when a paginated display is required.
+ * @author sean
+ * @version 0.3
+ * @date 2021/12/22
+ */
+@Data
+public class PaginationData {
+
+ /**
+ * The collection in which the data list is stored.
+ */
+ private List list;
+
+ private Pagination pagination;
+
+ public PaginationData(List list, Pagination pagination) {
+ this.list = list;
+ this.pagination = pagination;
+ }
+}
diff --git a/src/main/java/com/dji/sample/common/model/ResponseResult.java b/src/main/java/com/dji/sample/common/model/ResponseResult.java
new file mode 100644
index 0000000..8ad8e1b
--- /dev/null
+++ b/src/main/java/com/dji/sample/common/model/ResponseResult.java
@@ -0,0 +1,69 @@
+package com.dji.sample.common.model;
+
+import com.dji.sample.common.error.IErrorInfo;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.springframework.http.HttpStatus;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+@Builder
+@JsonInclude
+public class ResponseResult {
+
+ public static final int CODE_SUCCESS = 0;
+ public static final String MESSAGE_SUCCESS = "success";
+
+ private int code;
+
+ private String message;
+
+ private T data;
+
+ public static ResponseResult success(T data) {
+ return ResponseResult.builder()
+ .code(CODE_SUCCESS)
+ .message(MESSAGE_SUCCESS)
+ .data(data)
+ .build();
+ }
+
+ public static ResponseResult success() {
+ return ResponseResult.builder()
+ .code(0)
+ .message(MESSAGE_SUCCESS)
+ .build();
+ }
+
+ public static ResponseResult error() {
+ return ResponseResult.builder()
+ .code(HttpStatus.INTERNAL_SERVER_ERROR.value())
+ .message(HttpStatus.INTERNAL_SERVER_ERROR.getReasonPhrase())
+ .build();
+ }
+
+ public static ResponseResult error(String message) {
+ return ResponseResult.builder()
+ .code(HttpStatus.INTERNAL_SERVER_ERROR.value())
+ .message(message)
+ .build();
+ }
+
+ public static ResponseResult error(int code, String message) {
+ return ResponseResult.builder()
+ .code(code)
+ .message(message)
+ .build();
+ }
+
+ public static ResponseResult error(IErrorInfo errorInfo) {
+ return ResponseResult.builder()
+ .code(errorInfo.getErrorCode())
+ .message(errorInfo.getErrorMsg())
+ .build();
+ }
+}
diff --git a/src/main/java/com/dji/sample/common/util/JwtUtil.java b/src/main/java/com/dji/sample/common/util/JwtUtil.java
new file mode 100644
index 0000000..9c2cb9a
--- /dev/null
+++ b/src/main/java/com/dji/sample/common/util/JwtUtil.java
@@ -0,0 +1,103 @@
+package com.dji.sample.common.util;
+
+import com.auth0.jwt.JWT;
+import com.auth0.jwt.JWTCreator;
+import com.auth0.jwt.JWTVerifier;
+import com.auth0.jwt.algorithms.Algorithm;
+import com.auth0.jwt.exceptions.TokenExpiredException;
+import com.auth0.jwt.interfaces.DecodedJWT;
+import com.dji.sample.common.model.CustomClaim;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import java.util.Date;
+import java.util.Map;
+import java.util.Optional;
+
+@Slf4j
+@Component
+public class JwtUtil {
+
+ private static String issuer;
+
+ private static String subject;
+
+ private static long age;
+
+ private static String secret;
+
+ private static Algorithm algorithm;
+
+ @Value("${jwt.issuer: DJI}")
+ private void setIssuer(String issuer) {
+ JwtUtil.issuer = issuer;
+ }
+
+ @Value("${jwt.subject: CloudApiSample}")
+ private void setSubject(String subject) {
+ JwtUtil.subject = subject;
+ }
+
+ @Value("${jwt.age: 86400}")
+ private void setAge(long age) {
+ JwtUtil.age = age * 1000;
+ }
+
+ @Value("${jwt.secret: CloudApiSample}")
+ private void setSecret(String secret) {
+ JwtUtil.secret = secret;
+ setAlgorithm();
+ }
+
+ private void setAlgorithm() {
+ JwtUtil.algorithm = Algorithm.HMAC256(secret);
+ }
+
+ /**
+ * Create a token based on custom information.
+ * @param claims custom information
+ * @return token
+ */
+ public static String createToken(Map claims) {
+ Date now = new Date();
+ JWTCreator.Builder builder = JWT.create();
+ // Add custom information to the token's payload segment.
+ claims.forEach(builder::withClaim);
+ String token = builder.withIssuer(issuer)
+ .withSubject(subject)
+ .withIssuedAt(now)
+ .withExpiresAt(new Date(now.getTime() + age))
+ .withNotBefore(now)
+ .sign(algorithm);
+ log.debug("token created. " + token);
+ return token;
+ }
+
+ /**
+ * Verify that the token is valid.
+ * @param token
+ * @return
+ * @throws TokenExpiredException
+ */
+ public static DecodedJWT verifyToken(String token) {
+ try {
+ JWTVerifier verifier = JWT.require(algorithm).build();
+ return verifier.verify(token);
+ } catch (Exception e) {
+ log.error(e.getMessage());
+ e.printStackTrace();
+ return null;
+ }
+ }
+
+ /**
+ * Parses the custom information in the token into a CustomClaim object.
+ * @param token
+ * @return custom claim
+ */
+ public static Optional parseToken(String token) {
+ DecodedJWT jwt = verifyToken(token);
+ return jwt == null ? Optional.empty() : Optional.of(new CustomClaim(jwt.getClaims()));
+ }
+}
diff --git a/src/main/java/com/dji/sample/component/ApplicationBootInitial.java b/src/main/java/com/dji/sample/component/ApplicationBootInitial.java
new file mode 100644
index 0000000..226fe1a
--- /dev/null
+++ b/src/main/java/com/dji/sample/component/ApplicationBootInitial.java
@@ -0,0 +1,40 @@
+package com.dji.sample.component;
+
+import com.dji.sample.manage.model.DeviceStatusManager;
+import com.dji.sample.manage.model.enums.DeviceDomainEnum;
+import com.dji.sample.manage.model.param.DeviceQueryParam;
+import com.dji.sample.manage.service.IDeviceService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.stereotype.Component;
+
+import java.time.LocalDateTime;
+
+/**
+ * @author sean.zhou
+ * @date 2021/11/24
+ * @version 0.1
+ */
+@Component
+public class ApplicationBootInitial implements CommandLineRunner {
+
+ @Autowired
+ private IDeviceService deviceService;
+
+ /**
+ * Subscribe to the devices that exist in the database when the program starts,
+ * to prevent the data from being different from the pilot side due to program interruptions.
+ * @param args
+ * @throws Exception
+ */
+ @Override
+ public void run(String... args) throws Exception {
+ deviceService.getDevicesByParams(DeviceQueryParam.builder().build())
+ .forEach(device -> {
+ deviceService.subscribeTopicOnline(device.getDeviceSn());
+ DeviceStatusManager.STATUS_MANAGER.put(
+ DeviceDomainEnum.getVal(device.getDomain()) + "/"
+ + device.getDeviceSn(), LocalDateTime.now());
+ });
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/dji/sample/component/AuthInterceptor.java b/src/main/java/com/dji/sample/component/AuthInterceptor.java
new file mode 100644
index 0000000..0d3b8f7
--- /dev/null
+++ b/src/main/java/com/dji/sample/component/AuthInterceptor.java
@@ -0,0 +1,60 @@
+package com.dji.sample.component;
+
+import com.dji.sample.common.error.CommonErrorEnum;
+import com.dji.sample.common.model.CustomClaim;
+import com.dji.sample.common.util.JwtUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.HttpStatus;
+import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
+import org.springframework.web.servlet.HandlerInterceptor;
+import org.springframework.web.servlet.ModelAndView;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.util.Optional;
+
+@Slf4j
+@Component
+public class AuthInterceptor implements HandlerInterceptor {
+
+ public static final String PARAM_TOKEN = "x-auth-token";
+
+ public static final String TOKEN_CLAIM = "customClaim";
+
+ @Override
+ public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
+ String uri = request.getRequestURI();
+ log.debug("request uri: {}", uri);
+ // The options method is passed directly.
+ if (HttpMethod.OPTIONS.matches(request.getMethod())) {
+ response.setStatus(HttpStatus.OK.value());
+ return false;
+ }
+ String token = request.getHeader(PARAM_TOKEN);
+ // Check if the token exists.
+ if (!StringUtils.hasText(token)) {
+ response.setStatus(HttpStatus.UNAUTHORIZED.value());
+ log.error(CommonErrorEnum.NO_TOKEN.getErrorMsg());
+ return false;
+ }
+
+ // Check if the current token is valid.
+ Optional customClaimOpt = JwtUtil.parseToken(token);
+ if (customClaimOpt.isEmpty()) {
+ response.setStatus(HttpStatus.UNAUTHORIZED.value());
+ return false;
+ }
+
+ // Put the custom data from the token into the request.
+ request.setAttribute(TOKEN_CLAIM, customClaimOpt.get());
+ return true;
+ }
+
+ @Override
+ public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
+ // Delete the custom data in the request after the request ends.
+ request.removeAttribute(TOKEN_CLAIM);
+ }
+}
diff --git a/src/main/java/com/dji/sample/component/CorsFilter.java b/src/main/java/com/dji/sample/component/CorsFilter.java
new file mode 100644
index 0000000..a4c245f
--- /dev/null
+++ b/src/main/java/com/dji/sample/component/CorsFilter.java
@@ -0,0 +1,35 @@
+package com.dji.sample.component;
+
+import org.springframework.stereotype.Component;
+
+import javax.servlet.*;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+
+import static com.dji.sample.component.AuthInterceptor.PARAM_TOKEN;
+
+/**
+ * @author sean.zhou
+ * @version 0.1
+ * @date 2021/11/22
+ */
+@Component
+public class CorsFilter implements Filter {
+
+ @Override
+ public void doFilter(ServletRequest request, ServletResponse response, FilterChain filterChain) throws IOException, ServletException {
+ HttpServletResponse res = (HttpServletResponse) response;
+ res.addHeader("Access-Control-Allow-Credentials", "true");
+ res.addHeader("Access-Control-Allow-Origin", "*");
+ res.addHeader("Access-Control-Allow-Methods", "GET, POST, DELETE, PUT");
+ res.addHeader("Access-Control-Allow-Headers", "Access-Control-Allow-Headers," +
+ "Authorization, Content-Length, X-CSRF-Token, Token,session,X_Requested_With,Accept, "+
+ "Origin, Host, Connection, Accept-Encoding, Accept-Language,DNT, X-CustomHeader, Keep-Alive," +
+ " User-Agent, X-Requested-With, If-Modified-Since, Cache-Control, Content-Type, Pragma," + PARAM_TOKEN);
+ if (((HttpServletRequest) request).getMethod().equals("OPTIONS")) {
+ return;
+ }
+ filterChain.doFilter(request, response);
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/dji/sample/component/GlobalExceptionHandler.java b/src/main/java/com/dji/sample/component/GlobalExceptionHandler.java
new file mode 100644
index 0000000..1d5eb14
--- /dev/null
+++ b/src/main/java/com/dji/sample/component/GlobalExceptionHandler.java
@@ -0,0 +1,33 @@
+package com.dji.sample.component;
+
+import com.dji.sample.common.model.ResponseResult;
+import org.springframework.web.bind.annotation.ControllerAdvice;
+import org.springframework.web.bind.annotation.ExceptionHandler;
+import org.springframework.web.bind.annotation.ResponseBody;
+
+/**
+ * @author sean
+ * @version 0.2
+ * @date 2021/12/1
+ */
+@ControllerAdvice
+@ResponseBody
+public class GlobalExceptionHandler {
+
+ /**
+ * Please do not return directly like this, there is a risk.
+ * @param e
+ * @return
+ */
+ @ExceptionHandler(Exception.class)
+ public ResponseResult exceptionHandler(Exception e) {
+ e.printStackTrace();
+ return ResponseResult.error(e.getLocalizedMessage());
+ }
+
+ @ExceptionHandler(NullPointerException.class)
+ public ResponseResult nullPointerExceptionHandler(NullPointerException e) {
+ e.printStackTrace();
+ return ResponseResult.error("A null object appeared.");
+ }
+}
diff --git a/src/main/java/com/dji/sample/component/GlobalScheduleService.java b/src/main/java/com/dji/sample/component/GlobalScheduleService.java
new file mode 100644
index 0000000..ea44a62
--- /dev/null
+++ b/src/main/java/com/dji/sample/component/GlobalScheduleService.java
@@ -0,0 +1,58 @@
+package com.dji.sample.component;
+
+import com.dji.sample.manage.model.enums.DeviceDomainEnum;
+import com.dji.sample.manage.service.IDeviceService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import java.time.LocalDateTime;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static com.dji.sample.manage.model.DeviceStatusManager.DEFAULT_ALIVE_SECOND;
+import static com.dji.sample.manage.model.DeviceStatusManager.STATUS_MANAGER;
+
+/**
+ * @author sean.zhou
+ * @date 2021/11/24
+ * @version 0.1
+ */
+@Component
+@Slf4j
+public class GlobalScheduleService {
+
+ @Autowired
+ private IDeviceService deviceService;
+
+ /**
+ * Check the status of the devices every 30 seconds. It is recommended to use cache.
+ */
+ @Scheduled(fixedRate = 30, timeUnit = TimeUnit.SECONDS)
+ private void deviceStatusListen() {
+ for (Map.Entry entry : STATUS_MANAGER.entrySet()) {
+ if (entry.getValue().isAfter(
+ LocalDateTime.now().minusSeconds(DEFAULT_ALIVE_SECOND))) {
+ continue;
+ }
+
+ String device = entry.getKey();
+ int index = device.indexOf("/");
+
+ STATUS_MANAGER.remove(device);
+
+ int type = Integer.parseInt(device.substring(0, index));
+ String sn = device.substring(index + 1);
+ // Determine whether it is a gateway device.
+ if (DeviceDomainEnum.GATEWAY.getVal() == type) {
+ deviceService.deviceOffline(sn);
+ deviceService.unsubscribeTopicOffline(sn);
+ continue;
+ }
+
+ deviceService.subDeviceOffline(sn);
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/dji/sample/component/mqtt/config/InboundMessageRouter.java b/src/main/java/com/dji/sample/component/mqtt/config/InboundMessageRouter.java
new file mode 100644
index 0000000..1b2bdf2
--- /dev/null
+++ b/src/main/java/com/dji/sample/component/mqtt/config/InboundMessageRouter.java
@@ -0,0 +1,95 @@
+package com.dji.sample.component.mqtt.config;
+
+import com.dji.sample.component.mqtt.model.ChannelName;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.integration.annotation.Router;
+import org.springframework.integration.mqtt.support.MqttHeaders;
+import org.springframework.integration.router.AbstractMessageRouter;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.regex.Pattern;
+
+import static com.dji.sample.component.mqtt.model.TopicConst.*;
+
+/**
+ *
+ * @author sean.zhou
+ * @date 2021/11/10
+ * @version 0.1
+ */
+@Component
+@Slf4j
+public class InboundMessageRouter extends AbstractMessageRouter {
+
+ @Resource(name = ChannelName.INBOUND)
+ private MessageChannel inboundChannel;
+
+ @Resource(name = ChannelName.INBOUND_STATUS)
+ private MessageChannel statusChannel;
+
+ @Resource(name = ChannelName.INBOUND_STATE)
+ private MessageChannel stateChannel;
+
+ @Resource(name = ChannelName.DEFAULT)
+ private MessageChannel defaultChannel;
+
+ @Resource(name = ChannelName.INBOUND_SERVICE_REPLY)
+ private MessageChannel serviceReplyChannel;
+
+ @Resource(name = ChannelName.INBOUND_OSD)
+ private MessageChannel osdChannel;
+
+ private static final Pattern PATTERN_TOPIC_STATUS =
+ Pattern.compile("^" + BASIC_PRE + PRODUCT + REGEX_SN + STATUS_SUF + "$");
+
+ private static final Pattern PATTERN_TOPIC_STATE =
+ Pattern.compile("^" + THING_MODEL_PRE + PRODUCT + REGEX_SN + STATE_SUF + "$");
+
+ private static final Pattern PATTERN_TOPIC_SERVICE_REPLY =
+ Pattern.compile("^" + THING_MODEL_PRE + PRODUCT + REGEX_SN + SERVICES_SUF + _REPLY_SUF + "$");
+
+ private static final Pattern PATTERN_TOPIC_OSD =
+ Pattern.compile("^" + THING_MODEL_PRE + PRODUCT + REGEX_SN + OSD_SUF);
+
+ /**
+ * All mqtt broker messages will arrive here before distributing them to different channels.
+ * @param message message from mqtt broker
+ * @return channel
+ */
+ @Override
+ @Router(inputChannel = ChannelName.INBOUND)
+ protected Collection determineTargetChannels(Message> message) {
+ MessageHeaders headers = message.getHeaders();
+ String topic = headers.get(MqttHeaders.RECEIVED_TOPIC).toString();
+ byte[] payload = (byte[])message.getPayload();
+
+ // osd
+ if (PATTERN_TOPIC_OSD.matcher(topic).matches()) {
+ return Collections.singleton(osdChannel);
+ }
+
+ log.debug("received topic :{} \t payload :{}", topic, new String(payload));
+
+ // status
+ if (PATTERN_TOPIC_STATUS.matcher(topic).matches()) {
+ return Collections.singleton(statusChannel);
+ }
+
+ // state
+ if (PATTERN_TOPIC_STATE.matcher(topic).matches()) {
+ return Collections.singleton(stateChannel);
+ }
+
+ // services_reply
+ if (PATTERN_TOPIC_SERVICE_REPLY.matcher(topic).matches()) {
+ return Collections.singleton(serviceReplyChannel);
+ }
+ return Collections.singleton(defaultChannel);
+ }
+}
diff --git a/src/main/java/com/dji/sample/component/mqtt/config/MqttConfiguration.java b/src/main/java/com/dji/sample/component/mqtt/config/MqttConfiguration.java
new file mode 100644
index 0000000..750ba58
--- /dev/null
+++ b/src/main/java/com/dji/sample/component/mqtt/config/MqttConfiguration.java
@@ -0,0 +1,63 @@
+package com.dji.sample.component.mqtt.config;
+
+import lombok.Data;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
+import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
+
+/**
+ *
+ * @author sean.zhou
+ * @date 2021/11/10
+ * @version 0.1
+ */
+@Configuration
+@Data
+@ConfigurationProperties(prefix = "mqtt")
+public class MqttConfiguration {
+
+ private String protocol;
+
+ private String host;
+
+ private Integer port;
+
+ private String username;
+
+ private String password;
+
+ private String clientId;
+
+ /**
+ * The topic to subscribe to immediately when client connects.
+ */
+ private String inboundTopic;
+
+ @Bean
+ public MqttConnectOptions mqttConnectOptions() {
+ MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
+ mqttConnectOptions.setServerURIs(new String[]{
+ new StringBuilder()
+ .append(protocol.trim())
+ .append("://")
+ .append(host.trim())
+ .append(":")
+ .append(port)
+ .toString()});
+ mqttConnectOptions.setUserName(username);
+ mqttConnectOptions.setPassword(password.toCharArray());
+ mqttConnectOptions.setAutomaticReconnect(true);
+ mqttConnectOptions.setKeepAliveInterval(10);
+ return mqttConnectOptions;
+ }
+
+ @Bean
+ public MqttPahoClientFactory mqttClientFactory() {
+ DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
+ factory.setConnectionOptions(mqttConnectOptions());
+ return factory;
+ }
+}
diff --git a/src/main/java/com/dji/sample/component/mqtt/config/MqttInboundConfiguration.java b/src/main/java/com/dji/sample/component/mqtt/config/MqttInboundConfiguration.java
new file mode 100644
index 0000000..df6f04f
--- /dev/null
+++ b/src/main/java/com/dji/sample/component/mqtt/config/MqttInboundConfiguration.java
@@ -0,0 +1,69 @@
+package com.dji.sample.component.mqtt.config;
+
+import com.dji.sample.component.mqtt.model.ChannelName;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.annotation.IntegrationComponentScan;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.endpoint.MessageProducerSupport;
+import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
+import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
+import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHandler;
+
+import javax.annotation.Resource;
+
+/**
+ * Client configuration for inbound messages.
+ * @author sean.zhou
+ * @date 2021/11/10
+ * @version 0.1
+ */
+@Slf4j
+@Configuration
+@IntegrationComponentScan
+public class MqttInboundConfiguration {
+
+ @Autowired
+ private MqttConfiguration mqttConfiguration;
+
+ @Autowired
+ private MqttPahoClientFactory mqttClientFactory;
+
+ @Resource(name = ChannelName.INBOUND)
+ private MessageChannel inboundChannel;
+
+ /**
+ * Clients of inbound message channels.
+ * @return
+ */
+ @Bean(name = "adapter")
+ public MessageProducerSupport mqttInbound() {
+ MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
+ mqttConfiguration.getClientId() + "_consumer_" + System.currentTimeMillis(),
+ mqttClientFactory, mqttConfiguration.getInboundTopic().split(","));
+ DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
+ // use byte types uniformly
+ converter.setPayloadAsBytes(true);
+ adapter.setConverter(converter);
+ adapter.setQos(1);
+ adapter.setOutputChannel(inboundChannel);
+ return adapter;
+ }
+
+ /**
+ * Define a default channel to handle messages that have no effect.
+ * @return
+ */
+ @Bean
+ @ServiceActivator(inputChannel = ChannelName.DEFAULT)
+ public MessageHandler defaultInboundHandler() {
+ return message -> {
+ log.info("The default channel does not handle messages.");
+ };
+ }
+
+}
diff --git a/src/main/java/com/dji/sample/component/mqtt/config/MqttMessageChannel.java b/src/main/java/com/dji/sample/component/mqtt/config/MqttMessageChannel.java
new file mode 100644
index 0000000..2f06938
--- /dev/null
+++ b/src/main/java/com/dji/sample/component/mqtt/config/MqttMessageChannel.java
@@ -0,0 +1,90 @@
+package com.dji.sample.component.mqtt.config;
+
+import com.dji.sample.component.mqtt.model.ChannelName;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.channel.ExecutorChannel;
+import org.springframework.messaging.MessageChannel;
+
+import java.util.concurrent.Executor;
+
+/**
+ * Definition classes for all channels
+ * @author sean.zhou
+ * @date 2021/11/10
+ * @version 0.1
+ */
+@Configuration
+public class MqttMessageChannel {
+
+ @Autowired
+ private Executor threadPool;
+
+ @Bean(name = ChannelName.INBOUND)
+ public MessageChannel inboundChannel() {
+ return new ExecutorChannel(threadPool);
+ }
+
+ @Bean(name = ChannelName.INBOUND_STATUS)
+ public MessageChannel statusChannel() {
+ return new DirectChannel();
+ }
+
+ @Bean(name = ChannelName.INBOUND_STATUS_ONLINE)
+ public MessageChannel statusOnlineChannel() {
+ return new DirectChannel();
+ }
+
+ @Bean(name = ChannelName.INBOUND_STATUS_OFFLINE)
+ public MessageChannel statusOffChannel() {
+ return new DirectChannel();
+ }
+
+ @Bean(name = ChannelName.INBOUND_STATE)
+ public MessageChannel stateChannel() {
+ return new DirectChannel();
+ }
+
+ @Bean(name = ChannelName.INBOUND_STATE_BASIC)
+ public MessageChannel stateBasicChannel() {
+ return new DirectChannel();
+ }
+
+ @Bean(name = ChannelName.INBOUND_STATE_PAYLOAD)
+ public MessageChannel statePayloadChannel() {
+ return new DirectChannel();
+ }
+
+ @Bean(name = ChannelName.INBOUND_SERVICE_REPLY)
+ public MessageChannel serviceReplyChannel() {
+ return new DirectChannel();
+ }
+
+ @Bean(name = ChannelName.INBOUND_STATE_CAPACITY)
+ public MessageChannel stateCapacityChannel() {
+ return new DirectChannel();
+ }
+
+ @Bean(name = ChannelName.INBOUND_STATE_PAYLOAD_UPDATE)
+ public MessageChannel statePayloadUpdateChannel() {
+ return new DirectChannel();
+ }
+
+ @Bean(name = ChannelName.INBOUND_OSD)
+ public MessageChannel osdChannel() {
+ return new DirectChannel();
+ }
+
+ @Bean(name = ChannelName.DEFAULT)
+ public MessageChannel defaultChannel() {
+ return new DirectChannel();
+ }
+
+ @Bean(name = ChannelName.OUTBOUND)
+ public MessageChannel outboundChannel() {
+ return new DirectChannel();
+ }
+
+}
diff --git a/src/main/java/com/dji/sample/component/mqtt/config/MqttOutboundConfiguration.java b/src/main/java/com/dji/sample/component/mqtt/config/MqttOutboundConfiguration.java
new file mode 100644
index 0000000..69b919b
--- /dev/null
+++ b/src/main/java/com/dji/sample/component/mqtt/config/MqttOutboundConfiguration.java
@@ -0,0 +1,47 @@
+package com.dji.sample.component.mqtt.config;
+
+import com.dji.sample.component.mqtt.model.ChannelName;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
+import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
+import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
+import org.springframework.messaging.MessageHandler;
+
+/**
+ * Client configuration for outbound messages.
+ * @author sean.zhou
+ * @date 2021/11/10
+ * @version 0.1
+ */
+@Configuration
+public class MqttOutboundConfiguration {
+
+ @Autowired
+ private MqttConfiguration mqttConfiguration;
+
+ @Autowired
+ private MqttPahoClientFactory mqttClientFactory;
+
+ /**
+ * Clients of outbound message channels.
+ * @return
+ */
+ @Bean
+ @ServiceActivator(inputChannel = ChannelName.OUTBOUND)
+ public MessageHandler mqttOutbound() {
+ MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
+ mqttConfiguration.getClientId() + "_producer_" + System.currentTimeMillis(),
+ mqttClientFactory);
+ DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
+ // use byte types uniformly
+ converter.setPayloadAsBytes(true);
+
+ messageHandler.setAsync(true);
+ messageHandler.setDefaultQos(0);
+ messageHandler.setConverter(converter);
+ return messageHandler;
+ }
+}
diff --git a/src/main/java/com/dji/sample/component/mqtt/model/ChannelName.java b/src/main/java/com/dji/sample/component/mqtt/model/ChannelName.java
new file mode 100644
index 0000000..c8f5f01
--- /dev/null
+++ b/src/main/java/com/dji/sample/component/mqtt/model/ChannelName.java
@@ -0,0 +1,46 @@
+package com.dji.sample.component.mqtt.model;
+
+/**
+ * The name of all channels.
+ *
+ * @author sean.zhou
+ * @date 2021/11/10
+ * @version 0.1
+ */
+public class ChannelName {
+
+ public static final String INBOUND = "inbound";
+
+ public static final String INBOUND_STATUS = "inboundStatus";
+
+ public static final String INBOUND_STATUS_ROUTER = "inboundStatusRouter";
+
+ public static final String INBOUND_STATUS_ONLINE = "inboundStatusOnline";
+
+ public static final String INBOUND_STATUS_OFFLINE = "inboundStatusOffline";
+
+ public static final String INBOUND_STATE = "inboundState";
+
+ public static final String INBOUND_STATE_SPLITTER = "inboundStateSplitter";
+
+ public static final String INBOUND_STATE_ROUTER = "inboundStateRouter";
+
+ public static final String INBOUND_STATE_BASIC = "inboundStateBasic";
+
+ public static final String INBOUND_STATE_PAYLOAD = "inboundStatePayload";
+
+ public static final String INBOUND_STATE_PAYLOAD_UPDATE = "inboundStatePayloadUpdate";
+
+ public static final String INBOUND_STATE_CAPACITY = "inboundStateCapacity";
+
+ public static final String INBOUND_STATE_LIST = "inboundStateList";
+
+ public static final String INBOUND_SERVICE_REPLY = "inboundStateServiceReply";
+
+ public static final String INBOUND_OSD = "inboundOsd";
+
+ public static final String DEFAULT = "default";
+
+ public static final String OUTBOUND = "outbound";
+
+}
diff --git a/src/main/java/com/dji/sample/component/mqtt/model/CommonTopicReceiver.java b/src/main/java/com/dji/sample/component/mqtt/model/CommonTopicReceiver.java
new file mode 100644
index 0000000..5a6fe45
--- /dev/null
+++ b/src/main/java/com/dji/sample/component/mqtt/model/CommonTopicReceiver.java
@@ -0,0 +1,29 @@
+package com.dji.sample.component.mqtt.model;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.Data;
+
+/**
+ * Unified topic receiving format.
+ * @author sean.zhou
+ * @date 2021/11/10
+ * @version 0.1
+ */
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CommonTopicReceiver {
+
+ /**
+ * The command is sent and the response is matched by the tid and bid fields in the message,
+ * and the reply should keep the tid and bid the same.
+ */
+ private String tid;
+
+ private String bid;
+
+ private String method;
+
+ private Long timestamp;
+
+ private T data;
+}
\ No newline at end of file
diff --git a/src/main/java/com/dji/sample/component/mqtt/model/CommonTopicResponse.java b/src/main/java/com/dji/sample/component/mqtt/model/CommonTopicResponse.java
new file mode 100644
index 0000000..effd5c1
--- /dev/null
+++ b/src/main/java/com/dji/sample/component/mqtt/model/CommonTopicResponse.java
@@ -0,0 +1,33 @@
+package com.dji.sample.component.mqtt.model;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Unified Topic response format
+ * @author sean.zhou
+ * @date 2021/11/15
+ * @version 0.1
+ */
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+@Builder
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CommonTopicResponse {
+
+ /**
+ * The command is sent and the response is matched by the tid and bid fields in the message,
+ * and the reply should keep the tid and bid the same.
+ */
+ private String tid;
+
+ private String bid;
+
+ private String method;
+
+ private T data;
+}
\ No newline at end of file
diff --git a/src/main/java/com/dji/sample/component/mqtt/model/TopicConst.java b/src/main/java/com/dji/sample/component/mqtt/model/TopicConst.java
new file mode 100644
index 0000000..667c969
--- /dev/null
+++ b/src/main/java/com/dji/sample/component/mqtt/model/TopicConst.java
@@ -0,0 +1,29 @@
+package com.dji.sample.component.mqtt.model;
+
+/**
+ * All the topics that need to be used in the project.
+ * @author sean.zhou
+ * @date 2021/11/10
+ * @version 0.1
+ */
+public class TopicConst {
+
+ public static final String BASIC_PRE = "sys/";
+
+ public static final String THING_MODEL_PRE = "thing/";
+
+ public static final String PRODUCT = "product/";
+
+ public static final String STATUS_SUF = "/status";
+
+ public static final String _REPLY_SUF = "_reply";
+
+ public static final String STATE_SUF = "/state";
+
+ public static final String SERVICES_SUF = "/services";
+
+ public static final String OSD_SUF = "/osd";
+
+ public static final String REGEX_SN = "[A-Za-z0-9]+";
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/dji/sample/component/mqtt/model/TopicStateReceiver.java b/src/main/java/com/dji/sample/component/mqtt/model/TopicStateReceiver.java
new file mode 100644
index 0000000..522a4f8
--- /dev/null
+++ b/src/main/java/com/dji/sample/component/mqtt/model/TopicStateReceiver.java
@@ -0,0 +1,29 @@
+package com.dji.sample.component.mqtt.model;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.Data;
+
+/**
+ * The data format of the state topic.
+ * @author sean.zhou
+ * @date 2021/11/17
+ * @version 0.1
+ */
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class TopicStateReceiver {
+
+ private String tid;
+
+ private String bid;
+
+ private Long timestamp;
+
+ /**
+ * The sn of the gateway device.
+ */
+ private String gateway;
+
+ private T data;
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/dji/sample/component/mqtt/service/IMessageSenderService.java b/src/main/java/com/dji/sample/component/mqtt/service/IMessageSenderService.java
new file mode 100644
index 0000000..7930d41
--- /dev/null
+++ b/src/main/java/com/dji/sample/component/mqtt/service/IMessageSenderService.java
@@ -0,0 +1,27 @@
+package com.dji.sample.component.mqtt.service;
+
+import com.dji.sample.component.mqtt.model.CommonTopicResponse;
+
+/**
+ * @author sean.zhou
+ * @version 0.1
+ * @date 2021/11/25
+ */
+public interface IMessageSenderService {
+
+ /**
+ * Publish a message to a specific topic.
+ * @param topic target
+ * @param response message
+ */
+ void publish(String topic, CommonTopicResponse response);
+
+ /**
+ * Use a specific qos to push messages to a specific topic.
+ * @param topic target
+ * @param qos qos
+ * @param response message
+ */
+ void publish(String topic, int qos, CommonTopicResponse response);
+
+}
diff --git a/src/main/java/com/dji/sample/component/mqtt/service/IMqttMessageGateway.java b/src/main/java/com/dji/sample/component/mqtt/service/IMqttMessageGateway.java
new file mode 100644
index 0000000..cb50b12
--- /dev/null
+++ b/src/main/java/com/dji/sample/component/mqtt/service/IMqttMessageGateway.java
@@ -0,0 +1,33 @@
+package com.dji.sample.component.mqtt.service;
+
+import com.dji.sample.component.mqtt.model.ChannelName;
+import org.springframework.integration.annotation.MessagingGateway;
+import org.springframework.integration.mqtt.support.MqttHeaders;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.stereotype.Component;
+
+/**
+ *
+ * @author sean.zhou
+ * @date 2021/11/10
+ * @version 0.1
+ */
+@Component
+@MessagingGateway(defaultRequestChannel = ChannelName.OUTBOUND)
+public interface IMqttMessageGateway {
+
+ /**
+ * Publish a message to a specific topic.
+ * @param topic target
+ * @param payload message
+ */
+ void publish(@Header(MqttHeaders.TOPIC) String topic, byte[] payload);
+
+ /**
+ * Use a specific qos to push messages to a specific topic.
+ * @param topic target
+ * @param payload message
+ * @param qos qos
+ */
+ void publish(@Header(MqttHeaders.TOPIC) String topic, byte[] payload, @Header(MqttHeaders.QOS) int qos);
+}
diff --git a/src/main/java/com/dji/sample/component/mqtt/service/IMqttTopicService.java b/src/main/java/com/dji/sample/component/mqtt/service/IMqttTopicService.java
new file mode 100644
index 0000000..6983109
--- /dev/null
+++ b/src/main/java/com/dji/sample/component/mqtt/service/IMqttTopicService.java
@@ -0,0 +1,38 @@
+package com.dji.sample.component.mqtt.service;
+
+import org.springframework.integration.mqtt.support.MqttHeaders;
+import org.springframework.messaging.handler.annotation.Header;
+
+/**
+ *
+ * @author sean.zhou
+ * @date 2021/11/10
+ * @version 0.1
+ */
+public interface IMqttTopicService {
+
+ /**
+ * Subscribe to a specific topic.
+ * @param topic target
+ */
+ void subscribe(@Header(MqttHeaders.TOPIC) String topic);
+
+ /**
+ * Subscribe to a specific topic using a specific qos.
+ * @param topic target
+ * @param qos qos
+ */
+ void subscribe(@Header(MqttHeaders.TOPIC) String topic, int qos);
+
+ /**
+ * Unsubscribe from a specific topic.
+ * @param topic target
+ */
+ void unsubscribe(@Header(MqttHeaders.TOPIC) String topic);
+
+ /**
+ * Get all the subscribed topics.
+ * @return topics
+ */
+ String[] getSubscribedTopic();
+}
diff --git a/src/main/java/com/dji/sample/component/mqtt/service/impl/MessageSenderServiceImpl.java b/src/main/java/com/dji/sample/component/mqtt/service/impl/MessageSenderServiceImpl.java
new file mode 100644
index 0000000..2510ead
--- /dev/null
+++ b/src/main/java/com/dji/sample/component/mqtt/service/impl/MessageSenderServiceImpl.java
@@ -0,0 +1,50 @@
+package com.dji.sample.component.mqtt.service.impl;
+
+import com.dji.sample.component.mqtt.model.CommonTopicResponse;
+import com.dji.sample.component.mqtt.service.IMessageSenderService;
+import com.dji.sample.component.mqtt.service.IMqttMessageGateway;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * @author sean.zhou
+ * @date 2021/11/16
+ * @version 0.1
+ */
+@Service
+@Slf4j
+public class MessageSenderServiceImpl implements IMessageSenderService {
+
+ @Autowired
+ private IMqttMessageGateway messageGateway;
+
+ public void publish(String topic, CommonTopicResponse response) {
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ // Only parameters whose value is not null will be serialised.
+ mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
+
+ messageGateway.publish(topic, mapper.writeValueAsBytes(response));
+ } catch (JsonProcessingException e) {
+ log.info("Failed to publish the message. {}", response.toString());
+ e.printStackTrace();
+ }
+ }
+
+ public void publish(String topic, int qos, CommonTopicResponse response) {
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ // Only parameters whose value is not null will be serialised.
+ mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
+
+ messageGateway.publish(topic, mapper.writeValueAsBytes(response), qos);
+ } catch (JsonProcessingException e) {
+ log.info("Failed to publish the message. {}", response.toString());
+ e.printStackTrace();
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/dji/sample/component/mqtt/service/impl/MqttTopicServiceImpl.java b/src/main/java/com/dji/sample/component/mqtt/service/impl/MqttTopicServiceImpl.java
new file mode 100644
index 0000000..6880ecf
--- /dev/null
+++ b/src/main/java/com/dji/sample/component/mqtt/service/impl/MqttTopicServiceImpl.java
@@ -0,0 +1,44 @@
+package com.dji.sample.component.mqtt.service.impl;
+
+import com.dji.sample.component.mqtt.service.IMqttTopicService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+
+/**
+ *
+ * @author sean.zhou
+ * @date 2021/11/10
+ * @version 0.1
+ */
+@Component
+@Slf4j
+public class MqttTopicServiceImpl implements IMqttTopicService {
+
+ @Resource
+ private MqttPahoMessageDrivenChannelAdapter adapter;
+
+ @Override
+ public void subscribe(String topic) {
+ log.debug("subscribe topic: {}", topic);
+ adapter.addTopic(topic);
+ }
+
+ @Override
+ public void subscribe(String topic, int qos) {
+ log.debug("subscribe topic: {}", topic);
+ adapter.addTopic(topic, qos);
+ }
+
+ @Override
+ public void unsubscribe(String topic) {
+ log.debug("unsubscribe topic: {}", topic);
+ adapter.removeTopic(topic);
+ }
+
+ public String[] getSubscribedTopic() {
+ return adapter.getTopic();
+ }
+}
diff --git a/src/main/java/com/dji/sample/component/mybatis/MybatisPlusConfiguration.java b/src/main/java/com/dji/sample/component/mybatis/MybatisPlusConfiguration.java
new file mode 100644
index 0000000..2992641
--- /dev/null
+++ b/src/main/java/com/dji/sample/component/mybatis/MybatisPlusConfiguration.java
@@ -0,0 +1,24 @@
+package com.dji.sample.component.mybatis;
+
+import com.baomidou.mybatisplus.annotation.DbType;
+import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;
+import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * @author sean
+ * @version 0.3
+ * @date 2021/12/22
+ */
+@Configuration
+public class MybatisPlusConfiguration {
+
+ @Bean
+ public MybatisPlusInterceptor mybatisPlusInterceptor() {
+ MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
+ // select database
+ interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));
+ return interceptor;
+ }
+}
diff --git a/src/main/java/com/dji/sample/component/mybatis/MybatisPlusMetaObjectHandler.java b/src/main/java/com/dji/sample/component/mybatis/MybatisPlusMetaObjectHandler.java
new file mode 100644
index 0000000..88170ed
--- /dev/null
+++ b/src/main/java/com/dji/sample/component/mybatis/MybatisPlusMetaObjectHandler.java
@@ -0,0 +1,37 @@
+package com.dji.sample.component.mybatis;
+
+import com.baomidou.mybatisplus.core.handlers.MetaObjectHandler;
+import org.apache.ibatis.reflection.MetaObject;
+import org.springframework.stereotype.Component;
+
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+
+/**
+ * Automatic filling for set values
+ */
+@Component
+public class MybatisPlusMetaObjectHandler implements MetaObjectHandler {
+
+ /**
+ * Automatic filling when inserting into the database.
+ * @param metaObject
+ */
+ @Override
+ public void insertFill(MetaObject metaObject) {
+ this.strictInsertFill(metaObject, "createTime", Long.class,
+ LocalDateTime.now().toInstant(ZoneOffset.ofHours(8)).toEpochMilli());
+ this.strictInsertFill(metaObject, "updateTime", Long.class,
+ LocalDateTime.now().toInstant(ZoneOffset.ofHours(8)).toEpochMilli());
+ }
+
+ /**
+ * Automatic filling when updating the data.
+ * @param metaObject
+ */
+ @Override
+ public void updateFill(MetaObject metaObject) {
+ this.strictUpdateFill(metaObject, "updateTime", Long.class,
+ LocalDateTime.now().toInstant(ZoneOffset.ofHours(8)).toEpochMilli());
+ }
+}
diff --git a/src/main/java/com/dji/sample/component/oss/model/AliyunOSSConfiguration.java b/src/main/java/com/dji/sample/component/oss/model/AliyunOSSConfiguration.java
new file mode 100644
index 0000000..32624ea
--- /dev/null
+++ b/src/main/java/com/dji/sample/component/oss/model/AliyunOSSConfiguration.java
@@ -0,0 +1,104 @@
+package com.dji.sample.component.oss.model;
+
+import com.aliyun.oss.OSS;
+import com.aliyun.oss.OSSClientBuilder;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Lazy;
+
+/**
+ * @author sean
+ * @version 0.2
+ * @date 2021/12/9
+ */
+@Configuration
+public class AliyunOSSConfiguration {
+
+ /**
+ * default
+ */
+ public static final String PROVIDER = "ali";
+
+ /**
+ * Whether to use the current storage service.
+ */
+ public static boolean enable;
+
+ /**
+ * The protocol needs to be included at the beginning of the address.
+ */
+ public static String endpoint;
+
+ public static String accessKey;
+
+ public static String secretKey;
+
+ public static String region;
+
+ public static Long expire;
+
+ public static String roleSessionName;
+
+ public static String roleArn;
+
+ public static String bucket;
+
+ public static String objectDirPrefix;
+
+ @Value("${aliyun.oss.endpoint}")
+ private void setEndpoint(String endpoint) {
+ AliyunOSSConfiguration.endpoint = endpoint;
+ }
+
+ @Value("${aliyun.oss.access-key}")
+ private void setAccessKey(String accessKey) {
+ AliyunOSSConfiguration.accessKey = accessKey;
+ }
+
+ @Value("${aliyun.oss.secret-key}")
+ private void setSecretKey(String secretKey) {
+ AliyunOSSConfiguration.secretKey = secretKey;
+ }
+
+ @Value("${aliyun.oss.region}")
+ private void setRegion(String region) {
+ AliyunOSSConfiguration.region = region;
+ }
+
+ @Value("${aliyun.oss.expire: 3600}")
+ private void setExpire(Long expire) {
+ AliyunOSSConfiguration.expire = expire;
+ }
+
+ @Value("${aliyun.oss.enable: false}")
+ private void setEnable(boolean enable) {
+ AliyunOSSConfiguration.enable = enable;
+ }
+
+ @Value("${aliyun.oss.role-session-name}")
+ private void setRoleSessionName(String roleSessionName) {
+ AliyunOSSConfiguration.roleSessionName = roleSessionName;
+ }
+
+ @Value("${aliyun.oss.role-arn}")
+ private void setRoleArn(String roleArn) {
+ AliyunOSSConfiguration.roleArn = roleArn;
+ }
+
+ @Value("${aliyun.oss.bucket}")
+ private void setBucket(String bucket) {
+ AliyunOSSConfiguration.bucket = bucket;
+ }
+
+ @Value("${aliyun.oss.object-dir-prefix: wayline}")
+ private void setObjectDir(String objectDirPrefix) {
+ AliyunOSSConfiguration.objectDirPrefix = objectDirPrefix;
+ }
+
+ @Bean
+ @Lazy
+ public OSS ossClient() {
+ return new OSSClientBuilder().build(endpoint, accessKey, secretKey);
+ }
+}
diff --git a/src/main/java/com/dji/sample/component/oss/model/MinIOConfiguration.java b/src/main/java/com/dji/sample/component/oss/model/MinIOConfiguration.java
new file mode 100644
index 0000000..5768023
--- /dev/null
+++ b/src/main/java/com/dji/sample/component/oss/model/MinIOConfiguration.java
@@ -0,0 +1,93 @@
+package com.dji.sample.component.oss.model;
+
+import io.minio.MinioClient;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Lazy;
+import org.springframework.util.StringUtils;
+
+/**
+ * @author sean
+ * @version 0.2
+ * @date 2021/12/7
+ */
+@Configuration
+public class MinIOConfiguration {
+
+ /**
+ * default
+ */
+ public static final String PROVIDER = "aws";
+
+ /**
+ * Whether to use the current storage service.
+ */
+ public static boolean enable;
+
+ public static String endpoint;
+
+ public static String accessKey;
+
+ public static String secretKey;
+
+ public static String region;
+
+ public static String bucket;
+
+ public static Integer expire;
+
+ public static String objectDirPrefix;
+
+ @Value("${minio.endpoint: http://localhost:9000/}")
+ private void setEndpoint(String endpoint) {
+ MinIOConfiguration.endpoint = endpoint;
+ }
+
+ @Value("${minio.access-key: minioadmin}")
+ private void setAccessKey(String accessKey) {
+ MinIOConfiguration.accessKey = accessKey;
+ }
+
+ @Value("${minio.secret-key: minioadmin}")
+ private void setSecretKey(String secretKey) {
+ MinIOConfiguration.secretKey = secretKey;
+ }
+
+ @Value("${minio.region: }")
+ private void setRegion(String region) {
+ MinIOConfiguration.region = region;
+ }
+
+ @Value("${minio.bucket: test}")
+ private void setBucket(String bucket) {
+ MinIOConfiguration.bucket = bucket;
+ }
+
+ @Value("${minio.expire: 3600}")
+ private void setExpire(Integer expire) {
+ MinIOConfiguration.expire = expire;
+ }
+
+ @Value("${minio.enable: false}")
+ private void setEnable(boolean enable) {
+ MinIOConfiguration.enable = enable;
+ }
+
+ @Value("${minio.object-dir-prefix: wayline}")
+ private void setObjectDir(String objectDirPrefix) {
+ MinIOConfiguration.objectDirPrefix = objectDirPrefix;
+ }
+
+ @Bean
+ @Lazy
+ public MinioClient minioClient() {
+ MinioClient.Builder builder = MinioClient.builder()
+ .endpoint(endpoint)
+ .credentials(accessKey, secretKey);
+ if (StringUtils.hasText(region)) {
+ builder.region(MinIOConfiguration.region);
+ }
+ return builder.build();
+ }
+}
diff --git a/src/main/java/com/dji/sample/component/oss/service/IOssService.java b/src/main/java/com/dji/sample/component/oss/service/IOssService.java
new file mode 100644
index 0000000..beb9db6
--- /dev/null
+++ b/src/main/java/com/dji/sample/component/oss/service/IOssService.java
@@ -0,0 +1,27 @@
+package com.dji.sample.component.oss.service;
+
+import com.dji.sample.media.model.CredentialsDTO;
+
+import java.net.URL;
+
+/**
+ * @author sean
+ * @version 0.3
+ * @date 2021/12/23
+ */
+public interface IOssService {
+
+ /**
+ * Get temporary credentials.
+ * @return
+ */
+ CredentialsDTO getCredentials();
+
+ /**
+ * Get the address of the object based on the bucket name and the object name.
+ * @param bucket bucket name
+ * @param objectKey object name
+ * @return download link
+ */
+ URL getObjectUrl(String bucket, String objectKey);
+}
diff --git a/src/main/java/com/dji/sample/component/oss/service/impl/AliyunOssServiceImpl.java b/src/main/java/com/dji/sample/component/oss/service/impl/AliyunOssServiceImpl.java
new file mode 100644
index 0000000..f9f92d3
--- /dev/null
+++ b/src/main/java/com/dji/sample/component/oss/service/impl/AliyunOssServiceImpl.java
@@ -0,0 +1,68 @@
+package com.dji.sample.component.oss.service.impl;
+
+import com.aliyun.oss.OSS;
+import com.aliyuncs.DefaultAcsClient;
+import com.aliyuncs.IAcsClient;
+import com.aliyuncs.exceptions.ClientException;
+import com.aliyuncs.profile.DefaultProfile;
+import com.aliyuncs.sts.model.v20150401.AssumeRoleRequest;
+import com.aliyuncs.sts.model.v20150401.AssumeRoleResponse;
+import com.dji.sample.component.oss.model.AliyunOSSConfiguration;
+import com.dji.sample.component.oss.service.IOssService;
+import com.dji.sample.media.model.CredentialsDTO;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.util.StringUtils;
+
+import java.net.URL;
+import java.util.Date;
+
+/**
+ * @author sean
+ * @version 0.3
+ * @date 2021/12/23
+ */
+@Service
+@Slf4j
+public class AliyunOssServiceImpl implements IOssService {
+
+ @Autowired
+ private OSS ossClient;
+
+ @Override
+ public CredentialsDTO getCredentials() {
+
+ try {
+ DefaultProfile profile = DefaultProfile.getProfile(
+ AliyunOSSConfiguration.region, AliyunOSSConfiguration.accessKey, AliyunOSSConfiguration.secretKey);
+ IAcsClient client = new DefaultAcsClient(profile);
+
+ AssumeRoleRequest request = new AssumeRoleRequest();
+ request.setDurationSeconds(AliyunOSSConfiguration.expire);
+ request.setRoleArn(AliyunOSSConfiguration.roleArn);
+ request.setRoleSessionName(AliyunOSSConfiguration.roleSessionName);
+
+ AssumeRoleResponse response = client.getAcsResponse(request);
+ return new CredentialsDTO(response.getCredentials(), AliyunOSSConfiguration.expire);
+
+ } catch (ClientException e) {
+ log.debug("Failed to obtain sts.");
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+ @Override
+ public URL getObjectUrl(String bucket, String objectKey) {
+ if (!StringUtils.hasText(bucket) || !StringUtils.hasText(objectKey)) {
+ return null;
+ }
+ // First check if the object can be fetched.
+ ossClient.getObject(bucket, objectKey);
+
+ return ossClient.generatePresignedUrl(bucket, objectKey,
+ new Date(System.currentTimeMillis() + AliyunOSSConfiguration.expire * 1000));
+ }
+
+}
diff --git a/src/main/java/com/dji/sample/component/oss/service/impl/MinIOServiceImpl.java b/src/main/java/com/dji/sample/component/oss/service/impl/MinIOServiceImpl.java
new file mode 100644
index 0000000..1b5e74e
--- /dev/null
+++ b/src/main/java/com/dji/sample/component/oss/service/impl/MinIOServiceImpl.java
@@ -0,0 +1,66 @@
+package com.dji.sample.component.oss.service.impl;
+
+import com.dji.sample.component.oss.model.MinIOConfiguration;
+import com.dji.sample.component.oss.service.IOssService;
+import com.dji.sample.media.model.CredentialsDTO;
+import io.minio.GetPresignedObjectUrlArgs;
+import io.minio.MinioClient;
+import io.minio.credentials.AssumeRoleProvider;
+import io.minio.errors.*;
+import io.minio.http.Method;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.io.IOException;
+import java.net.URL;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+
+/**
+ * @author sean
+ * @version 0.3
+ * @date 2021/12/23
+ */
+@Service
+@Slf4j
+public class MinIOServiceImpl implements IOssService {
+
+ @Autowired
+ private MinioClient client;
+
+ @Override
+ public CredentialsDTO getCredentials() {
+ try {
+ AssumeRoleProvider provider = new AssumeRoleProvider(MinIOConfiguration.endpoint, MinIOConfiguration.accessKey,
+ MinIOConfiguration.secretKey, MinIOConfiguration.expire,
+ null, null, null, null, null, null);
+ return new CredentialsDTO(provider.fetch(), MinIOConfiguration.expire);
+ } catch (NoSuchAlgorithmException e) {
+ log.debug("Failed to obtain sts.");
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+ @Override
+ public URL getObjectUrl(String bucket, String objectKey) {
+ try {
+ return new URL(
+ client.getPresignedObjectUrl(
+ GetPresignedObjectUrlArgs.builder()
+ .method(Method.GET)
+ .bucket(bucket)
+ .object(objectKey)
+ .expiry(MinIOConfiguration.expire)
+ .build()));
+ } catch (ErrorResponseException | InsufficientDataException | InternalException |
+ InvalidKeyException | InvalidResponseException | IOException |
+ NoSuchAlgorithmException | XmlParserException | ServerException e) {
+ log.error("The file does not exist on the oss.");
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+}
diff --git a/src/main/java/com/dji/sample/component/websocket/config/AuthPrincipalHandler.java b/src/main/java/com/dji/sample/component/websocket/config/AuthPrincipalHandler.java
new file mode 100644
index 0000000..68f070e
--- /dev/null
+++ b/src/main/java/com/dji/sample/component/websocket/config/AuthPrincipalHandler.java
@@ -0,0 +1,70 @@
+package com.dji.sample.component.websocket.config;
+
+import com.dji.sample.common.model.CustomClaim;
+import com.dji.sample.common.util.JwtUtil;
+import com.dji.sample.component.AuthInterceptor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.http.server.ServerHttpRequest;
+import org.springframework.http.server.ServletServerHttpRequest;
+import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
+import org.springframework.web.socket.WebSocketHandler;
+import org.springframework.web.socket.server.support.DefaultHandshakeHandler;
+
+import javax.servlet.http.HttpServletRequest;
+import java.security.Principal;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * @author sean.zhou
+ * @date 2021/11/16
+ * @version 0.1
+ */
+@Slf4j
+@Component
+public class AuthPrincipalHandler extends DefaultHandshakeHandler {
+
+ @Override
+ protected boolean isValidOrigin(ServerHttpRequest request) {
+
+ if (request instanceof ServletServerHttpRequest) {
+ HttpServletRequest servletRequest = ((ServletServerHttpRequest) request).getServletRequest();
+ String token = servletRequest.getParameter(AuthInterceptor.PARAM_TOKEN);
+
+ if (!StringUtils.hasText(token)) {
+ return false;
+ }
+
+ Optional customClaim = JwtUtil.parseToken(token);
+ if (customClaim.isEmpty()) {
+ return false;
+ }
+
+ servletRequest.setAttribute(AuthInterceptor.TOKEN_CLAIM, customClaim.get());
+ return true;
+ }
+ return false;
+
+ }
+
+ /**
+ * The principal's name: {workspaceId}/{userType}/{userId}
+ * @param request
+ * @param wsHandler
+ * @param attributes
+ * @return
+ */
+ @Override
+ protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map attributes) {
+ if (request instanceof ServletServerHttpRequest) {
+
+ // get the custom claim
+ CustomClaim claim = (CustomClaim) ((ServletServerHttpRequest) request).getServletRequest()
+ .getAttribute(AuthInterceptor.TOKEN_CLAIM);
+
+ return () -> claim.getWorkspaceId() + "/" + claim.getUserType() + "/" + claim.getId();
+ }
+ return () -> null;
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/dji/sample/component/websocket/config/ConcurrentWebSocketSession.java b/src/main/java/com/dji/sample/component/websocket/config/ConcurrentWebSocketSession.java
new file mode 100644
index 0000000..cd13365
--- /dev/null
+++ b/src/main/java/com/dji/sample/component/websocket/config/ConcurrentWebSocketSession.java
@@ -0,0 +1,25 @@
+package com.dji.sample.component.websocket.config;
+
+import org.springframework.web.socket.WebSocketSession;
+import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator;
+
+/**
+ * @author sean.zhou
+ * @version 0.1
+ * @date 2021/11/24
+ */
+public class ConcurrentWebSocketSession extends ConcurrentWebSocketSessionDecorator {
+
+ private static final int SEND_BUFFER_SIZE_LIMIT = 1024 * 1024;
+
+ private static final int SEND_TIME_LIMIT = 1000;
+
+ private ConcurrentWebSocketSession(WebSocketSession delegate, int sendTimeLimit, int bufferSizeLimit) {
+ super(delegate, sendTimeLimit, bufferSizeLimit);
+ }
+
+ ConcurrentWebSocketSession(WebSocketSession delegate) {
+ this(delegate, SEND_TIME_LIMIT, SEND_BUFFER_SIZE_LIMIT);
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/dji/sample/component/websocket/config/WebSocketDefaultFactory.java b/src/main/java/com/dji/sample/component/websocket/config/WebSocketDefaultFactory.java
new file mode 100644
index 0000000..4fdd774
--- /dev/null
+++ b/src/main/java/com/dji/sample/component/websocket/config/WebSocketDefaultFactory.java
@@ -0,0 +1,20 @@
+package com.dji.sample.component.websocket.config;
+
+import org.springframework.stereotype.Component;
+import org.springframework.web.socket.WebSocketHandler;
+import org.springframework.web.socket.handler.WebSocketHandlerDecoratorFactory;
+
+/**
+ *
+ * @author sean.zhou
+ * @date 2021/11/16
+ * @version 0.1
+ */
+@Component
+public class WebSocketDefaultFactory implements WebSocketHandlerDecoratorFactory {
+
+ @Override
+ public WebSocketHandler decorate(WebSocketHandler handler) {
+ return new WebSocketDefaultHandler(handler);
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/dji/sample/component/websocket/config/WebSocketDefaultHandler.java b/src/main/java/com/dji/sample/component/websocket/config/WebSocketDefaultHandler.java
new file mode 100644
index 0000000..d4c7e7a
--- /dev/null
+++ b/src/main/java/com/dji/sample/component/websocket/config/WebSocketDefaultHandler.java
@@ -0,0 +1,60 @@
+package com.dji.sample.component.websocket.config;
+
+import com.dji.sample.component.websocket.model.WebSocketManager;
+import com.dji.sample.component.websocket.service.ISendMessageService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.util.StringUtils;
+import org.springframework.web.socket.CloseStatus;
+import org.springframework.web.socket.WebSocketHandler;
+import org.springframework.web.socket.WebSocketMessage;
+import org.springframework.web.socket.WebSocketSession;
+import org.springframework.web.socket.handler.WebSocketHandlerDecorator;
+
+import java.security.Principal;
+
+/**
+ *
+ * @author sean.zhou
+ * @date 2021/11/16
+ * @version 0.1
+ */
+@Slf4j
+public class WebSocketDefaultHandler extends WebSocketHandlerDecorator {
+
+ @Autowired
+ private ISendMessageService sendMessageService;
+
+ WebSocketDefaultHandler(WebSocketHandler delegate) {
+ super(delegate);
+ }
+
+ @Override
+ public void afterConnectionEstablished(WebSocketSession session) throws Exception {
+ Principal principal = session.getPrincipal();
+ if (StringUtils.hasText(principal.getName())) {
+ WebSocketManager.put(principal.getName(), new ConcurrentWebSocketSession(session));
+ log.debug("{} is connected. ID: {}. WebSocketSession[current count: {}]",
+ principal.getName(), session.getId(), WebSocketManager.getConnectedCount());
+ return;
+ }
+ session.close();
+ }
+
+ @Override
+ public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
+ Principal principal = session.getPrincipal();
+ if (StringUtils.hasText(principal.getName())) {
+ WebSocketManager.remove(principal.getName(), session.getId());
+ log.debug("{} is disconnected. ID: {}. WebSocketSession[current count: {}]",
+ principal.getName(), session.getId(), WebSocketManager.getConnectedCount());
+ }
+
+ }
+
+ @Override
+ public void handleMessage(WebSocketSession session, WebSocketMessage> message) throws Exception {
+ log.debug("received message: {}", message.getPayload());
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/dji/sample/component/websocket/config/WebSocketMessageConfiguration.java b/src/main/java/com/dji/sample/component/websocket/config/WebSocketMessageConfiguration.java
new file mode 100644
index 0000000..85a696c
--- /dev/null
+++ b/src/main/java/com/dji/sample/component/websocket/config/WebSocketMessageConfiguration.java
@@ -0,0 +1,39 @@
+package com.dji.sample.component.websocket.config;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
+import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
+import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
+import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration;
+
+/**
+ *
+ * @author sean.zhou
+ * @date 2021/11/17
+ * @version 0.1
+ */
+@EnableWebSocketMessageBroker
+@Configuration
+public class WebSocketMessageConfiguration implements WebSocketMessageBrokerConfigurer {
+
+ @Autowired
+ private AuthPrincipalHandler authPrincipalHandler;
+
+ @Autowired
+ private WebSocketDefaultFactory webSocketDefaultFactory;
+
+ @Override
+ public void registerStompEndpoints(StompEndpointRegistry registry) {
+ // Set the WebSocket connection address
+ registry.addEndpoint("/api/v1/ws").setAllowedOriginPatterns("*")
+ .setHandshakeHandler(authPrincipalHandler);
+ }
+
+ @Override
+ public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
+ registry.addDecoratorFactory(webSocketDefaultFactory);
+ registry.setTimeToFirstMessage(60000 * 60 * 24 * 10);
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/dji/sample/component/websocket/model/BizCodeEnum.java b/src/main/java/com/dji/sample/component/websocket/model/BizCodeEnum.java
new file mode 100644
index 0000000..6ac35c2
--- /dev/null
+++ b/src/main/java/com/dji/sample/component/websocket/model/BizCodeEnum.java
@@ -0,0 +1,37 @@
+package com.dji.sample.component.websocket.model;
+
+/**
+ * @author sean
+ * @version 0.1
+ * @date 2021/11/26
+ */
+public enum BizCodeEnum {
+
+ DEVICE_ONLINE("device_online"),
+
+ DEVICE_OFFLINE("device_offline"),
+
+ DEVICE_UPDATE_TOPO("device_update_topo"),
+
+ DEVICE_OSD("device_osd"),
+
+ GATEWAY_OSD("gateway_osd"),
+
+ MAP_ELEMENT_CREATE("map_element_create"),
+
+ MAP_ELEMENT_UPDATE("map_element_update"),
+
+ MAP_ELEMENT_DELETE("map_element_delete"),
+
+ MAP_GROUP_REFRESH("map_group_refresh");
+
+ private String code;
+
+ BizCodeEnum(String code) {
+ this.code = code;
+ }
+
+ public String getCode() {
+ return code;
+ }
+}
diff --git a/src/main/java/com/dji/sample/component/websocket/model/CustomWebSocketMessage.java b/src/main/java/com/dji/sample/component/websocket/model/CustomWebSocketMessage.java
new file mode 100644
index 0000000..c1d162d
--- /dev/null
+++ b/src/main/java/com/dji/sample/component/websocket/model/CustomWebSocketMessage.java
@@ -0,0 +1,30 @@
+package com.dji.sample.component.websocket.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Builder;
+import lombok.Data;
+
+/**
+ * The format of WebSocket messages that the pilot can receive.
+ * @author sean.zhou
+ * @date 2021/11/17
+ * @version 0.1
+ */
+@Data
+@Builder
+public class CustomWebSocketMessage {
+
+ /**
+ * @see BizCodeEnum
+ * specific value
+ */
+ @JsonProperty("biz_code")
+ private String bizCode;
+
+ @Builder.Default
+ private String version = "1.0";
+
+ private Long timestamp;
+
+ private T data;
+}
\ No newline at end of file
diff --git a/src/main/java/com/dji/sample/component/websocket/model/WebSocketManager.java b/src/main/java/com/dji/sample/component/websocket/model/WebSocketManager.java
new file mode 100644
index 0000000..da48ae1
--- /dev/null
+++ b/src/main/java/com/dji/sample/component/websocket/model/WebSocketManager.java
@@ -0,0 +1,112 @@
+package com.dji.sample.component.websocket.model;
+
+import com.dji.sample.component.websocket.config.ConcurrentWebSocketSession;
+import com.dji.sample.manage.model.enums.UserTypeEnum;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Manage all WebSocket connection objects.
+ * @author sean.zhou
+ * @date 2021/11/16
+ * @version 0.1
+ */
+@Slf4j
+public class WebSocketManager {
+
+ private static final ConcurrentHashMap>> MANAGER = new ConcurrentHashMap<>(16);
+
+ /**
+ * WebSocket connection from the pilot.
+ */
+ private static final Set PILOT_SESSION = ConcurrentHashMap.newKeySet(16);
+
+ /**
+ * WebSocket connection from the web.
+ */
+ private static final Set WEB_SESSION = ConcurrentHashMap.newKeySet(16);
+
+ public static void put(String key, ConcurrentWebSocketSession val) {
+
+ String[] name = key.split("/");
+ if (name.length != 3) {
+ log.debug("The key is out of format. [{workspaceId}/{userType}/{userId}]");
+ return;
+ }
+
+ ConcurrentHashMap> workspaceSessions =
+ MANAGER.getOrDefault(name[0], new ConcurrentHashMap<>(16));
+
+ ConcurrentHashMap userSessions = workspaceSessions.getOrDefault(
+ name[2], new ConcurrentHashMap<>(16));
+ userSessions.put(val.getId(), val);
+ workspaceSessions.put(name[2], userSessions);
+ MANAGER.put(name[0], workspaceSessions);
+
+ getSetByUserType(Integer.valueOf(name[1])).add(val);
+
+ }
+
+ public static void remove(String key, String sessionId) {
+ String[] name = key.split("/");
+ if (name.length != 3) {
+ log.debug("The key is out of format. [{workspaceId}/{userType}/{userId}]");
+ return;
+ }
+ ConcurrentHashMap userSession = MANAGER.get(name[0]).get(name[2]);
+
+ Set typeSession = getSetByUserType(Integer.valueOf(name[1]));
+
+ ConcurrentWebSocketSession session = userSession.get(sessionId);
+ typeSession.remove(session);
+ userSession.remove(sessionId);
+ }
+
+ public static int getConnectedCount() {
+ return PILOT_SESSION.size() + WEB_SESSION.size();
+ }
+
+ public static Collection getValueWithWorkspace(String workspaceId) {
+ Set sessions = ConcurrentHashMap.newKeySet();
+
+ MANAGER.get(workspaceId)
+ .forEach((userId, userSessions) -> {
+ sessions.addAll(userSessions.values());
+ });
+ return sessions;
+ }
+
+ public static Collection getValueWithWorkspaceAndUserType(String workspaceId, Integer userType) {
+ Set sessions = ConcurrentHashMap.newKeySet();
+ Set typeSessions = getSetByUserType(userType);
+
+ MANAGER.getOrDefault(workspaceId, new ConcurrentHashMap<>())
+ .forEach((userId, userSessions) -> {
+ Collection sessionList = userSessions.values();
+ if (!sessionList.isEmpty()) {
+ ConcurrentWebSocketSession session = sessionList.iterator().next();
+ if (typeSessions.contains(session)) {
+ sessions.addAll(sessionList);
+ }
+ }
+ });
+ return sessions;
+ }
+
+ private static Set getSetByUserType(Integer userType) {
+ if (UserTypeEnum.PILOT.getVal() == userType) {
+ return PILOT_SESSION;
+ }
+
+ if (UserTypeEnum.WEB.getVal() == userType) {
+ return WEB_SESSION;
+ }
+ return new HashSet<>();
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/dji/sample/component/websocket/service/ISendMessageService.java b/src/main/java/com/dji/sample/component/websocket/service/ISendMessageService.java
new file mode 100644
index 0000000..e1d67e7
--- /dev/null
+++ b/src/main/java/com/dji/sample/component/websocket/service/ISendMessageService.java
@@ -0,0 +1,28 @@
+package com.dji.sample.component.websocket.service;
+
+import com.dji.sample.component.websocket.config.ConcurrentWebSocketSession;
+import com.dji.sample.component.websocket.model.CustomWebSocketMessage;
+
+import java.util.Collection;
+
+/**
+ * @author sean.zhou
+ * @date 2021/11/24
+ * @version 0.1
+ */
+public interface ISendMessageService {
+
+ /**
+ * Send a message to the specific connection.
+ * @param session A WebSocket connection object
+ * @param message message
+ */
+ void sendMessage(ConcurrentWebSocketSession session, CustomWebSocketMessage message);
+
+ /**
+ * Send the same message to specific connection.
+ * @param sessions A collection of WebSocket connection objects.
+ * @param message message
+ */
+ void sendBatch(Collection sessions, CustomWebSocketMessage message);
+}
diff --git a/src/main/java/com/dji/sample/component/websocket/service/impl/SendMessageServiceImpl.java b/src/main/java/com/dji/sample/component/websocket/service/impl/SendMessageServiceImpl.java
new file mode 100644
index 0000000..8ee6131
--- /dev/null
+++ b/src/main/java/com/dji/sample/component/websocket/service/impl/SendMessageServiceImpl.java
@@ -0,0 +1,72 @@
+package com.dji.sample.component.websocket.service.impl;
+
+import com.dji.sample.component.websocket.config.ConcurrentWebSocketSession;
+import com.dji.sample.component.websocket.model.CustomWebSocketMessage;
+import com.dji.sample.component.websocket.service.ISendMessageService;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+import org.springframework.web.socket.TextMessage;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * @author sean.zhou
+ * @version 0.1
+ * @date 2021/11/24
+ */
+@Service
+@Slf4j
+public class SendMessageServiceImpl implements ISendMessageService {
+
+ @Override
+ public void sendMessage(ConcurrentWebSocketSession session, CustomWebSocketMessage message) {
+ if (session == null) {
+ return;
+ }
+
+ try {
+ if (!session.isOpen()) {
+ session.close();
+ log.debug("This session is closed.");
+ return;
+ }
+
+ ObjectMapper mapper = new ObjectMapper();
+
+ session.sendMessage(new TextMessage(mapper.writeValueAsBytes(message)));
+ } catch (IOException e) {
+ log.info("Failed to publish the message. {}", message.toString());
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void sendBatch(Collection sessions, CustomWebSocketMessage message) {
+ if (sessions.isEmpty()) {
+ return;
+ }
+
+ try {
+
+ ObjectMapper mapper = new ObjectMapper();
+ TextMessage data = new TextMessage(mapper.writeValueAsBytes(message));
+
+ for (ConcurrentWebSocketSession session : sessions) {
+ if (!session.isOpen()) {
+ session.close();
+ log.debug("This session is closed.");
+ return;
+ }
+ session.sendMessage(data);
+
+ }
+
+ } catch (IOException e) {
+ log.info("Failed to publish the message. {}", message.toString());
+
+ e.printStackTrace();
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/dji/sample/configuration/GlobalMVCConfigurer.java b/src/main/java/com/dji/sample/configuration/GlobalMVCConfigurer.java
new file mode 100644
index 0000000..0dfc05c
--- /dev/null
+++ b/src/main/java/com/dji/sample/configuration/GlobalMVCConfigurer.java
@@ -0,0 +1,35 @@
+package com.dji.sample.configuration;
+
+import com.dji.sample.component.AuthInterceptor;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
+import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Configuration
+public class GlobalMVCConfigurer implements WebMvcConfigurer {
+
+ @Autowired
+ private AuthInterceptor authInterceptor;
+
+ private static List excludePaths = new ArrayList<>();
+
+ @Value("${url.manage.prefix}")
+ private String managePrefix;
+
+ @Value("${url.manage.version}")
+ private String manageVersion;
+
+ @Override
+ public void addInterceptors(InterceptorRegistry registry) {
+ // Exclude the login interface.
+ excludePaths.add(managePrefix + manageVersion + "/login");
+ excludePaths.add(managePrefix + manageVersion + "/token/refresh");
+ // Intercept for all request interfaces.
+ registry.addInterceptor(authInterceptor).addPathPatterns("/**").excludePathPatterns(excludePaths);
+ }
+}
diff --git a/src/main/java/com/dji/sample/configuration/GlobalThreadPoolConfiguration.java b/src/main/java/com/dji/sample/configuration/GlobalThreadPoolConfiguration.java
new file mode 100644
index 0000000..af7a92d
--- /dev/null
+++ b/src/main/java/com/dji/sample/configuration/GlobalThreadPoolConfiguration.java
@@ -0,0 +1,42 @@
+package com.dji.sample.configuration;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.concurrent.*;
+
+/**
+ *
+ * @author sean.zhou
+ * @date 2021/11/10
+ * @version 0.1
+ */
+@Configuration
+public class GlobalThreadPoolConfiguration {
+
+ @Value("${thread.pool.core-pool-size: 10}")
+ private int corePoolSize;
+
+ @Value("${thread.pool.maximum-pool-size: 20}")
+ private int maximumPoolSize;
+
+ @Value("${thread.pool.keep-alive-time: 60}")
+ private long keepAliveTime;
+
+ @Value("${thread.pool.queue.capacity: 1000}")
+ private int capacity;
+
+ /**
+ * A custom thread pool.
+ * @return
+ */
+ @Bean
+ public Executor threadPool() {
+ return new ThreadPoolExecutor(corePoolSize,
+ maximumPoolSize, keepAliveTime,
+ TimeUnit.SECONDS, new LinkedBlockingQueue<>(capacity),
+ Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy());
+ }
+
+}
diff --git a/src/main/java/com/dji/sample/manage/controller/DeviceController.java b/src/main/java/com/dji/sample/manage/controller/DeviceController.java
new file mode 100644
index 0000000..08a245d
--- /dev/null
+++ b/src/main/java/com/dji/sample/manage/controller/DeviceController.java
@@ -0,0 +1,138 @@
+package com.dji.sample.manage.controller;
+
+import com.dji.sample.common.model.CustomClaim;
+import com.dji.sample.common.model.ResponseResult;
+import com.dji.sample.component.AuthInterceptor;
+import com.dji.sample.component.mqtt.model.ChannelName;
+import com.dji.sample.component.mqtt.model.CommonTopicReceiver;
+import com.dji.sample.component.mqtt.model.CommonTopicResponse;
+import com.dji.sample.component.websocket.model.BizCodeEnum;
+import com.dji.sample.component.websocket.model.CustomWebSocketMessage;
+import com.dji.sample.component.websocket.model.WebSocketManager;
+import com.dji.sample.component.websocket.service.ISendMessageService;
+import com.dji.sample.manage.model.dto.DeviceDTO;
+import com.dji.sample.manage.model.dto.WorkspaceDTO;
+import com.dji.sample.manage.model.enums.UserTypeEnum;
+import com.dji.sample.manage.model.param.DeviceQueryParam;
+import com.dji.sample.manage.model.receiver.StatusGatewayReceiver;
+import com.dji.sample.manage.service.IDeviceService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.mqtt.support.MqttHeaders;
+import org.springframework.messaging.Message;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.servlet.http.HttpServletRequest;
+import java.util.List;
+
+/**
+ * @author sean.zhou
+ * @version 0.1
+ * @date 2021/11/15
+ */
+@RestController
+@Slf4j
+@RequestMapping("${url.manage.prefix}${url.manage.version}/devices")
+public class DeviceController {
+
+ @Autowired
+ private IDeviceService deviceService;
+
+ @Autowired
+ private ISendMessageService sendMessageService;
+
+ /**
+ * Handles the message that the drone goes online.
+ * @param receiver The drone information is not empty.
+ */
+ @ServiceActivator(inputChannel = ChannelName.INBOUND_STATUS_ONLINE, outputChannel = ChannelName.OUTBOUND)
+ public void deviceOnline(CommonTopicReceiver receiver) {
+ boolean online = deviceService.deviceOnline(receiver.getData());
+ if (online) {
+ // Notify pilot that the drone is online successfully.
+ deviceService.publishStatusReply(receiver.getData().getSn(),
+ CommonTopicResponse.builder()
+ .tid(receiver.getTid())
+ .bid(receiver.getBid())
+ .build());
+
+ // Publish the latest device topology information in the current workspace to the pilot.
+ deviceService.pushDeviceOnlineTopo(WorkspaceDTO.DEFAULT_WORKSPACE_ID,
+ receiver.getData().getSn(), receiver.getData().getSubDevices().get(0).getSn());
+ }
+ }
+
+ /**
+ * Handles the message that the drone goes offline.
+ * @param receiver The drone information is empty.
+ */
+ @ServiceActivator(inputChannel = ChannelName.INBOUND_STATUS_OFFLINE, outputChannel = ChannelName.OUTBOUND)
+ public void deviceOffline(CommonTopicReceiver receiver) {
+
+ boolean offline = deviceService.deviceOffline(receiver.getData().getSn());
+ if (offline) {
+ // Notify pilot that the device is offline successfully.
+ deviceService.publishStatusReply(receiver.getData().getSn(),
+ CommonTopicResponse.builder()
+ .tid(receiver.getTid())
+ .bid(receiver.getBid())
+ .build());
+
+ // Publish the latest device topology information in the current workspace to the pilot.
+ deviceService.pushDeviceOfflineTopo(WorkspaceDTO.DEFAULT_WORKSPACE_ID, receiver.getData().getSn());
+ }
+ }
+
+ /**
+ * Get the topology list of all devices in the current user workspace.
+ * @param request
+ * @return
+ */
+ @GetMapping("/devices")
+ public ResponseResult> getDevices(HttpServletRequest request) {
+ // Get information about the current user.
+ CustomClaim claim = (CustomClaim)request.getAttribute(AuthInterceptor.TOKEN_CLAIM);
+ String workspaceId = claim.getWorkspaceId();
+ // Get information about the devices in the current user's workspace.
+ List devicesList = deviceService.getDevicesTopoForWeb(workspaceId);
+
+ return ResponseResult.success(devicesList);
+ }
+
+ @ServiceActivator(inputChannel = ChannelName.INBOUND_OSD)
+ public void osdRealTime(Message> message) {
+ String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();
+ byte[] payload = (byte[])message.getPayload();
+ deviceService.handleOSD(topic, payload);
+ }
+
+ /**
+ * Handles the payloads data of the drone.
+ * @param deviceSn drone's sn
+ */
+ @ServiceActivator(inputChannel = ChannelName.INBOUND_STATE_PAYLOAD_UPDATE)
+ public void pushWebSocketDevices(String deviceSn) {
+ List devicesList = deviceService.getDevicesByParams(
+ DeviceQueryParam.builder()
+ .deviceSn(deviceSn)
+ .build());
+ // Get drone information based on the sn of the drone. The sn of the drone is unique.
+ DeviceDTO device = devicesList.get(0);
+ // Set the remote controller and payloads information of the drone.
+ deviceService.spliceDeviceTopo(device);
+
+ CustomWebSocketMessage wsMessage = CustomWebSocketMessage.builder()
+ .timestamp(System.currentTimeMillis())
+ .bizCode(BizCodeEnum.DEVICE_UPDATE_TOPO.getCode())
+ .data(device)
+ .build();
+ // Update the topology of the drone via WebSocket notifications to the web side.
+ sendMessageService.sendBatch(WebSocketManager
+ .getValueWithWorkspaceAndUserType(
+ device.getWorkspaceId(), UserTypeEnum.WEB.getVal()),
+ wsMessage);
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/dji/sample/manage/controller/DevicePayloadController.java b/src/main/java/com/dji/sample/manage/controller/DevicePayloadController.java
new file mode 100644
index 0000000..0c0ba2c
--- /dev/null
+++ b/src/main/java/com/dji/sample/manage/controller/DevicePayloadController.java
@@ -0,0 +1,66 @@
+package com.dji.sample.manage.controller;
+
+import com.dji.sample.component.mqtt.model.ChannelName;
+import com.dji.sample.manage.model.receiver.DeviceBasicReceiver;
+import com.dji.sample.manage.model.receiver.DevicePayloadReceiver;
+import com.dji.sample.manage.service.IDevicePayloadService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.List;
+
+/**
+ * @author sean.zhou
+ * @date 2021/11/19
+ * @version 0.1
+ */
+@RestController
+@Slf4j
+public class DevicePayloadController {
+
+ @Autowired
+ private IDevicePayloadService devicePayloadService;
+
+ /**
+ * Handles the data for the payload messages in the state topic.
+ * @param payloadsList List of payload information.
+ * @return drone's sn
+ */
+ @ServiceActivator(inputChannel = ChannelName.INBOUND_STATE_PAYLOAD,
+ outputChannel = ChannelName.INBOUND_STATE_PAYLOAD_UPDATE)
+ public String statePayload(List payloadsList) {
+ // Delete all payload information for the drone based on the drone's sn.
+ devicePayloadService.deletePayloadsByDeviceSn(List.of(payloadsList.get(0).getDeviceSn()));
+
+ // Save the new payload information.
+ devicePayloadService.savePayloadDTOs(payloadsList);
+
+ log.debug("The result of saving the payload is successful.");
+
+ return payloadsList.get(0).getDeviceSn();
+ }
+
+ /**
+ * Handles messages in the state topic about basic drone data.
+ *
+ * Note: Only the data of the drone payload is handled here. You can handle other data from the drone
+ * according to your business needs.
+ * @param deviceBasic basic drone data
+ * @return drone's sn
+ */
+ @ServiceActivator(inputChannel = ChannelName.INBOUND_STATE_BASIC,
+ outputChannel = ChannelName.INBOUND_STATE_PAYLOAD_UPDATE)
+ public String stateBasic(DeviceBasicReceiver deviceBasic) {
+ // Delete all payload information for the drone based on the drone's sn.
+ devicePayloadService.deletePayloadsByDeviceSn(List.of(deviceBasic.getDeviceSn()));
+
+ // Save the new payload information.
+ boolean isSave = devicePayloadService.savePayloadDTOs(deviceBasic.getPayloads());
+
+ log.debug("The result of saving the payloads is {}.", isSave);
+
+ return deviceBasic.getDeviceSn();
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/dji/sample/manage/controller/LiveStreamController.java b/src/main/java/com/dji/sample/manage/controller/LiveStreamController.java
new file mode 100644
index 0000000..ef89f34
--- /dev/null
+++ b/src/main/java/com/dji/sample/manage/controller/LiveStreamController.java
@@ -0,0 +1,113 @@
+package com.dji.sample.manage.controller;
+
+import com.dji.sample.common.model.CustomClaim;
+import com.dji.sample.common.model.ResponseResult;
+import com.dji.sample.component.mqtt.model.ChannelName;
+import com.dji.sample.component.mqtt.model.CommonTopicReceiver;
+import com.dji.sample.manage.model.Chan;
+import com.dji.sample.manage.model.dto.CapacityDeviceDTO;
+import com.dji.sample.manage.model.dto.LiveTypeDTO;
+import com.dji.sample.manage.model.receiver.CapacityDeviceReceiver;
+import com.dji.sample.manage.model.receiver.ServiceReplyReceiver;
+import com.dji.sample.manage.service.ILiveStreamService;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.messaging.Message;
+import org.springframework.web.bind.annotation.*;
+
+import javax.servlet.http.HttpServletRequest;
+import java.io.IOException;
+import java.util.List;
+
+import static com.dji.sample.component.AuthInterceptor.TOKEN_CLAIM;
+
+/**
+ * @author sean.zhou
+ * @version 0.1
+ * @date 2021/11/19
+ */
+
+@RestController
+@Slf4j
+@RequestMapping("${url.manage.prefix}${url.manage.version}/live")
+public class LiveStreamController {
+
+ @Autowired
+ private ILiveStreamService liveStreamService;
+
+ /**
+ * Analyze the live streaming capabilities of drones.
+ * This data is necessary if drones are required for live streaming.
+ * @param device the capacity of drone
+ */
+ @ServiceActivator(inputChannel = ChannelName.INBOUND_STATE_CAPACITY)
+ public void stateCapacity(CapacityDeviceReceiver device) {
+ boolean parseCapacity = liveStreamService.saveLiveCapacity(device);
+ log.debug("The result of parsing the live capacity is {}.", parseCapacity);
+ }
+
+ /**
+ * Get live capability data of all drones in the current user's workspace from the database.
+ * @param request
+ * @return live capability
+ */
+ @GetMapping("/capacity")
+ public ResponseResult> getLiveCapacity(HttpServletRequest request) {
+ // Get information about the current user.
+ CustomClaim customClaim = (CustomClaim)request.getAttribute(TOKEN_CLAIM);
+
+ List liveCapacity = liveStreamService.getLiveCapacity(customClaim.getWorkspaceId());
+
+ return ResponseResult.success(liveCapacity);
+ }
+
+ /**
+ * Live streaming according to the parameters passed in from the web side.
+ * @param liveParam Live streaming parameters.
+ * @return
+ */
+ @PostMapping("/streams/start")
+ public ResponseResult liveStart(@RequestBody LiveTypeDTO liveParam) {
+ return liveStreamService.liveStart(liveParam);
+ }
+
+ /**
+ * Stop live streaming according to the parameters passed in from the web side.
+ * @param liveParam Live streaming parameters.
+ * @return
+ */
+ @PostMapping("/streams/stop")
+ public ResponseResult liveStop(@RequestBody LiveTypeDTO liveParam) {
+ return liveStreamService.liveStop(liveParam.getVideoId());
+ }
+
+ /**
+ * Set the quality of the live streaming according to the parameters passed in from the web side.
+ * @param liveParam Live streaming parameters.
+ * @return
+ */
+ @PostMapping("/streams/update")
+ public ResponseResult liveSetQuality(@RequestBody LiveTypeDTO liveParam) {
+ return liveStreamService.liveSetQuality(liveParam);
+ }
+
+ /**
+ * Handle the reply message from the pilot side to the on-demand video.
+ * @param message reply message
+ * @throws IOException
+ */
+ @ServiceActivator(inputChannel = ChannelName.INBOUND_SERVICE_REPLY)
+ public void serviceReply(Message> message) throws IOException {
+ byte[] payload = (byte[])message.getPayload();
+ ObjectMapper mapper = new ObjectMapper();
+ CommonTopicReceiver receiver = mapper.readValue(payload,
+ new TypeReference>() {
+ });
+ Chan chan = Chan.getInstance();
+ // Put the message to the chan object.
+ chan.put(receiver);
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/dji/sample/manage/controller/LoginController.java b/src/main/java/com/dji/sample/manage/controller/LoginController.java
new file mode 100644
index 0000000..b0cacb4
--- /dev/null
+++ b/src/main/java/com/dji/sample/manage/controller/LoginController.java
@@ -0,0 +1,47 @@
+package com.dji.sample.manage.controller;
+
+import com.dji.sample.common.error.CommonErrorEnum;
+import com.dji.sample.common.model.ResponseResult;
+import com.dji.sample.manage.model.dto.UserDTO;
+import com.dji.sample.manage.model.dto.UserLoginDTO;
+import com.dji.sample.manage.service.IUserService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.util.Optional;
+
+import static com.dji.sample.component.AuthInterceptor.PARAM_TOKEN;
+
+@RestController
+@RequestMapping("${url.manage.prefix}${url.manage.version}")
+public class LoginController {
+
+ @Autowired
+ private IUserService userService;
+
+ @PostMapping("/login")
+ public ResponseResult login(@RequestBody UserLoginDTO loginDTO) {
+ String username = loginDTO.getUsername();
+ String password = loginDTO.getPassword();
+ return userService.userLogin(username, password);
+ }
+
+ @PostMapping("/token/refresh")
+ public ResponseResult refreshToken(HttpServletRequest request, HttpServletResponse response) {
+ String token = request.getHeader(PARAM_TOKEN);
+ Optional user = userService.refreshToken(token);
+
+ if (user.isEmpty()) {
+ response.setStatus(HttpStatus.UNAUTHORIZED.value());
+ return ResponseResult.error(CommonErrorEnum.NO_TOKEN.getErrorMsg());
+ }
+
+ return ResponseResult.success(user);
+ }
+}
diff --git a/src/main/java/com/dji/sample/manage/controller/TopologyController.java b/src/main/java/com/dji/sample/manage/controller/TopologyController.java
new file mode 100644
index 0000000..3f2b80d
--- /dev/null
+++ b/src/main/java/com/dji/sample/manage/controller/TopologyController.java
@@ -0,0 +1,40 @@
+package com.dji.sample.manage.controller;
+
+import com.dji.sample.common.model.ResponseResult;
+import com.dji.sample.manage.model.dto.TopologyDTO;
+import com.dji.sample.manage.service.ITopologyService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author sean
+ * @version 0.2
+ * @date 2021/12/8
+ */
+@RestController
+@RequestMapping("${url.manage.prefix}${url.manage.version}/workspaces")
+public class TopologyController {
+
+ @Autowired
+ private ITopologyService topologyService;
+
+ /**
+ * Get the topology list of all devices in the current user workspace for pilot display.
+ * @param workspaceId
+ * @return
+ */
+ @GetMapping("/{workspace_id}/devices/topologies")
+ public ResponseResult