网站建设平台协议书,深圳 网站设计公司,网站建设员工资,蚌埠网站开发文章目录 一、概述二、导入依赖包三、创建锁的过程3.1 通过 create 创建节点信息3.2 AsyncCallback.StringCallback 回调函数3.3 AsyncCallback.Children2Callback 的回调函数3.4 Watcher 的回调函数 四、完整示例4.1 完整分布式锁代码4.2 测试类 如果您还没有安装Zookeeper请看… 文章目录 一、概述二、导入依赖包三、创建锁的过程3.1 通过 create 创建节点信息3.2 AsyncCallback.StringCallback 回调函数3.3 AsyncCallback.Children2Callback 的回调函数3.4 Watcher 的回调函数 四、完整示例4.1 完整分布式锁代码4.2 测试类 如果您还没有安装Zookeeper请看ZooKeeper 安装说明Zookeeper 命令使用方法和数据说明Zookeeper Java 开发入门。 一、概述 情景假设有10个客户端分散的10台主机要执行一个任务这个任务某些过程需要保持原子性。那么我们就需要一个分布式锁。 原理通过在Zookeeper中创建序列节点来实现获得锁删除节点来释放锁。其实质是一个按先来后到的排序过程实现过程如下 客户端发起请求创建锁序列节点/lock/xxxxxxxx 获取所有锁节点判断自己是否为最小节点 如果自己是最小序列节点则立即获得锁。否则不能获得锁但要监控前一个序列节点的状态。 获得锁的客户端开始执行任务。 执行完任务后释放锁。 由于后一个节点监控了前一个节点当前一个节点删除时后一个客户端会收到回调。 在这个回调中再获取所有锁节点判断自己是否为最小节点。 以此类推直到全部结束。 流程如下 如果您对没有做过 Zookeeper 开发强烈建立先看 Zookeeper Java 开发入门。
二、导入依赖包 在 pom.xml 文件中导入 Zookeeper 包注意一般这个包的版本要和您安装的 Zookeeper 服务端版本一致。 dependencygroupIdorg.apache.zookeeper/groupIdartifactIdzookeeper/artifactIdversion3.8.2/version
/dependency三、创建锁的过程
3.1 通过 create 创建节点信息
通过 create 创建序列节点信息。他是异步方式创建成功后会调用 AsyncCallback.StringCallback.processResult 回调函数。 public void lock() throws InterruptedException, LockException {zooKeeper.create(/lock, xxx.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, context);countDownLatch.await();if(StringUtils.isEmpty(this.lockNodePath)){throw new LockException(创建锁失败);}System.out.println(this.appName 获得锁);}3.2 AsyncCallback.StringCallback 回调函数
在 AsyncCallback.StringCallback 的回调函数中通过 getChildren 方法获取 ZooKeeper 锁节点下的所有节点信息。这个方法是异步的调用成功后会调用 AsyncCallback.Children2Callback.processResult 回调函数。 // AsyncCallback.StringCallbackOverridepublic void processResult(int i, String s, Object o, String s1) {if(StringUtils.isEmpty(s1)){// 这里是创建锁失败的情况。this.countDownLatch.countDown();return;}System.out.println(this.appName create lock nodes1);this.lockNodePath s1;// 获取 ZooKeeper 锁节点下的所有节点信息以此来判断我是不是第一个创建节点如果就获得锁否则监控前一个节点信息。zooKeeper.getChildren(/, false, this, context);}3.3 AsyncCallback.Children2Callback 的回调函数
在 AsyncCallback.Children2Callback 的回调函数判断我是不是第一个创建节点如果就获得锁否则监控前一个节点信息。监控前一个节点信息使用 exists 方法这个方法设置了 Watcher 的 processResult 回调函数 // AsyncCallback.Children2CallbackOverridepublic void processResult(int i, String s, Object o, ListString list, Stat stat) {Collections.sort(list);// for (String s1 : list) {
// System.out.println(\t this.lockNodePath previous lock nodes1);
// }int index list.indexOf(lockNodePath.substring(1));if(0 index){// 如果我现在是第一个节点则获得锁try {zooKeeper.setData(/, this.lockNodePath.getBytes(), -1);} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}this.countDownLatch.countDown();}else {// 我不是第一个节点监控前一个节点信息等他删除后我就是可能是第一个了String watchNodePath / list.get(index - 1);System.out.println(\t this.lockNodePath watch node watchNodePath);zooKeeper.exists(watchNodePath, this, new StatCallback() {Overridepublic void processResult(int i, String s, Object o, Stat stat) {}}, context);}}3.4 Watcher 的回调函数
在 Watcher 的回调函数我们通过判断 watchedEvent.getType() 为 NodeDeleted 类型时重新获取 ZooKeeper 锁节点下的所有节点信息这使得消息回到了 “3.3”步判断谁是第一个节点然后获得得完成整个流程。 // WatcherOverridepublic void process(WatchedEvent watchedEvent) {switch (watchedEvent.getType()) {case None:break;case NodeCreated:break;case NodeDeleted:zooKeeper.getChildren(/, false, this, context);break;case NodeDataChanged:break;case NodeChildrenChanged:break;case DataWatchRemoved:break;case ChildWatchRemoved:break;case PersistentWatchRemoved:break;}}
四、完整示例
4.1 完整分布式锁代码
package top.yiqifu.study.p131;import org.apache.commons.lang3.StringUtils;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;public class ZookeeperLock implements Watcher, AsyncCallback.StringCallback,AsyncCallback.Children2Callback {private String appName;private ZooKeeper zooKeeper;private Object context;private String lockNodePath;private CountDownLatch countDownLatch new CountDownLatch(1);public ZookeeperLock(String name, ZooKeeper zk){this.appName name;this.zooKeeper zk;this.context this;}public void lock() throws InterruptedException, LockException {zooKeeper.create(/lock, xxx.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, context);countDownLatch.await();if(StringUtils.isEmpty(this.lockNodePath)){throw new LockException(创建锁失败);}System.out.println(this.appName 获得锁);}public void unlock() throws KeeperException, InterruptedException, LockException {if(StringUtils.isEmpty(this.lockNodePath)){throw new LockException(没有获得锁无法释放锁);}zooKeeper.delete(lockNodePath, -1);System.out.println(this.appName 释放锁);}// AsyncCallback.StringCallbackOverridepublic void processResult(int i, String s, Object o, String s1) {if(StringUtils.isEmpty(s1)){// 这里是创建锁失败的情况。this.countDownLatch.countDown();return;}System.out.println(this.appName create lock nodes1);this.lockNodePath s1;// 获取 ZooKeeper 锁节点下的所有节点信息以此来判断我是不是第一个创建节点如果就获得锁否则监控前一个节点信息。zooKeeper.getChildren(/, false, this, context);}// AsyncCallback.Children2CallbackOverridepublic void processResult(int i, String s, Object o, ListString list, Stat stat) {Collections.sort(list);// for (String s1 : list) {
// System.out.println(\t this.lockNodePath previous lock nodes1);
// }int index list.indexOf(lockNodePath.substring(1));if(0 index){// 如果我现在是第一个节点则获得锁try {zooKeeper.setData(/, this.lockNodePath.getBytes(), -1);} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}this.countDownLatch.countDown();}else {// 我不是第一个节点监控前一个节点信息等他删除后我就是可能是第一个了String watchNodePath / list.get(index - 1);System.out.println(\t this.lockNodePath watch node watchNodePath);zooKeeper.exists(watchNodePath, this, new StatCallback() {Overridepublic void processResult(int i, String s, Object o, Stat stat) {}}, context);}}// WatcherOverridepublic void process(WatchedEvent watchedEvent) {switch (watchedEvent.getType()) {case None:break;case NodeCreated:break;case NodeDeleted:zooKeeper.getChildren(/, false, this, context);break;case NodeDataChanged:break;case NodeChildrenChanged:break;case DataWatchRemoved:break;case ChildWatchRemoved:break;case PersistentWatchRemoved:break;}}public class LockException extends Exception{public LockException(String message){super(message);}}
}
4.2 测试类
package top.yiqifu.study.p131;import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;public class Test06_ZookeeperLock {public static void main(String[] args) {try {// 创建 ZooKeeper 对象final ZooKeeper zooKeeper testCreateZookeeper();int clientCount 10;final CountDownLatch countDownLatch new CountDownLatch(clientCount);for (int i 0; i clientCount; i) {new Thread(new Runnable(){Overridepublic void run() {TestLock(zooKeeper);countDownLatch.countDown();}}).start();}countDownLatch.await();} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}private static void TestLock(ZooKeeper zooKeeper){try {String appName Thread.currentThread().getName();ZookeeperLock zookeeperLock new ZookeeperLock(appName, zooKeeper);// 加锁获得分布式锁zookeeperLock.lock();System.out.println(appName 执行任务);Thread.sleep(1000);// 释放锁zookeeperLock.unlock();} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();} catch (ZookeeperLock.LockException e) {e.printStackTrace();}}private static ZooKeeper testCreateZookeeper() throws IOException, InterruptedException, KeeperException {final CountDownLatch countDownLatch new CountDownLatch(1);// ZooKeeper 集群地址没连接池的概念是Session的概念//String connectionString 192.168.8.51:2181,192.168.8.52:2181,192.168.8.53:2181;String connectionString 192.168.8.51:2181,192.168.8.52:2181,192.168.8.53:2181/aaa;// ZooKeeper Session 数据超时时间也就是这个Session关闭后与这个Session相关的数据在ZooKeeper中保存多信。Integer sessionTimeout 3000;// ZooKeeper Session 级别 WatcherWatch只发生在读方法上如 get、existsfinal ZooKeeper zooKeeper new ZooKeeper(connectionString, sessionTimeout, new Watcher() {Overridepublic void process(WatchedEvent watchedEvent) {try {Event.KeeperState state watchedEvent.getState();Event.EventType type watchedEvent.getType();String path watchedEvent.getPath();switch (state) {case Unknown:break;case Disconnected:break;case NoSyncConnected:break;case SyncConnected:countDownLatch.countDown();break;case AuthFailed:break;case ConnectedReadOnly:break;case SaslAuthenticated:break;case Expired:break;case Closed:break;}switch (type) {case None:break;case NodeCreated:break;case NodeDeleted:break;case NodeDataChanged:break;case NodeChildrenChanged:break;case DataWatchRemoved:break;case ChildWatchRemoved:break;case PersistentWatchRemoved:break;}System.out.println(Session watch state state);System.out.println(Session watch type type);System.out.println(Session watch path path);} catch (Exception e) {e.printStackTrace();}}});countDownLatch.await();ZooKeeper.States state zooKeeper.getState();switch (state) {case CONNECTING:break;case ASSOCIATING:break;case CONNECTED:break;case CONNECTEDREADONLY:break;case CLOSED:break;case AUTH_FAILED:break;case NOT_CONNECTED:break;}System.out.println(ZooKeeper state state);return zooKeeper;}}