Checkpoint对齐机制源码分析
2021/7/26 14:06:06
本文主要是介绍Checkpoint对齐机制源码分析,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
checkpoint是保证Flink状态容错的重要机制,通过checkpoint可以实现不同的数据语义,也就是我们所说的Exactly-Once与At-Least-Once,通过不同的checkpoint机制实现不同的数据语义,这里所说的机制表示的是checkpoint对齐机制:对齐,实现Exactly-Once语义,不对齐,实现At-Least-Once语义。官方文档解释:
注:图片截图 https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/internals/stream_checkpointing.html#exactly-once-vs-at-least-once
对齐通常发生在需要接受上游多个输入流的操作中,例如keyBy、join等操作,接下来将会从源码角度分析对齐机制的实现。
checkpoint机制的处理发生在StreamInputProcessor/StreamTwoInputProcessor中,该类主要负责从远端读取数据然后交给StreamOperator处理,数据读取由CheckpointBarrierHandler完成,同时也负责对齐机制的处理,由getNextNonBlocked方法完成,该接口有两个不同的实现类BarrierBuffer与BarrierTracker:
//在StreamInputProcessor/StreamTwoInputProcessor 中创建CheckpointBarrierHandler //被调用 public static CheckpointBarrierHandler createCheckpointBarrierHandler( StreamTask<?, ?> checkpointedTask, CheckpointingMode checkpointMode, IOManager ioManager, InputGate inputGate, Configuration taskManagerConfig) throws IOException { CheckpointBarrierHandler barrierHandler; if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) { long maxAlign = taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT); if (!(maxAlign == -1 || maxAlign > 0)) { throw new IllegalConfigurationException( TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key() + " must be positive or -1 (infinite)"); } if (taskManagerConfig.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL)) { barrierHandler = new BarrierBuffer(inputGate, new CachedBufferBlocker(inputGate.getPageSize()), maxAlign); } else { barrierHandler = new BarrierBuffer(inputGate, new BufferSpiller(ioManager, inputGate.getPageSize()), maxAlign); } } else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) { barrierHandler = new BarrierTracker(inputGate); } else { throw new IllegalArgumentException("Unrecognized Checkpointing Mode: " + checkpointMode); } if (checkpointedTask != null) { barrierHandler.registerCheckpointEventHandler(checkpointedTask); } return barrierHandler; }
由此可见BarrierBuffer用来实现对齐机制,BarrierTracker用来实现非对齐机制。
对齐-BarrierBuffer
在BarrierBuffer包含了对齐使用的几个重要的成员变量:BufferBlocker类型的bufferBlocker、boolean类型数组的blockedChannels ,BufferBlocker内部包含一个ArraryDeque的队列,用于缓存对齐时的数据,blockedChannels用于判断通道是否处于对齐状态中。对齐流程方法:
@Override public BufferOrEvent getNextNonBlocked() throws Exception { while (true) { //..... BufferOrEvent bufferOrEvent = next.get(); if (isBlocked(bufferOrEvent.getChannelIndex())) { //当前获取数据channel处于对齐状态中则将数据添加到缓存中 //也就是 BufferBlocker中 bufferBlocker.add(bufferOrEvent); checkSizeLimit(); } else if (bufferOrEvent.isBuffer()) { //buffer 则直接返回 return bufferOrEvent; } else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) { if (!endOfStream) { // 处理CheckpointBarrier 类型的数据 processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex()); } } //....... } }
processBarrier方法:
private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception { //barrierId表示当前批次的checkpointId final long barrierId = receivedBarrier.getId(); // 如果是单输入流 则直接触发checkpoint if (totalNumberOfInputChannels == 1) { if (barrierId > currentCheckpointId) { // new checkpoint currentCheckpointId = barrierId; notifyCheckpoint(receivedBarrier); } return; } //多输入流的处理,numBarriersReceived表示已接收到的 //当前批次checkpointId 的channel 个数 //numBarriersReceived >0 表示正在对齐过程中 if (numBarriersReceived > 0) { // this is only true if some alignment is already progress and was not canceled if (barrierId == currentCheckpointId) { // regular case onBarrier(channelIndex); } else if (barrierId > currentCheckpointId) { // 如果到来的barrierId也就是checkpointId 大于当前正在 //发生对齐机制的checkpointId ,那么会取消当前的checkpoint(比喻说超时导致) // 并且重置blockedChannels状态 重置numBarriersReceived为0 //然后开启下一次(barrierId) checkpoint对齐机制 LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " + "Skipping current checkpoint.", inputGate.getOwningTaskName(), barrierId, currentCheckpointId); notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId)); releaseBlocksAndResetBarriers(); beginNewAlignment(barrierId, channelIndex); } else { // ignore trailing barrier from an earlier checkpoint (obsolete now) return; } } else if (barrierId > currentCheckpointId) { //numBarriersReceived==0 开启一次新的chechpoint //将对应的blockedChannels置为阻塞状态true beginNewAlignment(barrierId, channelIndex); } else { // either the current checkpoint was canceled (numBarriers == 0) or // this barrier is from an old subsumed checkpoint return; } // check if we have all barriers - since canceled checkpoints always have zero barriers // this can only happen on a non canceled checkpoint if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) { // actually trigger checkpoint if (LOG.isDebugEnabled()) { LOG.debug("{}: Received all barriers, triggering checkpoint {} at {}.", inputGate.getOwningTaskName(), receivedBarrier.getId(), receivedBarrier.getTimestamp()); } //对齐完成 将缓存的数据(BufferBlocker中的数据)插入到消费队列中 //被消费 ,然后触发checkpoint releaseBlocksAndResetBarriers(); notifyCheckpoint(receivedBarrier); } }
非对齐-BarrierTracker对于非对齐机制相对来说就比较简单,不会发生数据缓存,当所有的channel的checkpointBarrier达到就开始执行checkpoint。
public BufferOrEvent getNextNonBlocked() throws Exception { while (true) { Optional<BufferOrEvent> next = inputGate.getNextBufferOrEvent(); if (!next.isPresent()) { // buffer or input exhausted return null; } BufferOrEvent bufferOrEvent = next.get(); if (bufferOrEvent.isBuffer()) { return bufferOrEvent; } else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) { processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex()); } else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) { processCheckpointAbortBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex()); } else { // some other event return bufferOrEvent; } } }
processBarrier方法:
private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception { final long barrierId = receivedBarrier.getId(); // 如果只有一个输入则直接触发checkpoint if (totalNumberOfInputChannels == 1) { notifyCheckpoint(barrierId, receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions()); return; } // general path for multiple input channels if (LOG.isDebugEnabled()) { LOG.debug("Received barrier for checkpoint {} from channel {}", barrierId, channelIndex); } // find the checkpoint barrier in the queue of pending barriers CheckpointBarrierCount cbc = null; int pos = 0; //寻找同一批次的checkpoint for (CheckpointBarrierCount next : pendingCheckpoints) { if (next.checkpointId == barrierId) { cbc = next; break; } pos++; } if (cbc != null) { // add one to the count to that barrier and check for completion int numBarriersNew = cbc.incrementBarrierCount(); if (numBarriersNew == totalNumberOfInputChannels) { // 集齐七龙珠 可以触发checkpoint了 for (int i = 0; i <= pos; i++) { pendingCheckpoints.pollFirst(); } // notify the listener if (!cbc.isAborted()) { if (LOG.isDebugEnabled()) { LOG.debug("Received all barriers for checkpoint {}", barrierId); } notifyCheckpoint(receivedBarrier.getId(), receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions()); } } } else { // 新的开始了 if (barrierId > latestPendingCheckpointID) { latestPendingCheckpointID = barrierId; pendingCheckpoints.addLast(new CheckpointBarrierCount(barrierId)); // make sure we do not track too many checkpoints if (pendingCheckpoints.size() > MAX_CHECKPOINTS_TO_TRACK) { pendingCheckpoints.pollFirst(); } } } }
非对齐总体流程:在接受上游多个输入情况下,每一个批次的checkpoint不会发生数据缓存,会直接交给下游去处理,checkpoint信息会被缓存在一个CheckpointBarrierCount类型的队列中,CheckpointBarrierCount标识了一次checkpoint与其channel输入checkpointBarrier个数,当checkpointBarrier个数与channel个数相同则会触发checkpoint。
这篇关于Checkpoint对齐机制源码分析的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-20获取apk的md5值有哪些方法?-icode9专业技术文章分享
- 2024-11-20xml报文没有传 IdentCode ,为什么正常解析没报错呢?-icode9专业技术文章分享
- 2024-11-20如何知道代码有没有进行 Schema 验证?-icode9专业技术文章分享
- 2024-11-20Mycat教程:新手快速入门指南
- 2024-11-20WebSocket入门:轻松掌握WebSocket基础
- 2024-11-19WebSocket入门指南:轻松搭建实时通信应用
- 2024-11-19Nacos安装资料详解:新手入门教程
- 2024-11-19Nacos安装资料:新手入门教程
- 2024-11-19升级 Gerrit 时有哪些注意事项?-icode9专业技术文章分享
- 2024-11-19pnpm是什么?-icode9专业技术文章分享