Zookeeper源码部分 第2章 ZK服务端初始化源码解析

2022/6/5 1:20:26

本文主要是介绍Zookeeper源码部分 第2章 ZK服务端初始化源码解析,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

ZK服务端初始化源码解析

image-20220604222359643

2.2.1 ZK服务端启动脚本分析

1)Zookeeper服务的启动命令是zkServer.sh start

zkServer.sh

#!/usr/bin/env bash
# use POSTIX interface, symlink is followed automatically
ZOOBIN="${BASH_SOURCE-$0}"
ZOOBIN="$(dirname "${ZOOBIN}")"
ZOOBINDIR="$(cd "${ZOOBIN}"; pwd)"

if [ -e "$ZOOBIN/../libexec/zkEnv.sh" ]; then
  . "$ZOOBINDIR"/../libexec/zkEnv.sh
else
  . "$ZOOBINDIR"/zkEnv.sh  //相当于获取zkEnv.sh中的环境变量(ZOOCFG="zoo.cfg")
fi

# See the following page for extensive details on setting
# up the JVM to accept JMX remote management:
# http://java.sun.com/javase/6/docs/technotes/guides/management/agent.html
# by default we allow local JMX connections
if [ "x$JMXLOCALONLY" = "x" ]
then
    JMXLOCALONLY=false
fi

if [ "x$JMXDISABLE" = "x" ] || [ "$JMXDISABLE" = 'false' ]
then
  echo "ZooKeeper JMX enabled by default" >&2
  if [ "x$JMXPORT" = "x" ]
  then
    # for some reason these two options are necessary on jdk6 on Ubuntu
    #   accord to the docs they are not necessary, but otw jconsole cannot
    #   do a local attach
    ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY org.apache.zookeeper.server.quorum.QuorumPeerMain"
  else
    if [ "x$JMXAUTH" = "x" ]
    then
      JMXAUTH=false
    fi
    if [ "x$JMXSSL" = "x" ]
    then
      JMXSSL=false
    fi
    if [ "x$JMXLOG4J" = "x" ]
    then
      JMXLOG4J=true
    fi
    echo "ZooKeeper remote JMX Port set to $JMXPORT" >&2
    echo "ZooKeeper remote JMX authenticate set to $JMXAUTH" >&2
    echo "ZooKeeper remote JMX ssl set to $JMXSSL" >&2
    echo "ZooKeeper remote JMX log4j set to $JMXLOG4J" >&2
    ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=$JMXPORT -Dcom.sun.management.jmxremote.authenticate=$JMXAUTH -Dcom.sun.management.jmxremote.ssl=$JMXSSL -Dzookeeper.jmx.log4j.disable=$JMXLOG4J org.apache.zookeeper.server.quorum.QuorumPeerMain"
  fi
else
    echo "JMX disabled by user request" >&2
    ZOOMAIN="org.apache.zookeeper.server.quorum.QuorumPeerMain"
fi

if [ "x$SERVER_JVMFLAGS" != "x" ]
then
    JVMFLAGS="$SERVER_JVMFLAGS $JVMFLAGS"
fi

… …

case $1 in
start)
    echo  -n "Starting zookeeper ... "
    if [ -f "$ZOOPIDFILE" ]; then
      if kill -0 `cat "$ZOOPIDFILE"` > /dev/null 2>&1; then
         echo $command already running as process `cat "$ZOOPIDFILE"`.
         exit 1
      fi
    fi
    nohup "$JAVA" $ZOO_DATADIR_AUTOCREATE "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" 
    "-Dzookeeper.log.file=${ZOO_LOG_FILE}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" 
    -XX:+HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError='kill -9 %p' 
    -cp "$CLASSPATH" $JVMFLAGS $ZOOMAIN "$ZOOCFG" > "$_ZOO_DAEMON_OUT" 2>&1 < /dev/null &
    … …
    ;;
stop)
    echo -n "Stopping zookeeper ... "
    if [ ! -f "$ZOOPIDFILE" ]
    then
      echo "no zookeeper to stop (could not find file $ZOOPIDFILE)"
    else
      $KILL $(cat "$ZOOPIDFILE")
      rm "$ZOOPIDFILE"
      sleep 1
      echo STOPPED
    fi
    exit 0
    ;;
restart)
    shift
    "$0" stop ${@}
    sleep 3
    "$0" start ${@}
    ;;
status)
    … …
    ;;
*)
    echo "Usage: $0 [--config <conf-dir>] {start|start-foreground|stop|restart|status|print-cmd}" >&2

esac

2)zkServer.sh start底层的实际执行内容

nohup "$JAVA" 
+ 一堆提交参数 
+ $ZOOMAIN(org.apache.zookeeper.server.quorum.QuorumPeerMain)
+ "$ZOOCFG" (zkEnv.sh文件中ZOOCFG="zoo.cfg")

3)所以程序的入口是QuorumPeerMain.java类

2.2.2 ZK服务端启动入口

1)ctrl + n,查找QuorumPeerMain

​ QuorumPeerMain.java

public static void main(String[] args) {
    // 创建了一个zk节点
    QuorumPeerMain main = new QuorumPeerMain();
    try {
        // 初始化节点并运行,args相当于提交参数中的zoo.cfg
        main.initializeAndRun(args);
    } catch (IllegalArgumentException e) {
        ... ...
    }
    LOG.info("Exiting normally");
    System.exit(0);
}

2)initializeAndRun

protected void initializeAndRun(String[] args)
    throws ConfigException, IOException, AdminServerException
{
	// 管理zk的配置信息
    QuorumPeerConfig config = new QuorumPeerConfig();
    if (args.length == 1) {
		// 1解析参数,zoo.cfg和myid
        config.parse(args[0]);
    }

    // 2启动定时任务,对过期的快照,执行删除(默认该功能关闭)
    // Start and schedule the the purge task
    DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
            .getDataDir(), config.getDataLogDir(), config
            .getSnapRetainCount(), config.getPurgeInterval());
    purgeMgr.start();

    if (args.length == 1 && config.isDistributed()) {
        // 3 启动集群
        runFromConfig(config);
    } else {
        LOG.warn("Either no config or no quorum defined in config, running "
                + " in standalone mode");
        // there is only server in the quorum -- run as standalone
        ZooKeeperServerMain.main(args);
    }
}

2.2.3 解析参数zoo.cfg和myid

QuorumPeerConfig.java

public void parse(String path) throws ConfigException {
    LOG.info("Reading configuration from: " + path);
   
    try {
		// 校验文件路径及是否存在
        File configFile = (new VerifyingFileFactory.Builder(LOG)
            .warnForRelativePath()
            .failForNonExistingPath()
            .build()).create(path);
            
        Properties cfg = new Properties();
        FileInputStream in = new FileInputStream(configFile);
        try {
			// 加载配置文件
            cfg.load(in);
            configFileStr = path;
        } finally {
            in.close();
        }
        // 解析配置文件
        parseProperties(cfg);
    } catch (IOException e) {
        throw new ConfigException("Error processing " + path, e);
    } catch (IllegalArgumentException e) {
        throw new ConfigException("Error processing " + path, e);
    }   
    
    ... ...
}

QuorumPeerConfig.java

public void parseProperties(Properties zkProp)
throws IOException, ConfigException {
    int clientPort = 0;
    int secureClientPort = 0;
    String clientPortAddress = null;
    String secureClientPortAddress = null;
    VerifyingFileFactory vff = new VerifyingFileFactory.Builder(LOG).warnForRelativePath().build();
	
	// 读取zoo.cfg文件中的属性值,并赋值给QuorumPeerConfig的类对象
    for (Entry<Object, Object> entry : zkProp.entrySet()) {
        String key = entry.getKey().toString().trim();
        String value = entry.getValue().toString().trim();
        if (key.equals("dataDir")) {
            dataDir = vff.create(value);
        } else if (key.equals("dataLogDir")) {
            dataLogDir = vff.create(value);
        } else if (key.equals("clientPort")) {
            clientPort = Integer.parseInt(value);
        } else if (key.equals("localSessionsEnabled")) {
            localSessionsEnabled = Boolean.parseBoolean(value);
        } else if (key.equals("localSessionsUpgradingEnabled")) {
            localSessionsUpgradingEnabled = Boolean.parseBoolean(value);
        } else if (key.equals("clientPortAddress")) {
            clientPortAddress = value.trim();
        } else if (key.equals("secureClientPort")) {
            secureClientPort = Integer.parseInt(value);
        } else if (key.equals("secureClientPortAddress")){
            secureClientPortAddress = value.trim();
        } else if (key.equals("tickTime")) {
            tickTime = Integer.parseInt(value);
        } else if (key.equals("maxClientCnxns")) {
            maxClientCnxns = Integer.parseInt(value);
        } else if (key.equals("minSessionTimeout")) {
            minSessionTimeout = Integer.parseInt(value);
        } 
		... ...
    }

    ... ...

	if (dynamicConfigFileStr == null) {
        setupQuorumPeerConfig(zkProp, true);
        if (isDistributed() && isReconfigEnabled()) {
            // we don't backup static config for standalone mode.
            // we also don't backup if reconfig feature is disabled.
            backupOldConfig();
        }
    }
}

QuorumPeerConfig.java

void setupQuorumPeerConfig(Properties prop, boolean configBackwardCompatibilityMode)
        throws IOException, ConfigException {
    quorumVerifier = parseDynamicConfig(prop, electionAlg, true, configBackwardCompatibilityMode);
    setupMyId();
    setupClientPort();
    setupPeerType();
    checkValidity();
}

QuorumPeerConfig.java

private void setupMyId() throws IOException {
    File myIdFile = new File(dataDir, "myid");
    // standalone server doesn't need myid file.
    if (!myIdFile.isFile()) {
        return;
    }
    BufferedReader br = new BufferedReader(new FileReader(myIdFile));
    String myIdString;
    try {
        myIdString = br.readLine();
    } finally {
        br.close();
    }
    try {
        // 将解析myid文件中的id赋值给serverId
        serverId = Long.parseLong(myIdString);
        MDC.put("myid", myIdString);
    } catch (NumberFormatException e) {
        throw new IllegalArgumentException("serverid " + myIdString
                + " is not a number");
    }
}

2.2.4 过期快照删除

可以启动定时任务,对过期的快照,执行删除。默认该功能时关闭的

protected void initializeAndRun(String[] args)
    throws ConfigException, IOException, AdminServerException
{
	// 管理zk的配置信息
    QuorumPeerConfig config = new QuorumPeerConfig();
    if (args.length == 1) {
		// 1解析参数,zoo.cfg和myid
        config.parse(args[0]);
    }

    // 2启动定时任务,对过期的快照,执行删除(默认是关闭)
    // config.getSnapRetainCount() = 3 最少保留的快照个数
    // config.getPurgeInterval() = 0 默认0表示关闭
    // Start and schedule the the purge task
    DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
            .getDataDir(), config.getDataLogDir(), config
            .getSnapRetainCount(), config.getPurgeInterval());
    purgeMgr.start();

    if (args.length == 1 && config.isDistributed()) {
        // 3 启动集群
        runFromConfig(config);
    } else {
        LOG.warn("Either no config or no quorum defined in config, running "
                + " in standalone mode");
        // there is only server in the quorum -- run as standalone
        ZooKeeperServerMain.main(args);
    }
}

protected int snapRetainCount = 3;
protected int purgeInterval = 0;
public void start() {
    if (PurgeTaskStatus.STARTED == purgeTaskStatus) {
        LOG.warn("Purge task is already running.");
        return;
    }
	// 默认情况purgeInterval=0,该任务关闭,直接返回
    // Don't schedule the purge task with zero or negative purge interval.
    if (purgeInterval <= 0) {
        LOG.info("Purge task is not scheduled.");
        return;
    }
	// 创建一个定时器
    timer = new Timer("PurgeTask", true);
	// 创建一个清理快照任务
    TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);
	// 如果purgeInterval设置的值是1,表示1小时检查一次,判断是否有过期快照,有则删除
    timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));

    purgeTaskStatus = PurgeTaskStatus.STARTED;
}

static class PurgeTask extends TimerTask {
    private File logsDir;
    private File snapsDir;
    private int snapRetainCount;

    public PurgeTask(File dataDir, File snapDir, int count) {
        logsDir = dataDir;
        snapsDir = snapDir;
        snapRetainCount = count;
    }

    @Override
    public void run() {
        LOG.info("Purge task started.");
        try {
			// 清理过期的数据
            PurgeTxnLog.purge(logsDir, snapsDir, snapRetainCount);
        } catch (Exception e) {
            LOG.error("Error occurred while purging.", e);
        }
        LOG.info("Purge task completed.");
    }
}

public static void purge(File dataDir, File snapDir, int num) throws IOException {
    if (num < 3) {
        throw new IllegalArgumentException(COUNT_ERR_MSG);
    }

    FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);

    List<File> snaps = txnLog.findNRecentSnapshots(num);
    int numSnaps = snaps.size();
    if (numSnaps > 0) {
        purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1));
    }
}

2.2.5 初始化通信组件

protected void initializeAndRun(String[] args)
    throws ConfigException, IOException, AdminServerException
{
	// 管理zk的配置信息
    QuorumPeerConfig config = new QuorumPeerConfig();
    if (args.length == 1) {
		// 1解析参数,zoo.cfg和myid
        config.parse(args[0]);
    }

    // 2启动定时任务,对过期的快照,执行删除(默认是关闭)
    // config.getSnapRetainCount() = 3 最少保留的快照个数
    // config.getPurgeInterval() = 0 默认0表示关闭
    // Start and schedule the the purge task
    DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
            .getDataDir(), config.getDataLogDir(), config
            .getSnapRetainCount(), config.getPurgeInterval());
    purgeMgr.start();

    if (args.length == 1 && config.isDistributed()) {
        // 3 启动集群(集群模式)
        runFromConfig(config);
    } else {
        LOG.warn("Either no config or no quorum defined in config, running "
                + " in standalone mode");
        // there is only server in the quorum -- run as standalone
        // 本地模式
        ZooKeeperServerMain.main(args);
    }
}

1)通信协议默认NIO(可以支持Netty)

public void runFromConfig(QuorumPeerConfig config)
        throws IOException, AdminServerException
{
  … …

  LOG.info("Starting quorum peer");
  try {
      ServerCnxnFactory cnxnFactory = null;
      ServerCnxnFactory secureCnxnFactory = null;

	  // 通信组件初始化,默认是NIO通信
      if (config.getClientPortAddress() != null) {
          cnxnFactory = ServerCnxnFactory.createFactory();
          cnxnFactory.configure(config.getClientPortAddress(),
                  config.getMaxClientCnxns(), false);
      }

      if (config.getSecureClientPortAddress() != null) {
          secureCnxnFactory = ServerCnxnFactory.createFactory();
          secureCnxnFactory.configure(config.getSecureClientPortAddress(),
                  config.getMaxClientCnxns(), true);
      }

	  // 把解析的参数赋值给该zookeeper节点
      quorumPeer = getQuorumPeer();
      quorumPeer.setTxnFactory(new FileTxnSnapLog(
                  config.getDataLogDir(),
                  config.getDataDir()));
      quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
      quorumPeer.enableLocalSessionsUpgrading(
          config.isLocalSessionsUpgradingEnabled());
      //quorumPeer.setQuorumPeers(config.getAllMembers());
      quorumPeer.setElectionType(config.getElectionAlg());
      quorumPeer.setMyid(config.getServerId());
      quorumPeer.setTickTime(config.getTickTime());
      quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
      quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
      quorumPeer.setInitLimit(config.getInitLimit());
      quorumPeer.setSyncLimit(config.getSyncLimit());
      quorumPeer.setConfigFileName(config.getConfigFilename());
      // 管理zk数据的存储
      quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
      quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
      if (config.getLastSeenQuorumVerifier()!=null) {
          quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
    }
      quorumPeer.initConfigInZKDatabase();
      // 管理zk的通信
      quorumPeer.setCnxnFactory(cnxnFactory);
      quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
      quorumPeer.setSslQuorum(config.isSslQuorum());
      quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
      quorumPeer.setLearnerType(config.getPeerType());
      quorumPeer.setSyncEnabled(config.getSyncEnabled());
      quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
      if (config.sslQuorumReloadCertFiles) {
          quorumPeer.getX509Util().enableCertFileReloading();
      }
… …
quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
      quorumPeer.initialize();
      
	  // 启动zk
      quorumPeer.start();
      quorumPeer.join();
  } catch (InterruptedException e) {
      // warn, but generally this is ok
      LOG.warn("Quorum Peer interrupted", e);
  }
}

static public ServerCnxnFactory createFactory() throws IOException {
    String serverCnxnFactoryName =
        System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
    if (serverCnxnFactoryName == null) {
        serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
    }
    try {
        ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
                .getDeclaredConstructor().newInstance();
        LOG.info("Using {} as server connection factory", serverCnxnFactoryName);
        return serverCnxnFactory;
    } catch (Exception e) {
        IOException ioe = new IOException("Couldn't instantiate "
                + serverCnxnFactoryName);
        ioe.initCause(e);
        throw ioe;
    }
}

public static final String ZOOKEEPER_SERVER_CNXN_FACTORY = "zookeeper.serverCnxnFactory";

zookeeperAdmin.md 文件中
* *serverCnxnFactory* :
	 (Java system property: zookeeper.serverCnxnFactory)
	Specifies ServerCnxnFactory implementation. 
    This should be set to `NettyServerCnxnFactory` in order to use TLS based server communication.
     Default is `NIOServerCnxnFactory`.

2)初始化NIO服务端Socket(并未启动)

ctrl + alt +B 查找configure实现类,NIOServerCnxnFactory.java

public void configure(InetSocketAddress addr, int maxcc, boolean secure) throws IOException {
    if (secure) {
        throw new UnsupportedOperationException("SSL isn't supported in NIOServerCnxn");
    }
    configureSaslLogin();

    maxClientCnxns = maxcc;
    sessionlessCnxnTimeout = Integer.getInteger(
        ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000);
    // We also use the sessionlessCnxnTimeout as expiring interval for
    // cnxnExpiryQueue. These don't need to be the same, but the expiring
    // interval passed into the ExpiryQueue() constructor below should be
    // less than or equal to the timeout.
    cnxnExpiryQueue =
        new ExpiryQueue<NIOServerCnxn>(sessionlessCnxnTimeout);
    expirerThread = new ConnectionExpirerThread();

    int numCores = Runtime.getRuntime().availableProcessors();
    // 32 cores sweet spot seems to be 4 selector threads
    numSelectorThreads = Integer.getInteger(
        ZOOKEEPER_NIO_NUM_SELECTOR_THREADS,
        Math.max((int) Math.sqrt((float) numCores/2), 1));
    if (numSelectorThreads < 1) {
        throw new IOException("numSelectorThreads must be at least 1");
    }

    numWorkerThreads = Integer.getInteger(
        ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);
    workerShutdownTimeoutMS = Long.getLong(
        ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000);

    ... ...
    for(int i=0; i<numSelectorThreads; ++i) {
        selectorThreads.add(new SelectorThread(i));
    }

	// 初始化NIO服务端socket,绑定2181端口,可以接收客户端请求
    this.ss = ServerSocketChannel.open();
    ss.socket().setReuseAddress(true);
    LOG.info("binding to port " + addr);
	// 绑定2181端口
    ss.socket().bind(addr);
    ss.configureBlocking(false);
    acceptThread = new AcceptThread(ss, addr, selectorThreads);
}



这篇关于Zookeeper源码部分 第2章 ZK服务端初始化源码解析的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程