redis实现延时队列的两种方式
2021/8/3 19:09:26
本文主要是介绍redis实现延时队列的两种方式,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
背景
项目中的流程监控,有几种节点,需要监控每一个节点是否超时。按传统的做法,肯定是通过定时任务,去扫描然后判断,但是定时任务有缺点:1,数据量大会慢;2,时间不好控制,太短,怕一次处理不完,太长状态就会有延迟。所以就想到用延迟队列的方式去实现。
一,redis的过期key监控
1,开启过期key监听
在redis的配置里把这个注释去掉
notify-keyspace-events Ex
然后重启redis
2,使用redis过期监听实现延迟队列
继承KeyExpirationEventMessageListener类,实现父类的方法,就可以监听key过期时间了。当有key过期,就会执行这里。这里就把需要的key过滤出来,然后发送给kafka队列。
@Component @Slf4j public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener { @Autowired private KafkaProducerService kafkaProducerService; public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) { super(listenerContainer); } /** * 针对 redis 数据失效事件,进行数据处理 * @param message * @param pattern */ @Override public void onMessage(Message message, byte[] pattern){ if(message == null || StringUtils.isEmpty(message.toString())){ return; } String content = message.toString(); //key的格式为 flag:时效类型:运单号 示例如下 try { if(content.startsWith(AbnConstant.EMS)){ kafkaProducerService.sendMessageSync(TopicConstant.EMS_WAYBILL_ABN_QUEUE,content); }else if(content.startsWith(AbnConstant.YUNDA)){ kafkaProducerService.sendMessageSync(TopicConstant.YUNDA_WAYBILL_ABN_QUEUE,content); } } catch (Exception e) { log.error("监控过期key,发送kafka异常,",e); } } }
可以看的出来,这种方式其实是很简单的,但是有几个问题需要注意,一是,这个尽量单机运行,因为多台机器都会执行,浪费cpu,增加数据库负担。二是,机器频繁部署的时候,如果有时间间隔,会出现数据的漏处理。
二,redis的zset实现延迟队列
1,生产者实现
可以看到生产者很简单,其实就是利用zset的特性,给一个zset添加元素而已,而时间就是它的score。
public void produce(Integer taskId, long exeTime) { System.out.println("加入任务, taskId: " + taskId + ", exeTime: " + exeTime + ", 当前时间:" + LocalDateTime.now()); RedisOps.getJedis().zadd(RedisOps.key, exeTime, String.valueOf(taskId)); }
2,消费者实现
消费者的代码也不难,就是把已经过期的zset中的元素给删除掉,然后处理数据。
public void consumer() { Executors.newSingleThreadExecutor().submit(new Runnable() { @Override public void run() { while (true) { Set<String> taskIdSet = RedisOps.getJedis().zrangeByScore(RedisOps.key, 0, System.currentTimeMillis(), 0, 1); if (taskIdSet == null || taskIdSet.isEmpty()) { System.out.println("没有任务"); } else { taskIdSet.forEach(id -> { long result = RedisOps.getJedis().zrem(RedisOps.key, id); if (result == 1L) { System.out.println("从延时队列中获取到任务,taskId:" + id + " , 当前时间:" + LocalDateTime.now()); } }); } try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } }); }
可以看到这种方式其实是比上个方式要好的。因为,他的那两个缺点都被克服掉了。多台机器也没事儿,也不用再担心部署时间间隔长的问题。
这篇关于redis实现延时队列的两种方式的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2025-01-02阿里云Redis项目实战入门教程
- 2025-01-02阿里云Redis资料入门详解
- 2024-12-30阿里云Redis教程:新手入门指南
- 2024-12-27阿里云Redis学习入门指南
- 2024-12-27阿里云Redis入门详解:轻松搭建与管理
- 2024-12-27阿里云Redis学习:新手入门指南
- 2024-12-24Redis资料:新手入门快速指南
- 2024-12-24Redis资料:新手入门教程与实践指南
- 2024-12-24Redis资料:新手入门教程与实践指南
- 2024-12-07Redis高并发入门详解