Commit 73cf2012 by 胡超

RocketMQ topic 、producerGroup、consumerGroup 最大长度32,格式为 ^[%|a-zA-Z0-9_-]

parent 9362076c
...@@ -6,6 +6,7 @@ import org.apache.rocketmq.acl.common.AclClientRPCHook; ...@@ -6,6 +6,7 @@ import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties; import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
...@@ -27,6 +28,7 @@ public class RocketMQConfig { ...@@ -27,6 +28,7 @@ public class RocketMQConfig {
RocketMQTemplate rocketMQTemplate = new RocketMQTemplate(); RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
rocketMQTemplate.setProducer(defaultMQProducer); rocketMQTemplate.setProducer(defaultMQProducer);
rocketMQTemplate.setMessageConverter(new MappingFastJsonMessageConverter()); rocketMQTemplate.setMessageConverter(new MappingFastJsonMessageConverter());
rocketMQTemplate.setMessageQueueSelector(new SelectMessageQueueByHash()); // MessageQueueSelector, default SelectMessageQueueByHash
return rocketMQTemplate; return rocketMQTemplate;
} }
......
...@@ -11,6 +11,7 @@ import com.alibaba.fastjson.JSON; ...@@ -11,6 +11,7 @@ import com.alibaba.fastjson.JSON;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
...@@ -49,18 +50,19 @@ public class ProduceMQService { ...@@ -49,18 +50,19 @@ public class ProduceMQService {
Header header = new Header(MQAction.UPDATE.getAction(), "delivery/callbackUrl", deliveryRequest.getOrderId(), RocketMQConst.ORDER_THIRD_PUSH_TOPIC); Header header = new Header(MQAction.UPDATE.getAction(), "delivery/callbackUrl", deliveryRequest.getOrderId(), RocketMQConst.ORDER_THIRD_PUSH_TOPIC);
MQMessage<CallbackUrlRequestDto> mqMessage = new MQMessage(header, deliveryRequest); MQMessage<CallbackUrlRequestDto> mqMessage = new MQMessage(header, deliveryRequest);
String destination = String.format("%s:%s", RocketMQConst.ORDER_THIRD_PUSH_TOPIC, RocketMQConst.OrderSubsidiaryTag.DELIVERY); String destination = String.format("%s:%s", RocketMQConst.ORDER_THIRD_PUSH_TOPIC, RocketMQConst.OrderSubsidiaryTag.delivery);
Message message = MessageBuilder Message message = MessageBuilder
.withPayload(mqMessage) .withPayload(mqMessage)
.setHeader("partnerId", deliveryRequest.getPartnerId()) .setHeader("partnerId", deliveryRequest.getPartnerId())
.setHeader(MessageConst.PROPERTY_KEYS, deliveryRequest.getDeliveryId())
.build(); .build();
try { try {
SendResult sendResult = deliveryRocketMqTemplate.syncSendOrderly(destination, message, deliveryRequest.getOrderId()); SendResult sendResult = deliveryRocketMqTemplate.syncSendOrderly(destination, message, deliveryRequest.getOrderId());
log.info("sendDelivery of status:{}, message:{},", sendResult.toString(), JSON.toJSONString(message)); log.info("sendDelivery of status:{}, message:{},", sendResult.toString(), JSON.toJSONString(message));
} catch (Exception e) { } catch (Exception e) {
log.error("sendDelivery.error,message:{},cause:{}", JSON.toJSONString(message), Throwables.getStackTraceAsString(e)); log.error("sendDelivery.error, message:{}, cause:{}", JSON.toJSONString(message), Throwables.getStackTraceAsString(e));
} }
} }
} }
...@@ -2,7 +2,7 @@ package cn.freemud.constant; ...@@ -2,7 +2,7 @@ package cn.freemud.constant;
/** /**
* @Description RocketMQ服务配置 * @Description RocketMQ服务配置
* @mark topic 、producerGroup、consumerGroup 最大长度32 * @mark topic 、producerGroup、consumerGroup 最大长度32,格式为 ^[%|a-zA-Z0-9_-]
*/ */
public class RocketMQConst { public class RocketMQConst {
...@@ -13,7 +13,7 @@ public class RocketMQConst { ...@@ -13,7 +13,7 @@ public class RocketMQConst {
public enum OrderSubsidiaryTag { public enum OrderSubsidiaryTag {
DELIVERY, INVOICE; delivery, invoice;
} }
} }
package cn.freemud.controller.test; package cn.freemud.controller.test;
import cn.freemud.amp.service.ProduceMQService;
import cn.freemud.base.entity.BaseResponse; import cn.freemud.base.entity.BaseResponse;
import cn.freemud.entities.dto.delivery.CallbackUrlRequestDto;
import cn.freemud.entities.vo.CheckBeforeCreateOrderRequestVo; import cn.freemud.entities.vo.CheckBeforeCreateOrderRequestVo;
import cn.freemud.monitorcenter.tools.HealthUtil; import cn.freemud.monitorcenter.tools.HealthUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.freemud.application.sdk.api.log.ApiAnnotation; import com.freemud.application.sdk.api.log.ApiAnnotation;
import com.freemud.application.sdk.api.log.LogParams; import com.freemud.application.sdk.api.log.LogParams;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.validation.annotation.Validated; import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/** /**
* All rights Reserved, Designed By www.freemud.com * All rights Reserved, Designed By www.freemud.com
* *
...@@ -58,5 +64,42 @@ public class TestController { ...@@ -58,5 +64,42 @@ public class TestController {
return platformBaseResponse; return platformBaseResponse;
} }
@Resource
ProduceMQService produceMQService;
@ApiAnnotation(logMessage = "test")
@PostMapping("/test")
public PlatformBaseResponse test(String order,String partnerId,String channel) {
CallbackUrlRequestDto deliveryRequest = buildCallbackUrlRequestDto();
deliveryRequest.setOrderId(order);
deliveryRequest.setPartnerId(partnerId);
deliveryRequest.setChannelCode(channel);
produceMQService.sendRocketMqOfDeliveryInfo(deliveryRequest);
PlatformBaseResponse platformBaseResponse = new PlatformBaseResponse();
platformBaseResponse.setResponseBody(JSON.toJSONString(sendResult));
platformBaseResponse.setStatusCode("100");
return platformBaseResponse;
}
public CallbackUrlRequestDto buildCallbackUrlRequestDto() {
String json = "{\n" +
" \"channelCode\":\"MeiTuan\",\n" +
" \"channelDeliveryId\":\"1584868781086019780\",\n" +
" \"channelName\":\"美团\",\n" +
" \"deliveryId\":\"6913353047542579209b\",\n" +
" \"deliveryStatus\":2,\n" +
" \"orderId\":\"17283381807564801000006\",\n" +
" \"partnerId\":\"1864\",\n" +
" \"riderName\":\"宋**\",\n" +
" \"riderPhone\":\"1782****316\",\n" +
" \"storeId\":\"a785e270-0f67-4805-b3db-6e2604b87909\",\n" +
" \"updateTime\":\"2020-03-22 17:20:45\"\n" +
"}";
CallbackUrlRequestDto callbackUrlRequestDto = JSON.parseObject(json, CallbackUrlRequestDto.class);
return callbackUrlRequestDto;
}
} }
...@@ -9,29 +9,3 @@ env=dev ...@@ -9,29 +9,3 @@ env=dev
apollo.cluster=default apollo.cluster=default
apollo.bootstrap.enabled=true apollo.bootstrap.enabled=true
apollo.bootstrap.namespaces=micro_progeram_commons,order_service apollo.bootstrap.namespaces=micro_progeram_commons,order_service
#rocketmq.config
#Name Server地址列表,多个NameServer地址用分号隔开
rocketmq.name-server=10.10.4.18:9876;10.10.4.126:9876;10.10.4.138:9876
rocketmq.client.name=${spring.application.name}
#rocketmq.clientCallbackExecutorThreads=4
#轮询Name Server间隔时间,单位毫秒
rocketmq.pollNameServerInteval=30000
#向Broker发送心跳间隔时间,单位毫秒
rocketmq.heartbeatBrokerInterval=30000
#持久化Consumer消费进度间隔时间,单位毫秒
rocketmq.persistConsumerOffsetInterval=5000
#rocketmq.producer.config
rocketmq.producer.sendMessageTimeout=10000
#如果消息发送失败,最大重试次数,同步发送模式起作用
rocketmq.producer.retryTimesWhenSendFailed=2
#如果消息发送失败,最大重试次数,异步步发送模式起作用
rocketmq.producer.retryTimesWhenSendAsyncFailed=2
rocketmq.producer.retryNextServer=false
rocketmq.producer.accessKey=rocketmquser
rocketmq.producer.secretKey=719H8q8iVx09bETS
#客户端限制的消息大小4MB,超过报错,同时服务端也会限制,所以需要跟服务端配合使用
rocketmq.producer.maxMessageSize=4194304
#压缩消息体的阈值10KB
rocketmq.producer.compressMessageBodyThreshold=10240
rocketmq.producer.enableMsgTrace=true
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment