Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
O
order-group
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
order-group-application
order-group
Commits
5894c2c8
Commit
5894c2c8
authored
Oct 25, 2022
by
ping.wu
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
rocketmq资源回收,代码移除
parent
f4df913b
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
397 additions
and
408 deletions
+397
-408
order-application-service/pom.xml
+10
-10
order-application-service/src/main/java/cn/freemud/amp/config/RocketMQConfig.java
+80
-80
order-application-service/src/main/java/cn/freemud/amp/config/RocketMQProperties.java
+281
-281
order-application-service/src/main/java/cn/freemud/amp/service/ProduceMQService.java
+24
-35
order-application-service/src/main/java/cn/freemud/service/delivery/ThirdDeliveryServiceImpl.java
+1
-1
order-application-service/src/test/java/cn/freemud/amp/service/ProduceMQServiceTest.java
+1
-1
No files found.
order-application-service/pom.xml
View file @
5894c2c8
...
...
@@ -374,16 +374,16 @@
<!-- <artifactId>assortment-payment-sdk</artifactId>-->
<!-- <version>2.6.10.RELEASE</version>-->
<!-- </dependency>-->
<dependency
>
<groupId>
org.apache.rocketmq
</groupId
>
<artifactId>
rocketmq-spring-boot-starter
</artifactId
>
<version>
2.0.4
</version
>
</dependency
>
<dependency
>
<groupId>
com.alibaba
</groupId
>
<artifactId>
fastjson
</artifactId
>
<version>
1.2.51
</version
>
</dependency
>
<!-- <dependency>--
>
<!-- <groupId>org.apache.rocketmq</groupId>--
>
<!-- <artifactId>rocketmq-spring-boot-starter</artifactId>--
>
<!-- <version>2.0.4</version>--
>
<!-- </dependency>--
>
<!-- <dependency>--
>
<!-- <groupId>com.alibaba</groupId>--
>
<!-- <artifactId>fastjson</artifactId>--
>
<!-- <version>1.2.51</version>--
>
<!-- </dependency>--
>
<dependency>
<groupId>
tk.mybatis
</groupId>
<artifactId>
mapper-spring-boot-starter
</artifactId>
...
...
order-application-service/src/main/java/cn/freemud/amp/config/RocketMQConfig.java
View file @
5894c2c8
package
cn
.
freemud
.
amp
.
config
;
import
cn.freemud.constant.RocketMQConst
;
import
com.alibaba.fastjson.support.spring.messaging.MappingFastJsonMessageConverter
;
import
org.apache.rocketmq.acl.common.AclClientRPCHook
;
import
org.apache.rocketmq.acl.common.SessionCredentials
;
import
org.apache.rocketmq.client.AccessChannel
;
import
org.apache.rocketmq.client.producer.DefaultMQProducer
;
import
org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash
;
import
org.apache.rocketmq.spring.core.RocketMQTemplate
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.util.Assert
;
import
org.springframework.util.StringUtils
;
import
javax.annotation.Resource
;
@Configuration
public
class
RocketMQConfig
{
@Resource
private
RocketMQProperties
rocketMQProperties
;
@Bean
(
name
=
"deliveryRocketMqTemplate"
,
destroyMethod
=
"destroy"
)
public
RocketMQTemplate
rocketMQTemplate
()
{
DefaultMQProducer
defaultMQProducer
=
createMQProducer
(
RocketMQConst
.
ORDER_THIRD_PUSH_PRODUCER_GROUP
);
RocketMQTemplate
rocketMQTemplate
=
new
RocketMQTemplate
();
rocketMQTemplate
.
setProducer
(
defaultMQProducer
);
rocketMQTemplate
.
setMessageConverter
(
new
MappingFastJsonMessageConverter
());
rocketMQTemplate
.
setMessageQueueSelector
(
new
SelectMessageQueueByHash
());
// MessageQueueSelector, default SelectMessageQueueByHash
return
rocketMQTemplate
;
}
/**
* @return
*/
public
DefaultMQProducer
createMQProducer
(
String
producerGroupName
)
{
RocketMQProperties
.
Producer
producerConfig
=
rocketMQProperties
.
getProducer
();
String
nameServer
=
rocketMQProperties
.
getNameServer
();
String
groupName
=
StringUtils
.
isEmpty
(
producerGroupName
)
?
producerConfig
.
getGroup
()
:
producerGroupName
;
Assert
.
hasText
(
nameServer
,
"rocketmq.name-server must not be null"
);
Assert
.
hasText
(
groupName
,
"rocketmq.producer.group must not be null"
);
String
accessChannel
=
rocketMQProperties
.
getAccessChannel
();
String
accessKey
=
rocketMQProperties
.
getProducer
().
getAccessKey
();
String
secretKey
=
rocketMQProperties
.
getProducer
().
getSecretKey
();
DefaultMQProducer
producer
=
null
;
// 密码模式
if
(!
StringUtils
.
isEmpty
(
accessKey
)
&&
!
StringUtils
.
isEmpty
(
secretKey
))
{
producer
=
new
DefaultMQProducer
(
groupName
,
new
AclClientRPCHook
(
new
SessionCredentials
(
accessKey
,
secretKey
))
,
rocketMQProperties
.
getProducer
().
isEnableMsgTrace
()
,
rocketMQProperties
.
getProducer
().
getCustomizedTraceTopic
());
producer
.
setVipChannelEnabled
(
false
);
}
else
{
producer
=
new
DefaultMQProducer
(
groupName
,
rocketMQProperties
.
getProducer
().
isEnableMsgTrace
()
,
rocketMQProperties
.
getProducer
().
getCustomizedTraceTopic
());
}
producer
.
setNamesrvAddr
(
nameServer
);
if
(!
StringUtils
.
isEmpty
(
accessChannel
))
{
producer
.
setAccessChannel
(
AccessChannel
.
valueOf
(
accessChannel
));
}
producer
.
setSendMsgTimeout
(
producerConfig
.
getSendMessageTimeout
());
producer
.
setRetryTimesWhenSendFailed
(
producerConfig
.
getRetryTimesWhenSendFailed
());
producer
.
setRetryTimesWhenSendAsyncFailed
(
producerConfig
.
getRetryTimesWhenSendAsyncFailed
());
producer
.
setMaxMessageSize
(
producerConfig
.
getMaxMessageSize
());
producer
.
setCompressMsgBodyOverHowmuch
(
producerConfig
.
getCompressMessageBodyThreshold
());
producer
.
setRetryAnotherBrokerWhenNotStoreOK
(
producerConfig
.
isRetryNextServer
());
producer
.
setPollNameServerInterval
(
rocketMQProperties
.
getPollNameServerInteval
());
producer
.
setHeartbeatBrokerInterval
(
rocketMQProperties
.
getHeartbeatBrokerInterval
());
producer
.
setPersistConsumerOffsetInterval
(
rocketMQProperties
.
getPersistConsumerOffsetInterval
());
return
producer
;
}
}
//
package cn.freemud.amp.config;
//
//
import cn.freemud.constant.RocketMQConst;
//
import com.alibaba.fastjson.support.spring.messaging.MappingFastJsonMessageConverter;
//
import org.apache.rocketmq.acl.common.AclClientRPCHook;
//
import org.apache.rocketmq.acl.common.SessionCredentials;
//
import org.apache.rocketmq.client.AccessChannel;
//
import org.apache.rocketmq.client.producer.DefaultMQProducer;
//
import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
//
import org.apache.rocketmq.spring.core.RocketMQTemplate;
//
import org.springframework.context.annotation.Bean;
//
import org.springframework.context.annotation.Configuration;
//
import org.springframework.util.Assert;
//
import org.springframework.util.StringUtils;
//
//
import javax.annotation.Resource;
//
//
@Configuration
//
public class RocketMQConfig {
//
//
@Resource
//
private RocketMQProperties rocketMQProperties;
//
//
@Bean(name = "deliveryRocketMqTemplate", destroyMethod = "destroy")
//
public RocketMQTemplate rocketMQTemplate() {
//
DefaultMQProducer defaultMQProducer = createMQProducer(RocketMQConst.ORDER_THIRD_PUSH_PRODUCER_GROUP);
//
RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
//
rocketMQTemplate.setProducer(defaultMQProducer);
//
rocketMQTemplate.setMessageConverter(new MappingFastJsonMessageConverter());
//
rocketMQTemplate.setMessageQueueSelector(new SelectMessageQueueByHash()); // MessageQueueSelector, default SelectMessageQueueByHash
//
return rocketMQTemplate;
//
}
//
//
/**
//
* @return
//
*/
//
public DefaultMQProducer createMQProducer(String producerGroupName) {
//
RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
//
String nameServer = rocketMQProperties.getNameServer();
//
String groupName = StringUtils.isEmpty(producerGroupName) ? producerConfig.getGroup() : producerGroupName;
//
Assert.hasText(nameServer, "rocketmq.name-server must not be null");
//
Assert.hasText(groupName, "rocketmq.producer.group must not be null");
//
//
String accessChannel = rocketMQProperties.getAccessChannel();
//
String accessKey = rocketMQProperties.getProducer().getAccessKey();
//
String secretKey = rocketMQProperties.getProducer().getSecretKey();
//
//
DefaultMQProducer producer = null;
//
// 密码模式
//
if (!StringUtils.isEmpty(accessKey) && !StringUtils.isEmpty(secretKey)) {
//
producer = new DefaultMQProducer(groupName, new AclClientRPCHook(new SessionCredentials(accessKey, secretKey))
//
, rocketMQProperties.getProducer().isEnableMsgTrace()
//
, rocketMQProperties.getProducer().getCustomizedTraceTopic());
//
producer.setVipChannelEnabled(false);
//
} else {
//
producer = new DefaultMQProducer(groupName
//
, rocketMQProperties.getProducer().isEnableMsgTrace()
//
, rocketMQProperties.getProducer().getCustomizedTraceTopic());
//
}
//
//
producer.setNamesrvAddr(nameServer);
//
//
if (!StringUtils.isEmpty(accessChannel)) {
//
producer.setAccessChannel(AccessChannel.valueOf(accessChannel));
//
}
//
//
producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout());
//
producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed());
//
producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed());
//
producer.setMaxMessageSize(producerConfig.getMaxMessageSize());
//
producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold());
//
producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer());
//
producer.setPollNameServerInterval(rocketMQProperties.getPollNameServerInteval());
//
producer.setHeartbeatBrokerInterval(rocketMQProperties.getHeartbeatBrokerInterval());
//
producer.setPersistConsumerOffsetInterval(rocketMQProperties.getPersistConsumerOffsetInterval());
//
//
return producer;
//
}
//
//
}
order-application-service/src/main/java/cn/freemud/amp/config/RocketMQProperties.java
View file @
5894c2c8
package
cn
.
freemud
.
amp
.
config
;
import
org.apache.rocketmq.common.MixAll
;
import
org.springframework.boot.context.properties.ConfigurationProperties
;
import
org.springframework.context.annotation.Primary
;
import
org.springframework.stereotype.Component
;
import
java.util.HashMap
;
import
java.util.Map
;
/**
* {@link org.apache.rocketmq.spring.autoconfigure.RocketMQProperties}
*/
@Primary
@Component
@ConfigurationProperties
(
prefix
=
"rocketmq"
)
public
class
RocketMQProperties
{
/**
* The name server for rocketMQ, formats: `host:port;host:port`.
*/
private
String
nameServer
;
/**
* Enum type for accesChannel, values: LOCAL, CLOUD
* {@link org.apache.rocketmq.client.AccessChannel}
*/
private
String
accessChannel
;
/**
* 轮询nameServer间隔时间,默认30S
*/
private
int
pollNameServerInteval
=
30
*
1000
;
/**
* 向broker发送心跳间隔时间,默认30S
*/
private
int
heartbeatBrokerInterval
=
30
*
1000
;
/**
* 持久化Consumer进度间隔时间,默认5S
*/
private
int
persistConsumerOffsetInterval
=
5
*
1000
;
private
Producer
producer
;
/**
* Configure enable listener or not.
* In some particular cases, if you don't want the the listener is enabled when container startup,
* the configuration pattern is like this :
* rocketmq.consumer.listeners.<group-name>.<topic-name>.enabled=<boolean value, true or false>
* <p>
* the listener is enabled by default.
*/
private
Consumer
consumer
=
new
Consumer
();
public
String
getNameServer
()
{
return
nameServer
;
}
public
void
setNameServer
(
String
nameServer
)
{
this
.
nameServer
=
nameServer
;
}
public
String
getAccessChannel
()
{
return
accessChannel
;
}
public
void
setAccessChannel
(
String
accessChannel
)
{
this
.
accessChannel
=
accessChannel
;
}
public
int
getPollNameServerInteval
()
{
return
pollNameServerInteval
;
}
public
void
setPollNameServerInteval
(
int
pollNameServerInteval
)
{
this
.
pollNameServerInteval
=
pollNameServerInteval
;
}
public
int
getHeartbeatBrokerInterval
()
{
return
heartbeatBrokerInterval
;
}
public
void
setHeartbeatBrokerInterval
(
int
heartbeatBrokerInterval
)
{
this
.
heartbeatBrokerInterval
=
heartbeatBrokerInterval
;
}
public
int
getPersistConsumerOffsetInterval
()
{
return
persistConsumerOffsetInterval
;
}
public
void
setPersistConsumerOffsetInterval
(
int
persistConsumerOffsetInterval
)
{
this
.
persistConsumerOffsetInterval
=
persistConsumerOffsetInterval
;
}
public
RocketMQProperties
.
Producer
getProducer
()
{
return
producer
;
}
public
void
setProducer
(
RocketMQProperties
.
Producer
producer
)
{
this
.
producer
=
producer
;
}
public
static
class
Producer
{
/**
* Group name of producer.
*/
private
String
group
;
/**
* Millis of send message timeout.
*/
private
int
sendMessageTimeout
=
3000
;
/**
* Compress message body threshold, namely, message body larger than 4k will be compressed on default.
*/
private
int
compressMessageBodyThreshold
=
1024
*
4
;
/**
* Maximum number of retry to perform internally before claiming sending failure in synchronous mode.
* This may potentially cause message duplication which is up to application developers to resolve.
*/
private
int
retryTimesWhenSendFailed
=
2
;
/**
* <p> Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. </p>
* This may potentially cause message duplication which is up to application developers to resolve.
*/
private
int
retryTimesWhenSendAsyncFailed
=
2
;
/**
* Indicate whether to retry another broker on sending failure internally.
*/
private
boolean
retryNextServer
=
false
;
/**
* Maximum allowed message size in bytes.
*/
private
int
maxMessageSize
=
1024
*
1024
*
4
;
/**
* The property of "access-key".
*/
private
String
accessKey
;
/**
* The property of "secret-key".
*/
private
String
secretKey
;
/**
* Switch flag instance for message trace.
*/
private
boolean
enableMsgTrace
=
true
;
/**
* The name value of message trace topic.If you don't config,you can use the default trace topic name.
*/
private
String
customizedTraceTopic
=
MixAll
.
RMQ_SYS_TRACE_TOPIC
;
public
String
getGroup
()
{
return
group
;
}
public
void
setGroup
(
String
group
)
{
this
.
group
=
group
;
}
public
int
getSendMessageTimeout
()
{
return
sendMessageTimeout
;
}
public
void
setSendMessageTimeout
(
int
sendMessageTimeout
)
{
this
.
sendMessageTimeout
=
sendMessageTimeout
;
}
public
int
getCompressMessageBodyThreshold
()
{
return
compressMessageBodyThreshold
;
}
public
void
setCompressMessageBodyThreshold
(
int
compressMessageBodyThreshold
)
{
this
.
compressMessageBodyThreshold
=
compressMessageBodyThreshold
;
}
public
int
getRetryTimesWhenSendFailed
()
{
return
retryTimesWhenSendFailed
;
}
public
void
setRetryTimesWhenSendFailed
(
int
retryTimesWhenSendFailed
)
{
this
.
retryTimesWhenSendFailed
=
retryTimesWhenSendFailed
;
}
public
int
getRetryTimesWhenSendAsyncFailed
()
{
return
retryTimesWhenSendAsyncFailed
;
}
public
void
setRetryTimesWhenSendAsyncFailed
(
int
retryTimesWhenSendAsyncFailed
)
{
this
.
retryTimesWhenSendAsyncFailed
=
retryTimesWhenSendAsyncFailed
;
}
public
boolean
isRetryNextServer
()
{
return
retryNextServer
;
}
public
void
setRetryNextServer
(
boolean
retryNextServer
)
{
this
.
retryNextServer
=
retryNextServer
;
}
public
int
getMaxMessageSize
()
{
return
maxMessageSize
;
}
public
void
setMaxMessageSize
(
int
maxMessageSize
)
{
this
.
maxMessageSize
=
maxMessageSize
;
}
public
String
getAccessKey
()
{
return
accessKey
;
}
public
void
setAccessKey
(
String
accessKey
)
{
this
.
accessKey
=
accessKey
;
}
public
String
getSecretKey
()
{
return
secretKey
;
}
public
void
setSecretKey
(
String
secretKey
)
{
this
.
secretKey
=
secretKey
;
}
public
boolean
isEnableMsgTrace
()
{
return
enableMsgTrace
;
}
public
void
setEnableMsgTrace
(
boolean
enableMsgTrace
)
{
this
.
enableMsgTrace
=
enableMsgTrace
;
}
public
String
getCustomizedTraceTopic
()
{
return
customizedTraceTopic
;
}
public
void
setCustomizedTraceTopic
(
String
customizedTraceTopic
)
{
this
.
customizedTraceTopic
=
customizedTraceTopic
;
}
}
public
Consumer
getConsumer
()
{
return
consumer
;
}
public
void
setConsumer
(
Consumer
consumer
)
{
this
.
consumer
=
consumer
;
}
public
static
final
class
Consumer
{
/**
* listener configuration container
* the pattern is like this:
* group1.topic1 = false
* group2.topic2 = true
* group3.topic3 = false
*/
private
Map
<
String
,
Map
<
String
,
Boolean
>>
listeners
=
new
HashMap
<>();
public
Map
<
String
,
Map
<
String
,
Boolean
>>
getListeners
()
{
return
listeners
;
}
public
void
setListeners
(
Map
<
String
,
Map
<
String
,
Boolean
>>
listeners
)
{
this
.
listeners
=
listeners
;
}
}
}
\ No newline at end of file
//package cn.freemud.amp.config;
//
//import org.apache.rocketmq.common.MixAll;
//import org.springframework.boot.context.properties.ConfigurationProperties;
//import org.springframework.context.annotation.Primary;
//import org.springframework.stereotype.Component;
//
//import java.util.HashMap;
//import java.util.Map;
//
///**
// * {@link org.apache.rocketmq.spring.autoconfigure.RocketMQProperties}
// */
//@Primary
//@Component
//@ConfigurationProperties(prefix = "rocketmq")
//public class RocketMQProperties {
//
// /**
// * The name server for rocketMQ, formats: `host:port;host:port`.
// */
// private String nameServer;
//
// /**
// * Enum type for accesChannel, values: LOCAL, CLOUD
// * {@link org.apache.rocketmq.client.AccessChannel}
// */
// private String accessChannel;
//
// /**
// * 轮询nameServer间隔时间,默认30S
// */
// private int pollNameServerInteval = 30 * 1000;
//
// /**
// * 向broker发送心跳间隔时间,默认30S
// */
// private int heartbeatBrokerInterval = 30 * 1000;
// /**
// * 持久化Consumer进度间隔时间,默认5S
// */
// private int persistConsumerOffsetInterval = 5 * 1000;
//
//
// private Producer producer;
//
// /**
// * Configure enable listener or not.
// * In some particular cases, if you don't want the the listener is enabled when container startup,
// * the configuration pattern is like this :
// * rocketmq.consumer.listeners.<group-name>.<topic-name>.enabled=<boolean value, true or false>
// * <p>
// * the listener is enabled by default.
// */
// private Consumer consumer = new Consumer();
//
// public String getNameServer() {
// return nameServer;
// }
//
// public void setNameServer(String nameServer) {
// this.nameServer = nameServer;
// }
//
// public String getAccessChannel() {
// return accessChannel;
// }
//
// public void setAccessChannel(String accessChannel) {
// this.accessChannel = accessChannel;
// }
//
// public int getPollNameServerInteval() {
// return pollNameServerInteval;
// }
//
// public void setPollNameServerInteval(int pollNameServerInteval) {
// this.pollNameServerInteval = pollNameServerInteval;
// }
//
// public int getHeartbeatBrokerInterval() {
// return heartbeatBrokerInterval;
// }
//
// public void setHeartbeatBrokerInterval(int heartbeatBrokerInterval) {
// this.heartbeatBrokerInterval = heartbeatBrokerInterval;
// }
//
// public int getPersistConsumerOffsetInterval() {
// return persistConsumerOffsetInterval;
// }
//
// public void setPersistConsumerOffsetInterval(int persistConsumerOffsetInterval) {
// this.persistConsumerOffsetInterval = persistConsumerOffsetInterval;
// }
//
// public RocketMQProperties.Producer getProducer() {
// return producer;
// }
//
// public void setProducer(RocketMQProperties.Producer producer) {
// this.producer = producer;
// }
//
// public static class Producer {
//
// /**
// * Group name of producer.
// */
// private String group;
//
// /**
// * Millis of send message timeout.
// */
// private int sendMessageTimeout = 3000;
//
// /**
// * Compress message body threshold, namely, message body larger than 4k will be compressed on default.
// */
// private int compressMessageBodyThreshold = 1024 * 4;
//
// /**
// * Maximum number of retry to perform internally before claiming sending failure in synchronous mode.
// * This may potentially cause message duplication which is up to application developers to resolve.
// */
// private int retryTimesWhenSendFailed = 2;
//
// /**
// * <p> Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. </p>
// * This may potentially cause message duplication which is up to application developers to resolve.
// */
// private int retryTimesWhenSendAsyncFailed = 2;
//
// /**
// * Indicate whether to retry another broker on sending failure internally.
// */
// private boolean retryNextServer = false;
//
// /**
// * Maximum allowed message size in bytes.
// */
// private int maxMessageSize = 1024 * 1024 * 4;
//
// /**
// * The property of "access-key".
// */
// private String accessKey;
//
// /**
// * The property of "secret-key".
// */
// private String secretKey;
//
// /**
// * Switch flag instance for message trace.
// */
// private boolean enableMsgTrace = true;
//
// /**
// * The name value of message trace topic.If you don't config,you can use the default trace topic name.
// */
// private String customizedTraceTopic = MixAll.RMQ_SYS_TRACE_TOPIC;
//
// public String getGroup() {
// return group;
// }
//
// public void setGroup(String group) {
// this.group = group;
// }
//
// public int getSendMessageTimeout() {
// return sendMessageTimeout;
// }
//
// public void setSendMessageTimeout(int sendMessageTimeout) {
// this.sendMessageTimeout = sendMessageTimeout;
// }
//
// public int getCompressMessageBodyThreshold() {
// return compressMessageBodyThreshold;
// }
//
// public void setCompressMessageBodyThreshold(int compressMessageBodyThreshold) {
// this.compressMessageBodyThreshold = compressMessageBodyThreshold;
// }
//
// public int getRetryTimesWhenSendFailed() {
// return retryTimesWhenSendFailed;
// }
//
// public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) {
// this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
// }
//
// public int getRetryTimesWhenSendAsyncFailed() {
// return retryTimesWhenSendAsyncFailed;
// }
//
// public void setRetryTimesWhenSendAsyncFailed(int retryTimesWhenSendAsyncFailed) {
// this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed;
// }
//
// public boolean isRetryNextServer() {
// return retryNextServer;
// }
//
// public void setRetryNextServer(boolean retryNextServer) {
// this.retryNextServer = retryNextServer;
// }
//
// public int getMaxMessageSize() {
// return maxMessageSize;
// }
//
// public void setMaxMessageSize(int maxMessageSize) {
// this.maxMessageSize = maxMessageSize;
// }
//
// public String getAccessKey() {
// return accessKey;
// }
//
// public void setAccessKey(String accessKey) {
// this.accessKey = accessKey;
// }
//
// public String getSecretKey() {
// return secretKey;
// }
//
// public void setSecretKey(String secretKey) {
// this.secretKey = secretKey;
// }
//
// public boolean isEnableMsgTrace() {
// return enableMsgTrace;
// }
//
// public void setEnableMsgTrace(boolean enableMsgTrace) {
// this.enableMsgTrace = enableMsgTrace;
// }
//
// public String getCustomizedTraceTopic() {
// return customizedTraceTopic;
// }
//
// public void setCustomizedTraceTopic(String customizedTraceTopic) {
// this.customizedTraceTopic = customizedTraceTopic;
// }
// }
//
// public Consumer getConsumer() {
// return consumer;
// }
//
// public void setConsumer(Consumer consumer) {
// this.consumer = consumer;
// }
//
// public static final class Consumer {
// /**
// * listener configuration container
// * the pattern is like this:
// * group1.topic1 = false
// * group2.topic2 = true
// * group3.topic3 = false
// */
// private Map<String, Map<String, Boolean>> listeners = new HashMap<>();
//
// public Map<String, Map<String, Boolean>> getListeners() {
// return listeners;
// }
//
// public void setListeners(Map<String, Map<String, Boolean>> listeners) {
// this.listeners = listeners;
// }
// }
//
//}
\ No newline at end of file
order-application-service/src/main/java/cn/freemud/amp/service/ProduceMQService.java
View file @
5894c2c8
...
...
@@ -5,23 +5,12 @@ import cn.freemud.amqp.Header;
import
cn.freemud.amqp.MQAction
;
import
cn.freemud.amqp.MQMessage
;
import
cn.freemud.amqp.MQService
;
import
cn.freemud.constant.RocketMQConst
;
import
cn.freemud.entities.dto.delivery.CallbackUrlRequestDto
;
import
com.alibaba.fastjson.JSON
;
import
com.freemud.application.sdk.api.log.ErrorLog
;
import
com.google.common.base.Throwables
;
import
org.apache.commons.lang.BooleanUtils
;
import
org.apache.rocketmq.client.producer.SendResult
;
import
org.apache.rocketmq.common.message.MessageConst
;
import
org.apache.rocketmq.spring.core.RocketMQTemplate
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Value
;
import
org.springframework.messaging.Message
;
import
org.springframework.messaging.support.MessageBuilder
;
import
org.springframework.stereotype.Service
;
import
javax.annotation.Resource
;
/**
* mq发送方
*/
...
...
@@ -30,8 +19,8 @@ public class ProduceMQService {
@Autowired
private
MQService
mqService
;
@Resource
private
RocketMQTemplate
deliveryRocketMqTemplate
;
//
@Resource
//
private RocketMQTemplate deliveryRocketMqTemplate;
@Value
(
"${rocketmq.delivery.enable:false}"
)
private
Boolean
rocketMQDeliveryEnable
;
...
...
@@ -54,26 +43,26 @@ public class ProduceMQService {
* @param deliveryRequest
* @Description 发送配送信息到rocketMq:
*/
public
void
sendRocketMqOfDeliveryInfo
(
CallbackUrlRequestDto
deliveryRequest
)
{
if
(
BooleanUtils
.
isNotTrue
(
rocketMQDeliveryEnable
))
{
return
;
}
Header
header
=
new
Header
(
MQAction
.
UPDATE
.
getAction
(),
"delivery/callbackUrl"
,
deliveryRequest
.
getOrderId
(),
RocketMQConst
.
ORDER_THIRD_PUSH_TOPIC
);
MQMessage
<
CallbackUrlRequestDto
>
mqMessage
=
new
MQMessage
(
header
,
deliveryRequest
);
String
destination
=
String
.
format
(
"%s:%s"
,
RocketMQConst
.
ORDER_THIRD_PUSH_TOPIC
,
RocketMQConst
.
OrderSubsidiaryTag
.
delivery
);
Message
message
=
MessageBuilder
.
withPayload
(
mqMessage
)
.
setHeader
(
"partnerId"
,
deliveryRequest
.
getPartnerId
())
.
setHeader
(
MessageConst
.
PROPERTY_KEYS
,
deliveryRequest
.
getDeliveryId
())
.
build
();
try
{
SendResult
sendResult
=
deliveryRocketMqTemplate
.
syncSendOrderly
(
destination
,
message
,
deliveryRequest
.
getOrderId
());
// log.debug("sendDelivery of status:{}, message:{},", sendResult.toString(), JSON.toJSONString(message));
}
catch
(
Exception
e
)
{
ErrorLog
.
printErrorLog
(
"sendDelivery.error"
,
"delivery/callbackUrl"
,
message
,
e
);
}
}
//
public void sendRocketMqOfDeliveryInfo(CallbackUrlRequestDto deliveryRequest) {
//
//
if (BooleanUtils.isNotTrue(rocketMQDeliveryEnable)) {
//
return;
//
}
//
//
Header header = new Header(MQAction.UPDATE.getAction(), "delivery/callbackUrl", deliveryRequest.getOrderId(), RocketMQConst.ORDER_THIRD_PUSH_TOPIC);
//
MQMessage<CallbackUrlRequestDto> mqMessage = new MQMessage(header, deliveryRequest);
//
String destination = String.format("%s:%s", RocketMQConst.ORDER_THIRD_PUSH_TOPIC, RocketMQConst.OrderSubsidiaryTag.delivery);
//
Message message = MessageBuilder
//
.withPayload(mqMessage)
//
.setHeader("partnerId", deliveryRequest.getPartnerId())
//
.setHeader(MessageConst.PROPERTY_KEYS, deliveryRequest.getDeliveryId())
//
.build();
//
//
try {
//
SendResult sendResult = deliveryRocketMqTemplate.syncSendOrderly(destination, message, deliveryRequest.getOrderId());
//
//
log.debug("sendDelivery of status:{}, message:{},", sendResult.toString(), JSON.toJSONString(message));
//
} catch (Exception e) {
//
ErrorLog.printErrorLog("sendDelivery.error","delivery/callbackUrl",message,e);
//
}
//
}
}
order-application-service/src/main/java/cn/freemud/service/delivery/ThirdDeliveryServiceImpl.java
View file @
5894c2c8
...
...
@@ -194,7 +194,7 @@ public class ThirdDeliveryServiceImpl implements ThirdDeliveryService {
// 订单配送状态变化时发送到MQ
if
(
DeliveryStatus
.
checkDeliveryStatueForPlatform
(
deliveryStatus
))
{
produceMQService
.
sendOfDeliveryInfo
(
request
);
produceMQService
.
sendRocketMqOfDeliveryInfo
(
request
);
//
produceMQService.sendRocketMqOfDeliveryInfo(request);
}
//配送订阅消息
sendMicroMessage
(
partnerId
,
request
.
getOrderId
(),
deliveryStatus
);
...
...
order-application-service/src/test/java/cn/freemud/amp/service/ProduceMQServiceTest.java
View file @
5894c2c8
...
...
@@ -37,7 +37,7 @@ public class ProduceMQServiceTest {
@Test
public
void
send
()
{
CallbackUrlRequestDto
callbackUrlRequestDto
=
buildCallbackUrlRequestDto
();
produceMQService
.
sendRocketMqOfDeliveryInfo
(
callbackUrlRequestDto
);
//
produceMQService.sendRocketMqOfDeliveryInfo(callbackUrlRequestDto);
}
public
MQMessage
buildMQMessage
()
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment