Commit 2eb49431 by 胡超

RocketMQProperties

parent 7cec384c
package cn.freemud.amp;
package cn.freemud.amp.config;
import cn.freemud.constant.RocketMQConst;
import com.alibaba.fastjson.support.spring.messaging.MappingFastJsonMessageConverter;
......@@ -7,7 +7,6 @@ import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
......@@ -71,6 +70,9 @@ public class RocketMQConfig {
producer.setMaxMessageSize(producerConfig.getMaxMessageSize());
producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold());
producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer());
producer.setPollNameServerInterval(rocketMQProperties.getPollNameServerInteval());
producer.setHeartbeatBrokerInterval(rocketMQProperties.getHeartbeatBrokerInterval());
producer.setPersistConsumerOffsetInterval(rocketMQProperties.getPersistConsumerOffsetInterval());
return producer;
}
......
package cn.freemud.amp.config;
import org.apache.rocketmq.common.MixAll;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* {@link org.apache.rocketmq.spring.autoconfigure.RocketMQProperties}
*/
@Primary
@Component
@ConfigurationProperties(prefix = "rocketmq")
public class RocketMQProperties {
/**
* The name server for rocketMQ, formats: `host:port;host:port`.
*/
private String nameServer;
/**
* Enum type for accesChannel, values: LOCAL, CLOUD
* {@link org.apache.rocketmq.client.AccessChannel}
*/
private String accessChannel;
/**
* 轮询nameServer间隔时间,默认30S
*/
private int pollNameServerInteval = 30 * 1000;
/**
* 向broker发送心跳间隔时间,默认30S
*/
private int heartbeatBrokerInterval = 30 * 1000;
/**
* 持久化Consumer进度间隔时间,默认5S
*/
private int persistConsumerOffsetInterval = 5 * 1000;
private Producer producer;
/**
* Configure enable listener or not.
* In some particular cases, if you don't want the the listener is enabled when container startup,
* the configuration pattern is like this :
* rocketmq.consumer.listeners.<group-name>.<topic-name>.enabled=<boolean value, true or false>
* <p>
* the listener is enabled by default.
*/
private Consumer consumer = new Consumer();
public String getNameServer() {
return nameServer;
}
public void setNameServer(String nameServer) {
this.nameServer = nameServer;
}
public String getAccessChannel() {
return accessChannel;
}
public void setAccessChannel(String accessChannel) {
this.accessChannel = accessChannel;
}
public int getPollNameServerInteval() {
return pollNameServerInteval;
}
public void setPollNameServerInteval(int pollNameServerInteval) {
this.pollNameServerInteval = pollNameServerInteval;
}
public int getHeartbeatBrokerInterval() {
return heartbeatBrokerInterval;
}
public void setHeartbeatBrokerInterval(int heartbeatBrokerInterval) {
this.heartbeatBrokerInterval = heartbeatBrokerInterval;
}
public int getPersistConsumerOffsetInterval() {
return persistConsumerOffsetInterval;
}
public void setPersistConsumerOffsetInterval(int persistConsumerOffsetInterval) {
this.persistConsumerOffsetInterval = persistConsumerOffsetInterval;
}
public RocketMQProperties.Producer getProducer() {
return producer;
}
public void setProducer(RocketMQProperties.Producer producer) {
this.producer = producer;
}
public static class Producer {
/**
* Group name of producer.
*/
private String group;
/**
* Millis of send message timeout.
*/
private int sendMessageTimeout = 3000;
/**
* Compress message body threshold, namely, message body larger than 4k will be compressed on default.
*/
private int compressMessageBodyThreshold = 1024 * 4;
/**
* Maximum number of retry to perform internally before claiming sending failure in synchronous mode.
* This may potentially cause message duplication which is up to application developers to resolve.
*/
private int retryTimesWhenSendFailed = 2;
/**
* <p> Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. </p>
* This may potentially cause message duplication which is up to application developers to resolve.
*/
private int retryTimesWhenSendAsyncFailed = 2;
/**
* Indicate whether to retry another broker on sending failure internally.
*/
private boolean retryNextServer = false;
/**
* Maximum allowed message size in bytes.
*/
private int maxMessageSize = 1024 * 1024 * 4;
/**
* The property of "access-key".
*/
private String accessKey;
/**
* The property of "secret-key".
*/
private String secretKey;
/**
* Switch flag instance for message trace.
*/
private boolean enableMsgTrace = true;
/**
* The name value of message trace topic.If you don't config,you can use the default trace topic name.
*/
private String customizedTraceTopic = MixAll.RMQ_SYS_TRACE_TOPIC;
public String getGroup() {
return group;
}
public void setGroup(String group) {
this.group = group;
}
public int getSendMessageTimeout() {
return sendMessageTimeout;
}
public void setSendMessageTimeout(int sendMessageTimeout) {
this.sendMessageTimeout = sendMessageTimeout;
}
public int getCompressMessageBodyThreshold() {
return compressMessageBodyThreshold;
}
public void setCompressMessageBodyThreshold(int compressMessageBodyThreshold) {
this.compressMessageBodyThreshold = compressMessageBodyThreshold;
}
public int getRetryTimesWhenSendFailed() {
return retryTimesWhenSendFailed;
}
public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) {
this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
}
public int getRetryTimesWhenSendAsyncFailed() {
return retryTimesWhenSendAsyncFailed;
}
public void setRetryTimesWhenSendAsyncFailed(int retryTimesWhenSendAsyncFailed) {
this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed;
}
public boolean isRetryNextServer() {
return retryNextServer;
}
public void setRetryNextServer(boolean retryNextServer) {
this.retryNextServer = retryNextServer;
}
public int getMaxMessageSize() {
return maxMessageSize;
}
public void setMaxMessageSize(int maxMessageSize) {
this.maxMessageSize = maxMessageSize;
}
public String getAccessKey() {
return accessKey;
}
public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}
public String getSecretKey() {
return secretKey;
}
public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}
public boolean isEnableMsgTrace() {
return enableMsgTrace;
}
public void setEnableMsgTrace(boolean enableMsgTrace) {
this.enableMsgTrace = enableMsgTrace;
}
public String getCustomizedTraceTopic() {
return customizedTraceTopic;
}
public void setCustomizedTraceTopic(String customizedTraceTopic) {
this.customizedTraceTopic = customizedTraceTopic;
}
}
public Consumer getConsumer() {
return consumer;
}
public void setConsumer(Consumer consumer) {
this.consumer = consumer;
}
public static final class Consumer {
/**
* listener configuration container
* the pattern is like this:
* group1.topic1 = false
* group2.topic2 = true
* group3.topic3 = false
*/
private Map<String, Map<String, Boolean>> listeners = new HashMap<>();
public Map<String, Map<String, Boolean>> getListeners() {
return listeners;
}
public void setListeners(Map<String, Map<String, Boolean>> listeners) {
this.listeners = listeners;
}
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment