Commit 9a334b5f by 胡超

配送通知rocketmq

parent fbc288ca
...@@ -16,7 +16,7 @@ import org.springframework.util.StringUtils; ...@@ -16,7 +16,7 @@ import org.springframework.util.StringUtils;
import javax.annotation.Resource; import javax.annotation.Resource;
@Configuration @Configuration
public class RocketMqConfig { public class RocketMQConfig {
@Value("${producer_group_delivery:order_delivery_producer_group}") @Value("${producer_group_delivery:order_delivery_producer_group}")
private String PRODUCER_GROUP_DELIVERY; private String PRODUCER_GROUP_DELIVERY;
......
package cn.freemud.amp.service; package cn.freemud.amp.service;
import cn.freemud.amp.config.PushOrderConfig; import cn.freemud.amp.config.PushOrderConfig;
import cn.freemud.amqp.Header;
import cn.freemud.amqp.MQAction;
import cn.freemud.amqp.MQMessage; import cn.freemud.amqp.MQMessage;
import cn.freemud.amqp.MQService; import cn.freemud.amqp.MQService;
import cn.freemud.constant.RocketMQConst;
import cn.freemud.entities.dto.delivery.CallbackUrlRequestDto;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.google.common.base.Throwables;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.Resource; import javax.annotation.Resource;
...@@ -25,25 +32,35 @@ public class ProduceMQService { ...@@ -25,25 +32,35 @@ public class ProduceMQService {
private RocketMQTemplate deliveryRocketMqTemplate; private RocketMQTemplate deliveryRocketMqTemplate;
/** /**
* 发送配送状态变更信息 * @Description 发现配送信息到rabbitMQ
* * @param deliveryRequest
* @param body
*/ */
public void sendOfDeliveryInfo(MQMessage body) { public void sendOfDeliveryInfo(CallbackUrlRequestDto deliveryRequest) {
mqService.convertAndSend(PushOrderConfig.EXCHANGE_NAME, PushOrderConfig.OPEN_PLATFORM_ORDER_DELIVERY_ROUTING_KEY, body); Header header = new Header(MQAction.UPDATE.getAction(), "delivery/callbackUrl", deliveryRequest.getOrderId(), "open-platform-order-delivery-queue");
MQMessage<CallbackUrlRequestDto> mqMessage = new MQMessage<>(header, deliveryRequest);
mqService.convertAndSend(PushOrderConfig.EXCHANGE_NAME, PushOrderConfig.OPEN_PLATFORM_ORDER_DELIVERY_ROUTING_KEY, mqMessage);
} }
/** /**
* 发送配送信息到rocketmq: * @Description 发送配送信息到rocketMq:
* * @param deliveryRequest
* @param body
*/ */
public void sendRocketMqOfDeliveryInfo(MQMessage body) { public void sendRocketMqOfDeliveryInfo(CallbackUrlRequestDto deliveryRequest) {
String topic = "order_subsidiary"; Header header = new Header(MQAction.UPDATE.getAction(), "delivery/callbackUrl", deliveryRequest.getOrderId(), RocketMQConst.ORDER_DELIVERY_TOPIC);
String tag = "delivery"; MQMessage<CallbackUrlRequestDto> mqMessage = new MQMessage(header, deliveryRequest);
String destination = String.format("%s:%s", topic, tag);
String selectorKey = "orderId"; String destination = String.format("%s:%s", RocketMQConst.ORDER_DELIVERY_TOPIC, RocketMQConst.OrderSubsidiary.DELIVERY);
SendResult sendResult = deliveryRocketMqTemplate.syncSendOrderly(destination, body, selectorKey);
log.info("sendDelivery of status:{}, message:{},", sendResult, JSON.toJSONString(body)); Message message = MessageBuilder
.withPayload(mqMessage)
.setHeader("partnerId", deliveryRequest.getPartnerId())
.build();
try {
SendResult sendResult = deliveryRocketMqTemplate.syncSendOrderly(destination, message, deliveryRequest.getOrderId());
log.info("sendDelivery of status:{}, message:{},", sendResult.toString(), JSON.toJSONString(message));
} catch (Exception e) {
log.error("sendDelivery.error,message:{},cause:{}", JSON.toJSONString(message), Throwables.getStackTraceAsString(e));
}
} }
} }
package cn.freemud.constant;
/**
* @Description服务配置
* @mark topic 、producerGroup、consumerGroup 最大长度32
*/
public class RocketMQConst {
// 配送Topic
public static final String ORDER_DELIVERY_TOPIC = "order.delivery.topic";
// 配送生产者名称
public static final String ORDER_DELIVERY_PRODUCER_GROUP = "order.delivery.producer";
// 订单附属业务推送开放平台topic
public static final String ORDER_SUBSIDIARY_TOPIC = "order.subsidiary.topic";
// 订单附属业务推送开放平台生产者名称
public static final String ORDER_SUBSIDIARY_PRODUCER_GROUP = "order.subsidiary.producer";
// 订单原始消息topic
public static final String ORDER_ORIGIN_TOPIC = "order.origin.topic";
// 订单原始消息生产者名称
public static final String ORDER_ORIGIN_PRODUCER_GROUP = "order.origin.producer";
public enum OrderSubsidiary {
DELIVERY, INVOICE;
}
}
...@@ -64,6 +64,11 @@ public enum DeliveryStatus { ...@@ -64,6 +64,11 @@ public enum DeliveryStatus {
return null; return null;
} }
/**
* @Description: 校验配送单状态是否需要推送给开放平台
* @param status
* @return
*/
public static boolean checkDeliveryStatueForPlatform(int status) { public static boolean checkDeliveryStatueForPlatform(int status) {
DeliveryStatus deliveryStatus = DeliveryStatus.getDeliveryStatusByCode(status); DeliveryStatus deliveryStatus = DeliveryStatus.getDeliveryStatusByCode(status);
if (null == deliveryStatus) return false; if (null == deliveryStatus) return false;
......
...@@ -131,9 +131,7 @@ public class ThirdDeliveryServiceImpl implements ThirdDeliveryService { ...@@ -131,9 +131,7 @@ public class ThirdDeliveryServiceImpl implements ThirdDeliveryService {
// 订单配送状态变化时发送到MQ // 订单配送状态变化时发送到MQ
if (DeliveryStatus.checkDeliveryStatueForPlatform(deliveryStatus)) { if (DeliveryStatus.checkDeliveryStatueForPlatform(deliveryStatus)) {
Header header = new Header(MQAction.UPDATE.getAction(), "delivery/callbackUrl", request.getOrderId(), "open-platform-order-delivery-queue"); produceMQService.sendRocketMqOfDeliveryInfo(request);
MQMessage<CallbackUrlRequestDto> message = new MQMessage<>(header, request);
produceMQService.sendOfDeliveryInfo(message);
} }
// else { // else {
// return ResponseUtil.error(ResponseResult.SYSTEM_ERROR.getCode(), "订单回调状态有误"); // return ResponseUtil.error(ResponseResult.SYSTEM_ERROR.getCode(), "订单回调状态有误");
......
...@@ -32,14 +32,14 @@ public class ProduceMQServiceTest { ...@@ -32,14 +32,14 @@ public class ProduceMQServiceTest {
@Test @Test
public void sendOfDeliveryInfo() { public void sendOfDeliveryInfo() {
MQMessage body = buildMQMessage(); CallbackUrlRequestDto callbackUrlRequestDto = buildCallbackUrlRequestDto();
produceMQService.sendOfDeliveryInfo(body); produceMQService.sendOfDeliveryInfo(callbackUrlRequestDto);
} }
@Test @Test
public void send() { public void send() {
MQMessage body = buildMQMessage(); CallbackUrlRequestDto callbackUrlRequestDto = buildCallbackUrlRequestDto();
produceMQService.sendRocketMqOfDeliveryInfo(body); produceMQService.sendRocketMqOfDeliveryInfo(callbackUrlRequestDto);
} }
public MQMessage buildMQMessage() { public MQMessage buildMQMessage() {
...@@ -47,7 +47,15 @@ public class ProduceMQServiceTest { ...@@ -47,7 +47,15 @@ public class ProduceMQServiceTest {
Header header = new Header(MQAction.UPDATE.getAction(), "order-application-service", "orderId", "ORDER_DELIVERY"); Header header = new Header(MQAction.UPDATE.getAction(), "order-application-service", "orderId", "ORDER_DELIVERY");
body.setHeader(header); body.setHeader(header);
CallbackUrlRequestDto callbackUrlRequestDto = buildCallbackUrlRequestDto();
Random random = new Random(System.currentTimeMillis());
callbackUrlRequestDto.setDeliveryStatus(random.nextInt(8) + 1);
body.setBody(callbackUrlRequestDto);
return body;
}
public CallbackUrlRequestDto buildCallbackUrlRequestDto() {
String json = "{\n" + String json = "{\n" +
" \"channelCode\":\"MeiTuan\",\n" + " \"channelCode\":\"MeiTuan\",\n" +
" \"channelDeliveryId\":\"1584868781086019780\",\n" + " \"channelDeliveryId\":\"1584868781086019780\",\n" +
...@@ -63,11 +71,6 @@ public class ProduceMQServiceTest { ...@@ -63,11 +71,6 @@ public class ProduceMQServiceTest {
"}"; "}";
CallbackUrlRequestDto callbackUrlRequestDto = JSON.parseObject(json, CallbackUrlRequestDto.class); CallbackUrlRequestDto callbackUrlRequestDto = JSON.parseObject(json, CallbackUrlRequestDto.class);
return callbackUrlRequestDto;
Random random = new Random(System.currentTimeMillis());
callbackUrlRequestDto.setDeliveryStatus(random.nextInt(8) + 1);
body.setBody(callbackUrlRequestDto);
return body;
} }
} }
...@@ -79,6 +79,7 @@ public class DefaultPromotionService implements IPromotionService { ...@@ -79,6 +79,7 @@ public class DefaultPromotionService implements IPromotionService {
shoppingCartGoodsDto.setOriginalTotalAmount(shoppingCartGoodsResponseVo.getOriginalTotalAmount()); shoppingCartGoodsDto.setOriginalTotalAmount(shoppingCartGoodsResponseVo.getOriginalTotalAmount());
shoppingCartGoodsDto.setTotalAmount(shoppingCartGoodsResponseVo.getTotalAmount()); shoppingCartGoodsDto.setTotalAmount(shoppingCartGoodsResponseVo.getTotalAmount());
shoppingCartGoodsDto.setTotalDiscountAmount(shoppingCartGoodsResponseVo.getTotalDiscountAmount()); shoppingCartGoodsDto.setTotalDiscountAmount(shoppingCartGoodsResponseVo.getTotalDiscountAmount());
// todo 包装费
shoppingCartGoodsDto.setPackageAmount(shoppingCartGoodsResponseVo.getNewPackAmount()); shoppingCartGoodsDto.setPackageAmount(shoppingCartGoodsResponseVo.getNewPackAmount());
List<CartGoods> cartGoodsList = shoppingCartGoodsResponseVo.getProducts(); List<CartGoods> cartGoodsList = shoppingCartGoodsResponseVo.getProducts();
List<ActivityCalculationDiscountResponseDto.CalculationDiscountResult.ApportionGoods> apportionGoodsList = List<ActivityCalculationDiscountResponseDto.CalculationDiscountResult.ApportionGoods> apportionGoodsList =
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment