Zookeeper 使用 Java 实现分布式协调机制
2022/7/12 14:21:30
本文主要是介绍Zookeeper 使用 Java 实现分布式协调机制,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
Zookeeper 允许多个客户端在指定的一个或一些节点上添加监听事件,当被监听的节点发生状态变化时,Zookeeper 会把节点变化的细节通知到相应的客户端,这就是 Zookeeper 分布式协调机制的核心本质。
为了实现分布式协调功能,Zookeeper 引入了 Watcher 机制来进行事件监听,但是由于原生的方法需要开发人员反复注册,使用起来很不方便,所以我们通常使用第三方组件 Curator 来实现。Curator 对 Zookeeper 有关 Watcher 的原生方法进行了高度封装,引入了缓存 Cache 来实现对 ZooKeeper 服务端事件的监听,使用起来非常方便。
Curator 提供了 3 种 Cache 对象来实现对 Zookeeper 节点的状态变化监听,具体如下:
-
NodeCache : 只监听具体的某一个的节点
-
PathChildrenCache : 只监听一个节点下所有的子孙节点,但是不监听本身节点
-
TreeCache : 监听本身节点以及所有子孙节点,类似于 PathChildrenCache 和 NodeCache 的组合
好了,话不多说,直接上代码,在本篇博客的最后,会提供源代码下载。
一、搭建工程
新建一个 maven 项目,导入相关 jar 包,内容如下:
有关具体的 jar 包地址,可以在 https://mvnrepository.com 上进行查询。
<dependencies> <!--导入 Spring 的 jar 包--> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.3.18</version> </dependency> <!--导入 Spring 整合 junit 的 jar 包--> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>5.3.18</version> <scope>test</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13.2</version> <scope>test</scope> </dependency> <!--导入 curator 的 jar 包--> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.0</version> </dependency> <!--导入日志相关的 jar 包--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.21</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.21</version> </dependency> </dependencies>
搭建后的最终工程如下图所示,非常简单:
本篇博客的 Demo 的搭建,与上一篇博客很相似,具体实现主要在单元测试中的 zkWatchTest 类中。
二、代码实现
本篇博客除了单元测试中的类不一样之外,其它细节与上一篇博客完全相同,为了保持完整性,这里还是罗列一下。
首先需要对连接 Zookeeper 的信息进行配置,具体配置细节在 zookeeper.properties 文件中:
# zookeeper的连接字符串 # 如果是操作 zookeeper 集群,可以配置多个 zookeeper 地址 # 多个地址之间用英文逗号分隔,如 ip1:port1,ip2:port2,ip3:port3 zk.connectString=127.0.0.1:2181 # zookeeper的会话超时时间 # 单位:毫秒,默认是 60 秒 zk.sessionTimeoutMs=60000 # zookeeper的连接超时时间 # 单位:毫秒,默认是 15 秒 zk.connectionTimeoutMs=15000 # zookeeper默认操作的根节点 # 所有的增删改查操作,默认在该节点下进行 zk.namespace=jobs
然后在代码中使用 Spring 集成 Curator 组件,从 zookeeper.properties 读取连接配置信息:
package com.jobs.config; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.PropertySource; //加载 zookeeper.properties 文件内容 @PropertySource("classpath:zookeeper.properties") public class zookeeperConfig { @Value("${zk.connectString}") private String connectString; @Value("${zk.sessionTimeoutMs}") private Integer sessionTimeoutMs; @Value("${zk.connectionTimeoutMs}") private Integer connectionTimeoutMs; @Value("${zk.namespace}") private String namespace; //获取 Curator 的客户端连接 @Bean public CuratorFramework getCuratorFramework(){ //配置重试策略,如果连接失败,最多重试 1 次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 1); CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(connectString) .sessionTimeoutMs(sessionTimeoutMs) .connectionTimeoutMs(connectionTimeoutMs) .namespace(namespace) .retryPolicy(retryPolicy) .build(); client.start(); return client; } }
package com.jobs.config; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; //采用 Spring 集成 Curator,导入 zookeeperConfig 配置类 @Configuration @Import(zookeeperConfig.class) public class springConfig { }
下面我们就使用 junit 单元测试,使用 Java 采用 Curator 实现对 Zookeeper 节点的监听。
需要注意的是:运行以下代码对节点监听后,你需要使用 Zookeeper 自带的客户端操作被监听的节点,验证监听效果。
package com.jobs.test; import com.jobs.config.springConfig; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.*; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = springConfig.class) public class zkWatchTest { @Autowired private CuratorFramework client; //注意:由于 namespace 配置的是 jobs //因此以下的所有操作,都是默认在 /jobs 节点下进行操作 @Test public void nodeCacheTest() throws Exception { //NodeCache 能够进行单节点监控“增删改”操作,并通知所连接的客户端发生的变化细节 //以下代码运行后,请使用命令行操作 zookeeper 进行测试,分别运行以下命令: //1.创建 /jobs 节点:create /jobs //需要注意的是: //上面采用的是 zookeeper 自己的客户端命令行创建节点,没有给节点存储值,默认存储值是 null //如果采用的是 java 调用 curator 的方法创建节点,没有设置存储值的话,默认是客户端 ip 地址 //2.在 jobs 下创建 test1 节点:create /jobs/test1 //3.修改 test1 节点的内容:set /jobs/test1 hehe //4.删除节点:delete /jobs/test1 //使用 NodeCache 监控 /jobs/test1 节点的变化 NodeCache nodeCache = new NodeCache(client, "/test1"); //当 /jobs/test1 节点发生增删改操作时,通知当前客户端调用监听的方法 //采用 Lambda 表达式的方式写监听事件 nodeCache.getListenable().addListener(() -> { //获取所监听的节点路径 String path = nodeCache.getPath(); if (nodeCache.getCurrentData() == null) { System.out.println("节点[" + path + "]被删除了..."); } else { //获取数据版本号 int version = nodeCache.getCurrentData().getStat().getVersion(); //获取数据值 byte[] bytes = nodeCache.getCurrentData().getData(); String data = ""; if (bytes != null) { data = new String(bytes); } if (version == 0) { System.out.println("节点[" + path + "]被创建了,存储的数据为:" + data); } else { System.out.println("节点[" + path + "]被修改了,修改后的数据为:" + data); } } }); /* //采用匿名内部类的方式写监听事件 nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { //代码同上..... } }); */ //开启监听 nodeCache.start(); //由于是在单元测试中,测试监听,为了方式程序结束,这里简单采用休眠的方式进行阻塞 //真实应用场景下,是依靠网站的线程保持监听持续运行的 //休眠 5 分钟,请在 5 分钟内,采用命令行对 /jobs/test1 节点进行增删改操作,验证效果 Thread.sleep(300000); } //------------------------------------------------ @Test public void pathChildrenCacheTest() throws Exception { //PathChildrenCache 能够对一个节点下所有的子节点监控“增删改”操作,并通知所连接的客户端发生的变化细节 //注意:只监视子节点的变化,不监视父节点的变化 //以下代码,只监视 /jobs/test2 下所有子节点的变化,不监视 /jobs/test2 的变化 //以下代码运行后,请使用命令行操作 zookeeper 进行测试,分别运行以下命令: //1.创建 /jobs/test2 节点:create /jobs/test2 //2.在 /jobs/test2 下创建 p1 和 p2 两个节点: //create /jobs/test2/p1 //create /jobs/test2/p2 //3.修改 p1 和 p2 两个节点的内容: //set /jobs/test2/p1 hehe //set /jobs/test2/p2 haha //4.删除节点: //delete /jobs/test2/p1 //delete /jobs/test2/p2 //delete /jobs/test2 PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/test2", true); //采用 Lambda 表达式的方式写监听事件 pathChildrenCache.getListenable().addListener((client, event) -> { //获取事件类型 PathChildrenCacheEvent.Type type = event.getType(); byte[] bytes; String data; String path; if (type == PathChildrenCacheEvent.Type.CHILD_ADDED) { //添加子节点 //获取添加的子节点 path = event.getData().getPath(); //获取添加的子节点中的数据 bytes = event.getData().getData(); data = bytes != null ? new String(bytes) : ""; System.out.println("添加了子节点[" + path + "],存储数据为:" + data); } else if (type == PathChildrenCacheEvent.Type.CHILD_UPDATED) { //修改子节点 //获取修改的子节点 path = event.getData().getPath(); //获取修改后的子节点中的数据 bytes = event.getData().getData(); data = bytes != null ? new String(bytes) : ""; System.out.println("子节点[" + path + "]修改后的数据为:" + data); } else if (type == PathChildrenCacheEvent.Type.CHILD_REMOVED) { //删除子节点 //获取被删除的子节点 path = event.getData().getPath(); System.out.println("子节点[" + path + "]被删除了..."); } }); /* //采用匿名内部类的方式写监听事件 pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { //代码同上..... } }); */ pathChildrenCache.start(); //由于是在单元测试中,测试监听,为了方式程序结束,这里简单采用休眠的方式进行阻塞 //真实应用场景下,是依靠网站的线程保持监听持续运行的 //休眠 5 分钟,请在 5 分钟内,采用命令行对 /jobs/test2 节点进行增删改操作,验证效果 Thread.sleep(300000); } //------------------------------------------------ @Test public void treeCacheTest() throws Exception { //TreeCache 能够对一个节点,以及该节点下所有的子节点监控“增删改”操作,并通知所连接的客户端发生的变化细节 //TreeCache 相当于 NodeCache 和 PathChildrenCache 的组合。 //TreeCache 的使用方式,跟 PathChildrenCache 很相似,这里就不详细介绍 //TreeCache 跟 PathChildrenCache 的唯一区别在于,TreeCache 除了监控所有子节点之外,还监视本身节点 TreeCache treeCache = new TreeCache(client, "/test3"); //采用 Lambda 表达式的方式写监听事件 treeCache.getListenable().addListener((client, event) -> { //获取事件类型 TreeCacheEvent.Type type = event.getType(); byte[] bytes; String data; String path; if (type == TreeCacheEvent.Type.NODE_ADDED) { //添加节点 //获取添加的节点 path = event.getData().getPath(); //获取添加的节点中的数据 bytes = event.getData().getData(); data = bytes != null ? new String(bytes) : ""; System.out.println("添加了节点[" + path + "],存储数据为:" + data); } else if (type == TreeCacheEvent.Type.NODE_UPDATED) { //修改节点 //获取修改的节点 path = event.getData().getPath(); //获取修改后的节点中的数据 bytes = event.getData().getData(); data = bytes != null ? new String(bytes) : ""; System.out.println("节点[" + path + "]修改后的数据为:" + data); } else if (type == TreeCacheEvent.Type.NODE_REMOVED) { //删除节点 //获取被删除的节点 path = event.getData().getPath(); System.out.println("节点[" + path + "]被删除了..."); } }); /* //采用匿名内部类的方式写监听事件 treeCache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { //代码同上..... } }); */ treeCache.start(); //由于是在单元测试中,测试监听,为了方式程序结束,这里简单采用休眠的方式进行阻塞 //真实应用场景下,是依靠网站的线程保持监听持续运行的 //休眠 5 分钟,请在 5 分钟内,采用命令行对 /jobs/test2 节点进行增删改操作,验证效果 Thread.sleep(300000); } }
在运行以上单元测试方法时,请使用 Zookeeper 自带的客户端,通过命令行操作 Zookeeper 被监听的节点,进行效果验证。
Ok,有关使用 Java 通过 Curator 组件的 API 实现对 Zookeeper 的节点进行监听,从而实现 Zookeeper 的分布式协调机制,已经介绍完毕,总体非常简单,希望对大家有所帮助。
本篇博客的源代码下载地址为:https://files.cnblogs.com/files/blogs/699532/zookeeper_watcher.zip
这篇关于Zookeeper 使用 Java 实现分布式协调机制的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-23Springboot应用的多环境打包入门
- 2024-11-23Springboot应用的生产发布入门教程
- 2024-11-23Python编程入门指南
- 2024-11-23Java创业入门:从零开始的编程之旅
- 2024-11-23Java创业入门:新手必读的Java编程与创业指南
- 2024-11-23Java对接阿里云智能语音服务入门详解
- 2024-11-23Java对接阿里云智能语音服务入门教程
- 2024-11-23JAVA对接阿里云智能语音服务入门教程
- 2024-11-23Java副业入门:初学者的简单教程
- 2024-11-23JAVA副业入门:初学者的实战指南