使用redis的zset实现简单的延时队列
2021/12/2 2:06:05
本文主要是介绍使用redis的zset实现简单的延时队列,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
一、需求切入点
在公司做的一个系统业务需要有个定时提醒的功能(数据在mysql中),要求提醒的时间差精准到分钟
解决方案有:
- 使用定时器,每分钟执行一次,查符合提醒的数据,发起提醒(数据库连接与系统的负载都承受不住的!!)
- 将待提醒数据提前查出存进redis中,根据提醒时间设置过期时间,做redis的过期监听,监听到过期的数据再做业务处理(优点 : 不用实时查数据库,一定程度上减少系统压力 缺点: 一旦系统重启或者系统出现异常,可能导致一些过期的数据没有监听到,造成数据没有推送)
- 使用一个延时队列,利用redis的zset(sort set,有序不重复集合,关联分数score进行排序),将提醒时间作为分数,提取符合条件的score对应的集合发起提醒(本文所述也是围绕这个方案)
二、延时队列的基本操作流程- 基本流程图
- 代码实现
生产者,只关心数据进队列
- 基本流程图
public class MessageProvider { // 延时队列的服务 通过这个bean来统一管理数据 private final DelayingQueueService delayingQueueService; private static String APPOINTMENT_REMIND = "APPOINTMENT_REMIND"; /** * 往队列中添加消息 * @param messageContent */ public void sendMessage(String messageContent, long delay) { try { //业务代码 …… // 将分装好的数据写进队列 delayingQueueService.push(queueMessage); } } catch (Exception e) { e.printStackTrace(); } } /** * 撤回消息 业务延伸 * @param members */ public void withdrawMessage(Long members){ delayingQueueService.removeByMembersId(members); } }
消费者,只关心需要消费的数据
// 从延伸队列拉取符合消费的数据 List<QueueMessage> msgList = delayingQueueService.pull(); ``` msgList.stream().forEach(msg -> { // 拿出已经到期的预约提示 发起提醒 if (current >= msg.getDelayTime()) { try { // 进行业务消费 …… //成功消费后移除消息 delayingQueueService.remove(msg); } ```
延时队列实现
public class DelayingQueueService { private static ObjectMapper mapper = Jackson2ObjectMapperBuilder.json().build(); //key:membersId value:message private static ConcurrentHashMap <Long,String> membersMap= new ConcurrentHashMap<>(); private final StringRedisTemplate redisTemplate; /** * 可以不同业务用不同的key */ public static final String QUEUE_NAME = "message:queue"; /** * 锁key */ public static final String LOCK_KEY="message_lock_key"; /** * 插入消息 * * @param queueMessage * @return */ @SneakyThrows public Boolean push(QueueMessage queueMessage) { String messageStr = mapper.writeValueAsString(queueMessage); Boolean addFlag = redisTemplate.opsForZSet().add(QUEUE_NAME, messageStr, queueMessage.getDelayTime()); membersMap.put(membersId,messageStr); return addFlag; } /** * 移除消息 * * @param queueMessage * @return */ @SneakyThrows public Boolean remove(QueueMessage queueMessage) { Long remove = redisTemplate.opsForZSet().remove(QUEUE_NAME, mapper.writeValueAsString(queueMessage)); if(remove>0){ membersMap.remove(membersId); } return remove > 0 ? true : false; } /** * 拉取最新需要 * 被消费的消息 * rangeByScore 根据score范围获取 0-当前时间戳可以拉取当前时间及以前的需要被消费的消息 * * @return */ public List<QueueMessage> pull() { List<QueueMessage> msgList =new ArrayList<>(); try { Set<String> strings = redisTemplate.opsForZSet().rangeByScore(QUEUE_NAME, 0, System.currentTimeMillis()); if (strings == null) { return null; } msgList = strings.stream().map(msg -> { QueueMessage message = null; try { message = mapper.readValue(msg, QueueMessage.class); } catch (JsonProcessingException e) { e.printStackTrace(); } return message; }).collect(Collectors.toList()); } catch (Exception e) { log.error(e.toString()); } return msgList; } //获得锁 public Boolean getLock(){ boolean lock = false; //获得锁 lock = redisTemplate.opsForValue().setIfAbsent(LOCK_KEY,QUEUE_NAME+"is locking !",30, TimeUnit.SECONDS); return lock; } public void releaseLock(){ redisTemplate.delete(LOCK_KEY); } }
三、结束语
本文所述的方法也是存在一些小的缺点,比如,数据的正常操作依赖于第三方组件,如果redis挂掉了,这个服务就down掉了,实现延时队列的方法有很多种,基于业务与系统本身的情况,兼容利弊去做一些取舍,以达到最好的效果
这篇关于使用redis的zset实现简单的延时队列的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-18Redis安装入门:新手必读指南
- 2024-11-08阿里云Redis项目实战入门教程
- 2024-11-08阿里云Redis资料:新手入门与初级使用指南
- 2024-11-08阿里云Redis教程:新手入门及实用指南
- 2024-11-07阿里云Redis学习入门:新手必读指南
- 2024-11-07阿里云Redis学习入门:从零开始的操作指南
- 2024-11-07阿里云Redis学习:初学者指南
- 2024-11-06阿里云Redis入门教程:轻松搭建与使用指南
- 2024-11-02Redis项目实战:新手入门教程
- 2024-10-22Redis入门教程:轻松掌握数据存储与操作