在那做网站,深圳罗湖做网站公司,省级网站 开发建设 资质,企业管理系统软件免费目录 1 Zookeeper 概述1.1 Zookeeper 定义1.2 Zookeeper 工作机制1.3 Zookeeper 特点1.4 数据结构1.5 应用场景 2 Zookeeper 安装3 客户端命令行操作4 Zookeeper 的 Java 客户端操作4.1 IDEA 环境搭建4.2 初始化 ZooKeeper 客户端4.3 创建子节点4.4 获取子节点4.5 判断Znode是否… 目录 1 Zookeeper 概述1.1 Zookeeper 定义1.2 Zookeeper 工作机制1.3 Zookeeper 特点1.4 数据结构1.5 应用场景 2 Zookeeper 安装3 客户端命令行操作4 Zookeeper 的 Java 客户端操作4.1 IDEA 环境搭建4.2 初始化 ZooKeeper 客户端4.3 创建子节点4.4 获取子节点4.5 判断Znode是否存在4.6 获取子节点存储的数据4.7 设置节点的值4.8 删除节点 5 Zookeeper 内部原理5.1 节点类型5.2 Stat 结构体5.3 监听器原理重点5.4 选举机制重点5.5 写数据流程 1 Zookeeper 概述
1.1 Zookeeper 定义 Zookeeper 是一个 开源 的 分布式 的为分布式应用提供协调服务的 Apache 项目。
Zookeeper 从设计模式角度来理解是一个基于观察者模式设计的分布式服务管理框架它负责存储和管理大家都关心的数据然后接受观察者的注册一旦这些数据的状态发生了变化Zookeeper 就负责通知已经在 Zookeeper 上注册的那些观察者做出相应的反应。
1.2 Zookeeper 工作机制 1.3 Zookeeper 特点 Zookeeper一个领导者Leader)多个跟随者Follower组成的集群。集群中只要有 半数以上 节点存活Zookeeper 集群就能正常服务。全局数据一致性每个 Server 保存一份相同的数据副本Client 无论连接到哪个server数据都是一致的。更新请求 顺序 进行来自同一个 Client 的更新请求按其发送顺序依次执行。数据更新原子性一次数据更新要么成功要么失败保证了数据一致性。实时性在一定时间范围内Client能读到最新数据。
1.4 数据结构 ZooKeeper 数据模型的结构与 Unix 文件系统很类似整体上可以看作是一棵树每个节点称做一个 ZNode。每一个 ZNode 默认能够存储 1MB 的数据每个 ZNode 都可以通过其路径唯一标识。 ZooKeeper 中不存在文件的概念节点中存储的直接就是内容 1.5 应用场景 提供的服务包括统一命名服务、统一配置管理、统一集群管理、服务器节点动态上下线、软负载均衡等。
1统一命名服务
在分布式环境下经常需要对应用/服务进行统一命名便于识别。 例如IP不容易记住而域名容易记住。 2统一配置管理
分布式环境下配置文件同步非常常见。 一般要求一个集群中所有节点的配置信息是一致的比如 Kafka 集群。对配置文件修改后希望能够快速同步到各个节点上。 配置管理可交由 ZooKeeper 实现。 可将配置信息写入 ZooKeeper 上的一个Znode。各个客户端服务器监听这个Znode。一旦 Znode 中的数据被修改ZooKeeper 将通知各个客户端服务器。 3统一集群管理
分布式环境中实时掌握每个节点的状态是必要的。 可根据节点实时状态做出一些调整。 ZooKeeper 可以实现实时监控节点状态变化 可将节点信息写入 ZooKeeper 上的一个ZNode。监听这个 ZNode 可获取它的实时状态变化。 4服务器节点动态上下线 5软负载均衡
在 Zookeeper 中记录每台服务器的访问数让访问数最少的服务器去处理最新的客户端请求 软负载均衡即从软件层面配置实现负载均衡 硬负载均衡即从硬件层面实现负载均衡。 2 Zookeeper 安装 见博客 Zookeeper 安装与部署
3 客户端命令行操作 集群启动 Zookeeper 后每一台机器上启动的都是服务端要操作客户端还需要启动客户端最好是新开一个shell窗口单独作为客户端。
[huweihadoop101 ~]$ cd /opt/module/zookeeper-3.5.7
[huweihadoop101 zookeeper-3.5.7]$ bin/zkCli.sh -server hadoop101:2181由于 zookeeper 的数据都是同步的客户端连接到 hadoop101、hadoop102、hadoop103 哪个机器都是 OK 的 1查看当前 znode 中所包含的节点
[zk: hadoop101:2181(CONNECTED) 0] ls /
[zookeeper]
[zk: hadoop101:2181(CONNECTED) 1] ls /zookeeper
[config, quota]
[zk: hadoop101:2181(CONNECTED) 2] ls /zookeeper/config
[]2查看当前节点详细数据
[zk: hadoop101:2181(CONNECTED) 3] ls -s /
[zookeeper]cZxid 0x0
ctime Thu Jan 01 08:00:00 CST 1970
mZxid 0x0
mtime Thu Jan 01 08:00:00 CST 1970
pZxid 0x0
cversion -1
dataVersion 0
aclVersion 0
ephemeralOwner 0x0
dataLength 0
numChildren 13创建普通节点 无法同时创建多级节点除非父级节点存在也可以在创建节点时指定节点的内容 [zk: hadoop101:2181(CONNECTED) 4] create /sanguo
Created /sanguo
[zk: hadoop101:2181(CONNECTED) 5] ls /
[sanguo, zookeeper]
[zk: hadoop101:2181(CONNECTED) 6] create /sanguo/shuguo liubei
Created /sanguo/shuguo当创建临时节点时在当前客户端是能查看到的退出当前客户端然后再重启客户端再次查看会发现临时节点已经删除 4创建带序号的节点
先创建一个普通节点
[zk: hadoop101:2181(CONNECTED) 7] create /sanguo/weiguo caocao
Created /sanguo/weiguo再创建带序号的节点
[zk: hadoop101:2181(CONNECTED) 8] create -s /sanguo/weiguo caocao
Created /sanguo/weiguo0000000002
[zk: hadoop101:2181(CONNECTED) 9] ls /sanguo
[shuguo, weiguo, weiguo0000000002]如果节点下原来没有子节点序号从0开始依次递增。如果原节点下已有2个节点则再排序时从2开始以此类推。 5获取节点的值
[zk: hadoop101:2181(CONNECTED) 10] get /sanguo/shuguo
liubei6修改节点的值
[zk: hadoop101:2181(CONNECTED) 11] set /sanguo/shuguo kongming
[zk: hadoop101:2181(CONNECTED) 12] get /sanguo/shuguo
kongming7节点的值变化监听
在 hadoop102 主机上注册监听 /sanguo 节点数据变化
[huweihadoop102 zookeeper-3.5.7]$ bin/zkCli.sh
[zk: localhost:2181(CONNECTED) 0] get -w /sanguo
null在 hadoop101 主机上修改 /sanguo 节点数据
[zk: hadoop101:2181(CONNECTED) 13] set /sanguo simayi此时在 hadoop102 主机上见听到了 /sanguo 节点数据的变化 同理ls命令也可以加参数 -w 当新创建或删除文件后可监听文件的变化 8查看节点的状态
[zk: hadoop101:2181(CONNECTED) 14] stat /sanguo
cZxid 0x200000002
ctime Sun Dec 03 15:31:07 CST 2023
mZxid 0x200000008
mtime Sun Dec 03 16:02:16 CST 2023
pZxid 0x200000005
cversion 3
dataVersion 1
aclVersion 0
ephemeralOwner 0x0
dataLength 6
numChildren 39删除节点
[zk: hadoop101:2181(CONNECTED) 15] delete /sanguo
Node not empty: /sanguo
[zk: hadoop101:2181(CONNECTED) 16] delete /sanguo/weiguo0000000002只能删除内容为空的节点 10递归删除节点
[zk: hadoop101:2181(CONNECTED) 17] deleteall /sanguo可以递归地删除内容非空的节点 4 Zookeeper 的 Java 客户端操作
4.1 IDEA 环境搭建 1创建一个Maven Module 2添加 pom 文件
dependenciesdependencygroupIdjunit/groupIdartifactIdjunit/artifactIdversionRELEASE/version/dependencydependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-core/artifactIdversion2.8.2/version/dependencydependencygroupIdorg.apache.zookeeper/groupIdartifactIdzookeeper/artifactIdversion3.5.7/version/dependency
/dependencies3配置 log4j.properties文件
需要在项目的 src/main/resources 目录下新建一个文件命名为log4j.properties在文件中填入以下内容
log4j.rootLoggerINFO, stdout
log4j.appender.stdoutorg.apache.log4j.ConsoleAppender
log4j.appender.stdout.layoutorg.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern%d %p [%c] - %m%n
log4j.appender.logfileorg.apache.log4j.FileAppender
log4j.appender.logfile.Filetarget/spring.log
log4j.appender.logfile.layoutorg.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern%d %p [%c] - %m%n4.2 初始化 ZooKeeper 客户端 public class ZookeeperTest {private ZooKeeper zkClient;private String connectString;private int sessionTimeout;/**获取客户端对象*/Beforepublic void init() throws IOException {connectString hadoop102:2181,hadoop103:2181,hadoop104:2181;int sessionTimeout 10000; // 单位毫秒一般设置10000~40000//参数1 connectString连接zk服务的地址//参数2 sessionTimeout超时时间//参数3 当前客户端默认的监控器zkClient new ZooKeeper(connectString, sessionTimeout, new Watcher() {Overridepublic void process(WatchedEvent event) {}});}/*** 关闭客户端对象*/Afterpublic void close() throws InterruptedException {zkClient.close();}
}4.3 创建子节点 Test
public void create() throws InterruptedException, KeeperException {//参数1 指定创建节点的路径//参数2 指定要创建节点下的数据//参数3 对操作用户进行权限控制//参数4 节点类型、短暂、持久、短暂带序号、持久带序号zkClient.create(/sanguo, liubei.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}4.4 获取子节点 1获取子节点列表不监听
/*** 获取节点不监听*/
Test
public void get() throws InterruptedException, KeeperException {//参数1 指定获取节点的路径//参数2 是否监听ListString children zkClient.getChildren(/, false);System.out.println(children);for (String child : children) {System.out.println(child); // 获取每一个节点名称}
}2获取子节点列表并监听
/*** 获取节点监听*/
Test
public void getAndWatch() throws InterruptedException, KeeperException {//参数1 指定获取节点的路径//参数2 是否监听//参数3 当前客户端默认的监控器ListString children zkClient.getChildren(/, new Watcher() {public void process(WatchedEvent watchedEvent) {System.out.println(根目录下的节点有变化);}});System.out.println(children);for (String child : children) {System.out.println(child); // 获取每一个节点名称}//因为设置了监听,所以当前线程不能结束Thread.sleep(Long.MAX_VALUE);
}启动再去根目录下创建一个新节点
[zk: localhost:2181(CONNECTED) 2] create /shuihu
Created /shuihu查看 IDEA 终端
4.5 判断Znode是否存在 /*** 判断Znode是否存在*/
Test
public void exist() throws InterruptedException, KeeperException {//参数1 指定判断节点的路径//参数2 是否监听Stat stat zkClient.exists(/xiyou, false);System.out.println(stat null ? not exist : exist);
}4.6 获取子节点存储的数据 /*** 获取子节点存储的数据*/
Test
public void getData() throws InterruptedException, KeeperException {//判断节点是否存在Stat stat zkClient.exists(/sanguo, false);if (stat null) {System.out.println(节点不存在...);return;}//参数1 指定判断节点的路径//参数2 是否监听byte[] data zkClient.getData(/sanguo, false, stat);System.out.println(new String(data));
}4.7 设置节点的值 /*** 设置节点的值*/
Test
public void set() throws KeeperException, InterruptedException {//判断节点是否存在Stat stat zkClient.exists(/sanguo, false);if (stat null) {System.out.println(节点不存在...);return;}//参数1 指定判断节点的路径//参数2 节点的值//参数3 版本号zkClient.setData(/sanguo, caocao.getBytes(), stat.getVersion());
}参数3 版本号也可以写 -1但不能不传这个参数 4.8 删除节点 1删除空节点
/*** 删除空节点*/
Test
public void delete() throws KeeperException, InterruptedException {//判断节点是否存在Stat stat zkClient.exists(/aaa, false);if (stat null) {System.out.println(节点不存在...);return;}zkClient.delete(/aaa, stat.getVersion());
}2删除非空节点递归实现
/*** 删除非空节点递归实现*/
//封装一个方法,方便递归调用
public void deleteAll(String path, ZooKeeper zk) throws KeeperException, InterruptedException {//判断节点是否存在Stat stat zkClient.exists(path, false);if (stat null) {System.out.println(节点不存在...);return;}//先获取当前传入节点下的所有子节点ListString children zk.getChildren(path, false);if (children.isEmpty()) {//说明传入的节点没有子节点,可以直接删除zk.delete(path, stat.getVersion());} else {//如果传入的节点有子节点,循环所有子节点for (String child : children) {//删除子节点,但是不知道子节点下面还有没有子节点,所以递归调用deleteAll(path / child, zk);}//删除完所有子节点以后,记得删除传入的节点zk.delete(path, stat.getVersion());}
}//测试deleteAll
Test
public void testDeleteAll() throws KeeperException, InterruptedException {deleteAll(/shuihu, zkClient);
}5 Zookeeper 内部原理
5.1 节点类型 持久Persistent客户端和服务器端断开连接后创建的节点不删除 短暂Ephemeral客户端和服务器端断开连接后创建的节点自己删除 持久化目录节点 客户端与Zookeeper断开连接后该节点依旧存在 持久化顺序编号目录节点 客户端与Zookeeper断开连接后该节点依旧存在只是 zookeeper 给该节点名称进行顺序编号 临时目录节点 客户端与Zookeeper断开连接后该节点被删除 临时顺序编号目录节点 客户端与Zookeeper断开连接后该节点被删除只是 zookeeper 给该节点名称进行顺序编号 注意创建 znode 时设置顺序标识znode 名称后会附加一个值顺序号是一个单调递增的计数器由父节点维护在分布式系统中顺序号可以被用于为所有的事件进行全局排序这样客户端可以通过顺序号推断事件的顺序 5.2 Stat 结构体 [zk: hadoop101:2181(CONNECTED) 18] stat /
cZxid 0x0
ctime Thu Jan 01 08:00:00 CST 1970
mZxid 0x0
mtime Thu Jan 01 08:00:00 CST 1970
pZxid 0x20000000d
cversion 1
dataVersion 0
aclVersion 0
ephemeralOwner 0x0
dataLength 0
numChildren 1czxid创建节点的事务zxid
每次修改 ZooKeeper 状态都会收到一个zxid形式的时间戳也就是ZooKeeper事务ID。事务ID是 ZooKeeper 中所有修改总的次序。每个修改都有唯一的zxid如果zxid1小于zxid2那么zxid1在zxid2之前发生。
ctimeznode被创建的毫秒数(从1970年开始)mzxidznode最后更新的事务zxidmtimeznode最后修改的毫秒数(从1970年开始)pZxidznode最后更新的子节点zxidcversionznode子节点变化号znode子节点修改次数dataversionznode数据变化号aclVersionznode访问控制列表的变化号ephemeralOwner如果是临时节点这个是znode拥有者的session id。如果不是临时节点则是0。dataLengthznode的数据长度numChildrenznode子节点数量
5.3 监听器原理重点 常见的监听
监听节点数据的变化 get -w path监听子节点增减的变化 ls -w path
1首先要有一个 main() 线程 2在 main() 线程中创建 ZooKeeper 客户端这时就会创建两个线程一个负责网络连接通信connet一个负责监听listener 3客户端通过 connet 线程将注册的监听事件发送给 ZooKeeper 4在 ZooKeeper 的注册监听器列表中将注册的监听事件添加到列表中 5ZooKeeper 监听到有数据或路径的变化就会将这个消息发送给 listener 线程 6 客户端 listener 线程内部调用 process() 方法做出相应处理 5.4 选举机制重点 半数机制集群中半数以上机器存活集群可用。所以 Zookeeper 适合安装 奇数台服务器。 一般情况下 Zookeeper 集群更推荐使用奇数台机器原因 在 Zookeeper 集群中 奇数台 和 偶数台接近的台数 机器的容错能力是一样的所以在考虑资源节省的情况我们推荐使用奇数台方案 Zookeeper 虽然在配置文件中并没有指定 Master 和 Slave。但是Zookeeper工作时是有一个节点为 Leader其他则为FollowerLeader是通过内部的 选举机制 临时产生的。
选举机制总原则集群中的每台机器都参与投票通过交换选票信息得到每台机器的最终得票 一旦出现得票数超过机器总数 一半以上 数量当前机器即为 leader。 选票过程中每台机器怎么通信的 每台机器的 ip 加上端口号 3888 以一个简单的例子来说明整个选举的过程。
假设有五台服务器组成的 Zookeeper 集群它们的 id 从 1-5同时它们都是最新启动的也就是没有历史数据在存放数据量这一点上都是一样的。假设这些服务器依序启动来看看会发生什么。 1服务器 1 启动发起一次选举。服务器 1 投自己一票。此时服务器 1 票数一票不够半数以上3票选举无法完成服务器 1 状态保持为 LOCKING
2服务器2启动再发起一次选举。服务器 1 和 2 分别投自己一票并交换选票信息此时服务器 1 发现服务器2的 ID 比自己目前投票推举的服务器1大更改选票为推举服务器2。此时服务器 1 票数 0 票服务器 2 票数 2 票没有半数以上结果选举无法完成服务器12 状态保持 LOCKING
3服务器 3 启动发起一次选举。此时服务器 1 和 2 都会更改选票为服务器 3。此次投票结果服务器1为0票服务器2为0票服务器 3 为 3 票。此时服务器 3 的票数已经超过半数服务器 3 当选 Leader。服务器 12 更改状态为 FOLLOWING服务器3更改状态为LEADING
4服务器 4 启动发现当前集群已经有 leader它自己自动成为follower
5服务器5启动同服务器 4一样。 以5台机器为例当前集群正在使用有数据/没数据leader突然宕机的情况。 当集群中的leader挂掉集群会重新选出一个leader此时首先会比较每一台机器的czxidczxid最大的被选为leader。极端情况czxid都相等的情况那么就会直接比较myid。 5.5 写数据流程