Kafka Producer源码解析二:消息发送 send

2020/3/8 17:02:03

本文主要是介绍Kafka Producer源码解析二:消息发送 send,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

1.文章描述

在构建好kafka对象之后 如果我们想要发送消息 就要调用producer的send方法,参数包括消息体和回调函数 在上一篇文章中我们看到了kafkaproducer的整体架构 了解了发送消息主要涉及到三个点

  • 1.主线程
  • 2.RecordAccumulator消息收集池
  • 3.sender线程 这篇文章我们来看看这三个点是如何协同工作,以及一些其他的组件

2.send方法的时序图

  • 调用doSend方法 执行拦截器逻辑 interceptoes
  • doSend 调用produer本身的doSend方法 开始执行具体逻辑
  • waitOnMetadata 方法 等待更新元数据集合(元数据包含server集群相关信息)
  • serialize执行序列化器
  • partition 分区器
  • ensurValidRecordSize 确保消息大小
  • append 往RecordAccumulator里追加消息
  • wakeup 唤醒sender线程

3.源码解析

3.1 拦截器 ProducerInterceptor

首先来看一个interceptor的列子

    public class KafkaIntercepter implements ProducerInterceptor {
    @Override
    public ProducerRecord onSend(ProducerRecord producerRecord) {
        System.out.println("拦截器执行");
        return null;
    }

    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
        //用于正常返回时的回调方法
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {
        //自己配置的类  kafka都是通过反射来进行加载的  利用的是无餐构造  不可能会初始化你所有的数据  所以kafka在反射生成对象之后  会调用configure初始化数据
    }
}
复制代码
当我们调用send方法的时候,producer做的第一件事就是去执行拦截器 注意拦截器事一个list,被封装在ProducerInterceptors里面
复制代码

并且执行每一个拦截器onSend方法

3.2 执行完拦截器便开始真正的doSend逻辑

这一段代码有点长  我们首先看看源码 然后再分开说明  看不懂没关系 先看看上面的时序图 再结合源码看  就会有个大概的了解 再看后面具体的讲解
复制代码
    private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        TopicPartition tp = null;
        try {
            throwIfProducerClosed();
            // first make sure the metadata for the topic is available
            long nowMs = time.milliseconds();
            ClusterAndWaitTime clusterAndWaitTime;
            try {
                '//等待元数据更新'
                clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
            } catch (KafkaException e) {
                if (metadata.isClosed())
                    throw new KafkaException("Producer closed while send in progress", e);
                throw e;
            }
            nowMs += clusterAndWaitTime.waitedOnMetadataMs;
            long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
            Cluster cluster = clusterAndWaitTime.cluster;
            
            '//开始序列化key和value'
            byte[] serializedKey;
            try {
                serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in key.serializer", cce);
            }
            byte[] serializedValue;
            try {
                serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in value.serializer", cce);
            }
            
            '//执行分区器'
            int partition = partition(record, serializedKey, serializedValue, cluster);
            tp = new TopicPartition(record.topic(), partition);

            setReadOnly(record.headers());
            Header[] headers = record.headers().toArray();

            int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
                    compressionType, serializedKey, serializedValue, headers);
            
            '//校验消息大小'
            ensureValidRecordSize(serializedSize);
            long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
            if (log.isTraceEnabled()) {
                log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
            }
            // producer callback will make sure to call both 'callback' and interceptor callback
            Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

            if (transactionManager != null && transactionManager.isTransactional()) {
                transactionManager.failIfNotReadyForSend();
            }
            
            '//调用append最追加消息'
            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);

            if (result.abortForNewBatch) {
                int prevPartition = partition;
                partitioner.onNewBatch(record.topic(), cluster, prevPartition);
                partition = partition(record, serializedKey, serializedValue, cluster);
                tp = new TopicPartition(record.topic(), partition);
                if (log.isTraceEnabled()) {
                    log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
                }
                // producer callback will make sure to call both 'callback' and interceptor callback
                interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

                result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
            }

            if (transactionManager != null && transactionManager.isTransactional())
                transactionManager.maybeAddPartitionToTransaction(tp);

            if (result.batchIsFull || result.newBatchCreated) {
                '//如果消息满了  就唤醒sender线程起来执行'
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }
            return result.future;
            // handling exceptions and record the errors;
            // for API exceptions return them in the future,
            // for other exceptions throw directly
        } catch (ApiException e) {
            log.debug("Exception occurred during message send:", e);
            if (callback != null)
                callback.onCompletion(null, e);
            this.errors.record();
            this.interceptors.onSendError(record, tp, e);
            return new FutureFailure(e);
        } catch (InterruptedException e) {
            this.errors.record();
            this.interceptors.onSendError(record, tp, e);
            throw new InterruptException(e);
        } catch (BufferExhaustedException e) {
            this.errors.record();
            this.metrics.sensor("buffer-exhausted-records").record();
            this.interceptors.onSendError(record, tp, e);
            throw e;
        } catch (KafkaException e) {
            this.errors.record();
            this.interceptors.onSendError(record, tp, e);
            throw e;
        } catch (Exception e) {
            // we notify interceptor about all exceptions, since onSend is called before anything else in this method
            this.interceptors.onSendError(record, tp, e);
            throw e;
        }
    }
复制代码

3.2.1 waitOnMetadata 方法

这个方法首先会从缓存中获取元数据 然后获取当前topic的分区 判断分区是否存在 如果存在就直接返回 如果不存在 就唤醒sender线程 并且等待更新 更新之后返回

Cluster cluster = metadata.fetch();

        if (cluster.invalidTopics().contains(topic))
            throw new InvalidTopicException(topic);

        '//将topic添加进元数据缓存'
        metadata.add(topic, nowMs);

        Integer partitionsCount = cluster.partitionCountForTopic(topic);
        // Return cached metadata if we have it, and if the record's partition is either undefined'
        // or within the known partition range
        //返回缓存的数据
        if (partitionsCount != null && (partition == null || partition < partitionsCount))
            return new ClusterAndWaitTime(cluster, 0);

        long remainingWaitMs = maxWaitMs;
        long elapsed = 0;
        // Issue metadata requests until we have metadata for the topic and the requested partition,
        // or until maxWaitTimeMs is exceeded. This is necessary in case the metadata
        // is stale and the number of partitions for this topic has increased in the meantime.
        do {
            if (partition != null) {
                log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic);
            } else {
                log.trace("Requesting metadata update for topic {}.", topic);
            }

            metadata.add(topic, nowMs + elapsed);

            '//获取元数据版本号 并且设置needUpdate为true  同时唤醒sender线程'
            int version = metadata.requestUpdate();
            sender.wakeup();
            try {
                '//等待更新元数据'
                metadata.awaitUpdate(version, remainingWaitMs);
            } catch (TimeoutException ex) {
                // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
                throw new TimeoutException(
                        String.format("Topic %s not present in metadata after %d ms.",
                                topic, maxWaitMs));
            }
            '//重新从缓存中获取'
            cluster = metadata.fetch();
            elapsed = time.milliseconds() - nowMs;
            if (elapsed >= maxWaitMs) {
                throw new TimeoutException(partitionsCount == null ?
                        String.format("Topic %s not present in metadata after %d ms.",
                                topic, maxWaitMs) :
                        String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
                                partition, topic, partitionsCount, maxWaitMs));
            }
            metadata.maybeThrowExceptionForTopic(topic);
            remainingWaitMs = maxWaitMs - elapsed;
            partitionsCount = cluster.partitionCountForTopic(topic);
        } while (partitionsCount == null || (partition != null && partition >= partitionsCount));

        return new ClusterAndWaitTime(cluster, elapsed);
    }
复制代码
  • 1.元数据缓存对象Cluster 里面封装了多个map和list

  • 2.metadata.add 方法 将topic添加进topic map中 value为过期时间

  • 3.cluster.partitionCountForTopic 当前主题下的分区数量

  • 4.metadata.requestUpdate() 更新needUpdate标识为true ,sender会检查这个字段 如果为true 则更新元数据集合。然后就是返回当前元数据集合的版本号 ,sender线程在更新完元数据之后 会对当前版本号自加 我们判断元数据是否更新完成其主要就是依赖这个版本号来做判断的

    public synchronized int requestUpdate() {
        this.needUpdate = true;
        return this.updateVersion;
    }
复制代码
  • 5.sender.wakeup() 唤醒sender线程
  • 6.metadata.awaitUpdate(version, remainingWaitMs) 等待更新元数据集合 可以看到这里又将版本号回传了回去,阻塞等待是调用的SystemTimer的waitObjec方法来的部等待sender线程更新完成
    public synchronized void awaitUpdate(final int lastVersion, final long timeoutMs) throws InterruptedException {
        long currentTimeMs = time.milliseconds();
        long deadlineMs = currentTimeMs + timeoutMs < 0 ? Long.MAX_VALUE : currentTimeMs + timeoutMs;
        time.waitObject(this, () -> {
            // Throw fatal exceptions, if there are any. Recoverable topic errors will be handled by the caller.
            maybeThrowFatalException();
            //版本号作为是否更新完成的依据
            return updateVersion() > lastVersion || isClosed();
        }, deadlineMs);

        if (isClosed())
            throw new KafkaException("Requested metadata update after close");
    }
复制代码

waitObject方法 真正执行等待的方法 while死循环做检查

    @Override
    public void waitObject(Object obj, Supplier<Boolean> condition, long deadlineMs) throws InterruptedException {
        synchronized (obj) {
            while (true) {
                if (condition.get())
                    return;

                long currentTimeMs = milliseconds();
                if (currentTimeMs >= deadlineMs)
                    throw new TimeoutException("Condition not satisfied before deadline");
                //wait等待
                obj.wait(deadlineMs - currentTimeMs);
            }
        }
    }
复制代码

3.2.2 元数据更新完成之后便开始序列化key 和value ,执行分区器,然后进行压缩 判断消息大小 最后调用append追加消息 如果消息满了 则唤醒sender线程发送消息 最后返回future 由用户自主选择是否同步阻塞等待

关于accumulator.append 和sender线程的分析会在接下来的文章做分析



这篇关于Kafka Producer源码解析二:消息发送 send的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程