Zookeeper源码部分 第2章 ZK服务端加载数据源码解析
2022/6/5 1:20:13
本文主要是介绍Zookeeper源码部分 第2章 ZK服务端加载数据源码解析,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
2.3 ZK服务端加载数据源码解析
(1)zk中的数据模型,是一棵树,DataTree,每个节点,叫做DataNode
(2)zk集群中的DataTree时刻保持状态同步
(3)Zookeeper集群中每个zk节点中,数据在内存和磁盘中都有一份完整的数据。
-
内存数据:DataTree
-
磁盘数据:快照文件 + 编辑日志
2.3.1 冷启动数据恢复快照数据
1)启动集群
public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException { … … LOG.info("Starting quorum peer"); try { ServerCnxnFactory cnxnFactory = null; ServerCnxnFactory secureCnxnFactory = null; // 通信组件初始化,默认是NIO通信 if (config.getClientPortAddress() != null) { cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), false); } if (config.getSecureClientPortAddress() != null) { secureCnxnFactory = ServerCnxnFactory.createFactory(); secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), true); } // 把解析的参数赋值给该Zookeeper节点 quorumPeer = getQuorumPeer(); quorumPeer.setTxnFactory(new FileTxnSnapLog( config.getDataLogDir(), config.getDataDir())); quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled()); quorumPeer.enableLocalSessionsUpgrading( config.isLocalSessionsUpgradingEnabled()); //quorumPeer.setQuorumPeers(config.getAllMembers()); quorumPeer.setElectionType(config.getElectionAlg()); quorumPeer.setMyid(config.getServerId()); quorumPeer.setTickTime(config.getTickTime()); quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout()); quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout()); quorumPeer.setInitLimit(config.getInitLimit()); quorumPeer.setSyncLimit(config.getSyncLimit()); quorumPeer.setConfigFileName(config.getConfigFilename()); // 管理zk数据的存储 quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory())); quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false); if (config.getLastSeenQuorumVerifier()!=null) { quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false); } quorumPeer.initConfigInZKDatabase(); // 管理zk的通信 quorumPeer.setCnxnFactory(cnxnFactory); quorumPeer.setSecureCnxnFactory(secureCnxnFactory); quorumPeer.setSslQuorum(config.isSslQuorum()); quorumPeer.setUsePortUnification(config.shouldUsePortUnification()); quorumPeer.setLearnerType(config.getPeerType()); quorumPeer.setSyncEnabled(config.getSyncEnabled()); quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs()); if (config.sslQuorumReloadCertFiles) { quorumPeer.getX509Util().enableCertFileReloading(); } quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize); quorumPeer.initialize(); // 启动zk quorumPeer.start(); quorumPeer.join(); } catch (InterruptedException e) { // warn, but generally this is ok LOG.warn("Quorum Peer interrupted", e); } }
2)冷启动恢复数据
QuorumPeer.java
public synchronized void start() { if (!getView().containsKey(myid)) { throw new RuntimeException("My id " + myid + " not in the peer list"); } // 冷启动数据恢复 loadDataBase(); startServerCnxnFactory(); try { // 启动通信工厂实例对象 adminServer.start(); } catch (AdminServerException e) { LOG.warn("Problem starting AdminServer", e); System.out.println(e); } // 准备选举环境 startLeaderElection(); // 执行选举 super.start(); } private void loadDataBase() { try { // 加载磁盘数据到内存,恢复DataTree // zk的操作分两种:事务操作和非事务操作 // 事务操作:zk.cteate();都会被分配一个全局唯一的zxid,zxid组成:64位: (前32位:epoch每个leader任期的代号;后32位:txid为事务id) // 非事务操作:zk.getData() // 数据恢复过程: // (1)从快照文件中恢复大部分数据,并得到一个lastProcessZXid // (2)再从编辑日志中执行replay,执行到最后一条日志并更新lastProcessZXid // (3)最终得到,datatree和lastProcessZXid,表示数据恢复完成 zkDb.loadDataBase(); // load the epochs long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid; long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid); try { currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME); } catch(FileNotFoundException e) { // pick a reasonable epoch number // this should only happen once when moving to a // new code version currentEpoch = epochOfZxid; LOG.info(CURRENT_EPOCH_FILENAME + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation", currentEpoch); writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch); } if (epochOfZxid > currentEpoch) { throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid); } try { acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME); } catch(FileNotFoundException e) { // pick a reasonable epoch number // this should only happen once when moving to a // new code version acceptedEpoch = epochOfZxid; LOG.info(ACCEPTED_EPOCH_FILENAME + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation", acceptedEpoch); writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch); } if (acceptedEpoch < currentEpoch) { throw new IOException("The accepted epoch, " + ZxidUtils.zxidToString(acceptedEpoch) + " is less than the current epoch, " + ZxidUtils.zxidToString(currentEpoch)); } } catch(IOException ie) { LOG.error("Unable to load database on disk", ie); throw new RuntimeException("Unable to run quorum server ", ie); } } public long loadDataBase() throws IOException { long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener); initialized = true; return zxid; } public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException { // 恢复快照文件数据到DataTree long deserializeResult = snapLog.deserialize(dt, sessions); FileTxnLog txnLog = new FileTxnLog(dataDir); RestoreFinalizer finalizer = () -> { // 恢复编辑日志数据到DataTree long highestZxid = fastForwardFromEdits(dt, sessions, listener); return highestZxid; }; if (-1L == deserializeResult) { /* this means that we couldn't find any snapshot, so we need to * initialize an empty database (reported in ZOOKEEPER-2325) */ if (txnLog.getLastLoggedZxid() != -1) { // ZOOKEEPER-3056: provides an escape hatch for users upgrading // from old versions of zookeeper (3.4.x, pre 3.5.3). if (!trustEmptySnapshot) { throw new IOException(EMPTY_SNAPSHOT_WARNING + "Something is broken!"); } else { LOG.warn("{}This should only be allowed during upgrading.", EMPTY_SNAPSHOT_WARNING); return finalizer.run(); } } /* TODO: (br33d) we should either put a ConcurrentHashMap on restore() * or use Map on save() */ save(dt, (ConcurrentHashMap<Long, Integer>)sessions); /* return a zxid of zero, since we the database is empty */ return 0; } return finalizer.run(); } ctrl + alt +B 查找deserialize实现类FileSnap.java public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException { // we run through 100 snapshots (not all of them) // if we cannot get it running within 100 snapshots // we should give up List<File> snapList = findNValidSnapshots(100); if (snapList.size() == 0) { return -1L; } File snap = null; boolean foundValid = false; // 依次遍历每一个快照的数据 for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) { snap = snapList.get(i); LOG.info("Reading snapshot " + snap); // 反序列化环境准备 try (InputStream snapIS = new BufferedInputStream(new FileInputStream(snap)); CheckedInputStream crcIn = new CheckedInputStream(snapIS, new Adler32())) { InputArchive ia = BinaryInputArchive.getArchive(crcIn); // 反序列化,恢复数据到DataTree deserialize(dt, sessions, ia); long checkSum = crcIn.getChecksum().getValue(); long val = ia.readLong("val"); if (val != checkSum) { throw new IOException("CRC corruption in snapshot : " + snap); } foundValid = true; break; } catch (IOException e) { LOG.warn("problem reading snap file " + snap, e); } } if (!foundValid) { throw new IOException("Not able to find valid snapshots in " + snapDir); } dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX); return dt.lastProcessedZxid; } public void deserialize(DataTree dt, Map<Long, Integer> sessions, InputArchive ia) throws IOException { FileHeader header = new FileHeader(); header.deserialize(ia, "fileheader"); if (header.getMagic() != SNAP_MAGIC) { throw new IOException("mismatching magic headers " + header.getMagic() + " != " + FileSnap.SNAP_MAGIC); } // 恢复快照数据到DataTree SerializeUtils.deserializeSnapshot(dt,ia,sessions); } public static void deserializeSnapshot(DataTree dt,InputArchive ia, Map<Long, Integer> sessions) throws IOException { int count = ia.readInt("count"); while (count > 0) { long id = ia.readLong("id"); int to = ia.readInt("timeout"); sessions.put(id, to); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK, "loadData --- session in archive: " + id + " with timeout: " + to); } count--; } // 恢复快照数据到DataTree dt.deserialize(ia, "tree"); } public void deserialize(InputArchive ia, String tag) throws IOException { aclCache.deserialize(ia); nodes.clear(); pTrie.clear(); String path = ia.readString("path"); // 从快照中恢复每一个datanode节点数据到DataTree while (!"/".equals(path)) { // 每次循环创建一个节点对象 DataNode node = new DataNode(); ia.readRecord(node, "node"); // 将DataNode恢复到DataTree nodes.put(path, node); synchronized (node) { aclCache.addUsage(node.acl); } int lastSlash = path.lastIndexOf('/'); if (lastSlash == -1) { root = node; } else { // 处理父节点 String parentPath = path.substring(0, lastSlash); DataNode parent = nodes.get(parentPath); if (parent == null) { throw new IOException("Invalid Datatree, unable to find " + "parent " + parentPath + " of path " + path); } // 处理子节点 parent.addChild(path.substring(lastSlash + 1)); // 处理临时节点和永久节点 long eowner = node.stat.getEphemeralOwner(); EphemeralType ephemeralType = EphemeralType.get(eowner); if (ephemeralType == EphemeralType.CONTAINER) { containers.add(path); } else if (ephemeralType == EphemeralType.TTL) { ttls.add(path); } else if (eowner != 0) { HashSet<String> list = ephemerals.get(eowner); if (list == null) { list = new HashSet<String>(); ephemerals.put(eowner, list); } list.add(path); } } path = ia.readString("path"); } nodes.put("/", root); // we are done with deserializing the // the datatree // update the quotas - create path trie // and also update the stat nodes setupQuota(); aclCache.purgeUnused(); }
2.3.2 冷启动数据恢复编辑日志
回到FileTxnSnapLog.java类中的restore方法
public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException { // 恢复快照文件数据到DataTree long deserializeResult = snapLog.deserialize(dt, sessions); FileTxnLog txnLog = new FileTxnLog(dataDir); RestoreFinalizer finalizer = () -> { // 恢复编辑日志数据到DataTree long highestZxid = fastForwardFromEdits(dt, sessions, listener); return highestZxid; }; … … return finalizer.run(); } public long fastForwardFromEdits(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException { // 在此之前,已经从快照文件中恢复了大部分数据,接下来只需从快照的zxid + 1位置开始恢复 TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1); // 快照中最大的zxid,在执行编辑日志时,这个值会不断更新,直到所有操作执行完 long highestZxid = dt.lastProcessedZxid; TxnHeader hdr; try { // 从lastProcessedZxid事务编号器开始,不断的从编辑日志中恢复剩下的还没有恢复的数据 while (true) { // iterator points to // the first valid txn when initialized // 获取事务头信息(有zxid) hdr = itr.getHeader(); if (hdr == null) { //empty logs return dt.lastProcessedZxid; } if (hdr.getZxid() < highestZxid && highestZxid != 0) { LOG.error("{}(highestZxid) > {}(next log) for type {}", highestZxid, hdr.getZxid(), hdr.getType()); } else { highestZxid = hdr.getZxid(); } try { // 根据编辑日志恢复数据到DataTree,每执行一次,对应的事务id,highestZxid + 1 processTransaction(hdr,dt,sessions, itr.getTxn()); } catch(KeeperException.NoNodeException e) { throw new IOException("Failed to process transaction type: " + hdr.getType() + " error: " + e.getMessage(), e); } listener.onTxnLoaded(hdr, itr.getTxn()); if (!itr.next()) break; } } finally { if (itr != null) { itr.close(); } } return highestZxid; } public void processTransaction(TxnHeader hdr,DataTree dt, Map<Long, Integer> sessions, Record txn) throws KeeperException.NoNodeException { ProcessTxnResult rc; switch (hdr.getType()) { case OpCode.createSession: sessions.put(hdr.getClientId(), ((CreateSessionTxn) txn).getTimeOut()); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK, "playLog --- create session in log: 0x" + Long.toHexString(hdr.getClientId()) + " with timeout: " + ((CreateSessionTxn) txn).getTimeOut()); } // give dataTree a chance to sync its lastProcessedZxid rc = dt.processTxn(hdr, txn); break; case OpCode.closeSession: sessions.remove(hdr.getClientId()); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK, "playLog --- close session in log: 0x" + Long.toHexString(hdr.getClientId())); } rc = dt.processTxn(hdr, txn); break; default: // 创建节点、删除节点和其他的各种事务操作等 rc = dt.processTxn(hdr, txn); } / * Snapshots are lazily created. So when a snapshot is in progress, * there is a chance for later transactions to make into the * snapshot. Then when the snapshot is restored, NONODE/NODEEXISTS * errors could occur. It should be safe to ignore these. */ if (rc.err != Code.OK.intValue()) { LOG.debug( "Ignoring processTxn failure hdr: {}, error: {}, path: {}", hdr.getType(), rc.err, rc.path); } } public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) { ProcessTxnResult rc = new ProcessTxnResult(); try { rc.clientId = header.getClientId(); rc.cxid = header.getCxid(); rc.zxid = header.getZxid(); rc.type = header.getType(); rc.err = 0; rc.multiResult = null; switch (header.getType()) { case OpCode.create: CreateTxn createTxn = (CreateTxn) txn; rc.path = createTxn.getPath(); createNode( createTxn.getPath(), createTxn.getData(), createTxn.getAcl(), createTxn.getEphemeral() ? header.getClientId() : 0, createTxn.getParentCVersion(), header.getZxid(), header.getTime(), null); break; case OpCode.create2: CreateTxn create2Txn = (CreateTxn) txn; rc.path = create2Txn.getPath(); Stat stat = new Stat(); createNode( create2Txn.getPath(), create2Txn.getData(), create2Txn.getAcl(), create2Txn.getEphemeral() ? header.getClientId() : 0, create2Txn.getParentCVersion(), header.getZxid(), header.getTime(), stat); rc.stat = stat; break; case OpCode.createTTL: CreateTTLTxn createTtlTxn = (CreateTTLTxn) txn; rc.path = createTtlTxn.getPath(); stat = new Stat(); createNode( createTtlTxn.getPath(), createTtlTxn.getData(), createTtlTxn.getAcl(), EphemeralType.TTL.toEphemeralOwner(createTtlTxn.getTtl()), createTtlTxn.getParentCVersion(), header.getZxid(), header.getTime(), stat); rc.stat = stat; break; case OpCode.createContainer: CreateContainerTxn createContainerTxn = (CreateContainerTxn) txn; rc.path = createContainerTxn.getPath(); stat = new Stat(); createNode( createContainerTxn.getPath(), createContainerTxn.getData(), createContainerTxn.getAcl(), EphemeralType.CONTAINER_EPHEMERAL_OWNER, createContainerTxn.getParentCVersion(), header.getZxid(), header.getTime(), stat); rc.stat = stat; break; case OpCode.delete: case OpCode.deleteContainer: DeleteTxn deleteTxn = (DeleteTxn) txn; rc.path = deleteTxn.getPath(); deleteNode(deleteTxn.getPath(), header.getZxid()); break; case OpCode.reconfig: case OpCode.setData: SetDataTxn setDataTxn = (SetDataTxn) txn; rc.path = setDataTxn.getPath(); rc.stat = setData(setDataTxn.getPath(), setDataTxn .getData(), setDataTxn.getVersion(), header .getZxid(), header.getTime()); break; case OpCode.setACL: SetACLTxn setACLTxn = (SetACLTxn) txn; rc.path = setACLTxn.getPath(); rc.stat = setACL(setACLTxn.getPath(), setACLTxn.getAcl(), setACLTxn.getVersion()); break; case OpCode.closeSession: killSession(header.getClientId(), header.getZxid()); break; case OpCode.error: ErrorTxn errTxn = (ErrorTxn) txn; rc.err = errTxn.getErr(); break; case OpCode.check: CheckVersionTxn checkTxn = (CheckVersionTxn) txn; rc.path = checkTxn.getPath(); break; case OpCode.multi: MultiTxn multiTxn = (MultiTxn) txn ; List<Txn> txns = multiTxn.getTxns(); rc.multiResult = new ArrayList<ProcessTxnResult>(); boolean failed = false; for (Txn subtxn : txns) { if (subtxn.getType() == OpCode.error) { failed = true; break; } } .. … } } catch (KeeperException e) { ... ... } catch (IOException e) { ... ... } ... ... return rc; }
2.4 ZK选
这篇关于Zookeeper源码部分 第2章 ZK服务端加载数据源码解析的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-26Java语音识别项目资料:新手入门教程
- 2024-11-26JAVA语音识别项目资料:新手入门教程
- 2024-11-26Java语音识别项目资料:入门与实践指南
- 2024-11-26Java云原生资料入门教程
- 2024-11-26Java云原生资料入门教程
- 2024-11-26Java云原生资料:新手入门教程
- 2024-11-25Java创意资料:新手入门的创意学习指南
- 2024-11-25JAVA对接阿里云智能语音服务资料详解:新手入门指南
- 2024-11-25Java对接阿里云智能语音服务资料详解
- 2024-11-25Java对接阿里云智能语音服务资料详解