KafkaProducer源码
2021/7/27 11:36:12
本文主要是介绍KafkaProducer源码,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
文章目录
- 特征
- 线程安全
- 异步发送
- 元数据Future
- 核心参数:
- KafkaProducerUML图
- Producer接口方法
- KafkaProducer核心属性与方法
- KafkaProducer简单实例
特征
线程安全
多个线程可以交叉调用
异步发送
内部有一个消息累加器RecordAccumulator作为缓冲池,里面包含多个ProducerRecord队列,用于异步接收KafkaProducer.send()发送的的消息并把消息发送到broker,发送消息时会唤醒一个IO线程名叫Sender
元数据Future
Future send(ProducerRecord<K,V>, Callback)异步发送,发送后立即返回一个凭证Future,RecordMetadata里是集群的主题分区副本元信息
核心参数:
- acks:设为all是指isr中所有broker(0,all/-1,1)
- retries:消息发送失败后的重试次数
- batch.size和linger.ms:kafka 消息发送者为每一个分区维护一个未发送消息积压缓存区,其内存大小由batch.size指定,默认为 16K
但如果缓存区中不足100条,但发送线程此时空闲,是需要等到缓存区中积满100条才能发送还是可以立即发送呢?默认是立即发送,即 batch.size 的作用其实是客户端一次发送到broker的最大消息数量。为了提高 kafka 消息发送的高吞吐量,即控制在缓存区中未积满 batch.size 时来控制 消息发送线程的行为,是立即发送还是等待一定时间,如果linger.ms 设置为 0表示立即发送,如果设置为大于0,则消息发送线程会等待这个值后才会向broker发送。该参数值会增加响应时间,但有利于增加吞吐量。有点类似于 TCP 领域的 Nagle 算法。 - buffer.memory:消息发送者内存总大小。超过该值,往缓存区中添加消息会被阻塞。阻塞的最大时间可通过参数 max.block.ms 设置,阻塞超过该值会抛出超时异常。
- key.serializer和value.serializer
- enable.idempotence:支持消息传递幂等,可以做到消息只会被传递一次,通过 enable.idempotence 为 true 来开启。如果该值设置为 true,其 retries 将设置为 Integer.MAX_VALUE,acks 将被设置为 all。为了确保消息发送幂等性,必须避免应用程序端的任何重试,并且如果消息发送API如果返回错误,应用端应该记录最后成功发送的消息,避免消息的重复发送。
- 0.11版本后开始支持幂等和事务消息
KafkaProducerUML图
Producer接口方法
- void initTransactions()
- void beginTransaction()
初始化与开启事务,执行事务方法时调用 - void abortTransaction()
回滚事务。 - void commitTransaction()
提交事务。 - void sendOffsetsToTransaction(Map< TopicPartition, OffsetAndMetadata> offsets,String consumerGroupId)
- void sendOffsetsToTransaction(Map< TopicPartition, OffsetAndMetadata> offsets,ConsumerGroupId)
向消费组提交当前事务中的消息偏移量。 - void flush()
忽略 linger.ms 的值,直接唤醒IO线程,将缓冲区中的消息全部发送到 broker。 - List< PartitionInfo> partitionsFor(String topic)
获取 topic 的路由信息(分区信息)。 - Map< MetricName, ? extends Metric> metrics()
获取由生产者收集的统计信息。 - Future< RecordMetadata> send(ProducerRecord<K, V> record)
- Future< RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)
消息发送,该方法默认为异步发送,如果要实现同步发送的效果,对返回结果调用 get 方法即可。支持回调。 - void close()
- void close(Duration timeout)
关闭或者定时关闭事务
KafkaProducer核心属性与方法
- Logger log
日志记录 - String JMX_PREFIX = “kafka.producer”
- String NETWORK_THREAD_PREFIX = “kafka-producer-network-thread”
- final String PRODUCER_METRIC_GROUP_NAME = “producer-metrics”
- String clientId
客户端ID。创建 KafkaProducer 时可通过 client.id 定义 clientId,如果未指定,则默认 producer-seq,seq 在线程内递增,强烈建议客户端显式指定 clientId。 - Metrics metrics
指标相关存储类,例如消息体大小,耗时等监控相关指标。 - Partitioner partitioner
分区负载均衡算法,通过参数 partitioner.class 指定。 - int maxRequestSize
调用 send 方法发送的最大请求大小,包括 key、消息体序列化后的消息总大小不能超过该值。通过参数 max.request.size 指定。 - long totalMemorySize
生产者缓存所占内存的总大小,通过参数 buffer.memory指定。 - ProducerMetadata metadata
例如 topic 的路由信息,由 KafkaProducer 自动更新。 - RecordAccumulator accumulator
消息累加器 - Sender sender
封装消息发送的逻辑,即向 broker 发送消息的处理逻辑。 - Thread ioThread
消息发送的后台线程,一个独立的线程,内部使用 Sender 来向 broker 发送消息。 - CompressionType compressionType
默认不启用压缩,compression.type配置可选值:none、gzip、snappy、lz4、zstd。 - Sensor errors
错误信息收集器,类似metrics,用于监控。 - Time time
获取系统时间或线程睡眠等。 - Serializer keySerializer
键序列化器 - Serializer valueSerializer
值序列化器 - ProducerConfig producerConfig
生产者配置 - long maxBlockTimeMs
最大阻塞时间,缓存已满时,消息发送会阻塞,等待多久会抛出异常,通过参数max.block.ms指定。 - ProducerInterceptors<K, V> interceptors
生产者消息拦截器,消息发送前定制化处理。 - ApiVersions apiVersions
维护 api 版本的相关元信息,该类只能在 kafka 内部使用。 - TransactionManager transactionManager
事务环境下上下文初始化结果
KafkaProducer简单实例
package persistent.prestige.demo.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; import java.util.concurrent.Future; public class KafkaProducerTest { public static void main(String[] args){ Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092,localhost:9082,localhost:9072,"); props.put("acks", "all"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); try { for (int i = 0; i < 100; i++) { Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("TOPIC_ORDER", Integer.toString(i), Integer.toString(i))); RecordMetadata recordMetadata = future.get(); System.out.printf("offset:" + recordMetadata.offset()); } } catch (Throwable e) { e.printStackTrace(); } finally { producer.close(); } } }
这篇关于KafkaProducer源码的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-12-23DevExpress 怎么实现右键菜单(Context Menu)显示中文?-icode9专业技术文章分享
- 2024-12-22怎么通过控制台去看我的页面渲染的内容在哪个文件中呢-icode9专业技术文章分享
- 2024-12-22el-tabs 组件只被引用了一次,但有时会渲染两次是什么原因?-icode9专业技术文章分享
- 2024-12-22wordpress有哪些好的安全插件?-icode9专业技术文章分享
- 2024-12-22wordpress如何查看系统有哪些cron任务?-icode9专业技术文章分享
- 2024-12-21Svg Sprite Icon教程:轻松入门与应用指南
- 2024-12-20Excel数据导出实战:新手必学的简单教程
- 2024-12-20RBAC的权限实战:新手入门教程
- 2024-12-20Svg Sprite Icon实战:从入门到上手的全面指南
- 2024-12-20LCD1602显示模块详解