Kafka消息队列
2022/1/9 23:34:17
本文主要是介绍Kafka消息队列,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
之前也学习过消息队列,但一直没有使用的场景,今天项目中遇到了 kafka 那便有了应用场景
1. Kafka
Kafka 是一个分布式、支持分区,多副本的基于 zookeeper 的消息队列。使用消息队列,是应用 A 将要处理的信息发送到消息队列然后继续下面的任务,需要该信息的应用 B 从消息队列里面获取信息再做处理,这样做像是多此一举,应用 A 直接发信息给应用 B 不就可以了吗?存在即合理,使用消息队列其作用如下:
- 异步处理:用户注册后发送邮件、短信、验证码等可以异步处理,使注册这个过程写入数据库后就可立即返回
- 流量消峰:秒杀活动超过阈值的请求丢弃转向错误页面,然后根据消息队列的消息做业务处理
- 日志处理:可以将error的日志单独给消息队列进行持久化处理
- 应用解耦:购物的下单操作,订单系统与库存系统中间加消息队列,使二者解耦,若后者故障也不会导致消息丢失
之前 笔者也写过 RabbitMQ 的笔记,传送门
2. 生产消费模型
结合 kafka 的下面这些名词来解释其模型会更加容易理解
名称 | 解释 |
---|---|
Broker | kafka 的实例,部署多台 kafka 就是有多个 broker |
Topic | 消息订阅的话题,是这些消息的分类,类似于消息订阅的频道 |
Producer | 生产者,负责往 kafka 发送消息 |
Consumer | 消费者,从 kafka 读取消息来进行消费 |
3. 安装部署
kafka 和依赖的 zookeeper 是 java 编写的工具,其需要 jdk8 及其以上。笔者这里使用 Docker 安装,偷懒了贪图方便快捷
# 使用 wurstmeister 制作的镜像 docker pull wurstmeister/zookeeper docker pull wurstmeister/kafka # 启动 zookeeper docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper # 单机启动 kafka docker run -d --name kafka -p 9092:9092 \ -e KAFKA_BROKER_ID=0 \ -e KAFKA_ZOOKEEPER_CONNECT=xxx.xxx.xxx.xxx:2181 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://xxx.xxx.xxx.xxx:9092 \ -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka
4. Quickstart
kafka 官网也有很好的介绍,quickstart
# 进入kafka容器 docker exec -it kafka /bin/sh # 进入 bin 目录 cd /opt/kafka_2.13-2.8.1/bin # partitions 分区 # replication 副本因子 # 创建一个主题(参数不懂可直接填写,后面会讲解说明) ./kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic quickstart-events --bootstrap-server localhost:9092 # 查看 ./kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092 # 写入 topic(回车表示一条消息,ctrl + c 结束输入) # 消息默认存储 7 天,下一步的消费可以验证 ./kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092 This is my first event This is my second event # 读取 topic(运行多次可以读取消息,因为默认存储 7 天) ./kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092 ./kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
5. SpringBoot 集成
SpringBoot 集成了 Kafka,添加依赖后可使用内置的 KafkaTemplate 模板方法来操作 kafka 消息队列
5.1 添加依赖
<!-- sprinboot版本管理中有kafka可不写版本号 --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
5.2 配置文件
server: port: 8080 spring: # 消息队列 kafka: producer: # broker地址,重试次数,确认接收个数,消息的编解码方式 bootstrap-servers: 101.200.197.22:9092 retries: 3 acks: 1 key-serializer: org.springframework.kafka.support.serializer.StringSerializer value-serializer: org.springframework.kafka.support.serializer.StringSerializer consumer: # broker地址,自动提交,分区offset设置 bootstrap-servers: 101.200.197.22:9092 enable-auto-commit: false auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
5.3 生产者
@RestController @RequestMapping("/kafka") public class Producer { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; @GetMapping("/producer1") public String sendMessage1(@RequestParam(value = "message", defaultValue = "123") String message) throws ExecutionException, InterruptedException { ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("topic1", message); SendResult<String, Object> sendResult = future.get(); return sendResult.toString(); } @GetMapping("/producer2") public String sendMessage2(@RequestParam(value = "message", defaultValue = "123") String message) throws ExecutionException, InterruptedException { ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("topic1", message); future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() { @Override public void onFailure(Throwable ex) { System.out.println("faile"); } @Override public void onSuccess(SendResult<String, Object> result) { System.out.println("success"); } }); return ""; } }
5.4 消费者
@Component public class Consumer { @KafkaListener(topics = {"topic1"}) public void onMessage(ConsumerRecord<?, ?> record) { System.out.println(record.value()); } }
6. 存储目录结构
kafka |____kafka-logs |____topic1 | |____00000000000000000000.log(存储接收的消息) | |____consumer_offsets-01(消费者偏移量) | |____consumer_offsets-02 |____topic2 |____00000000000000000000.log |____consumer_offsets-01 |____consumer_offsets-02
每台 broker 实例接收到消息后将之存储到 00000.log 里面,保存的方式是先入先出。消息被消费后不会被删除,相反可以设置 topic 的消息保留时间,重要的是 Kafka 的性能在数据大小方面实际上是恒定的,因此长时间存储数据是完全没问题的
消费者会将自己消费偏移量 offset 提交给 topic 在 _consumer_offsets 里面保存,然后通过偏移量来确定消息的位置,默认从上次消费的位置开始,添加参数 --frombeginning 则从头开始消费,可获取之前所有存储的消息。kafka 也会定期清除内部的消息,直到保存最新的一条(文件保存的消息默认保存 7 天)
7. 消费组
这个在笔者配置消费者的时候发现的问题,启动时报错说没有指定消费组
- 每条分区消息只能被同组的一个消费者消费,consumer1 和 consumer2 同组,所以只有其中一个能消费同条消息
- 每条分区消息能被不同组的单个消费者消费,consumer2 和 consumer4 不同组,所以都能消费同条消息
- 以上二个规则同时成立
- 其作用是可以保证消费顺序,同个分区里的消息会被同个消费者顺序消费
8. 分区和副本
topic 消息保存的文件 0000.log 可以进行物理切分,这就是分区的概念,类似于数据库的分库分表。这样做的好处在于单个保存的文件不会太大从而影响性能,最重要的是分区后不是单个文件串行执行了,而是多区多文件可并行执行提高了并发能力
分区:消费者会消费同一 topic 的不同分区,所以会保存不同分区的偏移量,其格式为:GroupId + topic + 分区号
副本:副本是对分区的备份,集群中不同的分区在不同的 broker 上,但副本会对该分区备份到指定数量的 broker 上,这些副本有 leader 和 follower 的区别,leader负责读写,挂了再重新选举,副本为了保持数据一致性
9. 常见问题
9.1 生产者同步和异步消息
生产者发送消息给 broker,之后 broker 会响应 ack 给生产者,生产者等待接收 ack 信号 3 秒,超时则重试 3 次
生产者 ack 确认配置:
- ack = 0:不需要同步消息
- ack = 1:则 leader 收到消息,并保存到本地 log 之后才响应 ack 信息
- ack 默认配置为 2
9.2 消费者自动提交和手动提交
- 自动提交:消费者 pull 消息之后马上将自身的偏移量提交到 broker 中,这个过程是自动的
- 手动提交:消费者 pull 消息时或之后,在代码里将偏移量提交到 broker
- 二者区别:防止消费者 pull 消息之后挂掉,在消息还没消费但又提交了偏移量
9.3 消息丢失和重复消费
- 消息丢失
- 生产者:配置 ack ,以及配置副本和分区数值一致
- 消费者:设置手动提交
- 重复消费
- 设置唯一主键,Mysql 主键唯一则插入失败
- 分布式锁
9.4 顺序消费方案
- 生产者:关闭重试,使用同步发送,成功了再发下一条
- 消费者:消息发送到一个分区中,只有一个消费组的消费者能接收消息
这篇关于Kafka消息队列的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-12-21MQ-2烟雾传感器详解
- 2024-12-09Kafka消息丢失资料:新手入门指南
- 2024-12-07Kafka消息队列入门:轻松掌握Kafka消息队列
- 2024-12-07Kafka消息队列入门:轻松掌握消息队列基础知识
- 2024-12-07Kafka重复消费入门:轻松掌握Kafka消费的注意事项与实践
- 2024-12-07Kafka重复消费入门教程
- 2024-12-07RabbitMQ入门详解:新手必看的简单教程
- 2024-12-07RabbitMQ入门:新手必读教程
- 2024-12-06Kafka解耦学习入门教程
- 2024-12-06Kafka入门教程:快速上手指南