Commit 25965abf by Nepxion

修复在Spring上下文销毁的时候,对远程配置中心反订阅

parent a77beef7
......@@ -40,8 +40,8 @@ public class NacosOperation {
return configService.publishConfig(serviceId, group, config);
}
public void subscribeConfig(String group, String serviceId, NacosSubscribeCallback subscribeCallback) throws NacosException {
configService.addListener(serviceId, group, new Listener() {
public Listener subscribeConfig(String group, String serviceId, Executor executor, NacosSubscribeCallback subscribeCallback) throws NacosException {
Listener configListener = new Listener() {
@Override
public void receiveConfigInfo(String config) {
subscribeCallback.callback(config);
......@@ -49,8 +49,16 @@ public class NacosOperation {
@Override
public Executor getExecutor() {
return null;
return executor;
}
});
};
configService.addListener(serviceId, group, configListener);
return configListener;
}
public void unsubscribeConfig(String group, String serviceId, Listener configListener) {
configService.removeListener(serviceId, group, configListener);
}
}
\ No newline at end of file
package com.nepxion.discovery.common.nacos.thread;
/**
* <p>Title: Nepxion Discovery</p>
* <p>Description: Nepxion Discovery</p>
* <p>Copyright: Copyright (c) 2017-2050</p>
* <p>Company: Nepxion</p>
* @author Haojun Ren
* @version 1.0
*/
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
// Copy from Alibaba Sentinel project
public class NamedThreadFactory implements ThreadFactory {
private AtomicInteger count = new AtomicInteger(1);
private ThreadGroup group;
private String namePrefix;
private boolean daemon;
public NamedThreadFactory(String namePrefix) {
this(namePrefix, false);
}
public NamedThreadFactory(String namePrefix, boolean daemon) {
SecurityManager securityManager = System.getSecurityManager();
this.group = securityManager != null ? securityManager.getThreadGroup() : Thread.currentThread().getThreadGroup();
this.namePrefix = namePrefix;
this.daemon = daemon;
}
@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(group, runnable, namePrefix + "-thread-" + count.getAndIncrement(), 0);
thread.setDaemon(daemon);
return thread;
}
}
\ No newline at end of file
......@@ -9,6 +9,11 @@ package com.nepxion.discovery.plugin.configcenter.nacos.adapter;
* @version 1.0
*/
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.apache.commons.lang3.StringUtils;
......@@ -16,9 +21,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import com.alibaba.nacos.api.config.listener.Listener;
import com.nepxion.discovery.common.entity.RuleEntity;
import com.nepxion.discovery.common.nacos.operation.NacosOperation;
import com.nepxion.discovery.common.nacos.operation.NacosSubscribeCallback;
import com.nepxion.discovery.common.nacos.thread.NamedThreadFactory;
import com.nepxion.discovery.plugin.configcenter.adapter.ConfigAdapter;
import com.nepxion.discovery.plugin.framework.adapter.PluginAdapter;
import com.nepxion.discovery.plugin.framework.context.PluginContextAware;
......@@ -28,6 +35,8 @@ import com.nepxion.discovery.plugin.framework.event.RuleUpdatedEvent;
public class NacosConfigAdapter extends ConfigAdapter {
private static final Logger LOG = LoggerFactory.getLogger(NacosConfigAdapter.class);
private ExecutorService executorService = new ThreadPoolExecutor(2, 4, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), new NamedThreadFactory("Nacos"), new ThreadPoolExecutor.DiscardOldestPolicy());
@Autowired
protected PluginContextAware pluginContextAware;
......@@ -37,6 +46,9 @@ public class NacosConfigAdapter extends ConfigAdapter {
@Autowired
private NacosOperation nacosOperation;
private Listener partialListener;
private Listener globalListener;
@Override
public String getConfig() throws Exception {
String config = getConfig(false);
......@@ -68,11 +80,11 @@ public class NacosConfigAdapter extends ConfigAdapter {
@PostConstruct
public void subscribeConfig() {
subscribeConfig(false);
subscribeConfig(true);
partialListener = subscribeConfig(false);
globalListener = subscribeConfig(true);
}
private void subscribeConfig(boolean globalConfig) {
private Listener subscribeConfig(boolean globalConfig) {
String groupKey = pluginContextAware.getGroupKey();
String group = pluginAdapter.getGroup();
String serviceId = pluginAdapter.getServiceId();
......@@ -80,7 +92,7 @@ public class NacosConfigAdapter extends ConfigAdapter {
LOG.info("Subscribe {} config from Nacos server, {}={}, serviceId={}", getConfigType(globalConfig), groupKey, group, serviceId);
try {
nacosOperation.subscribeConfig(group, globalConfig ? group : serviceId, new NacosSubscribeCallback() {
return nacosOperation.subscribeConfig(group, globalConfig ? group : serviceId, executorService, new NacosSubscribeCallback() {
@Override
public void callback(String config) {
if (StringUtils.isNotEmpty(config)) {
......@@ -106,6 +118,30 @@ public class NacosConfigAdapter extends ConfigAdapter {
} catch (Exception e) {
LOG.error("Subscribe " + getConfigType(globalConfig) + " config from Nacos server failed, " + groupKey + "=" + group + ", serviceId=" + serviceId, e);
}
return null;
}
@Override
public void close() {
unsubscribeConfig(partialListener, false);
unsubscribeConfig(globalListener, true);
executorService.shutdownNow();
}
private void unsubscribeConfig(Listener configListener, boolean globalConfig) {
if (configListener == null) {
return;
}
String groupKey = pluginContextAware.getGroupKey();
String group = pluginAdapter.getGroup();
String serviceId = pluginAdapter.getServiceId();
LOG.info("Unsubscribe {} config from Nacos server, {}={}, serviceId={}", getConfigType(globalConfig), groupKey, group, serviceId);
nacosOperation.unsubscribeConfig(group, globalConfig ? group : serviceId, configListener);
}
private String getConfigType(boolean globalConfig) {
......
......@@ -14,6 +14,7 @@ import org.springframework.context.annotation.Configuration;
import com.nepxion.discovery.plugin.configcenter.adapter.ConfigAdapter;
import com.nepxion.discovery.plugin.configcenter.nacos.adapter.NacosConfigAdapter;
import com.nepxion.discovery.plugin.configcenter.nacos.context.NacosConfigContextClosedHandler;
@Configuration
public class NacosConfigAutoConfiguration {
......@@ -21,4 +22,9 @@ public class NacosConfigAutoConfiguration {
public ConfigAdapter configAdapter() {
return new NacosConfigAdapter();
}
@Bean
public NacosConfigContextClosedHandler configContextClosedHandler() {
return new NacosConfigContextClosedHandler();
}
}
\ No newline at end of file
package com.nepxion.discovery.plugin.configcenter.nacos.context;
/**
* <p>Title: Nepxion Discovery</p>
* <p>Description: Nepxion Discovery</p>
* <p>Copyright: Copyright (c) 2017-2050</p>
* <p>Company: Nepxion</p>
* @author Haojun Ren
* @version 1.0
*/
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import com.nepxion.discovery.plugin.configcenter.nacos.adapter.NacosConfigAdapter;
public class NacosConfigContextClosedHandler implements ApplicationListener<ContextClosedEvent> {
@Autowired
private NacosConfigAdapter nacosConfigAdapter;
@Override
public void onApplicationEvent(ContextClosedEvent event) {
nacosConfigAdapter.close();
}
}
\ No newline at end of file
......@@ -13,6 +13,9 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import com.nepxion.discovery.common.entity.RuleEntity;
import com.nepxion.discovery.common.redis.operation.RedisOperation;
......@@ -35,6 +38,15 @@ public class RedisConfigAdapter extends ConfigAdapter {
@Autowired
private RedisOperation redisOperation;
@Autowired
private RedisMessageListenerContainer redisMessageListenerContainer;
@Autowired
private MessageListenerAdapter partialMessageListenerAdapter;
@Autowired
private MessageListenerAdapter globalMessageListenerAdapter;
@Override
public String getConfig() throws Exception {
String config = getConfig(false);
......@@ -106,6 +118,22 @@ public class RedisConfigAdapter extends ConfigAdapter {
}
}
@Override
public void close() {
unsubscribeConfig(partialMessageListenerAdapter, false);
unsubscribeConfig(globalMessageListenerAdapter, true);
}
private void unsubscribeConfig(MessageListenerAdapter messageListenerAdapter, boolean globalConfig) {
String groupKey = pluginContextAware.getGroupKey();
String group = pluginAdapter.getGroup();
String serviceId = pluginAdapter.getServiceId();
LOG.info("Unsubscribe {} config from Redis server, {}={}, serviceId={}", getConfigType(globalConfig), groupKey, group, serviceId);
redisMessageListenerContainer.removeMessageListener(messageListenerAdapter, new PatternTopic(group + "-" + (globalConfig ? group : serviceId)));
}
private String getConfigType(boolean globalConfig) {
return globalConfig ? "global" : "partial";
}
......
......@@ -21,6 +21,7 @@ import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import com.nepxion.discovery.plugin.configcenter.adapter.ConfigAdapter;
import com.nepxion.discovery.plugin.configcenter.redis.adapter.RedisConfigAdapter;
import com.nepxion.discovery.plugin.configcenter.redis.context.RedisConfigContextClosedHandler;
import com.nepxion.discovery.plugin.framework.adapter.PluginAdapter;
import com.nepxion.discovery.plugin.framework.context.PluginContextAware;
......@@ -76,4 +77,9 @@ public class RedisConfigAutoConfiguration {
public ConfigAdapter configAdapter() {
return new RedisConfigAdapter();
}
@Bean
public RedisConfigContextClosedHandler configContextClosedHandler() {
return new RedisConfigContextClosedHandler();
}
}
\ No newline at end of file
package com.nepxion.discovery.plugin.configcenter.redis.context;
/**
* <p>Title: Nepxion Discovery</p>
* <p>Description: Nepxion Discovery</p>
* <p>Copyright: Copyright (c) 2017-2050</p>
* <p>Company: Nepxion</p>
* @author Haojun Ren
* @version 1.0
*/
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import com.nepxion.discovery.plugin.configcenter.redis.adapter.RedisConfigAdapter;
public class RedisConfigContextClosedHandler implements ApplicationListener<ContextClosedEvent> {
@Autowired
private RedisConfigAdapter redisConfigAdapter;
@Override
public void onApplicationEvent(ContextClosedEvent event) {
redisConfigAdapter.close();
}
}
\ No newline at end of file
......@@ -10,5 +10,5 @@ package com.nepxion.discovery.plugin.configcenter.loader;
*/
public abstract class RemoteConfigLoader implements ConfigLoader {
public abstract void close();
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment