如何获取 Kafka 的消费者详情? —— 从 Scala 到 Java 的切换
2021/9/7 17:07:56
本文主要是介绍如何获取 Kafka 的消费者详情? —— 从 Scala 到 Java 的切换,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
目标及实现
实现如同 bin/kafka-consumer-group.sh –describe –bootstrap-server localhost:9092 –group CONSUMER_GROUP_ID的效果:
[root@node2 kafka_2.12-1.0.0]# bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --group CONSUMER_GROUP_ID TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID topic-test1 0 1648 1648 0 CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261 /192.168.92.1 CLIENT_ID topic-test1 1 1648 1648 0 CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261 /192.168.92.1 CLIENT_ID topic-test1 2 1648 1648 0 CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261 /192.168.92.1 CLIENT_ID topic-test1 3 1648 1648 0 CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261 /192.168.92.1 CLIENT_ID
KafkaConsumerGroupService的核心方法是CollectGroupAssignment,其方法参数为一个consumer group的groupId,方法输出为上面示例中的列表信息。CollectGroupAssignment方法主要有以下几个步骤:
- 根据groupId调用describeConsumerGroup方法(内部原理是发送DescribeGroupsRequest请求)来获取consumer group的基本信息,参考上面示例中的CONSUMER-ID、HOST、CLIENT-ID以及TopicPartition信息,但是没有CURRENT-OFFSET、LOG-END-OFFSET、LAG信息。注意这里的LOG-END-OFFSET是消费者可见的LEO,不是生产者可见的LEO,也就是通俗意义上的HW。
- 根据groupId调用listGroupOffsets方法(内部原理是发送OffsetFetchRequest请求)来获取各个分区(Partition)的对应的消费位移CURRENT-OFFSET。
- 通过调用KafkaConsumer的endOffsets方法来获取TopicPartition对应的HW,即示例中的LOG-END-OFFSET。
- 计算Lag并组合成信息列表List。
改造
对应Java版的KafkaConsumerGroupService改造代码可以参见代码,目录结构如下图所示:
其中model中的ConsumerGroupSummary、ConsumerSummary和PartitionAssignmentState是简单的JavaBean, PartitionAssignmentState是用来保存每个TopicPartition的消费者信息的,具体内容参考如下。KafkaConsumerGroupCustomService就是本文所要陈述的Java改造办的KafkaConsumerGroupSerivice,ConsumerGroupUtils用来存放一些公用的代码。
@Data @Builder public class PartitionAssignmentState { private String group; // groupId private Node coordinator; // consumer coodinator节点信息 private String topic; private int partition; private long offset; private long lag; private String consumerId; private String host; private String clientId; private long logEndOffset; }
初始化KafkaConsumerGroupCustomService需要Kafka的服务端地址,然后初始化AdminClient和KafkaConsumer,AdminClient中包含了众多管理类方法,主要是通过发送各种自定义协议请求来完成,上面步骤中所说的describeConsumerGroup和listGroupOffsets方法也是通过AdminClient来实现的;KafkaConsumer主要是用来获取TopicPartition对应的HW(消费者可见的LogEndOffsets)的。
KafkaConsumerGroupCustomService中与scala版对应的collectGroupAssignment方法如下(详细步骤参考代码注释):
public List<PartitionAssignmentState> collectGroupAssignment( AdminClient adminClient, KafkaConsumer<String, String> consumer, String group) { //1. 获取consumer group的基本信息,包括CONSUMER-ID、HOST、 // CLIENT-ID以及TopicPartition信息 AdminClient.ConsumerGroupSummary consumerGroupSummary = adminClient.describeConsumerGroup(group, 0); List<TopicPartition> assignedTopicPartitions = new ArrayList<>(); List<PartitionAssignmentState> rowsWithConsumer = new ArrayList<>(); scala.collection.immutable.List<AdminClient.ConsumerSummary> consumers = consumerGroupSummary.consumers().get(); if (consumers != null) { //2. 获取各个分区(Partition)的对应的消费位移CURRENT-OFFSET scala.collection.immutable.Map<TopicPartition, Object> offsets = adminClient.listGroupOffsets(group); if (offsets.nonEmpty()) { String state = consumerGroupSummary.state(); // 3. 还有一个状态是Dead表示"group"对应的consumer group不存在 if (state.equals("Stable") || state.equals("Empty") || state.equals("PreparingRebalance") || state.equals("AwaitingSync")) { List<ConsumerSummary> consumerList = changeToJavaList(consumers); // 4. 获取当前有消费者的消费信息,即包含CONSUMER-ID、HOST、CLIENT-ID rowsWithConsumer = getRowsWithConsumer(consumerGroupSummary, offsets, consumer, consumerList, assignedTopicPartitions, group); } } //5. 获取当前没有消费者的消费信息 List<PartitionAssignmentState> rowsWithoutConsumer = getRowsWithoutConsumer(consumerGroupSummary, offsets, consumer, assignedTopicPartitions, group); //6. 合并结果 rowsWithConsumer.addAll(rowsWithoutConsumer); } return rowsWithConsumer; }
KafkaConsumerGroupCustomService类中包含有getRowsWithConsumer()、getRowsWithoutConsumer()、changeToJavaList等私有方法也都是在Scala语言与Java语言之间进行切换,这样可以不需要修改kafka-core的原生代码而通过外部的封装调用既可以实现获取Kafka消费者详情的功能。光看代码比较抽象,建议对此感兴趣的同学可以亲自对比一下kafka-core包中kafka.admin.ConsumerGroupCommand的KafkaConsumerGroupSerivice与笔者自定义的KafkaConsumerGroupCustomService的实现来了解下Scala语言到Java语言的转换。
如果需要打印详情可以调用KafkaConsumerGroupCustomService同目录的ConsumerGroupUtils类中的printPasList(List list)方法。注意要运行这些代码需要JDK8的环境,笔者为了让代码显得“骚气”一点就用来一点Java8的语法,如果需要Java7的代码实现可以关注私聊。
或许有些同学对于Scala和Java交叉的代码并不感冒,想要寻求一种存Java式的实现方式,那么在这里怎么实现呢?答案是通过KafkaAdminClient,它是AdminClient的Java版实现,从Kafka0.11.0.0版本开始引入的,不过KafkaAdminClient本身并没有提供describeConsumerGroup、listGroupOffsets之类的方法给我们直接使用,扩展一下也很方便,由于篇幅限制,这部分的内容将在下一篇文章中进行介绍,如果想要先一睹为快,可以参考下代码实现,详细的逻辑解析敬请期待….
这篇关于如何获取 Kafka 的消费者详情? —— 从 Scala 到 Java 的切换的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-02Java管理系统项目实战入门教程
- 2024-11-02Java监控系统项目实战教程
- 2024-11-02Java就业项目项目实战:从入门到初级工程师的必备技能
- 2024-11-02Java全端项目实战入门教程
- 2024-11-02Java全栈项目实战:从入门到初级应用
- 2024-11-02Java日志系统项目实战:初学者完全指南
- 2024-11-02Java微服务系统项目实战入门教程
- 2024-11-02Java微服务项目实战:新手入门指南
- 2024-11-02Java项目实战:新手入门教程
- 2024-11-02Java小程序项目实战:从入门到简单应用