1 Zookeeper 入门

Zookeeper是一个开源的分布式的,为分布式框架提供协调服务的Apache 项目。

1.1 Zookeeper工作机制

Zookeeper从设计模式角度来理解:是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper就将负责通知已经在Zookeeper上注册的那些观察者做出相应的反应。

1.2 Zookeeper特点

  1. Zookeeper:一个领导者(Leader),多个跟随者(Follower)组成的集群。

  2. 集群中只要有半数以上节点存活,Zookeeper集群就能正常服务。所以Zookeeper适合安装奇数台服务器。

  3. 全局数据一致:每个Server保存一份相同的数据副本,Client无论连接到哪个Server,数据都是一致的。

  4. 更新请求顺序执行,来自同一个Client的更新请求按其发送顺序依次执行。

  5. 数据更新原子性,一次数据更新要么成功,要么失败。

  6. 实时性,在一定时间范围内,Client能读到最新数据。

1.3 Zookeeper数据结构

ZooKeeper数据模型的结构与Unix 文件系统很类似,整体上可以看作是一棵树,每个节点称做一个ZNode。每一个ZNode默认能够存储1MB 的数据,每个ZNode 都可以通过其路径唯一标识。

image-20230904085153463

1.4 应用场景

ZooKeeper提供的服务包括:统一命名服务、统一配置管理、统一集群管理、服务器节点动态上下线、软负载均衡等。

2 Zookeeper本地安装

2.1 下载安装

官网首页:https://zookeeper.apache.org/

这里我们用docker安装

创建资源目录

mkdir -p /data/docker/zookeeper/data
mkdir -p /data/docker/zookeeper/conf
mkdir -p /data/docker/zookeeper/logs

在/data/docker/zookeeper/conf创建zoo.cfg,内容如下

dataDir=/data
clientPort=2181
dataLogDir=/var/log/zookeeper
tickTime=2000
initLimit=5
syncLimit=2
autopurge.snapRetainCount=3
autopurge.purgeInterval=0
maxClientCnxns=60
standaloneEnabled=true
admin.enableServer=true
server.1=localhost:2888:3888;2181

启动zookeeper容器

docker run -d --name zookeeper --privileged=true -p 2181:2181  -v /data/docker/zookeeper/data:/data -v /data/docker/zookeeper/conf:/conf -v /data/docker/zookeeper/logs:/var/log/zookeeper zookeeper:3.5.7

进入容器查看状态

root@750a20f12474:/apache-zookeeper-3.5.7-bin/bin# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: standalone

2.2 配置参数解读

tickTime = 2000

通信心跳时间:Zookeeper服务器与客户端心跳时间,单位毫秒

initLimit = 10

LF初始通信时限:Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量)

syncLimit = 5

LF同步通信时限:Leader和Follower之间通信时间如果超过syncLimit * tickTime,Leader认为Follwer死掉,从服务器列表中删除Follwer。

3 Zookeeper集群安装

3.1 下载安装

集群我们用K8S来,创建yaml文件

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: zookeeper-pvc-0
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 1Gi
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: zookeeper-pvc-1
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 1Gi
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: zookeeper-pvc-2
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 1Gi
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: zookeeper
spec:
  serviceName: zookeeper
  replicas: 3
  selector:
    matchLabels:
      app: zookeeper
  template:
    metadata:
      labels:
        app: zookeeper
    spec:
      containers:
      - name: zookeeper
        image: zookeeper:3.5.7
        ports:
        - containerPort: 2181
        volumeMounts:
        - name: data-vol
          mountPath: /data/zookeeper/${MY_ID}/data
        - name: log-vol
          mountPath: /data/zookeeper/${MY_ID}/log
        - name: config-vol
          mountPath: /data/zookeeper/${MY_ID}/zoo.cfg
      volumes:
      - name: data-vol
        persistentVolumeClaim:
          claimName: zookeeper-pvc-0
      - name: log-vol
        persistentVolumeClaim:
          claimName: zookeeper-pvc-1
      - name: config-vol
        persistentVolumeClaim:
          claimName: zookeeper-pvc-2

执行

kubectl apply -f zookeeper.yaml

测试

root@AdRainty:~# kubectl exec -it zookeeper0 bash
bash-4.4# cd /apache-zookeeper-3.5.7-bin/bin
bash-4.4# ./zkServer.sh status
root@750a20f12474:/apache-zookeeper-3.5.7-bin/bin# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Client port found: 2181. Client address: localhost, Client SSL: false.
Mode: follower

3.2 选举机制

3.2.1 第一次启动

image-20230904101040640

  1. 服务器1启动,发起一次选举。服务器1投自己一票。此时服务器1票数一票,不够半数以上(3票),选举无法完成,服务器1状态保持为LOOKING;

  2. 服务器2启动,再发起一次选举。服务器1和2分别投自己一票并交换选票信息:此时服务器1发现服务器2的myid比自己目前投票推举的(服务器1)大,更改选票为推举服务器2。此时服务器1票数0票,服务器2票数2票,没有半数以上结果,选举无法完成,服务器1,2状态保持LOOKING

  3. 服务器3启动,发起一次选举。此时服务器1和2都会更改选票为服务器3。此次投票结果:服务器1为0票,服务器2为0票,服务器3为3票。此时服务器3的票数已经超过半数,服务器3当选Leader。服务器1,2更改状态为FOLLOWING,服务器3更改状态LEADING;

  4. 服务器4启动,发起一次选举。此时服务器1,2,3已经不是LOOKING状态,不会更改选票信息。交换选票信息结果:服务器3为3票,服务器4为1票。此时服务器4服从多数,更改选票信息为服务器3,并更改状态为FOLLOWING;

  5. 服务器5启动,同4一样当小弟。

3.2.2 非第一次启动

假设ZooKeeper由5台服务器组成,SID分别为1、2、3、4、5,ZXID分别为8、8、8、7、7,并且此时SID为3的服务器是Leader。某一时刻,3和5服务器出现故障,因此开始进行Leader选举。

image-20230905092325698

选举Leader规则:

  1. EPOCH大的直接胜出

  2. EPOCH相同,事务id大的胜出

  3. 事务id相同,服务器id大的胜出

4 Zookeeper操作

4.1 命令行语法

命令基本语法

功能描述

help

显示所有操作命令

ls path

使用ls 命令来查看当前 znode 的子节点 -w 监听子节点变化 -s 附加次级信息

create

普通创建 -s 含有序列 -e 临时(重启或者超时消失)

get path

获得节点的值 -w 监听节点内容变化 -s 附加次级信息

set

设置节点的具体值

stat

查看节点状态

delete

删除节点

deleteall

递归删除节点

4.2 znode节点数据信息

[zk: localhost:2181(CONNECTED) 1] ls -s /
[zookeeper]cZxid = 0x0
ctime = Thu Jan 01 00:00:00 UTC 1970
mZxid = 0x0
mtime = Thu Jan 01 00:00:00 UTC 1970
pZxid = 0x0
cversion = -1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1
  • czxid:创建节点的事务 zxid

  • ctime:znode被创建的毫秒数

  • mzxid:znode最后更新的事务 zxid

  • mtime:znode最后修改的毫秒数

  • pZxid:znode最后更新的子节点 zxid

  • cversion:znode 子节点变化号,znode 子节点修改次数

  • dataversion:znode 数据变化号

  • aclVersion:znode 访问控制列表的变化号

  • ephemeralOwner:如果是临时节点,这个是znode 拥有者的session id。如果不是临时节点则是0

  • dataLength:znode 的数据长度

  • numChildren:znode 子节点数量

4.3 节点类型

  • 持久化目录节点:客户端与Zookeeper断开连接后,该节点依旧存在

  • 持久化顺序编号目录节点:客户端与Zookeeper断开连接后,该节点依旧存在,只是Zookeeper给该节点名称进行顺序编号

  • 临时目录节点:客户端与Zookeeper断开连接后,该节点被删除

  • 临时顺序编号目录节点:客户端与Zookeeper 断开连接后, 该节点被删除, 只是Zookeeper给该节点名称进行顺序编号。

image-20230905093801494

创建znode时设置顺序标识,znode名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护

在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序

4.4 监听器

客户端注册监听它关心的目录节点,当目录节点发生变化(数据改变、节点删除、子目录节点增加删除)时,ZooKeeper 会通知客户端。监听机制保证ZooKeeper 保存的任何的数据的任何改变都能快速的响应到监听了该节点的应用程序。

image-20230905094618722

  1. 首先要有一个main()线程,在main线程中创建Zookeeper客户端,这时就会创建两个线程,一个负责网络连接通信(connet),一个负责监听(listener)。

  2. 通过connect线程将注册的监听事件发送给Zookeeper。

  3. 在Zookeeper的注册监听器列表中将注册的监听事件添加到列表中。

  4. Zookeeper监听到有数据或路径变化,就会将这个消息发送给listener线程。

  5. listener线程内部调用了process()方法。

注意:节点的路径变化,也是注册一次,生效一次。想多次生效,就需要多次注册。

4.5 客户端向服务端写数据流程

4.5.1 请求直接发送给Leader节点

  1. 客户端告诉Leader节点写数据

  2. Leader节点写一份后同步给其他客户端

  3. 只要有半数节点回复Leader节点写成功,Leader节点就会像客户端返回ACK

  4. 剩下的节点继续写数据

image-20230905103729024

4.5.2 请求发送给follower节点

  1. 客户端告诉follower节点写数据

  2. follower节点告诉Leader节点写数据

  3. Leader节点写一份后同步给其他客户端

  4. 只要有半数节点回复Leader节点写成功,Leader节点就会像跟客户端建立的follower节点返回写成功

  5. follower节点向客户端返回写成功

  6. 剩下的节点继续写数据

image-20230905104047990

4.6 API操作

pom文件

<dependencies>
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.5.7</version>
    </dependency>

    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-core</artifactId>
        <version>2.8.2</version>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.13.2</version>
        <scope>test</scope>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.28</version>
    </dependency>
</dependencies>

代码如下:

@Slf4j
public class ZkClient {

    private static final int SESSION_TIMEOUT = 5000;

    private ZooKeeper zooKeeper;

    /**
     * 初始化连接
     * @param connect 连接地址
     * @param watcher 监听器
     */
    public void initClient(String connect, Watcher watcher) throws IOException {
        assert Objects.isNull(zooKeeper);
        zooKeeper = new ZooKeeper(connect, SESSION_TIMEOUT, watcher);
    }

    /**
     * 关闭连接
     */
    public void closeClient() throws InterruptedException {
        assert !Objects.isNull(zooKeeper);
        zooKeeper.close();
    }

    /**
     * 创建节点
     * @param path 路径
     * @param data 数据
     * @param createMode 创建模式
     */
    public void createNode(String path, String data,CreateMode createMode) throws InterruptedException, KeeperException {
        assert !Objects.isNull(zooKeeper);
        zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, createMode);
    }

    /**
     * 获取节点下子节点信息
     * @param path 路径
     * @param isWatch 是否开启监控
     */
    public List<String> getChildren(String path, Boolean isWatch) throws InterruptedException, KeeperException {
        assert !Objects.isNull(zooKeeper);
        return zooKeeper.getChildren(path, isWatch);
    }

    /**
     * 判断节点是否存在
     * @param path 路径
     * @param isWatch 是否开启监控
     */
    public boolean exists(String path, Boolean isWatch) throws InterruptedException, KeeperException {
        assert !Objects.isNull(zooKeeper);
        return null != zooKeeper.exists(path, isWatch);
    }

}

测试:

public class ZkTest {

    @Test
    public void testConnect() throws Exception {
        ZkClient zkClient = new ZkClient();
        zkClient.initClient("127.0.0.1:2181", watchedEvent -> {
            System.out.println(watchedEvent.getType() + " " + watchedEvent.getPath());
            List<String> children;
            try {
                children = zkClient.getChildren("/test", Boolean.TRUE);
                System.out.println(children);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        // Thread.sleep(Long.MAX_VALUE);
        // zkClient.createNode("/adrainty", "data", CreateMode.PERSISTENT);
        
        zkClient.closeClient();
    }

}