【金秋打卡】第7天 Go中间件集成学习-go使用kafka
2022/10/31 3:24:55
本文主要是介绍【金秋打卡】第7天 Go中间件集成学习-go使用kafka,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
课程名称:海量数据高并发场景,构建Go+ES8企业级搜索微服务
课程章节:6-4
课程讲师:少林码僧
课程内容:
★如何处理消息丢失问题?
先说一个前置设置:当生产者向集群生产消息的时候,有三种模式。
acks = 0 的时候,是生产者不管集群有没有返回ack响应,都只管发自己的
acks = 1 , 至少要等待集群的leader写入成功,无需等待follower写入成功。但是当leader已经宕掉,新的leader还未选举出来,生产者就会收到错误信息。生产者可以根据设置,重发消息,但是如果新的leader节点(raft算法选举)还没有同步到你发的消息,依然是报错的。这时候,可以采用回调的方式
acks = -1/all, 此模式要保证除了leader收到消息,还要保证 min.insync.replicas=2 这个配置中设定的副本数量收到消息,比如2就是至少两个2副本。一般只有金融系统才需要这么严格。
如果业务对消息丢失人容忍度很低,可以设置acks=-1/all
设置发送重试的次数,次数越多丢的几率越小。比如在go kafka的sdk中设置 config.Producer.Retry.Max = 5 //默认是3
不允许落后的broker成为新的leader
Broker配置文件中设置:unclean.leader.election.enable:false
这个配置是为了限制kafka选举的时候,谁有资格成为leader。当该配置为false的时候,落后于leader的follower将没有资格选为leader。因为在raft算法中,如果一个节点被选举为leader,那么其他节点都要和这个节点进行同步,如果比该节点先进,那么他将放弃他领先于leader的部分,这在kafka上就成为了丢数据
增加消息副本
以空间来换取可靠性。Broker配置文件中配置 replocation.factor:3,就是包含主分区在内所有副本为3,每次新消息将被保存为3份。
设置至少2个副本写入成功
Broker配置文件中设置:min.insync.replicas:2
需要注意的是,副本数量必须大于该值,replication.factor 要设置大于 min.insync.replicas,如果相同,那么一个副本挂掉就全挂了。
★如何避免重复消费的问题?
前置知识:
▲消息传递的三种质量标准
●至多一次(At most once) 允许消息丢失,不允许消息重复
●至少一次(At least once) 允许消息重复,不允许消息丢失
●只有一次(Exactly once) 不允许消息重复,也不允许消息丢失
市面上的队列,包括kafka ,rabbitMQ 等都是默认的 至少一次 送达,那么避免重复消费就有以下几个解决方式:
通过幂等性来处理重复消费问题
▲使用业务主键来做ES的文档_id
▲利用数据库的主键或者唯一索引来实现幂等性
▲通过Token机制或者全局唯一ID的机制记录消息的消费状态来实现,分布式中实现起来比较困难,需要加分布式锁,耗费资源较大
★如何处理消息乱序的问题?
不同应用服务中,多个消费者消费一个topic的时候,并不能保证消费的顺序。
设置分区策略为Hash
在go中,只需要指定消息的key即可,在kafka中,如果有key,那么会对key进行hash,然后对Partition数量进行取模来确定要去哪个Partition、
★哪些情况会触发kafka消费者的rebalance
▲ 什么是消费者的reblance
Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 consumer 如何达成一致,来分配订阅 Topic 的每个分区。
例如:某 Group 下有 20 个 consumer 实例,它订阅了一个具有 100 个 partition 的 Topic 。正常情况下,kafka 会为每个 Consumer 平均的分配 5 个分区。这个分配的过程就是 Rebalance。
▲触发条件
消费组中的消费者加入或者离开,这也是发生最多的情况
partion 数量的变化
对topic的订阅发生变化
▲go的kafka的sdk一些坑
主流的go的sdk由于在设计上出现一些缺陷(可以选用其他的sdk),导致修改完topic配置,新增了partion后,go如果不重启消费者和生产者,有可能一直无法识别新的partion,7天后新分区上的msg可能就丢了
▲如何避免消费者rebalance
将消费者代码尽可能隔离出来,拆分成单独的微服务
消费者所在的服务如果出现panic,会导致整个服务挂掉重启,这就会造成消费者的rebalance
合理设置这个max.poll.interval.ms这个参数
消费者消费时间过长,导致间隔过大,长时间不通知就会被kafka判定为死亡,踢出了消费组。根据业务处理最长时间来设置
- 合理设置 session.timeout.ms 和 heartbeat.interval.ms
这篇关于【金秋打卡】第7天 Go中间件集成学习-go使用kafka的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-12-20go-zero 框架的 RPC 服务 启动start和停止 底层是怎么实现的?-icode9专业技术文章分享
- 2024-12-19Go-Zero 框架的 RPC 服务启动和停止的基本机制和过程是怎么实现的?-icode9专业技术文章分享
- 2024-12-18怎么在golang中使用gRPC测试mock数据?-icode9专业技术文章分享
- 2024-12-15掌握PageRank算法核心!你离Google优化高手只差一步!
- 2024-12-15GORM 中的标签 gorm:"index"是什么?-icode9专业技术文章分享
- 2024-12-11怎么在 Go 语言中获取 Open vSwitch (OVS) 的桥接信息(Bridge)?-icode9专业技术文章分享
- 2024-12-11怎么用Go 语言的库来与 Open vSwitch 进行交互?-icode9专业技术文章分享
- 2024-12-11怎么在 go-zero 项目中发送阿里云短信?-icode9专业技术文章分享
- 2024-12-11怎么使用阿里云 Go SDK (alibaba-cloud-sdk-go) 发送短信?-icode9专业技术文章分享
- 2024-12-10搭建个人博客网站之一、使用hugo创建个人博客网站