Commit 6a08de98 by Nepxion

重构EventBus逻辑

parent 607cebad
...@@ -32,6 +32,7 @@ import com.nepxion.discovery.plugin.framework.constant.PluginConstant; ...@@ -32,6 +32,7 @@ import com.nepxion.discovery.plugin.framework.constant.PluginConstant;
import com.nepxion.discovery.plugin.framework.context.PluginContextAware; import com.nepxion.discovery.plugin.framework.context.PluginContextAware;
import com.nepxion.discovery.plugin.framework.entity.RuleEntity; import com.nepxion.discovery.plugin.framework.entity.RuleEntity;
import com.nepxion.discovery.plugin.framework.event.PluginPublisher; import com.nepxion.discovery.plugin.framework.event.PluginPublisher;
import com.nepxion.discovery.plugin.framework.event.RuleChangedEvent;
// 用法参照ServiceRegistryEndpoint和ServiceRegistryAutoConfiguration // 用法参照ServiceRegistryEndpoint和ServiceRegistryAutoConfiguration
@ManagedResource(description = "Config Endpoint") @ManagedResource(description = "Config Endpoint")
...@@ -59,7 +60,7 @@ public class ConfigEndpoint implements MvcEndpoint { ...@@ -59,7 +60,7 @@ public class ConfigEndpoint implements MvcEndpoint {
try { try {
InputStream inputStream = IOUtils.toInputStream(config, PluginConstant.ENCODING_UTF_8); InputStream inputStream = IOUtils.toInputStream(config, PluginConstant.ENCODING_UTF_8);
pluginPublisher.asyncPublish(inputStream); pluginPublisher.asyncPublish(new RuleChangedEvent(inputStream));
} catch (IOException e) { } catch (IOException e) {
LOG.error("Publish config failed", e); LOG.error("Publish config failed", e);
......
...@@ -28,6 +28,8 @@ import org.springframework.web.bind.annotation.ResponseBody; ...@@ -28,6 +28,8 @@ import org.springframework.web.bind.annotation.ResponseBody;
import com.nepxion.discovery.plugin.framework.adapter.PluginAdapter; import com.nepxion.discovery.plugin.framework.adapter.PluginAdapter;
import com.nepxion.discovery.plugin.framework.context.PluginContextAware; import com.nepxion.discovery.plugin.framework.context.PluginContextAware;
import com.nepxion.discovery.plugin.framework.event.PluginPublisher;
import com.nepxion.discovery.plugin.framework.event.VersionChangedEvent;
@ManagedResource(description = "Version Endpoint") @ManagedResource(description = "Version Endpoint")
public class VersionEndpoint implements MvcEndpoint { public class VersionEndpoint implements MvcEndpoint {
...@@ -37,6 +39,9 @@ public class VersionEndpoint implements MvcEndpoint { ...@@ -37,6 +39,9 @@ public class VersionEndpoint implements MvcEndpoint {
@Autowired @Autowired
private PluginAdapter pluginAdapter; private PluginAdapter pluginAdapter;
@Autowired
private PluginPublisher pluginPublisher;
// 设置服务的动态版本 // 设置服务的动态版本
@RequestMapping(path = "send", method = RequestMethod.POST) @RequestMapping(path = "send", method = RequestMethod.POST)
@ResponseBody @ResponseBody
...@@ -49,6 +54,8 @@ public class VersionEndpoint implements MvcEndpoint { ...@@ -49,6 +54,8 @@ public class VersionEndpoint implements MvcEndpoint {
pluginAdapter.setDynamicVersion(version); pluginAdapter.setDynamicVersion(version);
pluginPublisher.asyncPublish(new VersionChangedEvent());
return ResponseEntity.ok().body("OK"); return ResponseEntity.ok().body("OK");
} }
...@@ -64,6 +71,8 @@ public class VersionEndpoint implements MvcEndpoint { ...@@ -64,6 +71,8 @@ public class VersionEndpoint implements MvcEndpoint {
pluginAdapter.clearDynamicVersion(); pluginAdapter.clearDynamicVersion();
pluginPublisher.asyncPublish(new VersionChangedEvent());
return ResponseEntity.ok().body("OK"); return ResponseEntity.ok().body("OK");
} }
......
...@@ -9,18 +9,17 @@ package com.nepxion.discovery.plugin.configcenter; ...@@ -9,18 +9,17 @@ package com.nepxion.discovery.plugin.configcenter;
* @version 1.0 * @version 1.0
*/ */
import java.io.InputStream;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import com.nepxion.discovery.plugin.configcenter.loader.AbstractConfigLoader; import com.nepxion.discovery.plugin.configcenter.loader.AbstractConfigLoader;
import com.nepxion.discovery.plugin.framework.event.PluginPublisher; import com.nepxion.discovery.plugin.framework.event.PluginPublisher;
import com.nepxion.discovery.plugin.framework.event.RuleChangedEvent;
public abstract class ConfigAdapter extends AbstractConfigLoader { public abstract class ConfigAdapter extends AbstractConfigLoader {
@Autowired @Autowired
private PluginPublisher pluginPublisher; private PluginPublisher pluginPublisher;
public void publish(InputStream inputStream) { public void publish(RuleChangedEvent ruleChangedEvent) {
pluginPublisher.asyncPublish(inputStream); pluginPublisher.asyncPublish(ruleChangedEvent);
} }
} }
\ No newline at end of file
...@@ -27,6 +27,7 @@ import org.springframework.beans.factory.annotation.Autowired; ...@@ -27,6 +27,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import com.nepxion.discovery.plugin.configcenter.constant.ConfigConstant; import com.nepxion.discovery.plugin.configcenter.constant.ConfigConstant;
import com.nepxion.discovery.plugin.configcenter.xml.Dom4JParser; import com.nepxion.discovery.plugin.configcenter.xml.Dom4JParser;
import com.nepxion.discovery.plugin.framework.config.PluginConfigParser;
import com.nepxion.discovery.plugin.framework.constant.PluginConstant; import com.nepxion.discovery.plugin.framework.constant.PluginConstant;
import com.nepxion.discovery.plugin.framework.entity.CountEntity; import com.nepxion.discovery.plugin.framework.entity.CountEntity;
import com.nepxion.discovery.plugin.framework.entity.DiscoveryEntity; import com.nepxion.discovery.plugin.framework.entity.DiscoveryEntity;
...@@ -39,7 +40,7 @@ import com.nepxion.discovery.plugin.framework.entity.RuleEntity; ...@@ -39,7 +40,7 @@ import com.nepxion.discovery.plugin.framework.entity.RuleEntity;
import com.nepxion.discovery.plugin.framework.entity.VersionEntity; import com.nepxion.discovery.plugin.framework.entity.VersionEntity;
import com.nepxion.discovery.plugin.framework.exception.PluginException; import com.nepxion.discovery.plugin.framework.exception.PluginException;
public class ConfigParser extends Dom4JParser { public class ConfigParser extends Dom4JParser implements PluginConfigParser {
private static final Logger LOG = LoggerFactory.getLogger(ConfigParser.class); private static final Logger LOG = LoggerFactory.getLogger(ConfigParser.class);
@Autowired @Autowired
......
...@@ -14,7 +14,6 @@ import org.springframework.context.annotation.Configuration; ...@@ -14,7 +14,6 @@ import org.springframework.context.annotation.Configuration;
import com.nepxion.discovery.plugin.configcenter.ConfigInitializer; import com.nepxion.discovery.plugin.configcenter.ConfigInitializer;
import com.nepxion.discovery.plugin.configcenter.ConfigParser; import com.nepxion.discovery.plugin.configcenter.ConfigParser;
import com.nepxion.discovery.plugin.configcenter.ConfigSubscriber;
@Configuration @Configuration
public class ConfigAutoConfiguration { public class ConfigAutoConfiguration {
...@@ -27,9 +26,4 @@ public class ConfigAutoConfiguration { ...@@ -27,9 +26,4 @@ public class ConfigAutoConfiguration {
public ConfigParser configParser() { public ConfigParser configParser() {
return new ConfigParser(); return new ConfigParser();
} }
@Bean
public ConfigSubscriber configSubscriber() {
return new ConfigSubscriber();
}
} }
\ No newline at end of file
package com.nepxion.discovery.plugin.framework.config;
/**
* <p>Title: Nepxion Discovery</p>
* <p>Description: Nepxion Discovery</p>
* <p>Copyright: Copyright (c) 2017-2050</p>
* <p>Company: Nepxion</p>
* @author Haojun Ren
* @version 1.0
*/
import java.io.InputStream;
public interface PluginConfigParser {
void parse(InputStream inputStream);
}
\ No newline at end of file
...@@ -19,6 +19,7 @@ import com.nepxion.discovery.plugin.framework.context.PluginContainerInitialized ...@@ -19,6 +19,7 @@ import com.nepxion.discovery.plugin.framework.context.PluginContainerInitialized
import com.nepxion.discovery.plugin.framework.context.PluginContextAware; import com.nepxion.discovery.plugin.framework.context.PluginContextAware;
import com.nepxion.discovery.plugin.framework.entity.RuleEntity; import com.nepxion.discovery.plugin.framework.entity.RuleEntity;
import com.nepxion.discovery.plugin.framework.event.PluginPublisher; import com.nepxion.discovery.plugin.framework.event.PluginPublisher;
import com.nepxion.discovery.plugin.framework.event.PluginSubscriber;
import com.nepxion.discovery.plugin.framework.listener.discovery.DiscoveryListenerExecutor; import com.nepxion.discovery.plugin.framework.listener.discovery.DiscoveryListenerExecutor;
import com.nepxion.discovery.plugin.framework.listener.discovery.IpAddressFilterDiscoveryListener; import com.nepxion.discovery.plugin.framework.listener.discovery.IpAddressFilterDiscoveryListener;
import com.nepxion.discovery.plugin.framework.listener.discovery.VersionFilterDiscoveryListener; import com.nepxion.discovery.plugin.framework.listener.discovery.VersionFilterDiscoveryListener;
...@@ -47,6 +48,11 @@ public class PluginAutoConfiguration { ...@@ -47,6 +48,11 @@ public class PluginAutoConfiguration {
} }
@Bean @Bean
public PluginSubscriber pluginSubscriber() {
return new PluginSubscriber();
}
@Bean
public PluginCache pluginCache() { public PluginCache pluginCache() {
return new PluginCache(); return new PluginCache();
} }
......
package com.nepxion.discovery.plugin.configcenter; package com.nepxion.discovery.plugin.framework.event;
/** /**
* <p>Title: Nepxion Discovery</p> * <p>Title: Nepxion Discovery</p>
...@@ -16,21 +16,26 @@ import org.slf4j.LoggerFactory; ...@@ -16,21 +16,26 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import com.google.common.eventbus.Subscribe; import com.google.common.eventbus.Subscribe;
import com.nepxion.discovery.plugin.framework.config.PluginConfigParser;
import com.nepxion.discovery.plugin.framework.context.PluginContextAware; import com.nepxion.discovery.plugin.framework.context.PluginContextAware;
import com.nepxion.eventbus.annotation.EventBus; import com.nepxion.eventbus.annotation.EventBus;
import com.netflix.loadbalancer.ServerList;
@EventBus @EventBus
public class ConfigSubscriber { public class PluginSubscriber {
private static final Logger LOG = LoggerFactory.getLogger(ConfigSubscriber.class); private static final Logger LOG = LoggerFactory.getLogger(PluginSubscriber.class);
@Autowired @Autowired
private PluginContextAware pluginContextAware; private PluginContextAware pluginContextAware;
@Autowired @Autowired
private ConfigParser configParser; private PluginConfigParser pluninConfigParser;
@Autowired(required = false)
private ServerList<?> ribbonServerList;
@Subscribe @Subscribe
public void subscribe(InputStream inputStream) { public void subscribeRuleChanged(RuleChangedEvent ruleChangedEvent) {
Boolean discoveryControlEnabled = pluginContextAware.isDiscoveryControlEnabled(); Boolean discoveryControlEnabled = pluginContextAware.isDiscoveryControlEnabled();
Boolean remoteConfigEnabled = pluginContextAware.isRemoteConfigEnabled(); Boolean remoteConfigEnabled = pluginContextAware.isRemoteConfigEnabled();
...@@ -48,6 +53,26 @@ public class ConfigSubscriber { ...@@ -48,6 +53,26 @@ public class ConfigSubscriber {
LOG.info("********** Remote config change has been subscribed **********"); LOG.info("********** Remote config change has been subscribed **********");
configParser.parse(inputStream); InputStream inputStream = ruleChangedEvent.getInputStream();
pluninConfigParser.parse(inputStream);
subscribeVersionChanged(null);
}
@Subscribe
public void subscribeVersionChanged(VersionChangedEvent versionChangedEvent) {
Boolean discoveryControlEnabled = pluginContextAware.isDiscoveryControlEnabled();
if (!discoveryControlEnabled) {
LOG.info("********** Discovery control is disabled, ignore to subscribe **********");
return;
}
if (ribbonServerList == null) {
return;
}
// 当版本更新后,强制刷新Ribbon缓存
ribbonServerList.getUpdatedListOfServers();
} }
} }
\ No newline at end of file
package com.nepxion.discovery.plugin.framework.event;
/**
* <p>Title: Nepxion Discovery</p>
* <p>Description: Nepxion Discovery</p>
* <p>Copyright: Copyright (c) 2017-2050</p>
* <p>Company: Nepxion</p>
* @author Haojun Ren
* @version 1.0
*/
import java.io.InputStream;
import java.io.Serializable;
public class RuleChangedEvent implements Serializable {
private static final long serialVersionUID = 2315578803987663866L;
private InputStream inputStream;
public RuleChangedEvent(InputStream inputStream) {
this.inputStream = inputStream;
}
public InputStream getInputStream() {
return inputStream;
}
}
\ No newline at end of file
package com.nepxion.discovery.plugin.framework.event;
/**
* <p>Title: Nepxion Discovery</p>
* <p>Description: Nepxion Discovery</p>
* <p>Copyright: Copyright (c) 2017-2050</p>
* <p>Company: Nepxion</p>
* @author Haojun Ren
* @version 1.0
*/
import java.io.Serializable;
public class VersionChangedEvent implements Serializable {
private static final long serialVersionUID = 5079797986381461496L;
public VersionChangedEvent() {
}
}
\ No newline at end of file
...@@ -48,7 +48,7 @@ public class DiscoveryConfigAdapter extends ConfigAdapter { ...@@ -48,7 +48,7 @@ public class DiscoveryConfigAdapter extends ConfigAdapter {
public void publish() { public void publish() {
try { try {
InputStream inputStream = FileUtils.openInputStream(new File("src/main/resources/rule.xml")); InputStream inputStream = FileUtils.openInputStream(new File("src/main/resources/rule.xml"));
publish(inputStream); publish(new RuleChangedEvent(inputStream));
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment