Commit c89b7f02 by ping.wu

支付结果查询mq-(会员,点餐下单公共),死信队列

parent 556ef644
package cn.freemud.amp.config;
import com.freemud.application.sdk.api.config.TopicExchangeConfig;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
@Component
public class DlPayQueryMqConfig {
/**
* 消费队列
*/
public static final String KGD_PAY_QUERY_QUEUE = "kgd_pay_query_queue";
public static final String KGD_PAYMENT_QUERY_KEY = "kgd_pay_query_key";
/**
* 死信队列
*/
public static final String KGD_PAY_QUERY_DL_QUEUE = "kgd_pay_query_dl_queue";
public static final String KGD_PAY_QUERY_DL_KEY = "kgd_pay_query_dl_key";
@Bean("dlPayMqTopicExchange")
public Exchange dlPaymentMqTopicExchange() {
return ExchangeBuilder.topicExchange(TopicExchangeConfig.EXCHANGE_NAME).durable(true).build();
}
/**
* 写入RabbitMQ使用的默认队列
*
* @return
*/
@Bean("payQueueName")
public Queue paymentChangeQueue() {
return QueueBuilder.durable(KGD_PAY_QUERY_QUEUE).build();
}
/**
* 绑定消息队列
*
* @param queue 消息队列
* @param topicExchange RabbitMQ Fanout Exchange
* @return 消息队列绑定
*/
@Bean("bindingPayQueueChange")
Binding bindingPaymentChange(@Qualifier("payQueueName") Queue queue, @Qualifier("dlPayMqTopicExchange") Exchange topicExchange) {
return BindingBuilder.bind(queue).to(topicExchange).with(KGD_PAYMENT_QUERY_KEY).noargs();
}
@Bean("payChangeDlQueue")
public Queue paymentChangeDlQueue() {
Map<String, Object> delayQueueParams = new HashMap<>(2);
// DLX,dead letter发送到的exchange
delayQueueParams.put("x-dead-letter-exchange", TopicExchangeConfig.EXCHANGE_NAME);
// dead letter携带的routing key
delayQueueParams.put("x-dead-letter-routing-key", KGD_PAYMENT_QUERY_KEY);
//时间
//delayQueueParams.put("x-message-ttl", 20*1000);
return QueueBuilder.durable(KGD_PAY_QUERY_DL_QUEUE).withArguments(delayQueueParams).build();
}
@Bean("bindingPayDlChange")
Binding bindingPaymentDlChange(@Qualifier("payChangeDlQueue") Queue queue, @Qualifier("dlPayMqTopicExchange") Exchange topicExchange) {
Map<String, Object> args = new HashMap<>(2);
args.put("x-dead-letter-exchange", TopicExchangeConfig.EXCHANGE_NAME);
args.put("x-dead-letter-routing-key", KGD_PAYMENT_QUERY_KEY);
return BindingBuilder.bind(queue).to(topicExchange).with(KGD_PAY_QUERY_DL_KEY).and(args);
}
}
...@@ -103,4 +103,14 @@ public class ExposureOrderController { ...@@ -103,4 +103,14 @@ public class ExposureOrderController {
return ResponseUtil.success(exposureOrderService.getMallPayConfig(req)); return ResponseUtil.success(exposureOrderService.getMallPayConfig(req));
} }
/**
* 公共支付接口查询mq,死信队列
*/
@ApiAnnotation(logMessage = "支付结果查询mq-(会员,点餐下单公共),死信队列")
@PostMapping("/payQueryMq")
public BaseResponse payQueryMq(@Validated @LogParams @RequestBody PayQueryMqVo putDeadLetterVo) {
payService.putPayQueryDelMq(putDeadLetterVo.getPartnerId(), putDeadLetterVo.getStoreId(), putDeadLetterVo.getFmId(), putDeadLetterVo.getOrderId(), putDeadLetterVo.getPayChanelType());
return ResponseUtil.success();
}
} }
package cn.freemud.entities.vo;
import lombok.Data;
import javax.validation.constraints.NotBlank;
@Data
public class PayQueryMqVo {
@NotBlank(message = "商户号不能为空")
private String partnerId;
@NotBlank(message = "门店id不能为空")
private String storeId;
@NotBlank(message = "fmId不能为空")
private String fmId;
@NotBlank(message = "订单号不能为空")
private String orderId;
private Integer payChanelType;
}
package cn.freemud.service.impl; package cn.freemud.service.impl;
import cn.freemud.amp.config.DlPayQueryMqConfig;
import cn.freemud.amp.config.DlPaymentMqConfig; import cn.freemud.amp.config.DlPaymentMqConfig;
import cn.freemud.amqp.Header; import cn.freemud.amqp.Header;
import cn.freemud.amqp.MQAction; import cn.freemud.amqp.MQAction;
...@@ -120,4 +121,22 @@ public class OrderQueueService { ...@@ -120,4 +121,22 @@ public class OrderQueueService {
Message delmessage = new Message(notifyMsgBytes, messageProperties); Message delmessage = new Message(notifyMsgBytes, messageProperties);
mqService.convertAndSend(TopicExchangeConfig.EXCHANGE_NAME, DlPaymentMqConfig.OPEN_STORE_PAYMENT_QUERY_DL_KEY, delmessage); mqService.convertAndSend(TopicExchangeConfig.EXCHANGE_NAME, DlPaymentMqConfig.OPEN_STORE_PAYMENT_QUERY_DL_KEY, delmessage);
} }
/**
* 发支付补偿mq,查询支付结果回调支付成功接口
*/
public void putPayQueryDelMq(PaymentMqMessageDto dto) {
MQMessage<PaymentMqMessageDto> mqMessage = new MQMessage<PaymentMqMessageDto>();
mqMessage.setBody(dto);
Integer ttl = 20 * 1000;
Header header = new Header();
header.setKey("1");
mqMessage.setHeader(header);
byte[] notifyMsgBytes = JSON.toJSONString(mqMessage).getBytes();
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration(ttl.toString());
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
Message delmessage = new Message(notifyMsgBytes, messageProperties);
mqService.convertAndSend(TopicExchangeConfig.EXCHANGE_NAME, DlPayQueryMqConfig.KGD_PAY_QUERY_DL_KEY, delmessage);
}
} }
...@@ -1220,6 +1220,24 @@ public class PayServiceImpl { ...@@ -1220,6 +1220,24 @@ public class PayServiceImpl {
} }
} }
/**
* 获取预支付成功,将信息放入死心队列,当支付成功没有回掉的时候处理
*/
public void putPayQueryDelMq(String partnerId, String storeId, String fmId, String orderId, Integer payChanelType) {
PaymentMqMessageDto dto = new PaymentMqMessageDto();
dto.setFmId(fmId);
dto.setOrderId(orderId);
dto.setPartnerId(partnerId);
dto.setStoreId(storeId);
dto.setPayChannelType(payChanelType);
dto.setTrackingNo(LogThreadLocal.getTrackingNo());
try {
orderQueueService.putPayQueryDelMq(dto);
} catch (Exception e) {
AppLogUtil.errorLog("paymentQueueService.paymentCallback,orderId:{}", orderId, null, e);
}
}
/** /**
* 获取门店支付信息 * 获取门店支付信息
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment