全站最硬核 百万字强肝RocketMq源码 火热更新中~(八十六)延时队列

2022/1/30 11:04:17

本文主要是介绍全站最硬核 百万字强肝RocketMq源码 火热更新中~(八十六)延时队列,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

offset

在rocketMQ中,offset用来管理每个消息队列的不同消费组的消费进度。对offset的管理分为本地模式和远程模式,本地模式是以文本文件的形式存储在客户端,而远程模式是将数据保存到broker端,对应的数据结构分别为LocalFileOffsetStore和RemoteBrokerOffsetStore。
默认情况下,当消费模式为广播模式时,offset使用本地模式存储,因为每条消息会被所有的消费者消费,每个消费者管理自己的消费进度,各个消费者之间不存在消费进度的交集;当消费模式为集群消费时,则使用远程模式管理offset,消息会被多个消费者消费,不同的是每个消费者只负责消费其中部分消息,添加或删除消费者,都会使负载发生变动,容易造成消费进度冲突,因此需要集中管理。同时,RocketMQ也提供接口供用户自己实现offset管理(实现OffsetStore接口)。
生产环境上一般使用集群模式,这里主要记录集群模式下offset的管理,即RemoteBrokerOffsetStore。
在这里插入图片描述

broke端

offset的存储与加载

rocketMQ的broker端中,offset的是以json的形式持久化到磁盘文件中,文件路径为${user.home}/store/config/consumerOffset.json。其内容示例如下:

{
    "offsetTable": {
        "test-topic@test-group": {
            "0": 88526, 
            "1": 88528
        }
    }
}

broker端启动后,会调用BrokerController.initialize()方法,方法中会对offset进行加载,consumerOffsetManager.load()。获取文件内容后,序列化为ConsumerOffsetManager对象,实质是其属性ConcurrentMap<String,ConcurrentMap<Integer, Long>> **offsetTable,**offsetTable的数据结构为ConcurrentMap,是一个线程安全的容器,key的形式为topic@group(每个topic下不同消费组的消费进度),value也是一个ConcurrentMap,key为queueId,value为消费位移(这里不是offset而是位移)。通过对全局ConsumerOffsetManager对象就可以对各个topic下不同消费组的消费位移进行获取与管理。

/**ConsumerOffsetManager.offsetTable*/
private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
        new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);

/**ConsumerOffsetManager.decode*/
public void decode(String jsonString) {
        if (jsonString != null) {
            // 序列化成功后复制给全局ConsumerOffsetManager对象
            ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class);
            if (obj != null) {
                this.offsetTable = obj.offsetTable;
            }
        }
    }


这篇关于全站最硬核 百万字强肝RocketMq源码 火热更新中~(八十六)延时队列的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程