Commit 7159ffb8 by 胡超

sendRocketMqOfDeliveryInfo

parent 986eb351
......@@ -328,6 +328,21 @@
<version>1.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.51</version>
</dependency>
<dependency>
<groupId>tk.mybatis</groupId>
<artifactId>mapper-spring-boot-starter</artifactId>
</dependency>
......
package cn.freemud.amp;
import com.alibaba.fastjson.support.spring.messaging.MappingFastJsonMessageConverter;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
@Configuration
public class RocketMqConfig {
@Value("${producer_group_delivery:order_delivery_producer_group}")
private String PRODUCER_GROUP_DELIVERY;
@Resource
private RocketMQProperties rocketMQProperties;
@Bean(name = "deliveryRocketMqTemplate", destroyMethod = "destroy")
public RocketMQTemplate rocketMQTemplate() {
DefaultMQProducer defaultMQProducer = createMQProducer(PRODUCER_GROUP_DELIVERY);
RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
rocketMQTemplate.setProducer(defaultMQProducer);
rocketMQTemplate.setMessageConverter(new MappingFastJsonMessageConverter());
return rocketMQTemplate;
}
/**
* @return
*/
public DefaultMQProducer createMQProducer(String producerGroupName) {
RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
String nameServer = rocketMQProperties.getNameServer();
String groupName = StringUtils.isEmpty(producerGroupName) ? producerConfig.getGroup() : producerGroupName;
Assert.hasText(nameServer, "rocketmq.name-server must not be null");
Assert.hasText(groupName, "rocketmq.producer.group must not be null");
String accessChannel = rocketMQProperties.getAccessChannel();
String accessKey = rocketMQProperties.getProducer().getAccessKey();
String secretKey = rocketMQProperties.getProducer().getSecretKey();
DefaultMQProducer producer = null;
// 密码模式
if (!StringUtils.isEmpty(accessKey) && !StringUtils.isEmpty(secretKey)) {
producer = new DefaultMQProducer(groupName, new AclClientRPCHook(new SessionCredentials(accessKey, secretKey))
, rocketMQProperties.getProducer().isEnableMsgTrace()
, rocketMQProperties.getProducer().getCustomizedTraceTopic());
producer.setVipChannelEnabled(false);
} else {
producer = new DefaultMQProducer(groupName
, rocketMQProperties.getProducer().isEnableMsgTrace()
, rocketMQProperties.getProducer().getCustomizedTraceTopic());
}
producer.setNamesrvAddr(nameServer);
if (!StringUtils.isEmpty(accessChannel)) {
producer.setAccessChannel(AccessChannel.valueOf(accessChannel));
}
producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout());
producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed());
producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed());
producer.setMaxMessageSize(producerConfig.getMaxMessageSize());
producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold());
producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer());
return producer;
}
}
......@@ -3,23 +3,47 @@ package cn.freemud.amp.service;
import cn.freemud.amp.config.PushOrderConfig;
import cn.freemud.amqp.MQMessage;
import cn.freemud.amqp.MQService;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
* mq发送方
*/
@Slf4j
@Service
public class ProduceMQService {
@Autowired
private MQService mqService;
@Resource
private RocketMQTemplate deliveryRocketMqTemplate;
/**
* 发送配送状态变更信息
*
* @param body
*/
public void sendOfDeliveryInfo(MQMessage body) {
mqService.convertAndSend(PushOrderConfig.EXCHANGE_NAME, PushOrderConfig.OPEN_PLATFORM_ORDER_DELIVERY_ROUTING_KEY, body);
}
/**
* 发送配送信息到rocketmq:
*
* @param body
*/
public void sendRocketMqOfDeliveryInfo(MQMessage body) {
String topic = "order_subsidiary";
String tag = "delivery";
String destination = String.format("%s:%s", topic, tag);
String selectorKey = "orderId";
SendResult sendResult = deliveryRocketMqTemplate.syncSendOrderly(destination, body, selectorKey);
log.info("sendDelivery of status:{}, message:{},", sendResult, JSON.toJSONString(body));
}
}
......@@ -9,3 +9,29 @@ env=dev
apollo.cluster=default
apollo.bootstrap.enabled=true
apollo.bootstrap.namespaces=micro_progeram_commons,order_service
#rocketmq.config
#Name Server地址列表,多个NameServer地址用分号隔开
rocketmq.name-server=10.10.4.18:9876;10.10.4.126:9876;10.10.4.138:9876
rocketmq.client.name=${spring.application.name}
#rocketmq.clientCallbackExecutorThreads=4
#轮询Name Server间隔时间,单位毫秒
rocketmq.pollNameServerInteval=30000
#向Broker发送心跳间隔时间,单位毫秒
rocketmq.heartbeatBrokerInterval=30000
#持久化Consumer消费进度间隔时间,单位毫秒
rocketmq.persistConsumerOffsetInterval=5000
#rocketmq.producer.config
rocketmq.producer.sendMessageTimeout=10000
#如果消息发送失败,最大重试次数,同步发送模式起作用
rocketmq.producer.retryTimesWhenSendFailed=2
#如果消息发送失败,最大重试次数,异步步发送模式起作用
rocketmq.producer.retryTimesWhenSendAsyncFailed=2
rocketmq.producer.retryNextServer=false
rocketmq.producer.accessKey=rocketmquser
rocketmq.producer.secretKey=719H8q8iVx09bETS
#客户端限制的消息大小4MB,超过报错,同时服务端也会限制,所以需要跟服务端配合使用
rocketmq.producer.maxMessageSize=4194304
#压缩消息体的阈值10KB
rocketmq.producer.compressMessageBodyThreshold=10240
rocketmq.producer.enableMsgTrace=true
\ No newline at end of file
package cn.freemud.amp.service;
import cn.freemud.amqp.Header;
import cn.freemud.amqp.MQAction;
import cn.freemud.amqp.MQMessage;
import cn.freemud.entities.dto.delivery.CallbackUrlRequestDto;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.netflix.feign.EnableFeignClients;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.Random;
@RunWith(SpringRunner.class)
@SpringBootTest
@EnableDiscoveryClient
@EnableFeignClients
@EnableAutoConfiguration
@ActiveProfiles("dev")
@Slf4j
public class ProduceMQServiceTest {
@Autowired
private ProduceMQService produceMQService;
@Test
public void sendOfDeliveryInfo() {
MQMessage body = buildMQMessage();
produceMQService.sendOfDeliveryInfo(body);
}
@Test
public void send() {
MQMessage body = buildMQMessage();
produceMQService.sendRocketMqOfDeliveryInfo(body);
}
public MQMessage buildMQMessage() {
MQMessage body = new MQMessage();
Header header = new Header(MQAction.UPDATE.getAction(), "order-application-service", "orderId", "ORDER_DELIVERY");
body.setHeader(header);
String json = "{\n" +
" \"channelCode\":\"MeiTuan\",\n" +
" \"channelDeliveryId\":\"1584868781086019780\",\n" +
" \"channelName\":\"美团\",\n" +
" \"deliveryId\":\"6913353047542579209b\",\n" +
" \"deliveryStatus\":2,\n" +
" \"orderId\":\"17283381807564801000006\",\n" +
" \"partnerId\":\"1864\",\n" +
" \"riderName\":\"宋**\",\n" +
" \"riderPhone\":\"1782****316\",\n" +
" \"storeId\":\"a785e270-0f67-4805-b3db-6e2604b87909\",\n" +
" \"updateTime\":\"2020-03-22 17:20:45\"\n" +
"}";
CallbackUrlRequestDto callbackUrlRequestDto = JSON.parseObject(json, CallbackUrlRequestDto.class);
Random random = new Random(System.currentTimeMillis());
callbackUrlRequestDto.setDeliveryStatus(random.nextInt(8) + 1);
body.setBody(callbackUrlRequestDto);
return body;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment