聊聊debezium的BlockingReader
2020/5/17 17:26:45
本文主要是介绍聊聊debezium的BlockingReader,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
序
本文主要研究一下debezium的BlockingReader
Reader
debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/Reader.java
public interface Reader { public static enum State { /** * The reader is stopped and static. */ STOPPED, /** * The reader is running and generated records. */ RUNNING, /** * The reader has completed its work or been explicitly stopped, but not all of the generated records have been * consumed via {@link Reader#poll() polling}. */ STOPPING; } public String name(); public State state(); public void uponCompletion(Runnable handler); public default void initialize() { // do nothing } public default void destroy() { // do nothing } public void start(); public void stop(); public List<SourceRecord> poll() throws InterruptedException; } 复制代码
- Reader接口定义了name、state、uponCompletion、start、stop、poll方法
BlockingReader
debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BlockingReader.java
public class BlockingReader implements Reader { protected final Logger logger = LoggerFactory.getLogger(getClass()); private final AtomicReference<Runnable> uponCompletion = new AtomicReference<>(); private final AtomicReference<State> state = new AtomicReference<>(); private final Metronome metronome; private final String name; private final String runningLogMessage; public BlockingReader(String name, String runningLogMessage) { this.name = name; this.metronome = Metronome.parker(ConfigurationDefaults.RETURN_CONTROL_INTERVAL, Clock.SYSTEM); this.runningLogMessage = runningLogMessage; } /** * Does nothing until the connector task is shut down, but regularly returns control back to Connect in order for being paused if requested. */ @Override public List<SourceRecord> poll() throws InterruptedException { if (state.get() == State.STOPPED) { return null; } metronome.pause(); state.compareAndSet(State.RUNNING, State.STOPPING); return null; } @Override public State state() { return state.get(); } @Override public void uponCompletion(Runnable handler) { assert this.uponCompletion.get() == null; this.uponCompletion.set(handler); } @Override public void start() { state.set(State.RUNNING); logger.info(runningLogMessage); } @Override public void stop() { try { state.set(State.STOPPED); // Cleanup Resources Runnable completionHandler = uponCompletion.getAndSet(null); // set to null so that we call it only once if (completionHandler != null) { completionHandler.run(); } } finally { logger.info("Blocking Reader has completed."); } } @Override public String name() { return name; } } 复制代码
- BlockingReader实现了Reader接口,其start方法设置state为State.RUNNING,其stop方法设置state为State.STOPPED,同时执行completionHandler.run();其poll方法在state为State.STOPPED直接返回null,否则执行metronome.pause(),然后设置state为State.STOPPED,最后返回null
TimedBlockingReader
debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TimedBlockingReader.java
public class TimedBlockingReader extends BlockingReader { protected final Logger logger = LoggerFactory.getLogger(getClass()); private final Duration timeout; private volatile Timer timer; /** * @param name Name of the reader * @param timeout Duration of time until this TimedBlockingReader should stop */ public TimedBlockingReader(String name, Duration timeout) { super(name, "The connector will wait for " + timeout.toMillis() + " ms before proceeding"); this.timeout = timeout; } @Override public void start() { super.start(); this.timer = Threads.timer(Clock.SYSTEM, timeout); } @Override public List<SourceRecord> poll() throws InterruptedException { super.poll(); // Stop when we've reached the timeout threshold if (timer != null && timer.expired()) { stop(); } return null; } } 复制代码
- TimedBlockingReader继承了BlockingReader,其start方法通过Threads.timer(Clock.SYSTEM, timeout)创建了Timer;其poll方法先执行父类的poll方法,然后在timer.expired()为true时执行stop(),最后返回null
小结
BlockingReader实现了Reader接口,其start方法设置state为State.RUNNING,其stop方法设置state为State.STOPPED,同时执行completionHandler.run();其poll方法在state为State.STOPPED直接返回null,否则执行metronome.pause(),然后设置state为State.STOPPED,最后返回null
doc
- BlockingReader
这篇关于聊聊debezium的BlockingReader的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-12-27TypeScript面试真题解析与实战指南
- 2024-12-27TypeScript大厂面试真题详解与解析
- 2024-12-26怎么使用nsenter命令进入容器?-icode9专业技术文章分享
- 2024-12-26导入文件提示存在乱码,请确定使用的是UTF-8编码怎么解决?-icode9专业技术文章分享
- 2024-12-26csv文件怎么设置编码?-icode9专业技术文章分享
- 2024-12-25TypeScript基础知识详解
- 2024-12-25安卓NDK 是什么?-icode9专业技术文章分享
- 2024-12-25caddy 可以定义日志到 文件吗?-icode9专业技术文章分享
- 2024-12-25wordfence如何设置密码规则?-icode9专业技术文章分享
- 2024-12-25有哪些方法可以实现 DLL 文件路径的管理?-icode9专业技术文章分享