聊聊debezium的ChangeEventQueue
2020/5/13 17:25:54
本文主要是介绍聊聊debezium的ChangeEventQueue,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
序
本文主要研究一下debezium的ChangeEventQueue
ChangeEventQueueMetrics
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueueMetrics.java
public interface ChangeEventQueueMetrics { int totalCapacity(); int remainingCapacity(); } 复制代码
- ChangeEventQueueMetrics接口定义了totalCapacity、remainingCapacity方法
ChangeEventQueue
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueue.java
public class ChangeEventQueue<T> implements ChangeEventQueueMetrics { private static final Logger LOGGER = LoggerFactory.getLogger(ChangeEventQueue.class); private final Duration pollInterval; private final int maxBatchSize; private final int maxQueueSize; private final BlockingQueue<T> queue; private final Metronome metronome; private final Supplier<PreviousContext> loggingContextSupplier; private volatile RuntimeException producerException; private ChangeEventQueue(Duration pollInterval, int maxQueueSize, int maxBatchSize, Supplier<LoggingContext.PreviousContext> loggingContextSupplier) { this.pollInterval = pollInterval; this.maxBatchSize = maxBatchSize; this.maxQueueSize = maxQueueSize; this.queue = new LinkedBlockingDeque<>(maxQueueSize); this.metronome = Metronome.sleeper(pollInterval, Clock.SYSTEM); this.loggingContextSupplier = loggingContextSupplier; } public static class Builder<T> { private Duration pollInterval; private int maxQueueSize; private int maxBatchSize; private Supplier<LoggingContext.PreviousContext> loggingContextSupplier; public Builder<T> pollInterval(Duration pollInterval) { this.pollInterval = pollInterval; return this; } public Builder<T> maxQueueSize(int maxQueueSize) { this.maxQueueSize = maxQueueSize; return this; } public Builder<T> maxBatchSize(int maxBatchSize) { this.maxBatchSize = maxBatchSize; return this; } public Builder<T> loggingContextSupplier(Supplier<LoggingContext.PreviousContext> loggingContextSupplier) { this.loggingContextSupplier = loggingContextSupplier; return this; } public ChangeEventQueue<T> build() { return new ChangeEventQueue<T>(pollInterval, maxQueueSize, maxBatchSize, loggingContextSupplier); } } /** * Enqueues a record so that it can be obtained via {@link #poll()}. This method * will block if the queue is full. * * @param record * the record to be enqueued * @throws InterruptedException * if this thread has been interrupted */ public void enqueue(T record) throws InterruptedException { if (record == null) { return; } // The calling thread has been interrupted, let's abort if (Thread.interrupted()) { throw new InterruptedException(); } if (LOGGER.isDebugEnabled()) { LOGGER.debug("Enqueuing source record '{}'", record); } // this will also raise an InterruptedException if the thread is interrupted while waiting for space in the queue queue.put(record); } /** * Returns the next batch of elements from this queue. May be empty in case no * elements have arrived in the maximum waiting time. * * @throws InterruptedException * if this thread has been interrupted while waiting for more * elements to arrive */ public List<T> poll() throws InterruptedException { LoggingContext.PreviousContext previousContext = loggingContextSupplier.get(); try { LOGGER.debug("polling records..."); List<T> records = new ArrayList<>(); final Timer timeout = Threads.timer(Clock.SYSTEM, Temporals.max(pollInterval, ConfigurationDefaults.RETURN_CONTROL_INTERVAL)); while (!timeout.expired() && queue.drainTo(records, maxBatchSize) == 0) { throwProducerExceptionIfPresent(); LOGGER.debug("no records available yet, sleeping a bit..."); // no records yet, so wait a bit metronome.pause(); LOGGER.debug("checking for more records..."); } return records; } finally { previousContext.restore(); } } public void producerException(final RuntimeException producerException) { this.producerException = producerException; } private void throwProducerExceptionIfPresent() { if (producerException != null) { throw producerException; } } @Override public int totalCapacity() { return maxQueueSize; } @Override public int remainingCapacity() { return queue.remainingCapacity(); } } 复制代码
- ChangeEventQueue实现了ChangeEventQueueMetrics接口,其构造器创建了BlockingQueue、Metronome,并接收了loggingContextSupplier;其enqueue方法执行queue.put(record);其poll方法先通过loggingContextSupplier.get()获取previousContext,之后创建timeout,并while循环执行queue.drainTo(records, maxBatchSize)及metronome.pause(),直到timeout.expired()或者
queue.drainTo(records, maxBatchSize) == 0
为false,最后执行previousContext.restore();其totalCapacity返回maxQueueSize;其remainingCapacity返回queue.remainingCapacity()
Threads
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/Threads.java
public class Threads { //...... public static interface TimeSince { /** * Reset the elapsed time to 0. */ void reset(); /** * Get the time that has elapsed since the last call to {@link #reset() reset}. * * @return the number of milliseconds */ long elapsedTime(); } public static interface Timer { /** * @return true if current time is greater than start time plus requested time period */ boolean expired(); Duration remaining(); } public static Timer timer(Clock clock, Duration time) { final TimeSince start = timeSince(clock); start.reset(); return new Timer() { @Override public boolean expired() { return start.elapsedTime() > time.toMillis(); } @Override public Duration remaining() { return time.minus(start.elapsedTime(), ChronoUnit.MILLIS); } }; } public static TimeSince timeSince(Clock clock) { return new TimeSince() { private long lastTimeInMillis; @Override public void reset() { lastTimeInMillis = clock.currentTimeInMillis(); } @Override public long elapsedTime() { long elapsed = clock.currentTimeInMillis() - lastTimeInMillis; return elapsed <= 0L ? 0L : elapsed; } }; } //...... } 复制代码
- Threads定义了Timer接口,该接口定义了expired、remaining方法;timer方法先通过timeSince创建TimeSince,然后创建一个匿名Timer
LoggingContext
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/LoggingContext.java
public class LoggingContext { /** * The key for the connector type MDC property. */ public static final String CONNECTOR_TYPE = "dbz.connectorType"; /** * The key for the connector logical name MDC property. */ public static final String CONNECTOR_NAME = "dbz.connectorName"; /** * The key for the connector context name MDC property. */ public static final String CONNECTOR_CONTEXT = "dbz.connectorContext"; private LoggingContext() { } /** * A snapshot of an MDC context that can be {@link #restore()}. */ public static final class PreviousContext { private static final Map<String, String> EMPTY_CONTEXT = Collections.emptyMap(); private final Map<String, String> context; protected PreviousContext() { Map<String, String> context = MDC.getCopyOfContextMap(); this.context = context != null ? context : EMPTY_CONTEXT; } /** * Restore this logging context. */ public void restore() { MDC.setContextMap(context); } } //...... } 复制代码
- LoggingContext定义了PreviousContext,其构造器使用MDC.getCopyOfContextMap()拷贝的当前的MDC,其restore方法把之前拷贝的MDC数据再次设置到MDC中
Metronome
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/Metronome.java
@FunctionalInterface public interface Metronome { public void pause() throws InterruptedException; public static Metronome sleeper(Duration period, Clock timeSystem) { long periodInMillis = period.toMillis(); return new Metronome() { private long next = timeSystem.currentTimeInMillis() + periodInMillis; @Override public void pause() throws InterruptedException { for (;;) { final long now = timeSystem.currentTimeInMillis(); if (next <= now) { break; } Thread.sleep(next - now); } next = next + periodInMillis; } @Override public String toString() { return "Metronome (sleep for " + periodInMillis + " ms)"; } }; } //...... } 复制代码
- Metronome接口定义了pause方法;它提供了sleeper静态方法用于创建匿名的Metronome实现类,该实现类的pause方法通过Thread.sleep来实现pause
小结
ChangeEventQueue实现了ChangeEventQueueMetrics接口,其构造器创建了BlockingQueue、Metronome,并接收了loggingContextSupplier;其enqueue方法执行queue.put(record);其poll方法先通过loggingContextSupplier.get()获取previousContext,之后创建timeout,并while循环执行queue.drainTo(records, maxBatchSize)及metronome.pause(),直到timeout.expired()或者queue.drainTo(records, maxBatchSize) == 0
为false,最后执行previousContext.restore();其totalCapacity返回maxQueueSize;其remainingCapacity返回queue.remainingCapacity()
doc
- ChangeEventQueue
这篇关于聊聊debezium的ChangeEventQueue的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-12-27TypeScript面试真题解析与实战指南
- 2024-12-27TypeScript大厂面试真题详解与解析
- 2024-12-26怎么使用nsenter命令进入容器?-icode9专业技术文章分享
- 2024-12-26导入文件提示存在乱码,请确定使用的是UTF-8编码怎么解决?-icode9专业技术文章分享
- 2024-12-26csv文件怎么设置编码?-icode9专业技术文章分享
- 2024-12-25TypeScript基础知识详解
- 2024-12-25安卓NDK 是什么?-icode9专业技术文章分享
- 2024-12-25caddy 可以定义日志到 文件吗?-icode9专业技术文章分享
- 2024-12-25wordfence如何设置密码规则?-icode9专业技术文章分享
- 2024-12-25有哪些方法可以实现 DLL 文件路径的管理?-icode9专业技术文章分享