kafka源码解析之kakfaProducer
2020/2/27 17:16:01
本文主要是介绍kafka源码解析之kakfaProducer,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
前言
kafka作为消息队列领域的中流砥柱,作为后端开发,除了会使用以外,还需要深入的研究和分析其内在实现和设计,借鉴其优秀的设计理念。同时也可以帮助后续在实际问题定位和解决上提供帮助。本文主要介绍kafka发送到底是如何实现的
KafkaProducer的使用
下面的代码中使用最简单的配置来展示如何通过kafka发送一条数据:
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class KafkaProducerTest { public static void main(String[] args) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); producer.send(new ProducerRecord<String, String>("test-topic", "this is a test message")); producer.close(); } } 复制代码
- 从上述代码中可以看出发送一条数据其实很简单:
- 初始化一个
KafkaProducer
实例 - 调用prdoucer的
send
函数发送数据
- 初始化一个
kafka发送流程解析
根据入口方法send,一步步的对调用链的核心内容进行解析,来揭开kafka发送的神秘面纱
入口函数send(ProducerRecord<K, V> record, Callback callback)
函数解析
该方法主要是kafka发送消息的入口函数
话不多说,源码走起:
@Override public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { // 先走一波拦截器,拦截器是提供给用户做消息定制化处理逻辑。 ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record); //核心调用都在这个方法实现里面 return doSend(interceptedRecord, callback); } 复制代码
- 拦截器(
ProducerInterceptors
接口)主要是给用户提供一个钩子,让用户在消息记录发送之前,或者producer回调方法执行之前,对消息或者回调信息做一些定制的逻辑处理。- 通过参数 "interceptor.classes" 配置,可以是以
,
分隔的多个类全量名。
- 通过参数 "interceptor.classes" 配置,可以是以
- 然后调用
doSend(ProducerRecord<K, V> record, Callback callback)
函数,核心调用逻辑应该在这里面
核心方法doSend(ProducerRecord<K, V> record, Callback callback)
函数解析
该方法封装了kafka发送的核心逻辑,包含了到sender实际发送之前的所有处理流程。
先描述以下整体流程,然后再看源码,这样大家更有带入感:
- 获取当前topic的metadata(主要包含分区信息),主要流程是确认当前的topic的元数据是有效的,如果不存在或者需要更新,则需要阻塞从broker中拉取更新。
- 根据序列化配置对record的key和value进行序列化,得到byte数组
- 根据分区策略配置获取当前record需要发送的partioner号,然后从前面的metadata获取partioner元数据
- 向数据追加器 追加record数据,kafka为了提高效率,会将几条数据合并为一条数据,然后发送
- 追加完成后,如果已经又满足发送条件的数据,则通过唤醒sender来真正执行发送
让我们通过源码,来看一下是否是上述的流程:
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { TopicPartition tp = null; try { //第一步,确认当前的topic是有效的,然后获取集群信息(内部包含topic的信息) ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs); //上述调用中,存在阻塞等待更新的场景,需要减去这部分耗时 long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs); Cluster cluster = clusterAndWaitTime.cluster; //第二步,对key和value进行序列化 byte[] serializedKey; try { serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key()); } catch (ClassCastException cce) { ... } byte[] serializedValue; try { serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value()); } catch (ClassCastException cce) { ... } // 第三步,确认当前数据的分区号 int partition = partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); //计算包装后的kafka消息体的大小 setReadOnly(record.headers()); Header[] headers = record.headers().toArray(); int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(), compressionType, serializedKey, serializedValue, headers); ensureValidRecordSize(serializedSize); long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp(); //将拦截器和回调钩子形成一个调用链,在异步回调结果的时候会进行调用 Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp); //第四步,通过消息追加器(RecordAccumulator)追加数据 RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs); //第五步, 如果满足发送时机,则唤醒sender函数。由他执行真正的发送逻辑 if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } return result.future; } //异常捕获处理逻辑 ... } 复制代码
整个代码的实现流程还是比较清晰的,整体流程上文已经描述,需要注意以下几个核心依赖类:
ProducerInterceptors
拦截器接口MetaData
元数据实体,里面封装和包含了kafka集群的整个元数据信息org.apache.kafka.common.serialization.Serializer
序列化接口,包含key序列化和value序列化Partitioner
分区策略接口和PartitionInfo分区信息实体RecordAccumulator
消息累加器,主要负责消息的合并和加入到缓存中ProducerBatch
kafka实际发送的一条消息实体,是由一条或者多条Record合并而成。Sender
其实是一个Runnable接口的实现类,不断的循环,读取ProducerBatch,然后真正执行发送流程
他们的组合关系,如图所示
- 关系图中没有Sender是因为后续会有专门一篇文章解析整个流程
waitOnMetadata 解析
核心功能是获取topic的详细信息,内部封装了阻塞更新,核心依赖是
org.apache.kafka.clients.Metadata
类
//只保留核心代码,去除不在本文解析范围内的代码 private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException { //在Metadata中维护了topic的过期更新时间映射关系,add操作更新topic的过期更新时间,如果第一次加入,则需要立即更新(更新标记为) metadata.add(topic); //fetch是一个synchronized同步方法,保障了线程安全,获取一个最新可用的集群信息 Cluster cluster = metadata.fetch(); // 从集群的topic数据中,获取当前topic的分区数,主要是判断该topic的信息是否有效 Integer partitionsCount = cluster.partitionCountForTopic(topic); //有效的话就直接返回 if (partitionsCount != null && (partition == null || partition < partitionsCount)) return new ClusterAndWaitTime(cluster, 0); // 下面就是阻塞等待topic元数据信息更新成功或者超时 long begin = time.milliseconds(); long remainingWaitMs = maxWaitMs; long elapsed; do { metadata.add(topic); // 返回当前版本号,初始值为0,每次更新完成后会自增,并将 needUpdate 设置为 true int version = metadata.requestUpdate(); //将sender唤醒,向broker发送更新metadata请求 sender.wakeup(); try { //阻塞等待更新 metadata.awaitUpdate(version, remainingWaitMs); } catch (TimeoutException ex) { //处理超时异常 } cluster = metadata.fetch(); //kafka存在一个最大阻塞时间,需要减去这部分阻塞耗时。如果超时,则直接返回异常,消息发送失败 elapsed = time.milliseconds() - begin; if (elapsed >= maxWaitMs) throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); if (cluster.unauthorizedTopics().contains(topic)) throw new TopicAuthorizationException(topic); remainingWaitMs = maxWaitMs - elapsed; //判断是否更新完成 partitionsCount = cluster.partitionCountForTopic(topic); } while (partitionsCount == null); if (partition != null && partition >= partitionsCount) { throw new KafkaException( String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount)); } return new ClusterAndWaitTime(cluster, elapsed); } 复制代码
- 如果不存在或者topic信息已失效需要更新,则会通过do...while来等待集群的信息的更新成功。do..while的核心逻辑
- metadata 中会有一个版本号,从0开始,每次更新的时候累加,在awaitUpdate()中会await等待版本号累加来判断是否更新完成。
- 是否需要更新,在Metadata中维护了一个标记位
needUpdate
来控制是否需要更新 - 当设置更新后,sender.wakeup() 唤醒 sender 线程,由sender去执行发送流程
最终函数的返回是ClusterAndWaitTime
,最新的集群信息和阻塞耗时
key 和value序列化解析
key和value的序列化需要实现
org.apache.kafka.common.serialization.Serializer
接口
kafka已经内置了很多序列化,在org.apache.kafka.common.serialization
包下面,最为常用的就是StringSerializer。一般Serializer 和Deserializer 是对应的
分配当前消息的发送分区
通过实现
org.apache.kafka.clients.producer.Partitioner
接口来设计自定义消息分区规则。
kafka 会提供一个默认的分区策略,一般都使用这个分区策略org.apache.kafka.clients.producer.internals.DefaultPartitioner
,分区规则如下:
- 如果key为null,则采用轮询来计算分区和分配
- 如果key不为null则使用称之为murmur的Hash算法(非加密型Hash函数,具备高运算性能及低碰撞率)来计算分区分配。
消息的追加和合并
kafka为了提高效率,会对多条消息进行合并,同时也为了防止消息量比较少时,无法达到合并大小,会有一个最大发送周期的配置,任意一个触发,都会将消息发送出去
看一下RecordAccumulator.append方法的实现逻辑
public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long maxTimeToBlock) throws InterruptedException { //有一个计数器,来确认并发追加的数量。来确保消息不会处理丢失 appendsInProgress.incrementAndGet(); ByteBuffer buffer = null; if (headers == null) headers = Record.EMPTY_HEADERS; try { //为每一个topic+partionerNum维护一个双端队列,用来保存已经合并后的批量消息体(ProducerBatch) Deque<ProducerBatch> dq = getOrCreateDeque(tp); //防止并发执行 synchronized (dq) { ... //尝试往最后一个ProducerBatch追加消息,如果有足够的内存,则返回结果 RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); if (appendResult != null) return appendResult; } //如果队列为空或者ProducerBatch没有足够空间了,则需要新建一个ProducerBatch byte maxUsableMagic = apiVersions.maxUsableProduceMagic(); //kafka会要求设置一个标准批消息的大小(batchSize),为了提供NIO的Buffer重复利用,省去重新分配和回收的开销,如果消息超过标准大小,则以当前消息大小申请空间 int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); //从总缓存池中申请缓存块,这个申请的过程会在后面的文章中专门描述 buffer = free.allocate(size, maxTimeToBlock); synchronized (dq) { //这里重复尝试的原因是free.allocate会有阻塞等待,可能其他线程已经新建了一个批消息,则直接追加,提高利用效率 if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); if (appendResult != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... return appendResult; } //根据buffer + record新建一个ProducerBatch MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic); ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds()); FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds())); //追加到双端队列中 dq.addLast(batch); incomplete.add(batch); //主要是finally代码块中需要回收没有使用的缓存块 buffer = null; return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true); } } finally { //如果在第二次synchronized (dq),然后追加到别的线程创建的ProducerBatch,则需要将申请到的buffer给回收掉 if (buffer != null) free.deallocate(buffer); appendsInProgress.decrementAndGet(); } } 复制代码
- 从代码中可以看出在append主要做了一件事:对消息进行合并,放到缓存区中。这中间的细节会在下一篇文章中详细描述,包括怎么进行合并,缓存块如何进行划分和高效利用
sender发送
sender是Runnable接口的实现类,独立一个线程不断的从RecordAccumulator的双端队列中,取符合发送条件的批消息进行发送
- 这里面的细节后续会专门描述。可以理解为数据组装和数据发送进行了解耦,kafakaProducer主要负责的就是数据组装,如果有满足发送条件的,则将将sender线程唤起,然后起来干活了。
小结
kafkaProducer中的主要流程已经介绍完成了,相信大家对kafkaProducer如何对数据进行发送有了一个整体的了解,后续关于合并,实际如何发送会做专门的讲解
这篇关于kafka源码解析之kakfaProducer的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-01UniApp 中组件的生命周期是多少-icode9专业技术文章分享
- 2024-11-01如何使用Svg Sprite Icon简化网页图标管理
- 2024-10-31Excel数据导出课程:新手从入门到精通的实用教程
- 2024-10-31Excel数据导入课程:新手入门指南
- 2024-10-31RBAC的权限课程:新手入门教程
- 2024-10-31Svg Sprite Icon课程:新手入门必备指南
- 2024-10-31怎么配置 L2TP 允许多用户连接-icode9专业技术文章分享
- 2024-10-31怎么在FreeBSD上 安装 OpenResty-icode9专业技术文章分享
- 2024-10-31运行 modprobe l2tp_ppp 时收到“module not found”消息提醒是什么-icode9专业技术文章分享
- 2024-10-31FreeBSD的下载命令有哪些-icode9专业技术文章分享