C# Kafka重置到最新的偏移量,即从指定的Partition订阅消息使用Assign方法
2023/4/11 18:52:21
本文主要是介绍C# Kafka重置到最新的偏移量,即从指定的Partition订阅消息使用Assign方法,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
在使用Kafka的过程中,消费者断掉之后,再次开始消费时,消费者会从断掉时的位置重新开始消费。
场景再现:比如昨天消费者晚上断掉了,今天上午我们会发现kafka消费的数据不是最新的,而是昨天晚上的数据,由于数据量比较多,也不会及时的消费到今天上午的数据,这个时候就需要我们对偏移量进行重置为最新的,以获取最新的数据。
前提,我们使用的AutoOffsetReset配置是Latest,即从连接到Kafka那一刻开始消费之后产生的消息,之前发布的消息不在消费,这也是默认的配置。
关于AutoOffsetReset这个枚举的配置项如下:
-
latest
(default) which means consumers will read messages from the tail of the partition
最新(默认) ,这意味着使用者将从分区的尾部读取消息,只消费最新的信息,即自从消费者上线后才开始推送来的消息。那么会导致忽略掉之前没有处理的消息。 -
earliest
which means reading from the oldest offset in the partition
这意味着从分区中最早的偏移量读取;自动从消费者上次开始消费的位置开始,进行消费。 -
none
throw exception to the consumer if no previous offset is found for the consumer's group
如果没有为使用者的组找到以前的偏移量,则不会向使用者抛出异常。
接下来,我们直接使用下面这一段代码即可:
使用Assign订阅指定的分区,注意最后还需要使用Subscribe方法订阅
consumer.Assign(new TopicPartitionOffset(new TopicPartition(topic, new Partition(1)),Offset.End));//从指定的Partition订阅消息使用Assign方法 consumer.Subscribe(topic);//订阅消息使用Subscribe方法
从指定的分区获取数据,并且指定了对应的偏移量
关于Offset这个枚举不同配置项的说明如下:
Offset 可以被设置为 Beginning、End、Stored 和 Unset。这些值的含义如下:
-
Beginning:从 Kafka 分区的最早消息(Offset 为 0)开始消费。如果分区中有新消息产生,消费者会继续消费这些消息。
-
End:从 Kafka 分区的最新消息开始消费。如果消费者在启动后到达了 Kafka 分区的末尾,它将停止消费,并等待新消息的到来。
-
Stored:从消费者存储的 Offset 开始消费。这个 Offset 通常是消费者在上次停止消费时存储的 Offset。如果存储的 Offset 失效或者已过期,消费者会从最新的消息(End)开始消费。
-
Unset:在消费者启动时,Offset 没有被设置。在这种情况下,消费者将根据 auto.offset.reset 配置项的值来决定从哪里开始消费。如果 auto.offset.reset 的值为 latest,则从最新的消息开始消费;如果 auto.offset.reset 的值为 earliest,则从最早的消息开始消费。
需要注意的是,如果设置了 Stored 的 Offset,但是在 Kafka 中找不到对应的消息,消费者将会从最新的消息(End)开始消费。
因此,存储的 Offset 必须要有效才能够被正确地使用
这篇关于C# Kafka重置到最新的偏移量,即从指定的Partition订阅消息使用Assign方法的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2022-03-01沐雪多租宝商城源码从.NetCore3.1升级到.Net6的步骤
- 2024-12-06使用Microsoft.Extensions.AI在.NET中生成嵌入向量
- 2024-11-18微软研究:RAG系统的四个层次提升理解与回答能力
- 2024-11-15C#中怎么从PEM格式的证书中提取公钥?-icode9专业技术文章分享
- 2024-11-14云架构设计——如何用diagrams.net绘制专业的AWS架构图?
- 2024-05-08首个适配Visual Studio平台的国产智能编程助手CodeGeeX正式上线!C#程序员必备效率神器!
- 2024-03-30C#设计模式之十六迭代器模式(Iterator Pattern)【行为型】
- 2024-03-29c# datetime tryparse
- 2024-02-21list find index c#
- 2024-01-24convert toint32 c#