Commit 5aed9206 by ping.wu

Merge branch 'feature/20221021_小程序外卖单取消订单同步取消配送单_wuping' of…

Merge branch 'feature/20221021_小程序外卖单取消订单同步取消配送单_wuping' of https://gitlab.freemud.com/order-group-application/order-group
parents 1e210723 5894c2c8
...@@ -374,16 +374,16 @@ ...@@ -374,16 +374,16 @@
<!-- <artifactId>assortment-payment-sdk</artifactId>--> <!-- <artifactId>assortment-payment-sdk</artifactId>-->
<!-- <version>2.6.10.RELEASE</version>--> <!-- <version>2.6.10.RELEASE</version>-->
<!-- </dependency>--> <!-- </dependency>-->
<dependency> <!-- <dependency>-->
<groupId>org.apache.rocketmq</groupId> <!-- <groupId>org.apache.rocketmq</groupId>-->
<artifactId>rocketmq-spring-boot-starter</artifactId> <!-- <artifactId>rocketmq-spring-boot-starter</artifactId>-->
<version>2.0.4</version> <!-- <version>2.0.4</version>-->
</dependency> <!-- </dependency>-->
<dependency> <!-- <dependency>-->
<groupId>com.alibaba</groupId> <!-- <groupId>com.alibaba</groupId>-->
<artifactId>fastjson</artifactId> <!-- <artifactId>fastjson</artifactId>-->
<version>1.2.51</version> <!-- <version>1.2.51</version>-->
</dependency> <!-- </dependency>-->
<dependency> <dependency>
<groupId>tk.mybatis</groupId> <groupId>tk.mybatis</groupId>
<artifactId>mapper-spring-boot-starter</artifactId> <artifactId>mapper-spring-boot-starter</artifactId>
......
package cn.freemud.amp.config; //package cn.freemud.amp.config;
//
import cn.freemud.constant.RocketMQConst; //import cn.freemud.constant.RocketMQConst;
import com.alibaba.fastjson.support.spring.messaging.MappingFastJsonMessageConverter; //import com.alibaba.fastjson.support.spring.messaging.MappingFastJsonMessageConverter;
import org.apache.rocketmq.acl.common.AclClientRPCHook; //import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials; //import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel; //import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.producer.DefaultMQProducer; //import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash; //import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
import org.apache.rocketmq.spring.core.RocketMQTemplate; //import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.context.annotation.Bean; //import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; //import org.springframework.context.annotation.Configuration;
import org.springframework.util.Assert; //import org.springframework.util.Assert;
import org.springframework.util.StringUtils; //import org.springframework.util.StringUtils;
//
import javax.annotation.Resource; //import javax.annotation.Resource;
//
@Configuration //@Configuration
public class RocketMQConfig { //public class RocketMQConfig {
//
@Resource // @Resource
private RocketMQProperties rocketMQProperties; // private RocketMQProperties rocketMQProperties;
//
@Bean(name = "deliveryRocketMqTemplate", destroyMethod = "destroy") // @Bean(name = "deliveryRocketMqTemplate", destroyMethod = "destroy")
public RocketMQTemplate rocketMQTemplate() { // public RocketMQTemplate rocketMQTemplate() {
DefaultMQProducer defaultMQProducer = createMQProducer(RocketMQConst.ORDER_THIRD_PUSH_PRODUCER_GROUP); // DefaultMQProducer defaultMQProducer = createMQProducer(RocketMQConst.ORDER_THIRD_PUSH_PRODUCER_GROUP);
RocketMQTemplate rocketMQTemplate = new RocketMQTemplate(); // RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
rocketMQTemplate.setProducer(defaultMQProducer); // rocketMQTemplate.setProducer(defaultMQProducer);
rocketMQTemplate.setMessageConverter(new MappingFastJsonMessageConverter()); // rocketMQTemplate.setMessageConverter(new MappingFastJsonMessageConverter());
rocketMQTemplate.setMessageQueueSelector(new SelectMessageQueueByHash()); // MessageQueueSelector, default SelectMessageQueueByHash // rocketMQTemplate.setMessageQueueSelector(new SelectMessageQueueByHash()); // MessageQueueSelector, default SelectMessageQueueByHash
return rocketMQTemplate; // return rocketMQTemplate;
} // }
//
/** // /**
* @return // * @return
*/ // */
public DefaultMQProducer createMQProducer(String producerGroupName) { // public DefaultMQProducer createMQProducer(String producerGroupName) {
RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer(); // RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
String nameServer = rocketMQProperties.getNameServer(); // String nameServer = rocketMQProperties.getNameServer();
String groupName = StringUtils.isEmpty(producerGroupName) ? producerConfig.getGroup() : producerGroupName; // String groupName = StringUtils.isEmpty(producerGroupName) ? producerConfig.getGroup() : producerGroupName;
Assert.hasText(nameServer, "rocketmq.name-server must not be null"); // Assert.hasText(nameServer, "rocketmq.name-server must not be null");
Assert.hasText(groupName, "rocketmq.producer.group must not be null"); // Assert.hasText(groupName, "rocketmq.producer.group must not be null");
//
String accessChannel = rocketMQProperties.getAccessChannel(); // String accessChannel = rocketMQProperties.getAccessChannel();
String accessKey = rocketMQProperties.getProducer().getAccessKey(); // String accessKey = rocketMQProperties.getProducer().getAccessKey();
String secretKey = rocketMQProperties.getProducer().getSecretKey(); // String secretKey = rocketMQProperties.getProducer().getSecretKey();
//
DefaultMQProducer producer = null; // DefaultMQProducer producer = null;
// 密码模式 // // 密码模式
if (!StringUtils.isEmpty(accessKey) && !StringUtils.isEmpty(secretKey)) { // if (!StringUtils.isEmpty(accessKey) && !StringUtils.isEmpty(secretKey)) {
producer = new DefaultMQProducer(groupName, new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)) // producer = new DefaultMQProducer(groupName, new AclClientRPCHook(new SessionCredentials(accessKey, secretKey))
, rocketMQProperties.getProducer().isEnableMsgTrace() // , rocketMQProperties.getProducer().isEnableMsgTrace()
, rocketMQProperties.getProducer().getCustomizedTraceTopic()); // , rocketMQProperties.getProducer().getCustomizedTraceTopic());
producer.setVipChannelEnabled(false); // producer.setVipChannelEnabled(false);
} else { // } else {
producer = new DefaultMQProducer(groupName // producer = new DefaultMQProducer(groupName
, rocketMQProperties.getProducer().isEnableMsgTrace() // , rocketMQProperties.getProducer().isEnableMsgTrace()
, rocketMQProperties.getProducer().getCustomizedTraceTopic()); // , rocketMQProperties.getProducer().getCustomizedTraceTopic());
} // }
//
producer.setNamesrvAddr(nameServer); // producer.setNamesrvAddr(nameServer);
//
if (!StringUtils.isEmpty(accessChannel)) { // if (!StringUtils.isEmpty(accessChannel)) {
producer.setAccessChannel(AccessChannel.valueOf(accessChannel)); // producer.setAccessChannel(AccessChannel.valueOf(accessChannel));
} // }
//
producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout()); // producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout());
producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed()); // producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed());
producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed()); // producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed());
producer.setMaxMessageSize(producerConfig.getMaxMessageSize()); // producer.setMaxMessageSize(producerConfig.getMaxMessageSize());
producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold()); // producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold());
producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer()); // producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer());
producer.setPollNameServerInterval(rocketMQProperties.getPollNameServerInteval()); // producer.setPollNameServerInterval(rocketMQProperties.getPollNameServerInteval());
producer.setHeartbeatBrokerInterval(rocketMQProperties.getHeartbeatBrokerInterval()); // producer.setHeartbeatBrokerInterval(rocketMQProperties.getHeartbeatBrokerInterval());
producer.setPersistConsumerOffsetInterval(rocketMQProperties.getPersistConsumerOffsetInterval()); // producer.setPersistConsumerOffsetInterval(rocketMQProperties.getPersistConsumerOffsetInterval());
//
return producer; // return producer;
} // }
//
} //}
package cn.freemud.amp.config; //package cn.freemud.amp.config;
//
import org.apache.rocketmq.common.MixAll; //import org.apache.rocketmq.common.MixAll;
import org.springframework.boot.context.properties.ConfigurationProperties; //import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Primary; //import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component; //import org.springframework.stereotype.Component;
//
import java.util.HashMap; //import java.util.HashMap;
import java.util.Map; //import java.util.Map;
//
/** ///**
* {@link org.apache.rocketmq.spring.autoconfigure.RocketMQProperties} // * {@link org.apache.rocketmq.spring.autoconfigure.RocketMQProperties}
*/ // */
@Primary //@Primary
@Component //@Component
@ConfigurationProperties(prefix = "rocketmq") //@ConfigurationProperties(prefix = "rocketmq")
public class RocketMQProperties { //public class RocketMQProperties {
//
/** // /**
* The name server for rocketMQ, formats: `host:port;host:port`. // * The name server for rocketMQ, formats: `host:port;host:port`.
*/ // */
private String nameServer; // private String nameServer;
//
/** // /**
* Enum type for accesChannel, values: LOCAL, CLOUD // * Enum type for accesChannel, values: LOCAL, CLOUD
* {@link org.apache.rocketmq.client.AccessChannel} // * {@link org.apache.rocketmq.client.AccessChannel}
*/ // */
private String accessChannel; // private String accessChannel;
//
/** // /**
* 轮询nameServer间隔时间,默认30S // * 轮询nameServer间隔时间,默认30S
*/ // */
private int pollNameServerInteval = 30 * 1000; // private int pollNameServerInteval = 30 * 1000;
//
/** // /**
* 向broker发送心跳间隔时间,默认30S // * 向broker发送心跳间隔时间,默认30S
*/ // */
private int heartbeatBrokerInterval = 30 * 1000; // private int heartbeatBrokerInterval = 30 * 1000;
/** // /**
* 持久化Consumer进度间隔时间,默认5S // * 持久化Consumer进度间隔时间,默认5S
*/ // */
private int persistConsumerOffsetInterval = 5 * 1000; // private int persistConsumerOffsetInterval = 5 * 1000;
//
//
private Producer producer; // private Producer producer;
//
/** // /**
* Configure enable listener or not. // * Configure enable listener or not.
* In some particular cases, if you don't want the the listener is enabled when container startup, // * In some particular cases, if you don't want the the listener is enabled when container startup,
* the configuration pattern is like this : // * the configuration pattern is like this :
* rocketmq.consumer.listeners.<group-name>.<topic-name>.enabled=<boolean value, true or false> // * rocketmq.consumer.listeners.<group-name>.<topic-name>.enabled=<boolean value, true or false>
* <p> // * <p>
* the listener is enabled by default. // * the listener is enabled by default.
*/ // */
private Consumer consumer = new Consumer(); // private Consumer consumer = new Consumer();
//
public String getNameServer() { // public String getNameServer() {
return nameServer; // return nameServer;
} // }
//
public void setNameServer(String nameServer) { // public void setNameServer(String nameServer) {
this.nameServer = nameServer; // this.nameServer = nameServer;
} // }
//
public String getAccessChannel() { // public String getAccessChannel() {
return accessChannel; // return accessChannel;
} // }
//
public void setAccessChannel(String accessChannel) { // public void setAccessChannel(String accessChannel) {
this.accessChannel = accessChannel; // this.accessChannel = accessChannel;
} // }
//
public int getPollNameServerInteval() { // public int getPollNameServerInteval() {
return pollNameServerInteval; // return pollNameServerInteval;
} // }
//
public void setPollNameServerInteval(int pollNameServerInteval) { // public void setPollNameServerInteval(int pollNameServerInteval) {
this.pollNameServerInteval = pollNameServerInteval; // this.pollNameServerInteval = pollNameServerInteval;
} // }
//
public int getHeartbeatBrokerInterval() { // public int getHeartbeatBrokerInterval() {
return heartbeatBrokerInterval; // return heartbeatBrokerInterval;
} // }
//
public void setHeartbeatBrokerInterval(int heartbeatBrokerInterval) { // public void setHeartbeatBrokerInterval(int heartbeatBrokerInterval) {
this.heartbeatBrokerInterval = heartbeatBrokerInterval; // this.heartbeatBrokerInterval = heartbeatBrokerInterval;
} // }
//
public int getPersistConsumerOffsetInterval() { // public int getPersistConsumerOffsetInterval() {
return persistConsumerOffsetInterval; // return persistConsumerOffsetInterval;
} // }
//
public void setPersistConsumerOffsetInterval(int persistConsumerOffsetInterval) { // public void setPersistConsumerOffsetInterval(int persistConsumerOffsetInterval) {
this.persistConsumerOffsetInterval = persistConsumerOffsetInterval; // this.persistConsumerOffsetInterval = persistConsumerOffsetInterval;
} // }
//
public RocketMQProperties.Producer getProducer() { // public RocketMQProperties.Producer getProducer() {
return producer; // return producer;
} // }
//
public void setProducer(RocketMQProperties.Producer producer) { // public void setProducer(RocketMQProperties.Producer producer) {
this.producer = producer; // this.producer = producer;
} // }
//
public static class Producer { // public static class Producer {
//
/** // /**
* Group name of producer. // * Group name of producer.
*/ // */
private String group; // private String group;
//
/** // /**
* Millis of send message timeout. // * Millis of send message timeout.
*/ // */
private int sendMessageTimeout = 3000; // private int sendMessageTimeout = 3000;
//
/** // /**
* Compress message body threshold, namely, message body larger than 4k will be compressed on default. // * Compress message body threshold, namely, message body larger than 4k will be compressed on default.
*/ // */
private int compressMessageBodyThreshold = 1024 * 4; // private int compressMessageBodyThreshold = 1024 * 4;
//
/** // /**
* Maximum number of retry to perform internally before claiming sending failure in synchronous mode. // * Maximum number of retry to perform internally before claiming sending failure in synchronous mode.
* This may potentially cause message duplication which is up to application developers to resolve. // * This may potentially cause message duplication which is up to application developers to resolve.
*/ // */
private int retryTimesWhenSendFailed = 2; // private int retryTimesWhenSendFailed = 2;
//
/** // /**
* <p> Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. </p> // * <p> Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. </p>
* This may potentially cause message duplication which is up to application developers to resolve. // * This may potentially cause message duplication which is up to application developers to resolve.
*/ // */
private int retryTimesWhenSendAsyncFailed = 2; // private int retryTimesWhenSendAsyncFailed = 2;
//
/** // /**
* Indicate whether to retry another broker on sending failure internally. // * Indicate whether to retry another broker on sending failure internally.
*/ // */
private boolean retryNextServer = false; // private boolean retryNextServer = false;
//
/** // /**
* Maximum allowed message size in bytes. // * Maximum allowed message size in bytes.
*/ // */
private int maxMessageSize = 1024 * 1024 * 4; // private int maxMessageSize = 1024 * 1024 * 4;
//
/** // /**
* The property of "access-key". // * The property of "access-key".
*/ // */
private String accessKey; // private String accessKey;
//
/** // /**
* The property of "secret-key". // * The property of "secret-key".
*/ // */
private String secretKey; // private String secretKey;
//
/** // /**
* Switch flag instance for message trace. // * Switch flag instance for message trace.
*/ // */
private boolean enableMsgTrace = true; // private boolean enableMsgTrace = true;
//
/** // /**
* The name value of message trace topic.If you don't config,you can use the default trace topic name. // * The name value of message trace topic.If you don't config,you can use the default trace topic name.
*/ // */
private String customizedTraceTopic = MixAll.RMQ_SYS_TRACE_TOPIC; // private String customizedTraceTopic = MixAll.RMQ_SYS_TRACE_TOPIC;
//
public String getGroup() { // public String getGroup() {
return group; // return group;
} // }
//
public void setGroup(String group) { // public void setGroup(String group) {
this.group = group; // this.group = group;
} // }
//
public int getSendMessageTimeout() { // public int getSendMessageTimeout() {
return sendMessageTimeout; // return sendMessageTimeout;
} // }
//
public void setSendMessageTimeout(int sendMessageTimeout) { // public void setSendMessageTimeout(int sendMessageTimeout) {
this.sendMessageTimeout = sendMessageTimeout; // this.sendMessageTimeout = sendMessageTimeout;
} // }
//
public int getCompressMessageBodyThreshold() { // public int getCompressMessageBodyThreshold() {
return compressMessageBodyThreshold; // return compressMessageBodyThreshold;
} // }
//
public void setCompressMessageBodyThreshold(int compressMessageBodyThreshold) { // public void setCompressMessageBodyThreshold(int compressMessageBodyThreshold) {
this.compressMessageBodyThreshold = compressMessageBodyThreshold; // this.compressMessageBodyThreshold = compressMessageBodyThreshold;
} // }
//
public int getRetryTimesWhenSendFailed() { // public int getRetryTimesWhenSendFailed() {
return retryTimesWhenSendFailed; // return retryTimesWhenSendFailed;
} // }
//
public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) { // public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) {
this.retryTimesWhenSendFailed = retryTimesWhenSendFailed; // this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
} // }
//
public int getRetryTimesWhenSendAsyncFailed() { // public int getRetryTimesWhenSendAsyncFailed() {
return retryTimesWhenSendAsyncFailed; // return retryTimesWhenSendAsyncFailed;
} // }
//
public void setRetryTimesWhenSendAsyncFailed(int retryTimesWhenSendAsyncFailed) { // public void setRetryTimesWhenSendAsyncFailed(int retryTimesWhenSendAsyncFailed) {
this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed; // this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed;
} // }
//
public boolean isRetryNextServer() { // public boolean isRetryNextServer() {
return retryNextServer; // return retryNextServer;
} // }
//
public void setRetryNextServer(boolean retryNextServer) { // public void setRetryNextServer(boolean retryNextServer) {
this.retryNextServer = retryNextServer; // this.retryNextServer = retryNextServer;
} // }
//
public int getMaxMessageSize() { // public int getMaxMessageSize() {
return maxMessageSize; // return maxMessageSize;
} // }
//
public void setMaxMessageSize(int maxMessageSize) { // public void setMaxMessageSize(int maxMessageSize) {
this.maxMessageSize = maxMessageSize; // this.maxMessageSize = maxMessageSize;
} // }
//
public String getAccessKey() { // public String getAccessKey() {
return accessKey; // return accessKey;
} // }
//
public void setAccessKey(String accessKey) { // public void setAccessKey(String accessKey) {
this.accessKey = accessKey; // this.accessKey = accessKey;
} // }
//
public String getSecretKey() { // public String getSecretKey() {
return secretKey; // return secretKey;
} // }
//
public void setSecretKey(String secretKey) { // public void setSecretKey(String secretKey) {
this.secretKey = secretKey; // this.secretKey = secretKey;
} // }
//
public boolean isEnableMsgTrace() { // public boolean isEnableMsgTrace() {
return enableMsgTrace; // return enableMsgTrace;
} // }
//
public void setEnableMsgTrace(boolean enableMsgTrace) { // public void setEnableMsgTrace(boolean enableMsgTrace) {
this.enableMsgTrace = enableMsgTrace; // this.enableMsgTrace = enableMsgTrace;
} // }
//
public String getCustomizedTraceTopic() { // public String getCustomizedTraceTopic() {
return customizedTraceTopic; // return customizedTraceTopic;
} // }
//
public void setCustomizedTraceTopic(String customizedTraceTopic) { // public void setCustomizedTraceTopic(String customizedTraceTopic) {
this.customizedTraceTopic = customizedTraceTopic; // this.customizedTraceTopic = customizedTraceTopic;
} // }
} // }
//
public Consumer getConsumer() { // public Consumer getConsumer() {
return consumer; // return consumer;
} // }
//
public void setConsumer(Consumer consumer) { // public void setConsumer(Consumer consumer) {
this.consumer = consumer; // this.consumer = consumer;
} // }
//
public static final class Consumer { // public static final class Consumer {
/** // /**
* listener configuration container // * listener configuration container
* the pattern is like this: // * the pattern is like this:
* group1.topic1 = false // * group1.topic1 = false
* group2.topic2 = true // * group2.topic2 = true
* group3.topic3 = false // * group3.topic3 = false
*/ // */
private Map<String, Map<String, Boolean>> listeners = new HashMap<>(); // private Map<String, Map<String, Boolean>> listeners = new HashMap<>();
//
public Map<String, Map<String, Boolean>> getListeners() { // public Map<String, Map<String, Boolean>> getListeners() {
return listeners; // return listeners;
} // }
//
public void setListeners(Map<String, Map<String, Boolean>> listeners) { // public void setListeners(Map<String, Map<String, Boolean>> listeners) {
this.listeners = listeners; // this.listeners = listeners;
} // }
} // }
//
} //}
\ No newline at end of file \ No newline at end of file
...@@ -5,23 +5,12 @@ import cn.freemud.amqp.Header; ...@@ -5,23 +5,12 @@ import cn.freemud.amqp.Header;
import cn.freemud.amqp.MQAction; import cn.freemud.amqp.MQAction;
import cn.freemud.amqp.MQMessage; import cn.freemud.amqp.MQMessage;
import cn.freemud.amqp.MQService; import cn.freemud.amqp.MQService;
import cn.freemud.constant.RocketMQConst;
import cn.freemud.entities.dto.delivery.CallbackUrlRequestDto; import cn.freemud.entities.dto.delivery.CallbackUrlRequestDto;
import com.alibaba.fastjson.JSON;
import com.freemud.application.sdk.api.log.ErrorLog; import com.freemud.application.sdk.api.log.ErrorLog;
import com.google.common.base.Throwables;
import org.apache.commons.lang.BooleanUtils;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/** /**
* mq发送方 * mq发送方
*/ */
...@@ -30,8 +19,8 @@ public class ProduceMQService { ...@@ -30,8 +19,8 @@ public class ProduceMQService {
@Autowired @Autowired
private MQService mqService; private MQService mqService;
@Resource // @Resource
private RocketMQTemplate deliveryRocketMqTemplate; // private RocketMQTemplate deliveryRocketMqTemplate;
@Value("${rocketmq.delivery.enable:false}") @Value("${rocketmq.delivery.enable:false}")
private Boolean rocketMQDeliveryEnable; private Boolean rocketMQDeliveryEnable;
...@@ -54,26 +43,26 @@ public class ProduceMQService { ...@@ -54,26 +43,26 @@ public class ProduceMQService {
* @param deliveryRequest * @param deliveryRequest
* @Description 发送配送信息到rocketMq: * @Description 发送配送信息到rocketMq:
*/ */
public void sendRocketMqOfDeliveryInfo(CallbackUrlRequestDto deliveryRequest) { // public void sendRocketMqOfDeliveryInfo(CallbackUrlRequestDto deliveryRequest) {
//
if (BooleanUtils.isNotTrue(rocketMQDeliveryEnable)) { // if (BooleanUtils.isNotTrue(rocketMQDeliveryEnable)) {
return; // return;
} // }
//
Header header = new Header(MQAction.UPDATE.getAction(), "delivery/callbackUrl", deliveryRequest.getOrderId(), RocketMQConst.ORDER_THIRD_PUSH_TOPIC); // Header header = new Header(MQAction.UPDATE.getAction(), "delivery/callbackUrl", deliveryRequest.getOrderId(), RocketMQConst.ORDER_THIRD_PUSH_TOPIC);
MQMessage<CallbackUrlRequestDto> mqMessage = new MQMessage(header, deliveryRequest); // MQMessage<CallbackUrlRequestDto> mqMessage = new MQMessage(header, deliveryRequest);
String destination = String.format("%s:%s", RocketMQConst.ORDER_THIRD_PUSH_TOPIC, RocketMQConst.OrderSubsidiaryTag.delivery); // String destination = String.format("%s:%s", RocketMQConst.ORDER_THIRD_PUSH_TOPIC, RocketMQConst.OrderSubsidiaryTag.delivery);
Message message = MessageBuilder // Message message = MessageBuilder
.withPayload(mqMessage) // .withPayload(mqMessage)
.setHeader("partnerId", deliveryRequest.getPartnerId()) // .setHeader("partnerId", deliveryRequest.getPartnerId())
.setHeader(MessageConst.PROPERTY_KEYS, deliveryRequest.getDeliveryId()) // .setHeader(MessageConst.PROPERTY_KEYS, deliveryRequest.getDeliveryId())
.build(); // .build();
//
try { // try {
SendResult sendResult = deliveryRocketMqTemplate.syncSendOrderly(destination, message, deliveryRequest.getOrderId()); // SendResult sendResult = deliveryRocketMqTemplate.syncSendOrderly(destination, message, deliveryRequest.getOrderId());
// log.debug("sendDelivery of status:{}, message:{},", sendResult.toString(), JSON.toJSONString(message)); //// log.debug("sendDelivery of status:{}, message:{},", sendResult.toString(), JSON.toJSONString(message));
} catch (Exception e) { // } catch (Exception e) {
ErrorLog.printErrorLog("sendDelivery.error","delivery/callbackUrl",message,e); // ErrorLog.printErrorLog("sendDelivery.error","delivery/callbackUrl",message,e);
} // }
} // }
} }
...@@ -194,7 +194,7 @@ public class ThirdDeliveryServiceImpl implements ThirdDeliveryService { ...@@ -194,7 +194,7 @@ public class ThirdDeliveryServiceImpl implements ThirdDeliveryService {
// 订单配送状态变化时发送到MQ // 订单配送状态变化时发送到MQ
if (DeliveryStatus.checkDeliveryStatueForPlatform(deliveryStatus)) { if (DeliveryStatus.checkDeliveryStatueForPlatform(deliveryStatus)) {
produceMQService.sendOfDeliveryInfo(request); produceMQService.sendOfDeliveryInfo(request);
produceMQService.sendRocketMqOfDeliveryInfo(request); // produceMQService.sendRocketMqOfDeliveryInfo(request);
} }
//配送订阅消息 //配送订阅消息
sendMicroMessage(partnerId, request.getOrderId(), deliveryStatus); sendMicroMessage(partnerId, request.getOrderId(), deliveryStatus);
......
...@@ -37,7 +37,7 @@ public class ProduceMQServiceTest { ...@@ -37,7 +37,7 @@ public class ProduceMQServiceTest {
@Test @Test
public void send() { public void send() {
CallbackUrlRequestDto callbackUrlRequestDto = buildCallbackUrlRequestDto(); CallbackUrlRequestDto callbackUrlRequestDto = buildCallbackUrlRequestDto();
produceMQService.sendRocketMqOfDeliveryInfo(callbackUrlRequestDto); // produceMQService.sendRocketMqOfDeliveryInfo(callbackUrlRequestDto);
} }
public MQMessage buildMQMessage() { public MQMessage buildMQMessage() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment