Zookeeper源码部分 第2章 ZK服务端初始化源码解析
2022/6/5 1:20:26
本文主要是介绍Zookeeper源码部分 第2章 ZK服务端初始化源码解析,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
ZK服务端初始化源码解析
2.2.1 ZK服务端启动脚本分析
1)Zookeeper服务的启动命令是zkServer.sh start
zkServer.sh
#!/usr/bin/env bash # use POSTIX interface, symlink is followed automatically ZOOBIN="${BASH_SOURCE-$0}" ZOOBIN="$(dirname "${ZOOBIN}")" ZOOBINDIR="$(cd "${ZOOBIN}"; pwd)" if [ -e "$ZOOBIN/../libexec/zkEnv.sh" ]; then . "$ZOOBINDIR"/../libexec/zkEnv.sh else . "$ZOOBINDIR"/zkEnv.sh //相当于获取zkEnv.sh中的环境变量(ZOOCFG="zoo.cfg") fi # See the following page for extensive details on setting # up the JVM to accept JMX remote management: # http://java.sun.com/javase/6/docs/technotes/guides/management/agent.html # by default we allow local JMX connections if [ "x$JMXLOCALONLY" = "x" ] then JMXLOCALONLY=false fi if [ "x$JMXDISABLE" = "x" ] || [ "$JMXDISABLE" = 'false' ] then echo "ZooKeeper JMX enabled by default" >&2 if [ "x$JMXPORT" = "x" ] then # for some reason these two options are necessary on jdk6 on Ubuntu # accord to the docs they are not necessary, but otw jconsole cannot # do a local attach ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY org.apache.zookeeper.server.quorum.QuorumPeerMain" else if [ "x$JMXAUTH" = "x" ] then JMXAUTH=false fi if [ "x$JMXSSL" = "x" ] then JMXSSL=false fi if [ "x$JMXLOG4J" = "x" ] then JMXLOG4J=true fi echo "ZooKeeper remote JMX Port set to $JMXPORT" >&2 echo "ZooKeeper remote JMX authenticate set to $JMXAUTH" >&2 echo "ZooKeeper remote JMX ssl set to $JMXSSL" >&2 echo "ZooKeeper remote JMX log4j set to $JMXLOG4J" >&2 ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=$JMXPORT -Dcom.sun.management.jmxremote.authenticate=$JMXAUTH -Dcom.sun.management.jmxremote.ssl=$JMXSSL -Dzookeeper.jmx.log4j.disable=$JMXLOG4J org.apache.zookeeper.server.quorum.QuorumPeerMain" fi else echo "JMX disabled by user request" >&2 ZOOMAIN="org.apache.zookeeper.server.quorum.QuorumPeerMain" fi if [ "x$SERVER_JVMFLAGS" != "x" ] then JVMFLAGS="$SERVER_JVMFLAGS $JVMFLAGS" fi … … case $1 in start) echo -n "Starting zookeeper ... " if [ -f "$ZOOPIDFILE" ]; then if kill -0 `cat "$ZOOPIDFILE"` > /dev/null 2>&1; then echo $command already running as process `cat "$ZOOPIDFILE"`. exit 1 fi fi nohup "$JAVA" $ZOO_DATADIR_AUTOCREATE "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.log.file=${ZOO_LOG_FILE}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" -XX:+HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError='kill -9 %p' -cp "$CLASSPATH" $JVMFLAGS $ZOOMAIN "$ZOOCFG" > "$_ZOO_DAEMON_OUT" 2>&1 < /dev/null & … … ;; stop) echo -n "Stopping zookeeper ... " if [ ! -f "$ZOOPIDFILE" ] then echo "no zookeeper to stop (could not find file $ZOOPIDFILE)" else $KILL $(cat "$ZOOPIDFILE") rm "$ZOOPIDFILE" sleep 1 echo STOPPED fi exit 0 ;; restart) shift "$0" stop ${@} sleep 3 "$0" start ${@} ;; status) … … ;; *) echo "Usage: $0 [--config <conf-dir>] {start|start-foreground|stop|restart|status|print-cmd}" >&2 esac
2)zkServer.sh start底层的实际执行内容
nohup "$JAVA" + 一堆提交参数 + $ZOOMAIN(org.apache.zookeeper.server.quorum.QuorumPeerMain) + "$ZOOCFG" (zkEnv.sh文件中ZOOCFG="zoo.cfg")
3)所以程序的入口是QuorumPeerMain.java类
2.2.2 ZK服务端启动入口
1)ctrl + n,查找QuorumPeerMain
QuorumPeerMain.java
public static void main(String[] args) { // 创建了一个zk节点 QuorumPeerMain main = new QuorumPeerMain(); try { // 初始化节点并运行,args相当于提交参数中的zoo.cfg main.initializeAndRun(args); } catch (IllegalArgumentException e) { ... ... } LOG.info("Exiting normally"); System.exit(0); }
2)initializeAndRun
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException { // 管理zk的配置信息 QuorumPeerConfig config = new QuorumPeerConfig(); if (args.length == 1) { // 1解析参数,zoo.cfg和myid config.parse(args[0]); } // 2启动定时任务,对过期的快照,执行删除(默认该功能关闭) // Start and schedule the the purge task DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config .getDataDir(), config.getDataLogDir(), config .getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start(); if (args.length == 1 && config.isDistributed()) { // 3 启动集群 runFromConfig(config); } else { LOG.warn("Either no config or no quorum defined in config, running " + " in standalone mode"); // there is only server in the quorum -- run as standalone ZooKeeperServerMain.main(args); } }
2.2.3 解析参数zoo.cfg和myid
QuorumPeerConfig.java
public void parse(String path) throws ConfigException { LOG.info("Reading configuration from: " + path); try { // 校验文件路径及是否存在 File configFile = (new VerifyingFileFactory.Builder(LOG) .warnForRelativePath() .failForNonExistingPath() .build()).create(path); Properties cfg = new Properties(); FileInputStream in = new FileInputStream(configFile); try { // 加载配置文件 cfg.load(in); configFileStr = path; } finally { in.close(); } // 解析配置文件 parseProperties(cfg); } catch (IOException e) { throw new ConfigException("Error processing " + path, e); } catch (IllegalArgumentException e) { throw new ConfigException("Error processing " + path, e); } ... ... }
QuorumPeerConfig.java
public void parseProperties(Properties zkProp) throws IOException, ConfigException { int clientPort = 0; int secureClientPort = 0; String clientPortAddress = null; String secureClientPortAddress = null; VerifyingFileFactory vff = new VerifyingFileFactory.Builder(LOG).warnForRelativePath().build(); // 读取zoo.cfg文件中的属性值,并赋值给QuorumPeerConfig的类对象 for (Entry<Object, Object> entry : zkProp.entrySet()) { String key = entry.getKey().toString().trim(); String value = entry.getValue().toString().trim(); if (key.equals("dataDir")) { dataDir = vff.create(value); } else if (key.equals("dataLogDir")) { dataLogDir = vff.create(value); } else if (key.equals("clientPort")) { clientPort = Integer.parseInt(value); } else if (key.equals("localSessionsEnabled")) { localSessionsEnabled = Boolean.parseBoolean(value); } else if (key.equals("localSessionsUpgradingEnabled")) { localSessionsUpgradingEnabled = Boolean.parseBoolean(value); } else if (key.equals("clientPortAddress")) { clientPortAddress = value.trim(); } else if (key.equals("secureClientPort")) { secureClientPort = Integer.parseInt(value); } else if (key.equals("secureClientPortAddress")){ secureClientPortAddress = value.trim(); } else if (key.equals("tickTime")) { tickTime = Integer.parseInt(value); } else if (key.equals("maxClientCnxns")) { maxClientCnxns = Integer.parseInt(value); } else if (key.equals("minSessionTimeout")) { minSessionTimeout = Integer.parseInt(value); } ... ... } ... ... if (dynamicConfigFileStr == null) { setupQuorumPeerConfig(zkProp, true); if (isDistributed() && isReconfigEnabled()) { // we don't backup static config for standalone mode. // we also don't backup if reconfig feature is disabled. backupOldConfig(); } } }
QuorumPeerConfig.java
void setupQuorumPeerConfig(Properties prop, boolean configBackwardCompatibilityMode) throws IOException, ConfigException { quorumVerifier = parseDynamicConfig(prop, electionAlg, true, configBackwardCompatibilityMode); setupMyId(); setupClientPort(); setupPeerType(); checkValidity(); }
QuorumPeerConfig.java
private void setupMyId() throws IOException { File myIdFile = new File(dataDir, "myid"); // standalone server doesn't need myid file. if (!myIdFile.isFile()) { return; } BufferedReader br = new BufferedReader(new FileReader(myIdFile)); String myIdString; try { myIdString = br.readLine(); } finally { br.close(); } try { // 将解析myid文件中的id赋值给serverId serverId = Long.parseLong(myIdString); MDC.put("myid", myIdString); } catch (NumberFormatException e) { throw new IllegalArgumentException("serverid " + myIdString + " is not a number"); } }
2.2.4 过期快照删除
可以启动定时任务,对过期的快照,执行删除。默认该功能时关闭的
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException { // 管理zk的配置信息 QuorumPeerConfig config = new QuorumPeerConfig(); if (args.length == 1) { // 1解析参数,zoo.cfg和myid config.parse(args[0]); } // 2启动定时任务,对过期的快照,执行删除(默认是关闭) // config.getSnapRetainCount() = 3 最少保留的快照个数 // config.getPurgeInterval() = 0 默认0表示关闭 // Start and schedule the the purge task DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config .getDataDir(), config.getDataLogDir(), config .getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start(); if (args.length == 1 && config.isDistributed()) { // 3 启动集群 runFromConfig(config); } else { LOG.warn("Either no config or no quorum defined in config, running " + " in standalone mode"); // there is only server in the quorum -- run as standalone ZooKeeperServerMain.main(args); } } protected int snapRetainCount = 3; protected int purgeInterval = 0;
public void start() { if (PurgeTaskStatus.STARTED == purgeTaskStatus) { LOG.warn("Purge task is already running."); return; } // 默认情况purgeInterval=0,该任务关闭,直接返回 // Don't schedule the purge task with zero or negative purge interval. if (purgeInterval <= 0) { LOG.info("Purge task is not scheduled."); return; } // 创建一个定时器 timer = new Timer("PurgeTask", true); // 创建一个清理快照任务 TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount); // 如果purgeInterval设置的值是1,表示1小时检查一次,判断是否有过期快照,有则删除 timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval)); purgeTaskStatus = PurgeTaskStatus.STARTED; } static class PurgeTask extends TimerTask { private File logsDir; private File snapsDir; private int snapRetainCount; public PurgeTask(File dataDir, File snapDir, int count) { logsDir = dataDir; snapsDir = snapDir; snapRetainCount = count; } @Override public void run() { LOG.info("Purge task started."); try { // 清理过期的数据 PurgeTxnLog.purge(logsDir, snapsDir, snapRetainCount); } catch (Exception e) { LOG.error("Error occurred while purging.", e); } LOG.info("Purge task completed."); } } public static void purge(File dataDir, File snapDir, int num) throws IOException { if (num < 3) { throw new IllegalArgumentException(COUNT_ERR_MSG); } FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir); List<File> snaps = txnLog.findNRecentSnapshots(num); int numSnaps = snaps.size(); if (numSnaps > 0) { purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1)); } }
2.2.5 初始化通信组件
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException { // 管理zk的配置信息 QuorumPeerConfig config = new QuorumPeerConfig(); if (args.length == 1) { // 1解析参数,zoo.cfg和myid config.parse(args[0]); } // 2启动定时任务,对过期的快照,执行删除(默认是关闭) // config.getSnapRetainCount() = 3 最少保留的快照个数 // config.getPurgeInterval() = 0 默认0表示关闭 // Start and schedule the the purge task DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config .getDataDir(), config.getDataLogDir(), config .getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start(); if (args.length == 1 && config.isDistributed()) { // 3 启动集群(集群模式) runFromConfig(config); } else { LOG.warn("Either no config or no quorum defined in config, running " + " in standalone mode"); // there is only server in the quorum -- run as standalone // 本地模式 ZooKeeperServerMain.main(args); } }
1)通信协议默认NIO(可以支持Netty)
public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException { … … LOG.info("Starting quorum peer"); try { ServerCnxnFactory cnxnFactory = null; ServerCnxnFactory secureCnxnFactory = null; // 通信组件初始化,默认是NIO通信 if (config.getClientPortAddress() != null) { cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), false); } if (config.getSecureClientPortAddress() != null) { secureCnxnFactory = ServerCnxnFactory.createFactory(); secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), true); } // 把解析的参数赋值给该zookeeper节点 quorumPeer = getQuorumPeer(); quorumPeer.setTxnFactory(new FileTxnSnapLog( config.getDataLogDir(), config.getDataDir())); quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled()); quorumPeer.enableLocalSessionsUpgrading( config.isLocalSessionsUpgradingEnabled()); //quorumPeer.setQuorumPeers(config.getAllMembers()); quorumPeer.setElectionType(config.getElectionAlg()); quorumPeer.setMyid(config.getServerId()); quorumPeer.setTickTime(config.getTickTime()); quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout()); quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout()); quorumPeer.setInitLimit(config.getInitLimit()); quorumPeer.setSyncLimit(config.getSyncLimit()); quorumPeer.setConfigFileName(config.getConfigFilename()); // 管理zk数据的存储 quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory())); quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false); if (config.getLastSeenQuorumVerifier()!=null) { quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false); } quorumPeer.initConfigInZKDatabase(); // 管理zk的通信 quorumPeer.setCnxnFactory(cnxnFactory); quorumPeer.setSecureCnxnFactory(secureCnxnFactory); quorumPeer.setSslQuorum(config.isSslQuorum()); quorumPeer.setUsePortUnification(config.shouldUsePortUnification()); quorumPeer.setLearnerType(config.getPeerType()); quorumPeer.setSyncEnabled(config.getSyncEnabled()); quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs()); if (config.sslQuorumReloadCertFiles) { quorumPeer.getX509Util().enableCertFileReloading(); } … … quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize); quorumPeer.initialize(); // 启动zk quorumPeer.start(); quorumPeer.join(); } catch (InterruptedException e) { // warn, but generally this is ok LOG.warn("Quorum Peer interrupted", e); } } static public ServerCnxnFactory createFactory() throws IOException { String serverCnxnFactoryName = System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY); if (serverCnxnFactoryName == null) { serverCnxnFactoryName = NIOServerCnxnFactory.class.getName(); } try { ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName) .getDeclaredConstructor().newInstance(); LOG.info("Using {} as server connection factory", serverCnxnFactoryName); return serverCnxnFactory; } catch (Exception e) { IOException ioe = new IOException("Couldn't instantiate " + serverCnxnFactoryName); ioe.initCause(e); throw ioe; } } public static final String ZOOKEEPER_SERVER_CNXN_FACTORY = "zookeeper.serverCnxnFactory"; zookeeperAdmin.md 文件中 * *serverCnxnFactory* : (Java system property: zookeeper.serverCnxnFactory) Specifies ServerCnxnFactory implementation. This should be set to `NettyServerCnxnFactory` in order to use TLS based server communication. Default is `NIOServerCnxnFactory`.
2)初始化NIO服务端Socket(并未启动)
ctrl + alt +B 查找configure实现类,NIOServerCnxnFactory.java
public void configure(InetSocketAddress addr, int maxcc, boolean secure) throws IOException { if (secure) { throw new UnsupportedOperationException("SSL isn't supported in NIOServerCnxn"); } configureSaslLogin(); maxClientCnxns = maxcc; sessionlessCnxnTimeout = Integer.getInteger( ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000); // We also use the sessionlessCnxnTimeout as expiring interval for // cnxnExpiryQueue. These don't need to be the same, but the expiring // interval passed into the ExpiryQueue() constructor below should be // less than or equal to the timeout. cnxnExpiryQueue = new ExpiryQueue<NIOServerCnxn>(sessionlessCnxnTimeout); expirerThread = new ConnectionExpirerThread(); int numCores = Runtime.getRuntime().availableProcessors(); // 32 cores sweet spot seems to be 4 selector threads numSelectorThreads = Integer.getInteger( ZOOKEEPER_NIO_NUM_SELECTOR_THREADS, Math.max((int) Math.sqrt((float) numCores/2), 1)); if (numSelectorThreads < 1) { throw new IOException("numSelectorThreads must be at least 1"); } numWorkerThreads = Integer.getInteger( ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores); workerShutdownTimeoutMS = Long.getLong( ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000); ... ... for(int i=0; i<numSelectorThreads; ++i) { selectorThreads.add(new SelectorThread(i)); } // 初始化NIO服务端socket,绑定2181端口,可以接收客户端请求 this.ss = ServerSocketChannel.open(); ss.socket().setReuseAddress(true); LOG.info("binding to port " + addr); // 绑定2181端口 ss.socket().bind(addr); ss.configureBlocking(false); acceptThread = new AcceptThread(ss, addr, selectorThreads); }
这篇关于Zookeeper源码部分 第2章 ZK服务端初始化源码解析的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-26Mybatis官方生成器资料详解与应用教程
- 2024-11-26Mybatis一级缓存资料详解与实战教程
- 2024-11-26Mybatis一级缓存资料详解:新手快速入门
- 2024-11-26SpringBoot3+JDK17搭建后端资料详尽教程
- 2024-11-26Springboot单体架构搭建资料:新手入门教程
- 2024-11-26Springboot单体架构搭建资料详解与实战教程
- 2024-11-26Springboot框架资料:新手入门教程
- 2024-11-26Springboot企业级开发资料入门教程
- 2024-11-26SpringBoot企业级开发资料详解与实战教程
- 2024-11-26Springboot微服务资料:新手入门全攻略