Commit 9420dae2 by Nepxion

增加全局监听

parent 5c80e7d5
......@@ -39,29 +39,52 @@ public class NacosConfigAdapter extends ConfigAdapter {
@Override
public String getConfig() throws Exception {
String config = getConfig(true);
if (StringUtils.isNotEmpty(config)) {
return config;
} else {
LOG.info("No global config is retrieved from Nacos server");
}
config = getConfig(false);
if (StringUtils.isNotEmpty(config)) {
return config;
} else {
LOG.info("No partial config is retrieved from Nacos server");
}
return null;
}
private String getConfig(boolean globalConfig) throws Exception {
String groupKey = pluginContextAware.getGroupKey();
String group = pluginAdapter.getGroup();
String serviceId = pluginAdapter.getServiceId();
LOG.info("Get config from Nacos server, {}={}, serviceId={}", groupKey, group, serviceId);
LOG.info("Get config from Nacos server, {}={}, serviceId={}, globalConfig={}", groupKey, group, serviceId, globalConfig);
return nacosOperation.getConfig(group, serviceId);
return nacosOperation.getConfig(group, globalConfig ? group : serviceId);
}
@PostConstruct
public void subscribeConfig() {
subscribeConfig(true);
subscribeConfig(false);
}
private void subscribeConfig(boolean globalConfig) {
String groupKey = pluginContextAware.getGroupKey();
String group = pluginAdapter.getGroup();
String serviceId = pluginAdapter.getServiceId();
LOG.info("Subscribe config from Nacos server, {}={}, serviceId={}", groupKey, group, serviceId);
LOG.info("Subscribe config from Nacos server, {}={}, serviceId={}, globalConfig={}", groupKey, group, serviceId, globalConfig);
try {
nacosOperation.subscribeConfig(group, serviceId, new NacosSubscribeCallback() {
nacosOperation.subscribeConfig(group, globalConfig ? group : serviceId, new NacosSubscribeCallback() {
@Override
public void callback(String config) {
if (StringUtils.isNotEmpty(config)) {
LOG.info("Get config updated event from Nacos server, {}={}, serviceId={}", groupKey, group, serviceId);
LOG.info("Get config updated event from Nacos server, {}={}, serviceId={}, globalConfig={}", groupKey, group, serviceId, globalConfig);
RuleEntity ruleEntity = pluginAdapter.getRule();
String rule = null;
......@@ -71,17 +94,17 @@ public class NacosConfigAdapter extends ConfigAdapter {
if (!StringUtils.equals(rule, config)) {
fireRuleUpdated(new RuleUpdatedEvent(config), true);
} else {
LOG.info("Retrieved config is same as current config, ignore to update, {}={}, serviceId={}", groupKey, group, serviceId);
LOG.info("Retrieved config is same as current config, ignore to update, {}={}, serviceId={}, globalConfig={}", groupKey, group, serviceId, globalConfig);
}
} else {
LOG.info("Get config cleared event from Nacos server, {}={}, serviceId={}", groupKey, group, serviceId);
LOG.info("Get config cleared event from Nacos server, {}={}, serviceId={}, globalConfig={}", groupKey, group, serviceId, globalConfig);
fireRuleCleared(new RuleClearedEvent(), true);
}
}
});
} catch (Exception e) {
LOG.error("Subscribe config failed", e);
LOG.error("Subscribe config from Nacos server failed, " + groupKey + "=" + group + ", serviceId=" + serviceId + ", globalConfig=" + globalConfig, e);
}
}
}
\ No newline at end of file
......@@ -37,28 +37,52 @@ public class RedisConfigAdapter extends ConfigAdapter {
@Override
public String getConfig() throws Exception {
String config = getConfig(true);
if (StringUtils.isNotEmpty(config)) {
return config;
} else {
LOG.info("No global config is retrieved from Redis server");
}
config = getConfig(false);
if (StringUtils.isNotEmpty(config)) {
return config;
} else {
LOG.info("No partial config is retrieved from Redis server");
}
return null;
}
private String getConfig(boolean globalConfig) throws Exception {
String groupKey = pluginContextAware.getGroupKey();
String group = pluginAdapter.getGroup();
String serviceId = pluginAdapter.getServiceId();
LOG.info("Get config from Redis server, {}={}, serviceId={}", groupKey, group, serviceId);
LOG.info("Get config from Redis server, {}={}, serviceId={}, globalConfig={}", groupKey, group, serviceId, globalConfig);
return redisOperation.getConfig(group, serviceId);
return redisOperation.getConfig(group, globalConfig ? group : serviceId);
}
public void subscribeConfig(String config) {
public void subscribeGlobalConfig(String config) {
subscribeConfig(config, true);
}
public void subscribePartialConfig(String config) {
subscribeConfig(config, false);
}
private void subscribeConfig(String config, boolean globalConfig) {
String groupKey = pluginContextAware.getGroupKey();
String group = pluginAdapter.getGroup();
String serviceId = pluginAdapter.getServiceId();
LOG.info("Subscribe config from Redis server, {}={}, serviceId={}", groupKey, group, serviceId);
try {
redisOperation.subscribeConfig(config, new RedisSubscribeCallback() {
@Override
public void callback(String config) {
if (StringUtils.isNotEmpty(config)) {
LOG.info("Get config updated event from Redis server, {}={}, serviceId={}", groupKey, group, serviceId);
LOG.info("Get config updated event from Redis server, {}={}, serviceId={}, globalConfig={}", groupKey, group, serviceId, globalConfig);
RuleEntity ruleEntity = pluginAdapter.getRule();
String rule = null;
......@@ -68,17 +92,17 @@ public class RedisConfigAdapter extends ConfigAdapter {
if (!StringUtils.equals(rule, config)) {
fireRuleUpdated(new RuleUpdatedEvent(config), true);
} else {
LOG.info("Retrieved config is same as current config, ignore to update, {}={}, serviceId={}", groupKey, group, serviceId);
LOG.info("Retrieved config is same as current config, ignore to update, {}={}, serviceId={}, globalConfig={}", groupKey, group, serviceId, globalConfig);
}
} else {
LOG.info("Get config cleared event from Redis server, {}={}, serviceId={}", groupKey, group, serviceId);
LOG.info("Get config cleared event from Redis server, {}={}, serviceId={}, globalConfig={}", groupKey, group, serviceId, globalConfig);
fireRuleCleared(new RuleClearedEvent(), true);
}
}
});
} catch (Exception e) {
LOG.error("Subscribe config failed", e);
LOG.error("Subscribe config from Redis server failed, " + groupKey + "=" + group + ", serviceId=" + serviceId + ", globalConfig=" + globalConfig, e);
}
}
}
\ No newline at end of file
......@@ -9,6 +9,8 @@ package com.nepxion.discovery.plugin.configcenter.extension.redis.configuration;
* @version 1.0
*/
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
......@@ -20,30 +22,54 @@ import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import com.nepxion.discovery.plugin.configcenter.ConfigAdapter;
import com.nepxion.discovery.plugin.configcenter.extension.redis.adapter.RedisConfigAdapter;
import com.nepxion.discovery.plugin.framework.adapter.PluginAdapter;
import com.nepxion.discovery.plugin.framework.context.PluginContextAware;
@Configuration
public class RedisConfigAutoConfiguration {
private static final Logger LOG = LoggerFactory.getLogger(RedisConfigAutoConfiguration.class);
@Autowired
private RedisConnectionFactory redisConnectionFactory;
@Autowired
protected PluginContextAware pluginContextAware;
@Autowired
private PluginAdapter pluginAdapter;
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(MessageListenerAdapter messageListenerAdapter) {
public RedisMessageListenerContainer redisMessageListenerContainer(MessageListenerAdapter globalMessageListenerAdapter, MessageListenerAdapter partialMessageListenerAdapter) {
String group = pluginAdapter.getGroup();
String serviceId = pluginAdapter.getServiceId();
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
redisMessageListenerContainer.addMessageListener(messageListenerAdapter, new PatternTopic(group + "-" + serviceId));
redisMessageListenerContainer.addMessageListener(globalMessageListenerAdapter, new PatternTopic(group + "-" + group));
redisMessageListenerContainer.addMessageListener(partialMessageListenerAdapter, new PatternTopic(group + "-" + serviceId));
return redisMessageListenerContainer;
}
@Bean
public MessageListenerAdapter messageListenerAdapter(ConfigAdapter configAdapter) {
return new MessageListenerAdapter(configAdapter, "subscribeConfig");
public MessageListenerAdapter globalMessageListenerAdapter(ConfigAdapter configAdapter) {
String groupKey = pluginContextAware.getGroupKey();
String group = pluginAdapter.getGroup();
String serviceId = pluginAdapter.getServiceId();
LOG.info("Subscribe config from Redis server, {}={}, serviceId={}, globalConfig=true", groupKey, group, serviceId);
return new MessageListenerAdapter(configAdapter, "subscribeGlobalConfig");
}
@Bean
public MessageListenerAdapter partialMessageListenerAdapter(ConfigAdapter configAdapter) {
String groupKey = pluginContextAware.getGroupKey();
String group = pluginAdapter.getGroup();
String serviceId = pluginAdapter.getServiceId();
LOG.info("Subscribe config from Redis server, {}={}, serviceId={}, globalConfig=false", groupKey, group, serviceId);
return new MessageListenerAdapter(configAdapter, "subscribePartialConfig");
}
@Bean
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment