第一章 zookeeper下载地址

点击图片跳转链接 https://zookeeper.apache.org/

第二章 本地模式安装

本地模式安装

安装前准备

  1. 安装 JDK
  2. 拷贝 apache-zookeeper-3.5.7-bin.tar.gz 安装包到 Linux 系统下
    1
    scp -r .\apache-zookeeper-3.5.7-bin.tar.gz Jermyn@hadoop102:/opt/software
  3. 解压到指定目录
    1
    [Jermyn@hadoop102 module]$ tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/
  4. 修改名称
    1
    [Jermyn@hadoop102 module]$ mv apache-zookeeper-3.5.7-bin/ zookeeper-3.5.7

配置修改

  1. 将/opt/module/zookeeper-3.5.7/conf 这个路径下的 zoo_sample.cfg 拷贝修改为 zoo.cfg;
    1
    [Jermyn@hadoop102 conf]$ cp zoo_sample.cfg zoo.cfg
  2. 打开 zoo.cfg 文件,修改 dataDir 路径:
    1
    2
    3
    4
    [Jermyn@hadoop102 conf]$ vim zoo.cfg

    # 修改如下内容:
    dataDir=/opt/module/zookeeper-3.5.7/zkData
  3. 在/opt/module/zookeeper-3.5.7/这个目录上创建 zkData 文件夹
    1
    [Jermyn@hadoop102 zookeeper-3.5.7]$ mkdir zkData

操作 Zookeeper

  1. 启动 Zookeeper
    1
    [Jermyn@hadoop102 zookeeper-3.5.7]$ bin/zkServer.sh start
  2. 查看进程是否启动
    1
    2
    3
    [Jermyn@hadoop102 zookeeper-3.5.7]$ jps
    4020 Jps
    4001 QuorumPeerMain
  3. 查看状态
    1
    2
    3
    4
    [Jermyn@hadoop102 zookeeper-3.5.7]$ bin/zkServer.sh status
    ZooKeeper JMX enabled by default
    Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
    Mode: standalone
  4. 启动客户端
    1
    [Jermyn@hadoop102 zookeeper-3.5.7]$ bin/zkCli.sh
  5. 退出客户端:
    1
    [zk: localhost:2181(CONNECTED) 0] quit
  6. 停止 Zookeeper
    1
    [Jermyn@hadoop102 zookeeper-3.5.7]$ bin/zkServer.sh stop

配置参数解读

  1. tickTime = 2000:通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒
  2. initLimit = 10:LF初始通信时限

    Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量)
  3. syncLimit = 5:LF同步通信时限

    Leader和Follower之间通信时间如果超过syncLimit * tickTime,Leader认为Follwer死掉,从服务器列表中删除Follwer。
  4. dataDir:保存Zookeeper中的数据

    注意:默认的tmp目录,容易被Linux系统定期删除,所以一般不用默认的tmp目录。

  5. clientPort = 2181:客户端连接端口,通常不做修改

第三章 Zookeeper 集群操作

集群操作

集群安装

  1. 集群规划
    在 hadoop102、hadoop103 和 hadoop104 三个节点上都部署 Zookeeper。
  2. 解压安装
    • 在 hadoop102 解压 Zookeeper 安装包到/opt/module/目录下
      1
      [Jermyn@hadoop102 software]$ tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/
    • 修改 apache-zookeeper-3.5.7-bin 名称为 zookeeper-3.5.7
      1
      [Jermyn@hadoop102 module]$ mv apache-zookeeper-3.5.7-bin/zookeeper-3.5.7
  3. 配置服务器编号
    • 在/opt/module/zookeeper-3.5.7/这个目录下创建 zkData
      1
      [Jermyn@hadoop102 zookeeper-3.5.7]$ mkdir zkData
    • 在/opt/module/zookeeper-3.5.7/zkData 目录下创建一个 myid 的文件
      1
      [Jermyn@hadoop102 zkData]$ vi myid
      在文件中添加与 server 对应的编号(注意:上下不要有空行,左右不要有空格
      1
      2

      注意:添加 myid 文件,一定要在 Linux 里面创建,在 notepad++里面很可能乱码

    • 拷贝配置好的 zookeeper 到其他机器上
      1
      [Jermyn@hadoop102 module ]$ xsync zookeeper-3.5.7

      注意:并分别在 hadoop103、hadoop104 上修改 myid 文件中内容为 3、4

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      [Jermyn@hadoop102 module]$ xcall cat /opt/module/zookeeper-3.5.7/zkData/myid
      -----------------------Current User Is:Jermyn,Hostname Is:hadoop102-----------------------
      --> Excute Command "cat /opt/module/zookeeper-3.5.7/zkData/myid"
      2
      -----------------------Current User Is:Jermyn,Hostname Is:hadoop103-----------------------
      --> Excute Command "cat /opt/module/zookeeper-3.5.7/zkData/myid"
      3
      -----------------------Current User Is:Jermyn,Hostname Is:hadoop104-----------------------
      --> Excute Command "cat /opt/module/zookeeper-3.5.7/zkData/myid"
      4
      Excute Successfully!
      [Jermyn@hadoop102 module]$
  4. 配置zoo.cfg文件
    • 重命名/opt/module/zookeeper-3.5.7/conf 这个目录下的 zoo_sample.cfg 为 zoo.cfg
      1
      [Jermyn@hadoop102 conf]$ mv zoo_sample.cfg zoo.cfg
    • 打开 zoo.cfg 文件
      1
      2
      3
      4
      5
      6
      7
      8
      [Jermyn@hadoop102 conf]$ vim zoo.cfg
      #修改数据存储路径配置
      dataDir=/opt/module/zookeeper-3.5.7/zkData
      #增加如下配置
      #######################cluster##########################
      server.2=hadoop102:2888:3888
      server.3=hadoop103:2888:3888
      server.4=hadoop104:2888:3888
    • 配置参数解读
      server.A=B:C:D。
      1
      2
      3
      4
      5
      A 是一个数字,表示这个是第几号服务器;
      集群模式下配置一个文件 myid,这个文件在 dataDir 目录下,这个文件里面有一个数据就是 A 的值,Zookeeper 启动时读取此文件,拿到里面的数据与 zoo.cfg 里面的配置信息比较从而判断到底是哪个 server。
      B 是这个服务器的地址;
      C 是这个服务器 Follower 与集群中的 Leader 服务器交换信息的端口;
      D 是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口。
  5. 同步 zoo.cfg 配置文件
    1
    [Jermyn@hadoop102 conf]$ xsync zoo.cfg

    检查:

  6. 集群操作
    • 启动zookeeper
      1
      [Jermyn@hadoop102 zookeeper-3.5.7]$ xcall /opt/module/zookeeper-3.5.7/bin/zkServer.sh start
    • 查看状态
      1
      [Jermyn@hadoop102 zookeeper-3.5.7]$ xcall /opt/module/zookeeper-3.5.7/bin/zkServer.sh status

选举机制

  1. 服务器1启动,发起一次选举。服务器1投自己一票。此时服务器1票数一票,不够半数以上(3票),选举无法完成,服务器1状态保持为LOOKING
  2. 服务器2启动,再发起一次选举。服务器1和2分别投自己一票并交换选票信息:此时服务器1发现服务器2的myid比自己目前投票推举的(服务器1)大,更改选票为推举服务器2。此时服务器1票数0票,服务器2票数2票,没有半数以上结果,选举无法完成,服务器1,2状态保持LOOKING
  3. 服务器3启动,发起一次选举。此时服务器1和2都会更改选票为服务器3。此次投票结果:服务器1为0票,服务器2为0票,服务器3为3票。此时服务器3的票数已经超过半数,服务器3当选Leader。服务器1,2更改状态FOLLOWING,服务器3更改状态为LEADING;
  4. 服务器4启动,发起一次选举。此时服务器1,2,3已经不是LOOKING状态,不会更改选票信息。交换选票信息结果:服务器3为3票,服务器4为1票。此时服务器4服从多数,更改选票信息为服务器3,并更改状态为FOLLOWING
  5. 服务器5启动,同4一样当小弟。

  1. 当ZooKeeper集群中的一台服务器出现以下两种情况之一时,就会开始进入Leader选举:
    • 服务器初始化启动。
    • 服务器运行期间无法和Leader保持连接。
  2. 而当一台机器进入Leader选举流程时,当前集群也可能会处于以下两种状态:
    • 集群中本来就已经存在一个Leader。
      对于第一种已经存在Leader的情况,机器试图去选举Leader时,会被告知当前服务器的Leader信息,对于该机器来说,仅仅需要和Leader机器建立连接,并进行状态同步即可。
    • 集群中确实不存在Leader。
      假设ZooKeeper由5台服务器组成,SID分别为1、2、3、4、5,ZXID分别为8、8、8、7、7,并且此时SID为3的服务器是Leader。某一时刻,3和5服务器出现故障,因此开始进行Leader选举。
      SID为1、2、4的机器投票情况: (EPOCH,ZXID,SID )(1,8,1) ,(EPOCH,ZXID,SID ),(1,8,2) (EPOCH,ZXID,SID )(1,7,4)
      选举Leader规则: ①EPOCH大的直接胜出 ②EPOCH相同,事务id大的胜出 ③事务id相同,服务器id大的胜出

zookeeper集群启动停止脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#!/bin/bash
############################################################################
# Desc : zkServer zookeeper集群控制脚本
# Path : /home/Jermyn/bin
# Auther : Jermyn
# Date : 2023-07-01
# Version : 1.0
############################################################################
case $1 in
"start"){
for i in hadoop102 hadoop103 hadoop104
do
echo -e "-----------------------\033[5;34mzookeeper $i 启动\033[0m-----------------------"
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh start"
done
};;
"stop"){
for i in hadoop102 hadoop103 hadoop104
do
echo -e "-----------------------\033[5;34mzookeeper $i 停止\033[0m-----------------------"
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop"
done
};;
"status"){
for i in hadoop102 hadoop103 hadoop104
do
echo -e "-----------------------\033[5;34mzookeeper $i 状态\033[0m-----------------------"
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status "
done
};;
esac


#!/bin/bash
############################################################################
# Desc : zkServer zookeeper集群控制脚本
# Path : /home/Jermyn/bin
# Auther : Jermyn
# Date : 2023-07-01
# Version : 2.0
############################################################################
# 此命令需要使用到xcall脚本,确保xcall脚本存在且可以正常运行
case $1 in
"start"){
xcall /opt/module/zookeeper-3.5.7/bin/zkServer.sh start
};;
"stop"){
xcall /opt/module/zookeeper-3.5.7/bin/zkServer.sh stop
};;
"status"){
xcall /opt/module/zookeeper-3.5.7/bin/zkServer.sh status | sed "s/Mode:/$(tput setaf 1)&$(tput sgr0)/g"
};;
esac

客户端命令行操作

命令行语法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
help            显示所有操作命令
ls path 使用 ls 命令来查看当前 znode 的子节点 [可监听]
-w 监听子节点变化
-s 附加次级信息
create 普通创建
-s 含有序列
-e 临时(重启或者超时消失)
get path 获得节点的值 [可监听]
-w 监听节点内容变化
-s 附加次级信息
set 设置节点的具体值
stat 查看节点状态
delete 删除节点
deleteall 递归删除节点
  1. 启动客户端
    1
    [Jermyn@hadoop102 zookeeper-3.5.7]$ bin/zkCli.sh -server hadoop102:2181
  2. 显示所有操作命令
    1
    [zk: hadoop102:2181(CONNECTED) 1] help

znode 节点数据信息

  1. 查看当前znode中所包含的内容
    1
    2
    [zk: hadoop102:2181(CONNECTED) 0] ls /
    [zookeeper]
  2. 查看当前节点详细数据
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    [zk: hadoop102:2181(CONNECTED) 5] ls -s /
    [zookeeper]cZxid = 0x0
    ctime = Thu Jan 01 08:00:00 CST 1970
    mZxid = 0x0
    mtime = Thu Jan 01 08:00:00 CST 1970
    pZxid = 0x0
    cversion = -1
    dataVersion = 0
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 0
    numChildren = 1

    解释:
    (1)czxid:创建节点的事务 zxid每次修改 ZooKeeper 状态都会产生一个 ZooKeeper 事务 ID。事务 ID 是 ZooKeeper 中所有修改总的次序。每次修改都有唯一的 zxid,如果 zxid1 小于 zxid2,那么 zxid1 在 zxid2 之前发生。
    (2)ctime:znode 被创建的毫秒数(从 1970 年开始)
    (3)mzxid:znode 最后更新的事务 zxid
    (4)mtime:znode 最后修改的毫秒数(从 1970 年开始)
    (5)pZxid:znode 最后更新的子节点 zxid
    (6)cversion:znode 子节点变化号,znode 子节点修改次数
    (7)dataversion:znode 数据变化号
    (8)aclVersion:znode 访问控制列表的变化号
    (9)ephemeralOwner:如果是临时节点,这个是 znode 拥有者的 session id。如果不是临时节点则是 0。
    (10)dataLength:znode 的数据长度
    (11)numChildren:znode 子节点数量

节点类型(持久/短暂/有序号/无序号)


(1)持久化目录节点客户端与Zookeeper断开连接后,该节点依旧存在
(2)持久化顺序编号目录节点客户端与Zookeeper断开连接后,该节点依旧存在,只是Zookeeper给该节点名称进行顺序编号
(3)临时目录节点客户端与Zookeeper断开连接后,该节点被删除
(4)临时顺序编号目录节点客户端与 Zookeeper 断开连接后 , 该节点被删除 , 只是Zookeeper给该节点名称进行顺序编号

  1. 分别创建2个普通节点(永久节点 + 不带序号)
    1
    2
    3
    4
    5
    [zk: localhost:2181(CONNECTED) 3] create /sanguo "diaochan"
    Created /sanguo
    [zk: localhost:2181(CONNECTED) 4] create /sanguo/shuguo
    "liubei"
    Created /sanguo/shuguo

    注意:创建节点时,要赋值

  2. 获得节点的值
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    [zk: localhost:2181(CONNECTED) 5] get -s /sanguo
    diaochan
    cZxid = 0x100000003
    ctime = Wed Aug 29 00:03:23 CST 2018
    mZxid = 0x100000003
    mtime = Wed Aug 29 00:03:23 CST 2018
    pZxid = 0x100000004
    cversion = 1
    dataVersion = 0
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 7
    numChildren = 1
    [zk: localhost:2181(CONNECTED) 6] get -s /sanguo/shuguo
    liubei
    cZxid = 0x100000004
    ctime = Wed Aug 29 00:04:35 CST 2018
    mZxid = 0x100000004
    mtime = Wed Aug 29 00:04:35 CST 2018
    pZxid = 0x100000004
    cversion = 0
    dataVersion = 0
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 6
    numChildren = 0
  3. 创建带序号的节点(永久节点 + 带序号)
    • 先创建一个普通的根节点/sanguo/weiguo
      1
      2
      3
      [zk: localhost:2181(CONNECTED) 1] create /sanguo/weiguo
      "caocao"
      Created /sanguo/weiguo
    • 创建带序号的节点
      1
      2
      3
      4
      5
      6
      7
      8
      9
      [zk: localhost:2181(CONNECTED) 2] create -s
      /sanguo/weiguo/zhangliao "zhangliao"
      Created /sanguo/weiguo/zhangliao0000000000
      [zk: localhost:2181(CONNECTED) 3] create -s
      /sanguo/weiguo/zhangliao "zhangliao"
      Created /sanguo/weiguo/zhangliao0000000001
      [zk: localhost:2181(CONNECTED) 4] create -s
      /sanguo/weiguo/xuchu "xuchu"
      Created /sanguo/weiguo/xuchu0000000002

      如果原来没有序号节点,序号从 0 开始依次递增。如果原节点下已有 2 个节点,则再排序时从 2 开始,以此类推。

  4. 创建短暂节点(短暂节点 + 不带序号 or 带序号)
    • 创建短暂的不带序号的节点
      1
      2
      3
      [zk: localhost:2181(CONNECTED) 7] create -e /sanguo/wuguo
      "zhouyu"
      Created /sanguo/wuguo
    • 创建短暂的带序号的节点
      1
      2
      3
      [zk: localhost:2181(CONNECTED) 2] create -e -s /sanguo/wuguo
      "zhouyu"
      Created /sanguo/wuguo0000000001
    • 在当前客户端是能查看到的
      1
      2
      [zk: localhost:2181(CONNECTED) 3] ls /sanguo
      [wuguo, wuguo0000000001, shuguo]
    • 退出当前客户端然后再重启客户端
      1
      2
      [zk: localhost:2181(CONNECTED) 12] quit
      [Jermyn@hadoop104 zookeeper-3.5.7]$ bin/zkCli.sh
    • 再次查看根目录下短暂节点已经删除
      1
      2
      [zk: localhost:2181(CONNECTED) 0] ls /sanguo
      [shuguo]
  5. 修改节点数据值
    1
    [zk: localhost:2181(CONNECTED) 6] set /sanguo/weiguo "simayi"

监听器原理

客户端注册监听它关心的目录节点,当目录节点发生变化(数据改变、节点删除、子目录节点增加删除)时,ZooKeeper 会通知客户端。监听机制保证 ZooKeeper 保存的任何的数据的任何改变都能快速的响应到监听了该节点的应用程序。

  1. 节点的值变化监听
    • 在 hadoop104 主机上注册监听/sanguo 节点数据变化
      1
      [zk: localhost:2181(CONNECTED) 26] get -w /sanguo
    • 在 hadoop103 主机上修改/sanguo 节点的数据
      1
      [zk: localhost:2181(CONNECTED) 1] set /sanguo "xisi"
    • 观察 hadoop104 主机收到数据变化的监听
      1
      2
      WATCHER::
      WatchedEvent state:SyncConnected type:NodeDataChanged path:/sanguo

      注意:在hadoop103再多次修改/sanguo的值,hadoop104上不会再收到监听。因为注册一次,只能监听一次。想再次监听,需要再次注册。

  2. 节点的子节点变化监听(路径变化)
    • 在 hadoop104 主机上注册监听/sanguo 节点的子节点变化
      1
      2
      [zk: localhost:2181(CONNECTED) 1] ls -w /sanguo
      [shuguo, weiguo]
    • 在 hadoop103 主机/sanguo 节点上创建子节点
      1
      2
      [zk: localhost:2181(CONNECTED) 2] create /sanguo/jin "simayi"
      Created /sanguo/jin
    • 观察 hadoop104 主机收到子节点变化的监听
      1
      2
      WATCHER::
      WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/sanguo

      注意:节点的路径变化,也是注册一次,生效一次。想多次生效,就需要多次注册。

节点删除与查看

  1. 删除节点
    1
    [zk: localhost:2181(CONNECTED) 4] delete /sanguo/jin
  2. 递归删除节点
    1
    [zk: localhost:2181(CONNECTED) 15] deleteall /sanguo/shuguo
  3. 查看节点状态
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    [zk: localhost:2181(CONNECTED) 17] stat /sanguo
    cZxid = 0x100000003
    ctime = Wed Aug 29 00:03:23 CST 2018
    mZxid = 0x100000011
    mtime = Wed Aug 29 00:21:23 CST 2018
    pZxid = 0x100000014
    cversion = 9
    dataVersion = 1
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 4
    numChildren = 1

客户端 API 操作

前提:保证 hadoop102、hadoop103、hadoop104 服务器上 Zookeeper 集群服务端启动。

IDEA 环境搭建

  1. 创建一个工程:zookeeper
  2. 添加pom文件
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    <dependencies>
    <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
    </dependency>
    <dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-core</artifactId>
    <version>2.8.2</version>
    </dependency>
    <dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.5.7</version>
    </dependency>
    <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
    <scope>compile</scope>
    </dependency>
    </dependencies>
  3. 拷贝log4j.properties文件到项目根目录
    需要在项目的 src/main/resources 目录下,新建一个文件,命名为“log4j.properties”,在文件中填入。
    1
    2
    3
    4
    5
    6
    7
    8
    log4j.rootLogger=INFO, stdout
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%d %p [%c]- %m%n
    log4j.appender.logfile=org.apache.log4j.FileAppender
    log4j.appender.logfile.File=target/spring.log
    log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
    log4j.appender.logfile.layout.ConversionPattern=%d %p [%c]- %m%n
  4. 创建包名cn.jermyn.zk
  5. 创建类名称zkClient

创建 ZooKeeper 客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package cn.jermyn.zk;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.List;

public class zkClient {

// 逗号左右不可以有空格
private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
private int sessionTimeout = 2000;
private ZooKeeper zkClient;


/**
* 初始化
* @throws IOException
*/
@Before
public void init() throws IOException {

zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {

// 收到事件通知后的回调函数(用户的业务逻辑)
System.out.println(watchedEvent.getType() + "--"
+ watchedEvent.getPath());

// 再次启动监听
try {
List<String> children = null;
children = zkClient.getChildren("/", true);

for (String child : children) {
System.out.println(child);
}
System.out.println("----------------------------");
} catch (KeeperException | InterruptedException e) {
throw new RuntimeException(e);
}

}
});
}

/**
* 创建一个节点
*
* @throws InterruptedException
* @throws KeeperException
*/
@Test
public void create() throws InterruptedException, KeeperException {

// 参数1:要创建的节点的路径;参数2:节点数据 ;参数3:节点权限;参数4:节点的类型
String nodeCreated = zkClient.create("/jermyn",
"blog".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}

/**
* 获取子节点
*
* @throws InterruptedException
* @throws KeeperException
*/
@Test
public void getChildren() throws InterruptedException, KeeperException {

List<String> children = zkClient.getChildren("/", true);
for (String child : children) {
System.out.println(child);
}

// 延时阻塞
Thread.sleep(Long.MAX_VALUE);
}

/**
* 判断某一个节点是否存在
* @throws InterruptedException
* @throws KeeperException
*/
@Test
public void exist() throws InterruptedException, KeeperException {
Stat exists = zkClient.exists("/jermyn", false);
System.out.println(exists == null ? "not exists" : "exists");
}
}

客户端向服务端写数据流程

写流程之写入请求直接发送给Leader节点

写流程之写入请求发送给follower节点

第四章 服务器动态上下线监听案例

  1. 先在集群上创建/servers 节点
    1
    2
    [zk: localhost:2181(CONNECTED) 10] create /servers "servers"
    Created /servers
  2. 在 Idea 中创建包名:cn.jermyn.case1
  3. 服务器端向 Zookeeper 注册代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    package case1;

    import org.apache.zookeeper.*;

    import java.io.IOException;

    public class DistributeServer {

    private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
    private int sessionString = 2000;
    private ZooKeeper zooKeeper;

    /**
    * TODO 1 创建到 zk 的客户端连接
    * TODO 2 注册服务器到zk集群
    * TODO 3 业务功能
    *
    * @param args
    */
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

    DistributeServer distributeServer = new DistributeServer();

    // 1 创建到 zk 的客户端连接
    distributeServer.getConnection();

    // 2 注册服务器到zk集群
    distributeServer.register(args[0]);

    // 3 业务功能
    distributeServer.business();
    }


    /**
    * 业务逻辑
    * @throws InterruptedException
    */
    private void business() throws InterruptedException {
    Thread.sleep(Long.MAX_VALUE);
    }


    /**
    * 注册
    * @param hostname 注册的主机名称
    * @throws InterruptedException
    * @throws KeeperException
    */
    private void register(String hostname) throws InterruptedException, KeeperException {
    String create = zooKeeper.create("/servers/"+hostname,
    hostname.getBytes(),
    ZooDefs.Ids.OPEN_ACL_UNSAFE,
    CreateMode.EPHEMERAL_SEQUENTIAL);
    System.out.println(hostname + " is online");
    }

    /**
    * 创建连接
    *
    * @throws IOException
    */
    private void getConnection() throws IOException {

    zooKeeper = new ZooKeeper(connectString, sessionString, new Watcher() {
    @Override
    public void process(WatchedEvent event) {

    }
    });
    }
    }
  4. 客户端代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    package case1;

    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;

    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;

    /**
    * TODO 1 获取 zk 连接
    * TODO 2 获取 servers 的子节点信息,从中获取服务器信息列表
    * TODO 3 业务进程启动
    */
    public class DistributeClient {
    private static String connectString =
    "hadoop102:2181,hadoop103:2181,hadoop104:2181";
    private static int sessionTimeout = 2000;
    private ZooKeeper zk = null;
    private String parentNode = "/servers";


    /**
    * 创建到 zk 的客户端连接
    *
    * @throws IOException
    */
    public void getConnect() throws IOException {
    zk = new ZooKeeper(connectString, sessionTimeout, new
    Watcher() {

    @Override
    public void process(WatchedEvent event) {
    // 再次启动监听
    try {
    System.out.println("===============分割线================");
    getServerList();
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    });
    }


    /**
    * 获取服务器列表信息
    *
    * @throws Exception
    */
    public void getServerList() throws Exception {

    // 1 获取服务器子节点信息,并且对父节点进行监听
    List<String> children = zk.getChildren(parentNode, true);
    // 2 存储服务器信息列表
    ArrayList<String> servers = new ArrayList<>();

    // 3 遍历所有节点,获取节点中的主机名称信息
    for (String child : children) {

    byte[] data = zk.getData(parentNode + "/" + child,
    false, null);
    servers.add(new String(data));
    }
    // 4 打印服务器列表信息
    System.out.println(servers);
    }

    /**
    * 业务功能
    * @throws Exception
    */
    public void business() throws Exception {
    System.out.println("client is working ...");
    Thread.sleep(Long.MAX_VALUE);
    }

    public static void main(String[] args) throws Exception {

    // 1 获取 zk 连接
    DistributeClient client = new DistributeClient();
    client.getConnect();

    // 2 获取 servers 的子节点信息,从中获取服务器信息列表
    client.getServerList();

    // 3 业务进程启动
    client.business();
    }
    }

第五章 ZooKeeper 分布式锁案例

比如说”进程 1”在使用该资源的时候,会先去获得锁,”进程 1”获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,”进程 1”用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。

原生 Zookeeper 实现分布式锁案例

  1. 分布式锁实现
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    package case2;

    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;

    import java.io.IOException;
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;

    public class DistributedLock {

    private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181"; // zookeeper server 列表
    private int sessionTimeout = 2000; // 超时时间
    private ZooKeeper zk;
    private String rootNode = "locks";
    private String subNode = "seq-";
    private String waitPath; // 当前 client 等待的子节点
    private CountDownLatch connectLatch = new CountDownLatch(1); //ZooKeeper 连接
    private CountDownLatch waitLatch = new CountDownLatch(1); //ZooKeeper 节点等待
    private String currentNode; // 当前 client 创建的子节点

    /**
    * 和 zk 服务建立连接,并创建根节点
    *
    * @throws IOException
    * @throws InterruptedException
    * @throws KeeperException
    */
    public DistributedLock() throws IOException, InterruptedException, KeeperException {

    zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
    @Override
    public void process(WatchedEvent event) {

    // 连接建立时, 打开 latch, 唤醒 wait 在该 latch 上的线程
    if (event.getState() ==
    Event.KeeperState.SyncConnected) {
    connectLatch.countDown();
    }

    // 发生了 waitPath 的删除事件
    if (event.getType() ==
    Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {
    waitLatch.countDown();
    }
    }
    });

    // 等待连接建立
    connectLatch.await();

    //获取根节点状态
    Stat stat = zk.exists("/" + rootNode, false);
    //如果根节点不存在,则创建根节点,根节点类型为永久节点
    if (stat == null) {
    System.out.println("根节点不存在");
    zk.create("/" + rootNode,
    new byte[0],
    ZooDefs.Ids.OPEN_ACL_UNSAFE,
    CreateMode.PERSISTENT);
    }
    }

    /**
    * 加锁方法
    */
    public void zkLock() {
    try {
    //在根节点下创建临时顺序节点,返回值为创建的节点路径
    currentNode = zk.create("/" + rootNode + "/" + subNode,
    null,
    ZooDefs.Ids.OPEN_ACL_UNSAFE,
    CreateMode.EPHEMERAL_SEQUENTIAL);

    // wait 一小会, 让结果更清晰一些
    Thread.sleep(10);
    // 注意, 没有必要监听"/locks"的子节点的变化情况

    List<String> childrenNodes = zk.getChildren("/" + rootNode, false);

    // 列表中只有一个子节点, 那肯定就是 currentNode , 说明client 获得锁
    if (childrenNodes.size() == 1) {
    return;
    } else {
    //对根节点下的所有临时顺序节点进行从小到大排序
    Collections.sort(childrenNodes);

    //当前节点名称
    String thisNode = currentNode.substring(("/" + rootNode + "/").length());

    //获取当前节点的位置
    int index = childrenNodes.indexOf(thisNode);
    if (index == -1) {
    System.out.println("数据异常");
    } else if (index == 0) {
    // index == 0, 说明 thisNode 在列表中最小, 当前client 获得锁
    return;
    } else {
    // 获得排名比 currentNode 前 1 位的节点
    this.waitPath = "/" + rootNode + "/" + childrenNodes.get(index - 1);

    // 在 waitPath 上注册监听器, 当 waitPath 被删除时,zookeeper 会回调监听器的 process 方法
    zk.getData(waitPath, true, new Stat());

    //进入等待锁状态
    waitLatch.await();
    return;
    }
    }
    } catch (KeeperException | InterruptedException e) {
    e.printStackTrace();
    }
    }

    /**
    * 解锁方法
    */
    public void zkUnlock() {
    try {
    zk.delete(this.currentNode, -1);
    } catch (InterruptedException | KeeperException e) {
    e.printStackTrace();
    }
    }
    }
  2. 分布式锁测试
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    package case2;

    import org.apache.zookeeper.KeeperException;

    import java.io.IOException;

    import static org.junit.Assert.*;

    public class DistributedLockTest {

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

    DistributedLock lock1 = new DistributedLock();
    DistributedLock lock2 = new DistributedLock();

    new Thread(new Runnable() {
    @Override
    public void run() {
    try {
    lock1.zkLock();
    System.out.println("线程1启动,获取到锁");
    Thread.sleep(1000 * 5);
    lock1.zkUnlock();
    System.out.println("线程1释放锁");
    } catch (InterruptedException e) {
    throw new RuntimeException(e);
    }
    }
    }).start();

    new Thread(new Runnable() {
    @Override
    public void run() {
    try {
    lock2.zkLock();
    System.out.println("线程2启动,获取到锁");
    Thread.sleep(1000 * 5);
    lock2.zkUnlock();
    System.out.println("线程2释放锁");
    } catch (InterruptedException e) {
    throw new RuntimeException(e);
    }
    }
    }).start();

    }
    }

Curator 框架实现分布式锁案例

  1. 原生的 Java API 开发存在的问题
    • 会话连接是异步的,需要自己去处理。比如使用 CountDownLatch
    • Watch 需要重复注册,不然就不能生效
    • 开发的复杂性还是比较高
    • 不支持多节点删除和创建。需要自己去递归
  2. Curator 是一个专门解决分布式锁的框架,解决了原生 JavaAPI 开发分布式遇到的问题。
    详情请查看官方文档:
    1
    https://curator.apache.org/index.html
    点击图片跳转链接 https://curator.apache.org/index.html
  3. Curator 案例实操
    • 添加依赖
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      <?xml version="1.0" encoding="UTF-8"?>
      <project xmlns="http://maven.apache.org/POM/4.0.0"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>

      <groupId>cn.jermyn</groupId>
      <artifactId>zookeeper</artifactId>
      <version>1.0-SNAPSHOT</version>

      <properties>
      <maven.compiler.source>11</maven.compiler.source>
      <maven.compiler.target>11</maven.compiler.target>
      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      </properties>

      <dependencies>
      <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
      </dependency>
      <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-core</artifactId>
      <version>2.8.2</version>
      </dependency>
      <dependency>
      <groupId>org.apache.zookeeper</groupId>
      <artifactId>zookeeper</artifactId>
      <version>3.5.7</version>
      </dependency>
      <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
      <scope>compile</scope>
      </dependency>
      <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-framework</artifactId>
      <version>4.3.0</version>
      </dependency>
      <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-recipes</artifactId>
      <version>4.3.0</version>
      </dependency>
      <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-client</artifactId>
      <version>4.3.0</version>
      </dependency>

      </dependencies>

      </project>
    • 代码实现
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      package case2;

      import org.apache.curator.framework.CuratorFramework;
      import org.apache.curator.framework.CuratorFrameworkFactory;
      import org.apache.curator.framework.recipes.locks.InterProcessMutex;
      import org.apache.curator.retry.ExponentialBackoffRetry;

      public class CuratorLockTest {

      public static void main(String[] args) {

      // 创建分布式锁1
      InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");

      // 创建分布式锁2
      InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");

      new Thread(new Runnable() {
      @Override
      public void run() {
      try {
      lock1.acquire();
      System.out.println("线程1,获取到锁");

      lock1.acquire();
      System.out.println("线程1,再次获取到锁");

      Thread.sleep(5 * 1000);

      lock1.release();
      System.out.println("线程1,释放锁");

      lock1.release();
      System.out.println("线程1,再次释放锁");
      } catch (Exception e) {
      throw new RuntimeException(e);
      }
      }
      }).start();

      new Thread(new Runnable() {
      @Override
      public void run() {
      try {
      lock2.acquire();
      System.out.println("线程2,获取到锁");

      lock2.acquire();
      System.out.println("线程2,再次获取到锁");

      Thread.sleep(5 * 1000);

      lock2.release();
      System.out.println("线程2,释放锁");

      lock2.release();
      System.out.println("线程2,再次释放锁");
      } catch (Exception e) {
      throw new RuntimeException(e);
      }
      }
      }).start();
      }

      private static CuratorFramework getCuratorFramework() {

      ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);

      CuratorFramework client = CuratorFrameworkFactory.builder().
      connectString("hadoop102:2181,hadoop103:2181,hadoop104:2181").
      connectionTimeoutMs(2000).
      sessionTimeoutMs(2000).
      retryPolicy(policy).build();

      // 客户端启动
      client.start();
      System.out.println("zookeeper启动成功");

      return client;
      }
      }