Browse Source

feat: 为publishWithReply提供配置选项.

feat: 支持beforePublishHook, afterPublishHook回调.
feat: publishWithReply提供CompletableFuture支持.
pull/42/head
Vincent 1 year ago
parent
commit
9d392c5eab
  1. 79
      src/main/java/com/dji/sdk/common/JDKLockBarrierImpl.java
  2. 22
      src/main/java/com/dji/sdk/common/PublishBarrier.java
  3. 41
      src/main/java/com/dji/sdk/common/PublishBarrierResult.java
  4. 82
      src/main/java/com/dji/sdk/common/PublishConfiguration.java
  5. 59
      src/main/java/com/dji/sdk/common/PublishOption.java
  6. 38
      src/main/java/com/dji/sdk/config/DefaultBeanConfiguration.java
  7. 43
      src/main/java/com/dji/sdk/mqtt/ChanBarrierAdapter.java
  8. 52
      src/main/java/com/dji/sdk/mqtt/MqttGatewayPublish.java
  9. 16
      src/main/java/com/dji/sdk/mqtt/property/PropertySetReplyHandler.java
  10. 34
      src/main/java/com/dji/sdk/mqtt/services/ServicesPublish.java
  11. 16
      src/main/java/com/dji/sdk/mqtt/services/ServicesReplyHandler.java

79
src/main/java/com/dji/sdk/common/JDKLockBarrierImpl.java

@ -0,0 +1,79 @@ @@ -0,0 +1,79 @@
/*************************************************
* @copyright 2017 Flision Corporation Inc.
* @author: Vincent Chan @ Canton
* @date: 2023年09月22日
* @version: 1.0.0
* @description:
**************************************************/
package com.dji.sdk.common;
import com.dji.sdk.mqtt.Chan;
import com.dji.sdk.mqtt.CommonTopicRequest;
import com.dji.sdk.mqtt.CommonTopicResponse;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
public class JDKLockBarrierImpl implements PublishBarrier{
/**
* 在我的实现中是采用一个定期清理的TimedCache储存请求
*/
private final ConcurrentHashMap<String, JDKHolder> container = new ConcurrentHashMap<>();
@Override
public void put(String identity, CommonTopicResponse receiveData) {
if(hasIdentity(identity)){
container.get(identity).setData(receiveData);
}
}
@Override
public void registerRequest(String identity, CommonTopicRequest requestData) {
container.put(identity,new JDKHolder());
}
@Override
public PublishBarrierResult await(String identity, long timeout) {
JDKHolder jdkHolder = container.get(identity);
if(Objects.isNull(jdkHolder)){
throw new RuntimeException("等待指令返回前未注册指令到栅栏");
}
jdkHolder.await(timeout);
return jdkHolder.getResult();
}
@Override
public boolean hasIdentity(String identity) {
return container.containsKey(identity);
}
public static class JDKHolder{
volatile Object locker = new Object();
CommonTopicResponse response = null;
public void await(long timeout) {
synchronized (locker){
try {
locker.wait(timeout);
}catch (InterruptedException e){}
}
}
public void setData(CommonTopicResponse receiveData) {
this.response = receiveData;
synchronized (locker){
locker.notify();
}
}
public PublishBarrierResult getResult() {
if(Objects.nonNull(response)){
return PublishBarrierResult.ok(response);
}else{
return PublishBarrierResult.EMPTY;
}
}
}
}

22
src/main/java/com/dji/sdk/common/PublishBarrier.java

@ -0,0 +1,22 @@ @@ -0,0 +1,22 @@
/*************************************************
* @copyright 2017 Flision Corporation Inc.
* @author: Vincent Chan @ Canton
* @date: 2023年09月22日
* @version: 1.0.0
* @description:
**************************************************/
package com.dji.sdk.common;
import com.dji.sdk.mqtt.CommonTopicRequest;
import com.dji.sdk.mqtt.CommonTopicResponse;
public interface PublishBarrier {
void put(String identity, CommonTopicResponse receiveData);
void registerRequest(String identity, CommonTopicRequest requestData);
PublishBarrierResult await(String identity,long timeout);
boolean hasIdentity(String identity);
}

41
src/main/java/com/dji/sdk/common/PublishBarrierResult.java

@ -0,0 +1,41 @@ @@ -0,0 +1,41 @@
/*************************************************
* @copyright 2017 Flision Corporation Inc.
* @author: Vincent Chan @ Canton
* @date: 2023年09月22日
* @version: 1.0.0
* @description:
**************************************************/
package com.dji.sdk.common;
import com.dji.sdk.mqtt.CommonTopicResponse;
public class PublishBarrierResult {
public static PublishBarrierResult EMPTY = new PublishBarrierResult();
public static PublishBarrierResult ok(CommonTopicResponse data){
return new PublishBarrierResult().setData(data);
}
boolean timeout = true;
CommonTopicResponse data;
private PublishBarrierResult() {
}
private PublishBarrierResult setData(CommonTopicResponse data) {
this.data = data;
this.timeout = false;
return this;
}
public boolean isTimeout(){
return timeout;
}
public <T> CommonTopicResponse<T> getData(){
return data;
}
}

82
src/main/java/com/dji/sdk/common/PublishConfiguration.java

@ -0,0 +1,82 @@ @@ -0,0 +1,82 @@
/*************************************************
* @copyright 2017 Flision Corporation Inc.
* @author: Vincent Chan @ Canton
* @date: 2023年09月22日
* @version: 1.0.0
* @description:
**************************************************/
package com.dji.sdk.common;
import com.dji.sdk.mqtt.CommonTopicRequest;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
public class PublishConfiguration {
String bid;
String tid;
//默认超时
int timeout = 3;
//请求发送前调用
Consumer<CommonTopicRequest> beforePublishHook = (e)->{};
//收到请求回信后调用
BiConsumer<CommonTopicRequest, PublishBarrierResult> afterPublishHook = (req,rsp) ->{};
public String getBid() {
return bid;
}
public String getTid() {
return tid;
}
public long getTimeout() {
return timeout * 1000;
}
public void setBizId(String bid) {
this.bid = bid;
}
public void setTransactionId(String tid) {
this.tid = tid;
}
public void setTimeout(int timeout) {
this.timeout = timeout;
}
public void setBeforePublishHook(Consumer<CommonTopicRequest> callback) {
beforePublishHook = callback;
}
public void setAfterPublishReplyHook(BiConsumer<CommonTopicRequest, PublishBarrierResult> callback) {
afterPublishHook = callback;
}
public void invokeBeforePublishHook(CommonTopicRequest req){
if(Objects.nonNull(beforePublishHook)){
try {
beforePublishHook.accept(req);
}catch (Throwable ex){
//do nothing
//业务层的异常不理会
}
}
}
public void invokeAfterPublishReplyHook(CommonTopicRequest req, PublishBarrierResult result){
if(Objects.nonNull(afterPublishHook)){
try{
afterPublishHook.accept(req,result);
}catch (Throwable ex){
//do nothing
//业务层的异常不理会
}
}
}
}

59
src/main/java/com/dji/sdk/common/PublishOption.java

@ -0,0 +1,59 @@ @@ -0,0 +1,59 @@
/*************************************************
* @copyright 2017 Flision Corporation Inc.
* @author: Vincent Chan @ Canton
* @date: 2023年09月22日
* @version: 1.0.0
* @description:
**************************************************/
package com.dji.sdk.common;
import com.dji.sdk.mqtt.CommonTopicRequest;
import com.dji.sdk.mqtt.CommonTopicResponse;
import com.google.common.base.Strings;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
public class PublishOption {
public static Consumer<PublishOption> DEFAULT = (cfg)->{};
final PublishConfiguration configuration;
public PublishOption(PublishConfiguration configuration) {
this.configuration = configuration;
}
public PublishOption withBizId(String bid){
if(!Strings.isNullOrEmpty(bid)){
configuration.setBizId(bid);
}
return this;
}
public PublishOption withTransactionId(String tid){
if(!Strings.isNullOrEmpty(tid)){
configuration.setTransactionId(tid);
}
return this;
}
public PublishOption timeout(int second){
configuration.setTimeout(second);
return this;
}
public PublishOption beforePublish(Consumer<CommonTopicRequest> callback){
if(Objects.nonNull(callback)){
configuration.setBeforePublishHook(callback);
}
return this;
}
public PublishOption afterPublishReply(BiConsumer<CommonTopicRequest, PublishBarrierResult> callback){
if(Objects.nonNull(callback)){
configuration.setAfterPublishReplyHook(callback);
}
return this;
}
}

38
src/main/java/com/dji/sdk/config/DefaultBeanConfiguration.java

@ -0,0 +1,38 @@ @@ -0,0 +1,38 @@
/*************************************************
* @copyright 2017 Flision Corporation Inc.
* @author: Vincent Chan @ Canton
* @date: 2023年09月22日
* @version: 1.0.0
* @description:
**************************************************/
package com.dji.sdk.config;
import com.dji.sdk.common.JDKLockBarrierImpl;
import com.dji.sdk.common.PublishBarrier;
import com.dji.sdk.mqtt.ChanBarrierAdapter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DefaultBeanConfiguration {
/**
* 使用者可以自定义PublishBarrier的实现默认采用Chan实现
*/
@Bean
@ConditionalOnMissingBean(PublishBarrier.class)
public PublishBarrier chanBarrier(){
/** 原Chan实现 */
return new ChanBarrierAdapter();
}
/**
* PublishBarrier 另一个实现, 采用同步锁
*/
// @Bean
// @ConditionalOnMissingBean(PublishBarrier.class)
// public PublishBarrier jdkBarrier(){
// return new JDKLockBarrierImpl();
// }
}

43
src/main/java/com/dji/sdk/mqtt/ChanBarrierAdapter.java

@ -0,0 +1,43 @@ @@ -0,0 +1,43 @@
/*************************************************
* @copyright 2017 Flision Corporation Inc.
* @author: Vincent Chan @ Canton
* @date: 2023年09月22日
* @version: 1.0.0
* @description:
**************************************************/
package com.dji.sdk.mqtt;
import com.dji.sdk.common.PublishBarrier;
import com.dji.sdk.common.PublishBarrierResult;
import java.util.Objects;
public class ChanBarrierAdapter implements PublishBarrier {
@Override
public void put(String identity, CommonTopicResponse receiveData) {
Chan instance = Chan.getInstance(identity, false);
if(Objects.nonNull(instance)){
instance.put(receiveData);
}
}
@Override
public void registerRequest(String identity, CommonTopicRequest requestData) {
Chan.getInstance(identity, true);
}
@Override
public PublishBarrierResult await(String identity,long timeout) {
Chan instance = Chan.getInstance(identity, false);
CommonTopicResponse response = instance.get(identity, timeout);
return Objects.nonNull(response) ? PublishBarrierResult.ok(response) : PublishBarrierResult.EMPTY;
}
@Override
public boolean hasIdentity(String identity) {
Chan instance = Chan.getInstance(identity, false);
return Objects.nonNull(instance);
}
}

52
src/main/java/com/dji/sdk/mqtt/MqttGatewayPublish.java

@ -1,9 +1,10 @@ @@ -1,9 +1,10 @@
package com.dji.sdk.mqtt;
import com.dji.sdk.common.Common;
import com.dji.sdk.common.*;
import com.dji.sdk.exception.CloudSDKErrorEnum;
import com.dji.sdk.exception.CloudSDKException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.TypeMismatchException;
import org.springframework.integration.mqtt.support.MqttHeaders;
@ -14,7 +15,10 @@ import org.springframework.util.StringUtils; @@ -14,7 +15,10 @@ import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
/**
* @author sean.zhou
@ -32,6 +36,9 @@ public class MqttGatewayPublish { @@ -32,6 +36,9 @@ public class MqttGatewayPublish {
@Resource
private IMqttMessageGateway messageGateway;
@Resource
private PublishBarrier publishBarrier;
public void publish(String topic, int qos, CommonTopicRequest request) {
try {
log.debug("send topic: {}, payload: {}", topic, request.toString());
@ -73,7 +80,9 @@ public class MqttGatewayPublish { @@ -73,7 +80,9 @@ public class MqttGatewayPublish {
AtomicInteger time = new AtomicInteger(0);
boolean hasBid = StringUtils.hasText(request.getBid());
request.setBid(hasBid ? request.getBid() : UUID.randomUUID().toString());
// Retry
// Is Retry necessary? why not use Spring @Retryable instead?
while (time.getAndIncrement() <= retryCount) {
this.publish(topic, request);
@ -97,5 +106,46 @@ public class MqttGatewayPublish { @@ -97,5 +106,46 @@ public class MqttGatewayPublish {
throw new CloudSDKException(CloudSDKErrorEnum.MQTT_PUBLISH_ABNORMAL, "No message reply received.");
}
public <T> CompletableFuture<CommonTopicResponse<T>> publishWithReply(Class<T> clazz, String topic, CommonTopicRequest request, Consumer<PublishOption> options){
PublishConfiguration config = prepareConfiguration(options);
request.setBid(config.getBid());
request.setTid(config.getTid());
//use to log request data or the last chance to change some data
config.invokeBeforePublishHook(request);
//注册barrier
String identity = request.getTid(); //提供栅栏标识
publishBarrier.registerRequest(identity, request);
return CompletableFuture.supplyAsync(()->{
this.publish(topic, request);
if(log.isDebugEnabled()){ log.debug("等待{}指令返回",identity); };
PublishBarrierResult result = publishBarrier.await(identity,config.getTimeout());
config.invokeAfterPublishReplyHook(request, result);
if(result.isTimeout()){
throw new CloudSDKException("Timeout"); //TODO: 换个更明确的异常更好
}
if(log.isDebugEnabled()){ log.debug("{}指令已返回",identity); }
return result.getData();
});
}
private PublishConfiguration prepareConfiguration(Consumer<PublishOption> options){
PublishConfiguration config = new PublishConfiguration();
PublishOption option = new PublishOption(config);
options.accept(option);
if(Strings.isNullOrEmpty(config.getBid())){
config.setBizId(UUID.randomUUID().toString());
}
if(Strings.isNullOrEmpty(config.getTid())){
config.setTransactionId(UUID.randomUUID().toString());
}
return config;
}
}

16
src/main/java/com/dji/sdk/mqtt/property/PropertySetReplyHandler.java

@ -1,6 +1,7 @@ @@ -1,6 +1,7 @@
package com.dji.sdk.mqtt.property;
import com.dji.sdk.common.Common;
import com.dji.sdk.common.PublishBarrier;
import com.dji.sdk.mqtt.Chan;
import com.dji.sdk.mqtt.ChannelName;
import com.fasterxml.jackson.core.type.TypeReference;
@ -9,6 +10,7 @@ import org.springframework.integration.annotation.ServiceActivator; @@ -9,6 +10,7 @@ import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.Objects;
@ -20,6 +22,9 @@ import java.util.Objects; @@ -20,6 +22,9 @@ import java.util.Objects;
@Component
public class PropertySetReplyHandler {
@Resource
PublishBarrier barrier;
private static final String RESULT_KEY = "result";
/**
@ -32,13 +37,18 @@ public class PropertySetReplyHandler { @@ -32,13 +37,18 @@ public class PropertySetReplyHandler {
byte[] payload = (byte[])message.getPayload();
TopicPropertySetResponse receiver = Common.getObjectMapper().readValue(payload, new TypeReference<TopicPropertySetResponse>() {});
Chan chan = Chan.getInstance(receiver.getTid(), false);
if (Objects.isNull(chan)) {
//fix: use Barrier instead witcom@2023.09.22
//Chan chan = Chan.getInstance(receiver.getTid(), false);
// if (Objects.isNull(chan)) {
// return;
// }
if(!barrier.hasIdentity(receiver.getTid())) {
return;
}
receiver.setData(PropertySetReplyResultEnum.find(
Common.getObjectMapper().convertValue(receiver.getData(), JsonNode.class).findValue(RESULT_KEY).intValue()));
// Put the message to the chan object.
chan.put(receiver);
//chan.put(receiver);
barrier.put(receiver.getTid(), receiver);
}
}

34
src/main/java/com/dji/sdk/mqtt/services/ServicesPublish.java

@ -1,6 +1,8 @@ @@ -1,6 +1,8 @@
package com.dji.sdk.mqtt.services;
import com.dji.sdk.common.Common;
import com.dji.sdk.common.PublishOption;
import com.dji.sdk.mqtt.CommonTopicResponse;
import com.dji.sdk.mqtt.MqttGatewayPublish;
import com.dji.sdk.mqtt.TopicConst;
import com.fasterxml.jackson.core.type.TypeReference;
@ -10,6 +12,8 @@ import org.springframework.stereotype.Component; @@ -10,6 +12,8 @@ import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
/**
* @author sean
@ -60,6 +64,8 @@ public class ServicesPublish { @@ -60,6 +64,8 @@ public class ServicesPublish {
public <T> TopicServicesResponse<ServicesReplyData<T>> publish(
TypeReference<T> clazz, String sn, String method, Object data, String bid, int retryCount, long timeout) {
return this.publish(clazz, sn, method,data,ops-> ops.withBizId(bid).timeout((int)(timeout / 1000))).join();
/*
String topic = TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT + Objects.requireNonNull(sn) + TopicConst.SERVICES_SUF;
TopicServicesResponse response = (TopicServicesResponse) gatewayPublish.publishWithReply(
ServicesReplyReceiver.class, topic, new TopicServicesRequest<>()
@ -84,6 +90,34 @@ public class ServicesPublish { @@ -84,6 +90,34 @@ public class ServicesPublish {
reply.setOutput(mapper.convertValue(replyReceiver.getOutput(), clazz));
}
return response.setData(reply);
*/
}
public <T> CompletableFuture<TopicServicesResponse<ServicesReplyData<T>>> publish(TypeReference<T> clazz, String sn, String method, Object data, Consumer<PublishOption> options){
String topic = TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT + Objects.requireNonNull(sn) + TopicConst.SERVICES_SUF;
return gatewayPublish.publishWithReply(ServicesReplyReceiver.class, topic, new TopicServicesRequest<>()
.setTimestamp(System.currentTimeMillis())
.setMethod(method)
.setData(Objects.requireNonNullElse(data, "")), options)
.thenApply(response->(TopicServicesResponse)response)
.thenApply(response->{
ServicesReplyReceiver replyReceiver = (ServicesReplyReceiver) response.getData();
ServicesReplyData<T> reply = new ServicesReplyData<T>().setResult(replyReceiver.getResult());
if (Objects.isNull(clazz)) {
reply.setOutput((T) Objects.requireNonNullElse(
replyReceiver.getOutput(), Objects.requireNonNullElse(replyReceiver.getInfo(), "")));
return response.setData(reply);
}
// put together in "output"
ObjectMapper mapper = Common.getObjectMapper();
if (Objects.nonNull(replyReceiver.getInfo())) {
reply.setOutput(mapper.convertValue(replyReceiver.getInfo(), clazz));
}
if (Objects.nonNull(replyReceiver.getOutput())) {
reply.setOutput(mapper.convertValue(replyReceiver.getOutput(), clazz));
}
return response.setData(reply);
});
}
}

16
src/main/java/com/dji/sdk/mqtt/services/ServicesReplyHandler.java

@ -3,6 +3,7 @@ package com.dji.sdk.mqtt.services; @@ -3,6 +3,7 @@ package com.dji.sdk.mqtt.services;
import com.dji.sdk.cloudapi.log.FileUploadListResponse;
import com.dji.sdk.cloudapi.log.LogMethodEnum;
import com.dji.sdk.common.Common;
import com.dji.sdk.common.PublishBarrier;
import com.dji.sdk.mqtt.Chan;
import com.dji.sdk.mqtt.ChannelName;
import com.fasterxml.jackson.core.type.TypeReference;
@ -10,6 +11,7 @@ import org.springframework.integration.annotation.ServiceActivator; @@ -10,6 +11,7 @@ import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.Objects;
@ -21,6 +23,9 @@ import java.util.Objects; @@ -21,6 +23,9 @@ import java.util.Objects;
@Component
public class ServicesReplyHandler {
@Resource
PublishBarrier barrier;
/**
* Handle the reply message from topic "/services_reply".
* @param message reply message
@ -32,14 +37,19 @@ public class ServicesReplyHandler { @@ -32,14 +37,19 @@ public class ServicesReplyHandler {
TopicServicesResponse<ServicesReplyReceiver> receiver = Common.getObjectMapper()
.readValue(payload, new TypeReference<TopicServicesResponse<ServicesReplyReceiver>>() {});
Chan chan = Chan.getInstance(receiver.getTid(), false);
if (Objects.isNull(chan)) {
//fix: use Barrier instead witcom@2023.09.22
// Chan chan = Chan.getInstance(receiver.getTid(), false);
// if (Objects.isNull(chan)) {
// return;
// }
if(!barrier.hasIdentity(receiver.getTid())){
return;
}
if (LogMethodEnum.FILE_UPLOAD_LIST.getMethod().equals(receiver.getMethod())) {
receiver.getData().setOutput(Common.getObjectMapper().convertValue(receiver.getData(),
new TypeReference<FileUploadListResponse>() {}));
}
chan.put(receiver);
barrier.put(receiver.getTid(), receiver);
//chan.put(receiver);
}
}

Loading…
Cancel
Save