Redis 消息队列
2021/10/12 19:16:36
本文主要是介绍Redis 消息队列,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
一、消息队列
消息队列的基本需求
- 消息保序:消息的产生与消费有先后顺序,消息队列需保证消费的有序性。
- 重复消息处理:生产者可能会发送重复消息,需保证消费的幂等性。
- 消息可靠性保证:消息不能丢失,消费失败后,需保证有机制可以重新消费。
消息队列可靠性的保证
- 生产者丢数据
- 消费者丢数据
- 消息队列自身丢数据
二、基于list的消息队列
Redis 的sub/pub机制,可以实现部分的消息队列机制,但遇到网络连接中断,数据库宕机等情况,数据易丢失无法重复消费。基于list的消费队列可以简单的实现消费队列的部分功能。
实现指令
- LPUSH:队列插入一条消息;
- RPOP:队列消费一条消息;
- BRPOP:阻塞式的消费一条消息;
- BRPOPLPUSH:阻塞式的消费一条消息的同时将此消息插入另一个消费队列(用于消费失败时的可重复消费);
使用示例
# 消费队列mq中插入一条消息序列为00001,消息值为msg:5的数据 127.0.0.1:6379> lpush mq 00001:msg:5 (integer) 1 # 消费一条消息 127.0.0.1:6379> rpop mq "00001:msg:5"
127.0.0.1:6379> lpush mq 00002:msg:7 00003:msg:4 (integer) 2 # 阻塞式(3s 超时)消费一条消息 127.0.0.1:6379> brpop mq 3 1) "mq" 2) "00002:msg:7" (2.51s) 127.0.0.1:6379> brpop mq 3 1) "mq" 2) "00003:msg:4"
127.0.0.1:6379> lpush mq 00004:msg:9 (integer) 1 # 阻塞式(3s 超时)消费一条消息,并将消费的数据放入mq2队列,预防消费失败时数据丢失。 127.0.0.1:6379> brpoplpush mq mq2 3 "00003:msg:9" # 消费成功后再删除掉mq2中的消息 127.0.0.1:6379> rpop mq2 "00003:msg:9"
存在问题
- 业务需自行保证序列ID唯一,避免重复消费。
- BRPOPLPUSH为了防止消费失败后可重新消费,会多产生的一条消费队列,占用内存。
- 并不支持消费组概念,当消费速度较慢时,会导致消息队列数据积压。
- 消息队列自身基于redis,本身存在数据丢失风险。
三、基于streams的消息队列
Redis5.0之后新支持了streams数据结构用于支持完备功能的消息队列,新增消费组消费,ACK确认等完备机制。
实现指令
- XADD:插入消息,保证有序,可以自动生成全局唯一 ID;
- XREAD:用于读取消息,可以按 ID 读取数据;
- XREADGROUP:按消费组形式读取消息;
- XPENDING:用来查询每个消费组内所有消费者已读取但尚未确认的消息;
- XACK:用于向消息队列确认消息处理已完成;
使用示例
- xadd key ID field string [field string …]
- ID :可自己指定唯一序列号,一般用* 指定系统自动生成,生成ID序列号:[毫秒时间戳-0开头序列],如"1633939083516-0"
127.0.0.1:6379> xadd mq * money -5 "1633939083516-0" 127.0.0.1:6379> xadd mq * money -5 money +10 "1633953837863-0"
- xrange key start end [COUNT count]
127.0.0.1:6379> xrange mq - + 1) 1) "1633939083516-0" 2) 1) "money" 2) "-5" 2) 1) "1633953837863-0" 2) 1) "money" 2) "-5" 3) "money" 4) "+10"
- xread [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]
- count :读取数目
- block :阻塞毫秒数,不设置则代表非阻塞式读取
- key :消息队列名称
- id :消息序列号,0-0 代表最小开始,$ 代表从现在开始。
127.0.0.1:6379> xread count 2 block 5000 streams mq 0-0 1) 1) "mq" 2) 1) 1) "1633939083516-0" 2) 1) "money" 2) "-5" 2) 1) "1633953837863-0" 2) 1) "money" 2) "-5" 3) "money" 4) "+10"
- xgroup [CREATE key groupname id-or-$ ] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
# 创建消费mq队列的group1组,从ID 0-0开始消费。 127.0.0.1:6379> xgroup create mq group1 0-0 OK
- xreadgroup GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]
# group1组的consumer1开始从0-0消费mq队列 127.0.0.1:6379> xreadgroup GROUP group1 consumer1 STREAMS mq 0-0 1) 1) "mq" 2) 1) 1) "1633939083516-0" 2) 1) "money" 2) "-5" 2) 1) "1633953837863-0" 2) 1) "money" 2) "-5" 3) "money" 4) "+10"
- xpending key group [start end count] [consumer]
# 查询当前读取但为确认消费的消息 127.0.0.1:6379> xpending mq group1 - + 2 consumer1 1) 1) "1633939083516-0" 2) "consumer1" 3) (integer) 672137 4) (integer) 1 2) 1) "1633953837863-0" 2) "consumer1" 3) (integer) 672137 4) (integer) 1
- xack key group ID [ID …]
# 确认1633939083516-0消息 127.0.0.1:6379> xack mq group1 1633939083516-0 (integer) 1 127.0.0.1:6379> xpending mq group1 - + 2 consumer1 1) 1) "1633953837863-0" 2) "consumer1" 3) (integer) 840477 4) (integer) 1
存在问题
- Redis5.0之后版本开始支持。
- 消息队列自身基于redis,依然存在数据丢失风险。
- 轻量级消息队列,适用于丢失数据不敏感业务(发短信,发通知)。
这篇关于Redis 消息队列的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-12-24Redis资料:新手入门快速指南
- 2024-12-24Redis资料:新手入门教程与实践指南
- 2024-12-24Redis资料:新手入门教程与实践指南
- 2024-12-07Redis高并发入门详解
- 2024-12-07Redis缓存入门:新手必读指南
- 2024-12-07Redis缓存入门:新手必读教程
- 2024-12-07Redis入门:新手必备的简单教程
- 2024-12-07Redis入门:新手必读的简单教程
- 2024-12-06Redis入门教程:从安装到基本操作
- 2024-12-06Redis缓存入门教程:轻松掌握缓存技巧