Commit 0cc3d288 by 陈文顺

zookeeper curator

parent e10ddcb0
......@@ -23,7 +23,40 @@
<artifactId>fastjson</artifactId>
<version>1.2.32</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.9.1</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>4.3.9.RELEASE</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-core -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package cn.freemud.demo.zookeeper;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* Hello world!
*
*/
public class App
{
public static void main( String[] args ) throws Exception {
System.out.println( "Hello World!" );
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", retryPolicy);
client.start();
// client.create().forPath("/ashun");
final int count = 0;
final InterProcessMutex lock = new InterProcessMutex(client, "/ashun/LOCK");
final CountDownLatch began = new CountDownLatch(1);
//region 启动线程
new Thread(new Runnable() {
private int flag = 1;
public void run() {
try {
began.await();
if ( lock.acquire(3, TimeUnit.SECONDS) )
{
try
{
// do some work inside of the critical section here
System.out.println("Thread :"+Thread.currentThread()+" running...."+flag);
}
finally
{
lock.release();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
//endregion
//region 启动线程
new Thread(new Runnable() {
public void run() {
try {
began.await();
if ( lock.acquire(3, TimeUnit.SECONDS) )
{
try
{
// do some work inside of the critical section here
System.out.println("Thread :"+Thread.currentThread()+" running....");
}
finally
{
lock.release();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
//endregion
//region 启动线程
new Thread(new Runnable() {
public void run() {
try {
began.await();
if ( lock.acquire(3, TimeUnit.SECONDS) )
{
try
{
// do some work inside of the critical section here
System.out.println("Thread :"+Thread.currentThread()+" running....");
}
finally
{
lock.release();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
//endregion
//region 启动线程
new Thread(new Runnable() {
public void run() {
try {
began.await();
if ( lock.acquire(3, TimeUnit.SECONDS) )
{
try
{
// do some work inside of the critical section here
System.out.println("Thread :"+Thread.currentThread()+" running....");
}
finally
{
lock.release();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
//endregion
//region 启动线程
new Thread(new Runnable() {
public void run() {
try {
began.await();
if ( lock.acquire(3, TimeUnit.SECONDS) )
{
try
{
// do some work inside of the critical section here
System.out.println("Thread :"+Thread.currentThread()+" running....");
}
finally
{
lock.release();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
//endregion
//region 启动线程
new Thread(new Runnable() {
public void run() {
try {
began.await();
if ( lock.acquire(3, TimeUnit.SECONDS) )
{
try
{
// do some work inside of the critical section here
System.out.println("Thread :"+Thread.currentThread()+" running....");
}
finally
{
lock.release();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
//endregion
began.countDown();
// client.close();
}
public class Runner implements Runnable{
public void run() {
}
}
}
package cn.freemud.demo.zookeeper;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.util.concurrent.CountDownLatch;
/**
* Created by chenwenshun on 2017/7/9.
*/
public class App2 {
public static void main(String[] args) throws Exception {
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:application.xml");
CuratorFramework client = (CuratorFramework)applicationContext.getBean("curatorFramework");
// RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
// CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", retryPolicy);
// client.start();
// client.create().forPath("/ashun");
try {
client.getChildren().usingWatcher(new Watcher() {
public void process(WatchedEvent event) {
System.out.println("已经触发了" + event.getType() + "事件!");
}
}).forPath("/ashun/LOCK");
} catch (Exception e) {
e.printStackTrace();
}
// InterProcessMutex lock = new InterProcessMutex(client, "/ashun/LOCK");
CountDownLatch began = new CountDownLatch(1);
Q q = new Q(0);
for (int i = 0; i < 10; i++) {
InterProcessMutex lock = new InterProcessMutex(client, "/ashun/LOCK");
// new Thread(new Runner(lock, began, q), "thread_"+i).start();
new Thread(new Runner2(began, q), "thread_"+i).start();
}
// client.setData().forPath("/ashun/LOCK","haha".getBytes());
began.countDown();
System.out.println( "Hello World!" );
}
}
package cn.freemud.demo.zookeeper;
/**
* Created by chenwenshun on 2017/7/9.
*/
public class Q {
private volatile int n;
public Q(int n) {
this.n = n;
}
public int getN() {
return n;
}
public void setN(int n) {
this.n = n;
}
}
package cn.freemud.demo.zookeeper;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* Created by chenwenshun on 2017/7/9.
*/
public class Runner implements Runnable {
private InterProcessMutex lock;
private CountDownLatch began;
private Q q;
public Runner(InterProcessMutex lock, CountDownLatch began, Q q) {
this.lock = lock;
this.began = began;
this.q = q;
}
public void run() {
try {
began.await();
boolean getLock = false;
try
{
getLock = lock.acquire(11, TimeUnit.SECONDS);
if (getLock){
// do some work inside of the critical section here
TimeUnit.SECONDS.sleep(1);
q.setN(q.getN()+1);
System.out.println("Thread :"+Thread.currentThread()+" running.... >>>"+q.getN());
}else{
System.out.println("Thread :"+Thread.currentThread()+" You do not own the lock.... >>>");
}
}
finally
{
// if (getLock)
lock.release();
}
}
catch (InterruptedException e) {
e.printStackTrace();
}
catch (Exception e) {
e.printStackTrace();
}
}
}
package cn.freemud.demo.zookeeper;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* Created by chenwenshun on 2017/7/9.
*/
public class Runner2 implements Runnable {
private CountDownLatch began;
private Q q;
public Runner2(CountDownLatch began, Q q) {
this.began = began;
this.q = q;
}
public void run() {
try {
began.await();
TimeUnit.SECONDS.sleep(3);
q.setN(q.getN()+1);
System.out.println("Thread :"+Thread.currentThread()+" running.... >>>"+q.getN());
}
catch (InterruptedException e) {
e.printStackTrace();
}
catch (Exception e) {
e.printStackTrace();
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:task="http://www.springframework.org/schema/task"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/task
http://www.springframework.org/schema/task/spring-task.xsd
">
<bean class="org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor"/>
<!--<task:annotation-driven/> -->
<context:component-scan base-package="com" use-default-filters="false">
<!-- 扫描符合@Service @Repository的类 -->
<context:include-filter type="annotation" expression="org.springframework.stereotype.Service" />
<context:include-filter type="annotation" expression="org.springframework.stereotype.Repository" />
<context:include-filter type="annotation" expression="org.springframework.stereotype.Component"/>
</context:component-scan>
<!-- 重连策略 -->
<bean id="retryPolicy" class="org.apache.curator.retry.ExponentialBackoffRetry">
<!-- 间隔时间基数 -->
<constructor-arg index="0" value="1000" />
<!-- 重连策略 -->
<constructor-arg index="1" value="3000" />
</bean>
<bean id="curatorFramework" class="org.apache.curator.framework.CuratorFrameworkFactory" factory-method="newClient" init-method="start">
<constructor-arg index="0" value="localhost:2181" />
<!--<constructor-arg index="0" value="server1:port,server2:port,server3:port" />-->
<!-- sessionTimeoutMs会话超时时间,单位为毫秒。默认是60000ms -->
<constructor-arg index="1" value="5000" />
<!-- connectionTimeoutMs连接创建超时时间,单位毫秒,默认15000ms -->
<constructor-arg index="2" value="3000" />
<constructor-arg index="3" ref="retryPolicy" />
</bean>
</beans>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<springProperty scope="context"
name="springAppName"
source="spring.application.name"/>
<!-- %m输出的信息,%p日志级别,%t线程名,%d日期,%c类的全名,,,, -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<charset>UTF-8</charset>
<pattern>%d %p (%file:%line\)- ${springAppName:-} %X{X-B3-TraceId:-} %X{X-B3-SpanId:-} %m%n</pattern>
</encoder>
</appender>
<!--<appender name="FREEMUD_API" class="ch.qos.logback.core.rolling.RollingFileAppender">-->
<!--<file>/data/log/mall/order/order.log</file>-->
<!--<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">-->
<!--<fileNamePattern>/data/log/mall/order/order-%d{yyyy-MM-dd}.%i.log</fileNamePattern>-->
<!--<maxFileSize>10MB</maxFileSize>-->
<!--</rollingPolicy>-->
<!--<encoder>-->
<!--<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{5} - %msg%n</pattern>-->
<!--</encoder>-->
<!--</appender>-->
<root level="info">
<appender-ref ref="STDOUT"/>
<!--<appender-ref ref="FREEMUD_API"/>-->
</root>
</configuration>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment