ZooKeeper源码分析十之集群启动

2021/12/29 12:07:23

本文主要是介绍ZooKeeper源码分析十之集群启动,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

一、ZooKeeper集群

我们先模拟搭建一个ZooKeeper集群环境:
第一步,把下载好的文件复制好三份
在这里插入图片描述
第二步,准备三个数据目录
在这里插入图片描述
第三步,在每个数据目录下创建myid文件,文件内容分别写入1、2、3
第四步,修改每个zoo.cfg配置文件,具体修改内容如下,不同的服务使用不同的clientPort端口

tickTime=2000
initLimit=10
syncLimit=5
dataDir=D:\\zookeeper\\data1
clientPort=2181
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890

最后依次启动即可。

二、源码分析

接下来我们将从源码的角度分析ZooKeeper集群启动过程以及关于集群的一些知识。回到QuorumPeerMain启动类中,其main方法会调用initializeAndRun:

    protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException {      //解析配置文件,这在分析单机启动的时候,已经见到介绍过了
        QuorumPeerConfig config = new QuorumPeerConfig();
        if (args.length == 1) {
            config.parse(args[0]);
        }
        //判断当前是否是集群启动模式
        if (args.length == 1 && config.isDistributed()) {
            runFromConfig(config);
        } else {
            ZooKeeperServerMain.main(args);
        }
    }

我们看看ZooKeeper是怎么判断当前启动是集群还是非集群config.isDistributed():

```java
    public boolean isDistributed() {
        return quorumVerifier != null && (!standaloneEnabled || quorumVerifier.getVotingMembers().size() > 1);
    }

quorumVerifier 的实例是在解析配置文件的parse方法中实例化的如下所示:

        if (dynamicConfigFileStr == null) {
            setupQuorumPeerConfig(zkProp, true);
            if (isDistributed() && isReconfigEnabled()) {
                backupOldConfig();
            }
        }
    void setupQuorumPeerConfig(Properties prop, boolean configBackwardCompatibilityMode) throws IOException, ConfigException {
       //解析配置文件中的server.*配置
        quorumVerifier = parseDynamicConfig(prop, electionAlg, true, configBackwardCompatibilityMode);
        //解析data目录下的myid文件,设置为当前serverId属性
        setupMyId();
        //解析客户端端口
        setupClientPort();
        //设置当前节点是否参与选举
        setupPeerType();
        //验证当前配置的合法性
        checkValidity();
    }

继续执行runFromConfig方法:

    public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {
       //去除了部分代码
        try {
           //此时也会初始化两个网络IO处理工厂
            ServerCnxnFactory cnxnFactory = null;
            ServerCnxnFactory secureCnxnFactory = null;
            //初始化工厂,这和单机启动一样
            if (config.getClientPortAddress() != null) {
                cnxnFactory = ServerCnxnFactory.createFactory();
                cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
            }

            if (config.getSecureClientPortAddress() != null) {
                secureCnxnFactory = ServerCnxnFactory.createFactory();
                secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true);
            }
           //实例化QuorumPeer 对象
            quorumPeer = getQuorumPeer();
            //设置日志文件处理器(已经分析过)
            quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));
            quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
            quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled());
            quorumPeer.setElectionType(config.getElectionAlg());
            quorumPeer.setMyid(config.getServerId());
            quorumPeer.setTickTime(config.getTickTime());
            quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
            quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
            quorumPeer.setInitLimit(config.getInitLimit());
            quorumPeer.setSyncLimit(config.getSyncLimit());
            quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit());
            quorumPeer.setObserverMasterPort(config.getObserverMasterPort());
            quorumPeer.setConfigFileName(config.getConfigFilename());
            quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog());
            quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
            quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
            if (config.getLastSeenQuorumVerifier() != null) {
                quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
            }
            quorumPeer.initConfigInZKDatabase();
            quorumPeer.setCnxnFactory(cnxnFactory);
            quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
            quorumPeer.setSslQuorum(config.isSslQuorum());
            quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
            quorumPeer.setLearnerType(config.getPeerType());
            quorumPeer.setSyncEnabled(config.getSyncEnabled());
            quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
            if (config.sslQuorumReloadCertFiles) {
                quorumPeer.getX509Util().enableCertFileReloading();
            }
            quorumPeer.setMultiAddressEnabled(config.isMultiAddressEnabled());
            quorumPeer.setMultiAddressReachabilityCheckEnabled(config.isMultiAddressReachabilityCheckEnabled());
            quorumPeer.setMultiAddressReachabilityCheckTimeoutMs(config.getMultiAddressReachabilityCheckTimeoutMs());

            // sets quorum sasl authentication configurations
            quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
            if (quorumPeer.isQuorumSaslAuthEnabled()) {
                quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
                quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
                quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
                quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
                quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
            }
            quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
            quorumPeer.initialize();
            if (config.jvmPauseMonitorToRun) {
                quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config));
            }
           //此时我们需要关注的就是这个start方法,这也是集群启动的入口方法
            quorumPeer.start();
            ZKAuditProvider.addZKStartStopAuditLog();
            quorumPeer.join();
        } catch (InterruptedException e) {
        } finally {
            try {
                metricsProvider.stop();
            } catch (Throwable error) {
                LOG.warn("Error while stopping metrics", error);
            }
        }
    }

start方法

    public synchronized void start() {
        //加载数据库
        loadDataBase();
        //启动客户端监听程序(接收客户端请求),和单机模式一样
        startServerCnxnFactory();
        try {
             //启动管理后台程序
            adminServer.start();
        } catch (AdminServerException e) {
            LOG.warn("Problem starting AdminServer", e);
            System.out.println(e);
        }
        //开始leader选举
        startLeaderElection();
        startJvmPauseMonitor();
        //启动当前线程,执行run方法
        super.start();
    }

从启动上逻辑看,集群启动,相比单机启动多了个leader选举过程,但是还是有个中差异。在加载数据库的时候,集群模式下,多了两个参数currentEpoch和acceptedEpoch,至于这两个参数具体作用是什么先不分析。

三、总结

有了单机模式下启动过程的分析,集群模式的启动过程分析起来,就相对简单了些,难点在于启动过程中leader的选举,以及怎么去维护当前集群,以及集群中处理请求的过程,所以接下来将从集群模式下去一一分析这些问题。

以上,有任何不对的地方,请留言指正,敬请谅解。



这篇关于ZooKeeper源码分析十之集群启动的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程