Loading...

文章背景图

Zookeeper

2024-04-03
2
-
- 分钟
|

1 Zookeeper 入门

Zookeeper是一个开源的分布式的,为分布式框架提供协调服务的Apache 项目。

1.1 Zookeeper工作机制

Zookeeper从设计模式角度来理解:是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper就将负责通知已经在Zookeeper上注册的那些观察者做出相应的反应。

1.2 Zookeeper特点

  1. Zookeeper:一个领导者(Leader),多个跟随者(Follower)组成的集群。
  2. 集群中只要有半数以上节点存活,Zookeeper集群就能正常服务。所以Zookeeper适合安装奇数台服务器。
  3. 全局数据一致:每个Server保存一份相同的数据副本,Client无论连接到哪个Server,数据都是一致的。
  4. 更新请求顺序执行,来自同一个Client的更新请求按其发送顺序依次执行。
  5. 数据更新原子性,一次数据更新要么成功,要么失败。
  6. 实时性,在一定时间范围内,Client能读到最新数据。

1.3 Zookeeper数据结构

ZooKeeper数据模型的结构与Unix 文件系统很类似,整体上可以看作是一棵树,每个节点称做一个ZNode。每一个ZNode默认能够存储1MB 的数据,每个ZNode 都可以通过其路径唯一标识。

image-20230904085153463

1.4 应用场景

ZooKeeper提供的服务包括:统一命名服务、统一配置管理、统一集群管理、服务器节点动态上下线、软负载均衡等。

2 Zookeeper本地安装

2.1 下载安装

官网首页:https://zookeeper.apache.org/

这里我们用docker安装

创建资源目录

mkdir -p /data/docker/zookeeper/data
mkdir -p /data/docker/zookeeper/conf
mkdir -p /data/docker/zookeeper/logs

在/data/docker/zookeeper/conf创建zoo.cfg,内容如下

dataDir=/data
clientPort=2181
dataLogDir=/var/log/zookeeper
tickTime=2000
initLimit=5
syncLimit=2
autopurge.snapRetainCount=3
autopurge.purgeInterval=0
maxClientCnxns=60
standaloneEnabled=true
admin.enableServer=true
server.1=localhost:2888:3888;2181

启动zookeeper容器

docker run -d --name zookeeper --privileged=true -p 2181:2181  -v /data/docker/zookeeper/data:/data -v /data/docker/zookeeper/conf:/conf -v /data/docker/zookeeper/logs:/var/log/zookeeper zookeeper:3.5.7

进入容器查看状态

root@750a20f12474:/apache-zookeeper-3.5.7-bin/bin# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: standalone

2.2 配置参数解读

tickTime = 2000

通信心跳时间:Zookeeper服务器与客户端心跳时间,单位毫秒

initLimit = 10

LF初始通信时限:Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量)

syncLimit = 5

LF同步通信时限:Leader和Follower之间通信时间如果超过syncLimit * tickTime,Leader认为Follwer死掉,从服务器列表中删除Follwer。

3 Zookeeper集群安装

3.1 下载安装

集群我们用K8S来,创建yaml文件

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: zookeeper-pvc-0
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 1Gi
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: zookeeper-pvc-1
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 1Gi
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: zookeeper-pvc-2
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 1Gi
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: zookeeper
spec:
  serviceName: zookeeper
  replicas: 3
  selector:
    matchLabels:
      app: zookeeper
  template:
    metadata:
      labels:
        app: zookeeper
    spec:
      containers:
      - name: zookeeper
        image: zookeeper:3.5.7
        ports:
        - containerPort: 2181
        volumeMounts:
        - name: data-vol
          mountPath: /data/zookeeper/${MY_ID}/data
        - name: log-vol
          mountPath: /data/zookeeper/${MY_ID}/log
        - name: config-vol
          mountPath: /data/zookeeper/${MY_ID}/zoo.cfg
      volumes:
      - name: data-vol
        persistentVolumeClaim:
          claimName: zookeeper-pvc-0
      - name: log-vol
        persistentVolumeClaim:
          claimName: zookeeper-pvc-1
      - name: config-vol
        persistentVolumeClaim:
          claimName: zookeeper-pvc-2

执行

kubectl apply -f zookeeper.yaml

测试

root@AdRainty:~# kubectl exec -it zookeeper0 bash
bash-4.4# cd /apache-zookeeper-3.5.7-bin/bin
bash-4.4# ./zkServer.sh status
root@750a20f12474:/apache-zookeeper-3.5.7-bin/bin# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Client port found: 2181. Client address: localhost, Client SSL: false.
Mode: follower

3.2 选举机制

3.2.1 第一次启动

image-20230904101040640

  1. 服务器1启动,发起一次选举。服务器1投自己一票。此时服务器1票数一票,不够半数以上(3票),选举无法完成,服务器1状态保持为LOOKING;
  2. 服务器2启动,再发起一次选举。服务器1和2分别投自己一票并交换选票信息:此时服务器1发现服务器2的myid比自己目前投票推举的(服务器1)大,更改选票为推举服务器2。此时服务器1票数0票,服务器2票数2票,没有半数以上结果,选举无法完成,服务器1,2状态保持LOOKING
  3. 服务器3启动,发起一次选举。此时服务器1和2都会更改选票为服务器3。此次投票结果:服务器1为0票,服务器2为0票,服务器3为3票。此时服务器3的票数已经超过半数,服务器3当选Leader。服务器1,2更改状态为FOLLOWING,服务器3更改状态LEADING;
  4. 服务器4启动,发起一次选举。此时服务器1,2,3已经不是LOOKING状态,不会更改选票信息。交换选票信息结果:服务器3为3票,服务器4为1票。此时服务器4服从多数,更改选票信息为服务器3,并更改状态为FOLLOWING;
  5. 服务器5启动,同4一样当小弟。

3.2.2 非第一次启动

假设ZooKeeper由5台服务器组成,SID分别为1、2、3、4、5,ZXID分别为8、8、8、7、7,并且此时SID为3的服务器是Leader。某一时刻,3和5服务器出现故障,因此开始进行Leader选举。

image-20230905092325698

选举Leader规则:

  1. EPOCH大的直接胜出
  2. EPOCH相同,事务id大的胜出
  3. 事务id相同,服务器id大的胜出

4 Zookeeper操作

4.1 命令行语法

命令基本语法功能描述
help显示所有操作命令
ls path使用ls 命令来查看当前 znode 的子节点
-w 监听子节点变化
-s 附加次级信息
create普通创建
-s 含有序列
-e 临时(重启或者超时消失)
get path获得节点的值
-w 监听节点内容变化
-s 附加次级信息
set设置节点的具体值
stat查看节点状态
delete删除节点
deleteall递归删除节点

4.2 znode节点数据信息

[zk: localhost:2181(CONNECTED) 1] ls -s /
[zookeeper]cZxid = 0x0
ctime = Thu Jan 01 00:00:00 UTC 1970
mZxid = 0x0
mtime = Thu Jan 01 00:00:00 UTC 1970
pZxid = 0x0
cversion = -1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1
  • czxid:创建节点的事务 zxid
  • ctime:znode被创建的毫秒数
  • mzxid:znode最后更新的事务 zxid
  • mtime:znode最后修改的毫秒数
  • pZxid:znode最后更新的子节点 zxid
  • cversion:znode 子节点变化号,znode 子节点修改次数
  • dataversion:znode 数据变化号
  • aclVersion:znode 访问控制列表的变化号
  • ephemeralOwner:如果是临时节点,这个是znode 拥有者的session id。如果不是临时节点则是0
  • dataLength:znode 的数据长度
  • numChildren:znode 子节点数量

4.3 节点类型

  • 持久化目录节点:客户端与Zookeeper断开连接后,该节点依旧存在
  • 持久化顺序编号目录节点:客户端与Zookeeper断开连接后,该节点依旧存在,只是Zookeeper给该节点名称进行顺序编号
  • 临时目录节点:客户端与Zookeeper断开连接后,该节点被删除
  • 临时顺序编号目录节点:客户端与Zookeeper 断开连接后, 该节点被删除, 只是Zookeeper给该节点名称进行顺序编号。

image-20230905093801494

创建znode时设置顺序标识,znode名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护

在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序

4.4 监听器

客户端注册监听它关心的目录节点,当目录节点发生变化(数据改变、节点删除、子目录节点增加删除)时,ZooKeeper 会通知客户端。监听机制保证ZooKeeper 保存的任何的数据的任何改变都能快速的响应到监听了该节点的应用程序。

image-20230905094618722

  1. 首先要有一个main()线程,在main线程中创建Zookeeper客户端,这时就会创建两个线程,一个负责网络连接通信(connet),一个负责监听(listener)。
  2. 通过connect线程将注册的监听事件发送给Zookeeper。
  3. 在Zookeeper的注册监听器列表中将注册的监听事件添加到列表中。
  4. Zookeeper监听到有数据或路径变化,就会将这个消息发送给listener线程。
  5. listener线程内部调用了process()方法。

注意:节点的路径变化,也是注册一次,生效一次。想多次生效,就需要多次注册。

4.5 客户端向服务端写数据流程

4.5.1 请求直接发送给Leader节点

  1. 客户端告诉Leader节点写数据
  2. Leader节点写一份后同步给其他客户端
  3. 只要有半数节点回复Leader节点写成功,Leader节点就会像客户端返回ACK
  4. 剩下的节点继续写数据

image-20230905103729024

4.5.2 请求发送给follower节点

  1. 客户端告诉follower节点写数据
  2. follower节点告诉Leader节点写数据
  3. Leader节点写一份后同步给其他客户端
  4. 只要有半数节点回复Leader节点写成功,Leader节点就会像跟客户端建立的follower节点返回写成功
  5. follower节点向客户端返回写成功
  6. 剩下的节点继续写数据

image-20230905104047990

4.6 API操作

pom文件

<dependencies>
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.5.7</version>
    </dependency>

    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-core</artifactId>
        <version>2.8.2</version>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.13.2</version>
        <scope>test</scope>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.28</version>
    </dependency>
</dependencies>

代码如下:

@Slf4j
public class ZkClient {

    private static final int SESSION_TIMEOUT = 5000;

    private ZooKeeper zooKeeper;

    /**
     * 初始化连接
     * @param connect 连接地址
     * @param watcher 监听器
     */
    public void initClient(String connect, Watcher watcher) throws IOException {
        assert Objects.isNull(zooKeeper);
        zooKeeper = new ZooKeeper(connect, SESSION_TIMEOUT, watcher);
    }

    /**
     * 关闭连接
     */
    public void closeClient() throws InterruptedException {
        assert !Objects.isNull(zooKeeper);
        zooKeeper.close();
    }

    /**
     * 创建节点
     * @param path 路径
     * @param data 数据
     * @param createMode 创建模式
     */
    public void createNode(String path, String data,CreateMode createMode) throws InterruptedException, KeeperException {
        assert !Objects.isNull(zooKeeper);
        zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, createMode);
    }

    /**
     * 获取节点下子节点信息
     * @param path 路径
     * @param isWatch 是否开启监控
     */
    public List<String> getChildren(String path, Boolean isWatch) throws InterruptedException, KeeperException {
        assert !Objects.isNull(zooKeeper);
        return zooKeeper.getChildren(path, isWatch);
    }

    /**
     * 判断节点是否存在
     * @param path 路径
     * @param isWatch 是否开启监控
     */
    public boolean exists(String path, Boolean isWatch) throws InterruptedException, KeeperException {
        assert !Objects.isNull(zooKeeper);
        return null != zooKeeper.exists(path, isWatch);
    }

}

测试:

public class ZkTest {

    @Test
    public void testConnect() throws Exception {
        ZkClient zkClient = new ZkClient();
        zkClient.initClient("127.0.0.1:2181", watchedEvent -> {
            System.out.println(watchedEvent.getType() + " " + watchedEvent.getPath());
            List<String> children;
            try {
                children = zkClient.getChildren("/test", Boolean.TRUE);
                System.out.println(children);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        // Thread.sleep(Long.MAX_VALUE);
        // zkClient.createNode("/adrainty", "data", CreateMode.PERSISTENT);
        
        zkClient.closeClient();
    }

}

5 分布式前提基础

5.1 Paxos算法

Paxos算法:一种基于消息传递且具有高度容错特性的一致性算法。Paxos算法解决的问题就是如何快速正确的在一个分布式系统中对某个数据值达成一致,并且保证不论发生任何异常,都不会破坏整个系统的一致性。

在一个Paxos系统中,首先将所有节点划分为Proposer(提议者),Acceptor(接受者),和Learner(学习者)。(注意:每个节点都可以身兼数职)。

image-20230905114038316

一个完整的Paxos算法流程分为三个阶段:

  1. Prepare准备阶段:Proposer生成全局唯一且递增的Proposal ID,向所有Acceptor发送Propose请求,这里无需携带提案内容,只携
    带Proposal ID即可。Acceptor收到Propose请求后,做出“两个承诺,一个应答”。
  • 不再接受Proposal ID小于等于当前请求的Propose请求。
  • 不再接受Proposal ID小于当前请求的Accept请求。
  • 不违背以前做出的承诺下,回复已经Accept过的提案中Proposal ID最大的那个提案的Value和Proposal ID,没有则返回空值。
  1. Accept接受阶段:Proposer收到多数Acceptor的Promise应答后,从应答中选择Proposal ID最大的提案的Value,作为本次要发起的提案。如果所有应答的提案Value均为空值,则可以自己随意决定提案Value。然后携带当前Proposal ID,向所有Acceptor发送Propose请求。Acceptor收到Propose请求后,在不违背自己之前做出的承诺下,接受并持久化当前Proposal ID和提案Value。

  2. Learn学习阶段:Proposer收到多数Acceptor的Accept后,决议形成,将形成的决议发送给所有Learner。

5.2 ZAB 协议

Zab 借鉴了Paxos 算法,是特别为Zookeeper 设计的支持崩溃恢复的原子广播协议。基于该协议,Zookeeper 设计为只有一台客户端(Leader)负责处理外部的写事务请求,然后Leader 客户端将数据同步到其他Follower 节点。即Zookeeper 只有一个Leader 可以发起提案。

Zab 协议包括两种基本的模式:消息广播、崩溃恢复。

5.2.1 消息广播

ZAB协议针对事务请求的处理过程类似于一个两阶段提交过程

  • 广播事务阶段
  • 广播提交操作

具体如下:

image-20230905142153410

  1. 客户端发起一个写操作请求。
  2. Leader服务器将客户端的请求转化为事务Proposal 提案,同时为每个Proposal 分配一个全局的ID,即zxid。
  3. Leader服务器为每个Follower服务器分配一个单独的队列,然后将需要广播的Proposal依次放到队列中去,并且根据FIFO策略进行消息发送。
  4. Follower接收到Proposal后,会首先将其以事务日志的方式写入本地磁盘中,写入成功后向Leader反馈一个Ack响应消息。
  5. Leader接收到超过半数以上Follower的Ack响应消息后,即认为消息发送成功,可以发送commit消息。
  6. Leader向所有Follower广播commit消息,同时自身也会完成事务提交。Follower 接收到commit消息后,会将上一条事务提交。
  7. Zookeeper采用Zab协议的核心,就是只要有一台服务器提交了Proposal,就要确保所有的服务器最终都能正确提交Proposal。

5.2.2 崩溃恢复

一旦Leader服务器出现崩溃或者由于网络原因导致Leader服务器失去了与过半Follower的联系,那么就会进入崩溃恢复模式。

Zab协议崩溃恢复要求满足以下两个要求:

  1. 确保已经被Leader提交的提案Proposal,必须最终被所有的Follower服务器提交。
  2. 确保丢弃已经被Leader提出的,但是没有被提交的Proposal。

崩溃恢复主要包括两部分:Leader选举和数据恢复。

  1. Leader选举:根据上述要求,Zab协议需要保证选举出来的Leader需要满足以下条件:
  • 新选举出来的Leader不能包含未提交的Proposal。即新Leader必须都是已经提交了Proposal的Follower服务器节点
  • 新选举的Leader节点中含有最大的zxid。这样做的好处是可以避免Leader服务器检查Proposal的提交和丢弃工作。
  1. 数据同步:
  • 完成Leader选举后,在正式开始工作之前(接收事务请求,然后提出新的Proposal),Leader服务器会首先确认事务日志中的所有的Proposal 是否已经被集群中过半的服务器Commit。
  • Leader服务器需要确保所有的Follower服务器能够接收到每一条事务的Proposal,并且能将所有已经提交的事务Proposal应用到内存数据中。等到Follower将所有尚未同步的事务Proposal都从Leader服务器上同步过,并且应用到内存数据中以后,Leader才会把该Follower加入到真正可用的Follower列表中。

5.3 CAP理论

CAP理论告诉我们,一个分布式系统不可能同时满足以下三种

  • 一致性(C:Consistency):在分布式环境中,一致性是指数据在多个副本之间是否能够保持数据一致的特性。在一致性的需求下,当一个系统在数据一致的状态下执行更新操作后,应该保证系统的数据仍然处于一致的状态。
  • 可用性(A:Available):可用性是指系统提供的服务必须一直处于可用的状态,对于用户的每一个操作请求总是能够在有限的时间内返回结果。
  • 分区容错性(P:Partition Tolerance):分布式系统在遇到任何网络分区故障的时候,仍然需要能够保证对外提供满足一致性和可用性的服务,除非是整个网络环境都发生了故障。

Zookeeper保证的是CP:

  • Zookeeper不能保证每次服务请求的可用性。(注:在极端环境下,Zookeeper可能会丢弃一些请求,消费者程序需要重新请求才能获得结果)。所以说,Zookeeper不能保证服务可用性。
  • 进行Leader选举时集群都是不可用。

6 源码解析

6.1 服务端初始化源码

image-20230905150117010

Zookeeper启动是从org.apache.zookeeper.server.quorum.QuorumPeerMain开始的

// QuorumPeerMain
public static void main(String[] args) {
        QuorumPeerMain main = new QuorumPeerMain();
        try {
            // 初始化并且启动
            main.initializeAndRun(args);
        } 
    	......
        ServiceUtils.requestSystemExit(ExitCode.EXECUTION_FINISHED.getValue());
    }

初始化并且启动的方法initializeAndRun

// QuorumPeerMain
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException {
    // 解析参数 zoo.cfg和 myid
    QuorumPeerConfig config = new QuorumPeerConfig();
    if (args.length == 1) {
        config.parse(args[0]);
    }

    // Start and schedule the the purge task
    // 启动定时任务,删除过期快照
    DatadirCleanupManager purgeMgr = new DatadirCleanupManager(
        config.getDataDir(),
        config.getDataLogDir(),
        config.getSnapRetainCount(),        // 最少保留快照的个数
        config.getPurgeInterval());         // 默认0表示关闭
    purgeMgr.start();

    // 通信初始化
    if (args.length == 1 && config.isDistributed()) {
        runFromConfig(config);
    } else {
        LOG.warn("Either no config or no quorum defined in config, running in standalone mode");
        // there is only server in the quorum -- run as standalone
        ZooKeeperServerMain.main(args);
    }
}

进行通信初始化

// QuorumPeerMain
public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {
    try {
        ManagedUtil.registerLog4jMBeans();
    } catch (JMException e) {
        LOG.warn("Unable to register log4j JMX control", e);
    }

    LOG.info("Starting quorum peer, myid=" + config.getServerId());
    MetricsProvider metricsProvider;
    try {
        metricsProvider = MetricsProviderBootstrap.startMetricsProvider(
            config.getMetricsProviderClassName(),
            config.getMetricsProviderConfiguration());
    } catch (MetricsProviderLifeCycleException error) {
        throw new IOException("Cannot boot MetricsProvider " + config.getMetricsProviderClassName(), error);
    }
    try {
        ServerMetrics.metricsProviderInitialized(metricsProvider);
        ServerCnxnFactory cnxnFactory = null;
        ServerCnxnFactory secureCnxnFactory = null;

        // 创建通讯工厂,默认用的NIO
        if (config.getClientPortAddress() != null) {
            cnxnFactory = ServerCnxnFactory.createFactory();
            cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
        }

        if (config.getSecureClientPortAddress() != null) {
            secureCnxnFactory = ServerCnxnFactory.createFactory();
            secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true);
        }

        // 设置连接参数
        quorumPeer = getQuorumPeer();
        quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));
        quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
        quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled());
        ........
        
        quorumPeer.initialize();

        if (config.jvmPauseMonitorToRun) {
            quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config));
        }

        // 启动zk
        quorumPeer.start();
        ZKAuditProvider.addZKStartStopAuditLog();
        quorumPeer.join();
    } 
    ......
}

6.2 服务端加载数据源码

  • zk 中的数据模型,是一棵树,DataTree,每个节点,叫做DataNode
  • zk 集群中的DataTree 时刻保持状态同步
  • Zookeeper 集群中每个zk 节点中,数据在内存和磁盘中都有一份完整的数据。

image-20230905153106287

上一章我们最后到了start方法启动ZK,我们从这个方法开始

// QuorumPeer
public synchronized void start() {
    if (!getView().containsKey(myid)) {
        throw new RuntimeException("My id " + myid + " not in the peer list");
    }
    // 加载编辑日志和镜像文件数据
    loadDataBase();
    startServerCnxnFactory();
    try {
        // 启动通信工厂实例对象
        adminServer.start();
    } catch (AdminServerException e) {
        LOG.warn("Problem starting AdminServer", e);
        System.out.println(e);
    }
    // 准备选举环境
    startLeaderElection();
    startJvmPauseMonitor();
    // 执行选举
    super.start();
}

image-20230905153544196

// ZKDatabase
public long loadDataBase() throws IOException {
    long startTime = Time.currentElapsedTime();
    long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
    initialized = true;
    long loadTime = Time.currentElapsedTime() - startTime;
    ServerMetrics.getMetrics().DB_INIT_TIME.add(loadTime);
    LOG.info("Snapshot loaded in {} ms, highest zxid is 0x{}, digest is {}",
             loadTime, Long.toHexString(zxid), dataTree.getTreeDigest());
    return zxid;
}
// FileTxnSnapLog
public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException {
    long snapLoadingStartTime = Time.currentElapsedTime();
    // 恢复快照
    long deserializeResult = snapLog.deserialize(dt, sessions);
    ServerMetrics.getMetrics().STARTUP_SNAP_LOAD_TIME.add(Time.currentElapsedTime() - snapLoadingStartTime);
    FileTxnLog txnLog = new FileTxnLog(dataDir);
    boolean trustEmptyDB;
    File initFile = new File(dataDir.getParent(), "initialize");
    if (Files.deleteIfExists(initFile.toPath())) {
        LOG.info("Initialize file found, an empty database will not block voting participation");
        trustEmptyDB = true;
    } else {
        trustEmptyDB = autoCreateDB;
    }

    RestoreFinalizer finalizer = () -> {
        // 加载编辑日志
        long highestZxid = fastForwardFromEdits(dt, sessions, listener);
        // The snapshotZxidDigest will reset after replaying the txn of the
        // zxid in the snapshotZxidDigest, if it's not reset to null after
        // restoring, it means either there are not enough txns to cover that
        // zxid or that txn is missing
        DataTree.ZxidDigest snapshotZxidDigest = dt.getDigestFromLoadedSnapshot();
        if (snapshotZxidDigest != null) {
            LOG.warn(
                "Highest txn zxid 0x{} is not covering the snapshot digest zxid 0x{}, "
                + "which might lead to inconsistent state",
                Long.toHexString(highestZxid),
                Long.toHexString(snapshotZxidDigest.getZxid()));
        }
        return highestZxid;
    };

    .......

    return finalizer.run();
}

6.3 ZK选举源码

image-20230906105405916

// QuorumPeer
public synchronized void startLeaderElection() {
    try {
        if (getPeerState() == ServerState.LOOKING) {
            // 创建选票
            // 选票组件 epoch leader的任期代号、 zxid某个leader当选期间执行的事务编号 myid serverid
            // 开始选票时,都是先投自己
            currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
        }
    } catch (IOException e) {
        RuntimeException re = new RuntimeException(e.getMessage());
        re.setStackTrace(e.getStackTrace());
        throw re;
    }

    // 创建选举算法实例
    this.electionAlg = createElectionAlgorithm(electionType);
}
// QuorumPeer
protected Election createElectionAlgorithm(int electionAlgorithm) {
    Election le = null;

    //TODO: use a factory rather than a switch
    switch (electionAlgorithm) {
        case 1:
            throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");
        case 2:
            throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");
        case 3:
            // 创建QuorumCnxnManager,负责选举过程中的所有网络通信
            QuorumCnxManager qcm = createCnxnManager();
            QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
            if (oldQcm != null) {
                LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
                oldQcm.halt();
            }
            QuorumCnxManager.Listener listener = qcm.listener;
            if (listener != null) {
                // 启动监听线程
                listener.start();
                // 准备开始选举
                FastLeaderElection fle = new FastLeaderElection(this, qcm);
                fle.start();
                le = fle;
            } else {
                LOG.error("Null listener when initializing cnx manager");
            }
            break;
        default:
            assert false;
    }
    return le;
}
// FastLeaderElection
public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager) {
    this.stop = false;
    this.manager = manager;
    starter(self, manager);
}
// FastLeaderElection
private void starter(QuorumPeer self, QuorumCnxManager manager) {
    this.self = self;
    proposedLeader = -1;
    proposedZxid = -1;

    // 初始化队列和信息 
    sendqueue = new LinkedBlockingQueue<ToSend>();
    recvqueue = new LinkedBlockingQueue<Notification>();
    this.messenger = new Messenger(manager);
}

image-20230906101334061

// QuorumPeer
@Override
public void run() {
    ......

    try {
        /*
             * Main loop
             */
        while (running) {
            switch (getPeerState()) {
                case LOOKING:
                    LOG.info("LOOKING");
                    ServerMetrics.getMetrics().LOOKING_COUNT.add(1);

                    if (Boolean.getBoolean("readonlymode.enabled")) {
                        LOG.info("Attempting to start ReadOnlyZooKeeperServer");

                        // Create read-only server but don't start it immediately
                        final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);

                        // Instead of starting roZk immediately, wait some grace
                        // period before we decide we're partitioned.
                        //
                        // Thread is used here because otherwise it would require
                        // changes in each of election strategy classes which is
                        // unnecessary code coupling.
                        Thread roZkMgr = new Thread() {
                            public void run() {
                                try {
                                    // lower-bound grace period to 2 secs
                                    sleep(Math.max(2000, tickTime));
                                    if (ServerState.LOOKING.equals(getPeerState())) {
                                        roZk.startup();
                                    }
                                } 
                                ......
                            }
                        };
                        try {
                            roZkMgr.start();
                            reconfigFlagClear();
                            if (shuttingDownLE) {
                                shuttingDownLE = false;
                                startLeaderElection();
                            }
                            setCurrentVote(makeLEStrategy().lookForLeader());
                        } 
                        ......
                    } else {
                        try {
                            reconfigFlagClear();
                            if (shuttingDownLE) {
                                shuttingDownLE = false;
                                startLeaderElection();
                            }
                            // 更新当前选票
                            setCurrentVote(makeLEStrategy().lookForLeader());
                        } 
                        ......
                    }
                    break;
                case OBSERVING:
                    try {
                        LOG.info("OBSERVING");
                        setObserver(makeObserver(logFactory));
                        observer.observeLeader();
                    } 
                    ......
                    break;
                case FOLLOWING:
                    try {
                        LOG.info("FOLLOWING");
                        setFollower(makeFollower(logFactory));
                        // 如果是Follower,找Leader进行连接
                        follower.followLeader();
                    } 
                    ......
                    break;
                case LEADING:
                    LOG.info("LEADING");
                    try {
                        setLeader(makeLeader(logFactory));
                        // 如果是leader 调用leader.lead
                        leader.lead();
                        setLeader(null);
                    } 
                    ......
            }
        }
    }
    ......
}
// FastLeaderElection
public Vote lookForLeader() throws InterruptedException {
    ......
        
    try {
		// 保存每一个服务器的最新合法有效的投票
        Map<Long, Vote> recvset = new HashMap<Long, Vote>();

        // 存储合法选举之外的投票结果
        Map<Long, Vote> outofelection = new HashMap<Long, Vote>();

        // 一次选举的最大等待时间,默认值是 0.2s
        int notTimeout = minNotificationInterval;

        // 每发起一轮选举, logicalclock++ 
        // 在没有合法的 epoch数据之前,都使用逻辑时钟代替
        synchronized (this) {
            // 更新逻辑时钟,每进行一次选举,都需要更新逻辑时钟
            logicalclock.incrementAndGet();
            // 更新选票
            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
        }

        ......
        
        // 广播发送选票信息
        sendNotifications();

        SyncedLearnerTracker voteSet;

        // 一轮一轮的选举 直到选举成功
        while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
            ......
        }
        return null;
    } 
    ......
}
// FastLeaderElection
private void sendNotifications() {
    // 遍历投票参与者,给每台服务器发送选票
    for (long sid : self.getCurrentAndNextConfigVoters()) {
        QuorumVerifier qv = self.getQuorumVerifier();
        // 向外发送的相关信息
        ToSend notmsg = new ToSend(
            ToSend.mType.notification,
            proposedLeader,
            proposedZxid,
            logicalclock.get(),
            QuorumPeer.ServerState.LOOKING,
            sid,
            proposedEpoch,
            qv.toString().getBytes());

		// 发送消息到消息队列
        sendqueue.offer(notmsg);
    }
}
// QuorumCnxManager
public void toSend(Long sid, ByteBuffer b) {
    // 如果是发送给自己,添加到自己的recvQueue
    if (this.mySid == sid) {
        b.position(0);
        addToRecvQueue(new Message(b.duplicate(), sid));
    } else {
        BlockingQueue<ByteBuffer> bq = queueSendMap.computeIfAbsent(sid, serverId -> new CircularBlockingQueue<>(SEND_CAPACITY));
        // 否则连接到对应服务器并发送
        addToSendQueue(bq, b);
        connectOne(sid);
    }
}
// QuorumCnxManager
private boolean startConnection(Socket sock, Long sid) throws IOException {
    ......

    // 如果对方的Sid大于自己的Sid,说明说自己没资格当leader了,直接放弃连接
    if (sid > self.getId()) {
        LOG.info("Have smaller server identifier, so dropping the connection: (myId:{} --> sid:{})", self.getId(), sid);
        closeSocket(sock);
        // Otherwise proceed with the connection
    } else {
        LOG.debug("Have larger server identifier, so keeping the connection: (myId:{} --> sid:{})", self.getId(), sid);
        // 否则发送相关消息
        SendWorker sw = new SendWorker(sock, sid);
        RecvWorker rw = new RecvWorker(sock, din, sid, sw);
        sw.setRecv(rw);

        SendWorker vsw = senderWorkerMap.get(sid);

        if (vsw != null) {
            vsw.finish();
        }

        senderWorkerMap.put(sid, sw);

        queueSendMap.putIfAbsent(sid, new CircularBlockingQueue<>(SEND_CAPACITY));

        sw.start();
        rw.start();

        return true;

    }
    return false;
}

6.4 Leader和Follow状态同步源码

当选举结束后,每个节点都需要根据自己的角色更新自己的状态。选举出的Leader更新自己状态为 Leader,其他节点更新自己状态为 Follower。

Leader更新状态入口: leader.lead()

Follower更新状态入口: follower.followerLeader()

image-20230906112323660

  1. follower必须要让 leader知道自己的状态: epoch、 zxid、 sid

  • 必须要找出谁是leader

  • 发起请求连接leader

  • 发送自己的信息给leader

  • leader接收到信息,必须要返回对应的信息给 follower。

  1. 当 leader得知 follower的状态了,就确定需要做何种方式的数据同步 DIFF、 TRUNC、SNAP
  2. 执行数据同步
  3. 当 leader接收到超过半数 follower的 ack之后,进入正常工作状态,集群启动完成了

同步策略:

  • DIFF咱两一样,不需要做什么
  • TRUNC follower的 zxid比 leader的 zxid大,所以 Follower要回滚
  • COMMIT leader的 zxid比 follower的 zxid大,发送 Proposal给 follower提交执行
  • 如果 follower并没有任何数据,直接使用 SNAP的方式来执行数据同步(直接把数据全部序列到 follower)

image-20230906112720173

// Leader
void lead() throws IOException, InterruptedException {
    
    ......

    try {
        self.setZabState(QuorumPeer.ZabState.DISCOVERY);
        self.tick.set(0);
        zk.loadData();

        leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());

        // 等待其他 follower节点向 leader节点发送同步状态
        cnxAcceptor = new LearnerCnxAcceptor();
        cnxAcceptor.start();

        long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());

        ......
        
    } finally {
        zk.unregisterJMX(this);
    }
}
class LearnerCnxAcceptorHandler implements Runnable {
    
    ......

    LearnerCnxAcceptorHandler(ServerSocket serverSocket, CountDownLatch latch) {
        this.serverSocket = serverSocket;
        this.latch = latch;
    }

    @Override
    public void run() {
        try {
            Thread.currentThread().setName("LearnerCnxAcceptorHandler-" + serverSocket.getLocalSocketAddress());

            while (!stop.get()) {
                acceptConnections();
            }
        } catch (Exception e) {
            LOG.warn("Exception while accepting follower", e);
            if (fail.compareAndSet(false, true)) {
                handleException(getName(), e);
                halt();
            }
        } finally {
            latch.countDown();
        }
    }

    private void acceptConnections() throws IOException {
        Socket socket = null;
        boolean error = false;
        try {
            // 等待接收连接
            socket = serverSocket.accept();

            socket.setSoTimeout(self.tickTime * self.initLimit);
            socket.setTcpNoDelay(nodelay);

            BufferedInputStream is = new BufferedInputStream(socket.getInputStream());
            // 一旦接收到 follower的请求,就创建 LearnerHandler对象,处理请求
            LearnerHandler fh = new LearnerHandler(socket, is, Leader.this);
            fh.start();
        } 
        ......
    }

}
// Follower
void followLeader() throws InterruptedException {
    
    .......

    try {
        self.setZabState(QuorumPeer.ZabState.DISCOVERY);
        // 找到集群中谁是Leader
        QuorumServer leaderServer = findLeader();
        try {
            // 找到Leader后连接到Leader
            connectToLeader(leaderServer.addr, leaderServer.hostname);
            connectionTime = System.currentTimeMillis();
            // 注册,告诉自己的状态
            long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
            if (self.isReconfigStateChange()) {
                throw new Exception("learned about role change");
            }
            long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
            if (newEpoch < self.getAcceptedEpoch()) {
                ......
                throw new IOException("Error: Epoch of leader is lower");
            }
            long startTime = Time.currentElapsedTime();
            try {
                self.setLeaderAddressAndId(leaderServer.addr, leaderServer.getId());
                self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
                syncWithLeader(newEpochZxid);
                self.setZabState(QuorumPeer.ZabState.BROADCAST);
                completedSync = true;
            } 
            ......
            if (self.getObserverMasterPort() > 0) {
                LOG.info("Starting ObserverMaster");

                om = new ObserverMaster(self, fzk, self.getObserverMasterPort());
                om.start();
            } else {
                om = null;
            }
            // create a reusable packet to reduce gc impact
            QuorumPacket qp = new QuorumPacket();
            while (this.isRunning()) {
                readPacket(qp);
                processPacket(qp);
            }
        } 
        ......
    } finally {
        ......
    }
}
// LearnerHandler
@Override
public void run() {
    try {
        learnerMaster.addLearnerHandler(this);
        // 心跳处理
        tickOfNextAckDeadline = learnerMaster.getTickOfInitialAckDeadline();

        ia = BinaryInputArchive.getArchive(bufferedInput);
        bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
        oa = BinaryOutputArchive.getArchive(bufferedOutput);

        QuorumPacket qp = new QuorumPacket();
        // 获取到对应的消息
        ia.readRecord(qp, "packet");

        ......

        learnerMaster.registerLearnerHandlerBean(this, sock);

        // 读取 Follower发送过来的 lastAcceptedEpoch 
        // 选举过程中,所使用的 epoch,其实还是上一任 leader的 epoch
        long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());

        long peerLastZxid;
        StateSummary ss = null;
        long zxid = qp.getZxid();
        // 创建新的EpochId
        long newEpoch = learnerMaster.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
        long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);

        if (this.getVersion() < 0x10000) {
            // we are going to have to extrapolate the epoch information
            long epoch = ZxidUtils.getEpochFromZxid(zxid);
            ss = new StateSummary(epoch, zxid);
            // fake the message
            learnerMaster.waitForEpochAck(this.getSid(), ss);
        } else {
            byte[] ver = new byte[4];
            ByteBuffer.wrap(ver).putInt(0x10000);
            // 发送新的EpochId给follower
            QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
            oa.writeRecord(newEpochPacket, "packet");
            messageTracker.trackSent(Leader.LEADERINFO);
            bufferedOutput.flush();
            QuorumPacket ackEpochPacket = new QuorumPacket();
            ia.readRecord(ackEpochPacket, "packet");
            messageTracker.trackReceived(ackEpochPacket.getType());
            if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
                LOG.error("{} is not ACKEPOCH", ackEpochPacket.toString());
                return;
            }
            ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
            ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
            learnerMaster.waitForEpochAck(this.getSid(), ss);
        }
        peerLastZxid = ss.getLastZxid();
        
        boolean needSnap = syncFollower(peerLastZxid, learnerMaster);

        boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
        /* if we are not truncating or sending a diff just send a snapshot */
        if (needSnap) {
            syncThrottler = learnerMaster.getLearnerSnapSyncThrottler();
            syncThrottler.beginSync(exemptFromThrottle);
            ServerMetrics.getMetrics().INFLIGHT_SNAP_COUNT.add(syncThrottler.getSyncInProgress());
            try {
                long zxidToSend = learnerMaster.getZKDatabase().getDataTreeLastProcessedZxid();
                oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
                messageTracker.trackSent(Leader.SNAP);
                bufferedOutput.flush();

                // Dump data to peer
                learnerMaster.getZKDatabase().serializeSnapshot(oa);
                oa.writeString("BenWasHere", "signature");
                bufferedOutput.flush();
            } finally {
                ServerMetrics.getMetrics().SNAP_COUNT.add(1);
            }
        } else {
            syncThrottler = learnerMaster.getLearnerDiffSyncThrottler();
            syncThrottler.beginSync(exemptFromThrottle);
            ServerMetrics.getMetrics().INFLIGHT_DIFF_COUNT.add(syncThrottler.getSyncInProgress());
            ServerMetrics.getMetrics().DIFF_COUNT.add(1);
        }

        LOG.debug("Sending NEWLEADER message to {}", sid);
        if (getVersion() < 0x10000) {
            QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, null, null);
            oa.writeRecord(newLeaderQP, "packet");
        } else {
            QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, learnerMaster.getQuorumVerifierBytes(), null);
            queuedPackets.add(newLeaderQP);
        }
        bufferedOutput.flush();

        // Start thread that blast packets in the queue to learner
        startSendingPackets();
        
        qp = new QuorumPacket();
        ia.readRecord(qp, "packet");

        messageTracker.trackReceived(qp.getType());
        if (qp.getType() != Leader.ACK) {
            LOG.error("Next packet was supposed to be an ACK, but received packet: {}", packetToString(qp));
            return;
        }

        LOG.debug("Received NEWLEADER-ACK message from {}", sid);

        learnerMaster.waitForNewLeaderAck(getSid(), qp.getZxid());

        syncLimitCheck.start();
        // sync ends when NEWLEADER-ACK is received
        syncThrottler.endSync();
        if (needSnap) {
            ServerMetrics.getMetrics().INFLIGHT_SNAP_COUNT.add(syncThrottler.getSyncInProgress());
        } else {
            ServerMetrics.getMetrics().INFLIGHT_DIFF_COUNT.add(syncThrottler.getSyncInProgress());
        }
        syncThrottler = null;

        // now that the ack has been processed expect the syncLimit
        sock.setSoTimeout(learnerMaster.syncTimeout());

        /*
             * Wait until learnerMaster starts up
             */
        learnerMaster.waitForStartup();

        LOG.debug("Sending UPTODATE message to {}", sid);
        queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));

        while (true) {
            qp = new QuorumPacket();
            ia.readRecord(qp, "packet");
            messageTracker.trackReceived(qp.getType());

            long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
            if (qp.getType() == Leader.PING) {
                traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
            }
            if (LOG.isTraceEnabled()) {
                ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
            }
            tickOfNextAckDeadline = learnerMaster.getTickOfNextAckDeadline();

            packetsReceived.incrementAndGet();

            ByteBuffer bb;
            long sessionId;
            int cxid;
            int type;

            switch (qp.getType()) {
                case Leader.ACK:
                    if (this.learnerType == LearnerType.OBSERVER) {
                        LOG.debug("Received ACK from Observer {}", this.sid);
                    }
                    syncLimitCheck.updateAck(qp.getZxid());
                    learnerMaster.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
                    break;
                case Leader.PING:
                    // Process the touches
                    ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
                    DataInputStream dis = new DataInputStream(bis);
                    while (dis.available() > 0) {
                        long sess = dis.readLong();
                        int to = dis.readInt();
                        learnerMaster.touch(sess, to);
                    }
                    break;
                case Leader.REVALIDATE:
                    ServerMetrics.getMetrics().REVALIDATE_COUNT.add(1);
                    learnerMaster.revalidateSession(qp, this);
                    break;
                case Leader.REQUEST:
                    bb = ByteBuffer.wrap(qp.getData());
                    sessionId = bb.getLong();
                    cxid = bb.getInt();
                    type = bb.getInt();
                    bb = bb.slice();
                    Request si;
                    if (type == OpCode.sync) {
                        si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
                    } else {
                        si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
                    }
                    si.setOwner(this);
                    learnerMaster.submitLearnerRequest(si);
                    requestsReceived.incrementAndGet();
                    break;
                default:
                    LOG.warn("unexpected quorum packet, type: {}", packetToString(qp));
                    break;
            }
        }
    } 
    ......
}
// LearnerHandler
boolean syncFollower(long peerLastZxid, LearnerMaster learnerMaster) {
    try {
    	......

        if (forceSnapSync) {
            // Force learnerMaster to use snapshot to sync with follower
            LOG.warn("Forcing snapshot sync - should not see this in production");
        } else if (lastProcessedZxid == peerLastZxid) {
            // DIFF: 咱两一样,不需要做什么
            queueOpPacket(Leader.DIFF, peerLastZxid);
            needOpPacket = false;
            needSnap = false;
        } else if (peerLastZxid > maxCommittedLog && !isPeerNewEpochZxid) {
            // TRUNC follower的 zxid比 leader的 zxid大,所以 Follower要回滚
            queueOpPacket(Leader.TRUNC, maxCommittedLog);
            currentZxid = maxCommittedLog;
            needOpPacket = false;
            needSnap = false;
        } else if ((maxCommittedLog >= peerLastZxid) && (minCommittedLog <= peerLastZxid)) {
            // COMMIT leader的 zxid比 follower的 zxid大,发送 Proposal给 follower提交执行
            LOG.info("Using committedLog for peer sid: {}", getSid());
            Iterator<Proposal> itr = db.getCommittedLog().iterator();
            currentZxid = queueCommittedProposals(itr, peerLastZxid, null, maxCommittedLog);
            needSnap = false;
        } else if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) {
            long sizeLimit = db.calculateTxnLogSizeLimit();
            Iterator<Proposal> txnLogItr = db.getProposalsFromTxnLog(peerLastZxid, sizeLimit);
            if (txnLogItr.hasNext()) {
                // 两阶段提交 先提交提案
                currentZxid = queueCommittedProposals(txnLogItr, peerLastZxid, minCommittedLog, maxCommittedLog);

                if (currentZxid < minCommittedLog) {
                    // Clear out currently queued requests and revert
                    // to sending a snapshot.
                    currentZxid = peerLastZxid;
                    queuedPackets.clear();
                    needOpPacket = true;
                } else {
                    LOG.debug("Queueing committedLog 0x{}", Long.toHexString(currentZxid));
                    Iterator<Proposal> committedLogItr = db.getCommittedLog().iterator();
                    currentZxid = queueCommittedProposals(committedLogItr, currentZxid, null, maxCommittedLog);
                    needSnap = false;
                }
            }
            // closing the resources
            if (txnLogItr instanceof TxnLogProposalIterator) {
                TxnLogProposalIterator txnProposalItr = (TxnLogProposalIterator) txnLogItr;
                txnProposalItr.close();
            }
        } 
        ......
            
        if (needSnap) {
            currentZxid = db.getDataTreeLastProcessedZxid();
        }

        LOG.debug("Start forwarding 0x{} for peer sid: {}", Long.toHexString(currentZxid), getSid());
        leaderLastZxid = learnerMaster.startForwarding(this, currentZxid);
    } finally {
        rl.unlock();
    }

    if (needOpPacket && !needSnap) {
        // This should never happen, but we should fall back to sending
        // snapshot just in case.
        LOG.error("Unhandled scenario for peer sid: {} fall back to use snapshot",  getSid());
        needSnap = true;
    }

    return needSnap;
}
// Follower
protected void processPacket(QuorumPacket qp) throws Exception {
    switch (qp.getType()) {
        case Leader.PING:
            ping(qp);
            break;
        case Leader.PROPOSAL:
            // 收到提案
            ServerMetrics.getMetrics().LEARNER_PROPOSAL_RECEIVED_COUNT.add(1);
            TxnLogEntry logEntry = SerializeUtils.deserializeTxn(qp.getData());
            TxnHeader hdr = logEntry.getHeader();
            Record txn = logEntry.getTxn();
            TxnDigest digest = logEntry.getDigest();
            if (hdr.getZxid() != lastQueued + 1) {
                LOG.warn(
                    "Got zxid 0x{} expected 0x{}",
                    Long.toHexString(hdr.getZxid()),
                    Long.toHexString(lastQueued + 1));
            }
            lastQueued = hdr.getZxid();

            if (hdr.getType() == OpCode.reconfig) {
                SetDataTxn setDataTxn = (SetDataTxn) txn;
                QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
                self.setLastSeenQuorumVerifier(qv, true);
            }

            fzk.logRequest(hdr, txn, digest);
            if (hdr != null) {
                long now = Time.currentWallTime();
                long latency = now - hdr.getTime();
                if (latency >= 0) {
                    ServerMetrics.getMetrics().PROPOSAL_LATENCY.add(latency);
                }
            }
            if (om != null) {
                final long startTime = Time.currentElapsedTime();
                om.proposalReceived(qp);
                ServerMetrics.getMetrics().OM_PROPOSAL_PROCESS_TIME.add(Time.currentElapsedTime() - startTime);
            }
            break;
        case Leader.COMMIT:
            // 收到Leader的一阶段提交提案
            ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1);
            fzk.commit(qp.getZxid());
            if (om != null) {
                final long startTime = Time.currentElapsedTime();
                om.proposalCommitted(qp.getZxid());
                ServerMetrics.getMetrics().OM_COMMIT_PROCESS_TIME.add(Time.currentElapsedTime() - startTime);
            }
            break;

        case Leader.COMMITANDACTIVATE:
            // get the new configuration from the request
            Request request = fzk.pendingTxns.element();
            SetDataTxn setDataTxn = (SetDataTxn) request.getTxn();
            QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));

            // get new designated leader from (current) leader's message
            ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
            long suggestedLeaderId = buffer.getLong();
            final long zxid = qp.getZxid();
            boolean majorChange = self.processReconfig(qv, suggestedLeaderId, zxid, true);
            // commit (writes the new config to ZK tree (/zookeeper/config)
            fzk.commit(zxid);

            if (om != null) {
                om.informAndActivate(zxid, suggestedLeaderId);
            }
            if (majorChange) {
                throw new Exception("changes proposed in reconfig");
            }
            break;
        case Leader.UPTODATE:
            LOG.error("Received an UPTODATE message after Follower started");
            break;
        case Leader.REVALIDATE:
            if (om == null || !om.revalidateLearnerSession(qp)) {
                revalidate(qp);
            }
            break;
        case Leader.SYNC:
            fzk.sync();
            break;
        default:
            LOG.warn("Unknown packet type: {}", LearnerHandler.packetToString(qp));
            break;
    }
}

6.5 服务端Leader启动

image-20230906132201425

// Leader
private synchronized void startZkServer() {
    ...... 
        
    zk.startup();
    
    ......
}
// ZooKeeperServer
private void startupWithServerState(State state) {
    if (sessionTracker == null) {
        createSessionTracker();
    }
    startSessionTracker();
    setupRequestProcessors();

    startRequestThrottler();

    registerJMX();

    startJvmPauseMonitor();

    registerMetrics();

    setState(state);

    requestPathMetricsCollector.start();

    localSessionEnabled = sessionTracker.isLocalSessionsEnabled();

    notifyAll();
}
// ZooKeeperServer
protected void setupRequestProcessors() {
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
    ((SyncRequestProcessor) syncProcessor).start();
    firstProcessor = new PrepRequestProcessor(this, syncProcessor);
    ((PrepRequestProcessor) firstProcessor).start();
}

public ZooKeeperServerListener getZooKeeperServerListener() {
    return listener;
}
@Override
public void run() {
    LOG.info(String.format("PrepRequestProcessor (sid:%d) started, reconfigEnabled=%s", zks.getServerId(), zks.reconfigEnabled));
    try {
        while (true) {
            ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_SIZE.add(submittedRequests.size());
            Request request = submittedRequests.take();
            ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_TIME
                .add(Time.currentElapsedTime() - request.prepQueueStartTime);
            long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
            if (request.type == OpCode.ping) {
                traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
            }
            if (LOG.isTraceEnabled()) {
                ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
            }
            if (Request.requestOfDeath == request) {
                break;
            }

            request.prepStartTime = Time.currentElapsedTime();
            pRequest(request);
        }
    } catch (Exception e) {
        handleException(this.getName(), e);
    }
    LOG.info("PrepRequestProcessor exited loop!");
}
protected void pRequest(Request request) throws RequestProcessorException {
    // LOG.info("Prep>>> cxid = " + request.cxid + " type = " +
    // request.type + " id = 0x" + Long.toHexString(request.sessionId));
    request.setHdr(null);
    request.setTxn(null);

    try {
        // 根据不同的请求类型进行处理
        switch (request.type) {
            case OpCode.createContainer:
            case OpCode.create:
            case OpCode.create2:
                CreateRequest create2Request = new CreateRequest();
                pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);
                break;
            case OpCode.createTTL:
                CreateTTLRequest createTtlRequest = new CreateTTLRequest();
                pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest, true);
                break;
            case OpCode.deleteContainer:
            case OpCode.delete:
                DeleteRequest deleteRequest = new DeleteRequest();
                pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true);
                break;
            case OpCode.setData:
                SetDataRequest setDataRequest = new SetDataRequest();
                pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);
                break;
            case OpCode.reconfig:
                ReconfigRequest reconfigRequest = new ReconfigRequest();
                ByteBufferInputStream.byteBuffer2Record(request.request, reconfigRequest);
                pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest, true);
                break;
            case OpCode.setACL:
                SetACLRequest setAclRequest = new SetACLRequest();
                pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true);
                break;
            case OpCode.check:
                CheckVersionRequest checkRequest = new CheckVersionRequest();
                pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest, true);
                break;
            case OpCode.multi:
                MultiOperationRecord multiRequest = new MultiOperationRecord();
                try {
                    ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest);
                } catch (IOException e) {
                    request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(), Time.currentWallTime(), OpCode.multi));
                    throw e;
                }
                List<Txn> txns = new ArrayList<Txn>();
                //Each op in a multi-op must have the same zxid!
                long zxid = zks.getNextZxid();
                KeeperException ke = null;

                //Store off current pending change records in case we need to rollback
                Map<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest);
                request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
                                             Time.currentWallTime(), request.type));
	......
}

6.6 服务端Follower启动

image-20230906132747245

这块上面有,不提了

6.7 客户端启动

image-20230906132945151

评论交流

文章目录