redisson延迟队列解决延迟任务
2022/10/2 14:18:18
本文主要是介绍redisson延迟队列解决延迟任务,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
场景:
针对客户端提交的合成视频任务,按照提交时间延迟60秒进行执行
方案甄选:
1、使用redis的Java扩展库redisson提供的RBlocingQueue + RDelayDeque来实现
2、使用类似kafka时间轮的方式实现延迟队列
经过在本地测试发现,第一种实现起来工作量要小一些,易于操作,也易于理解;第二种借鉴kafka的时间轮方式,执行延迟任务时间更加精确,但是操作成本要高一些;如果对执行任务时间精度没有太高要求,可以直接选择第一种redisson的方式;
代码示例:
向RDelayedDueue队列存入要进行延迟处理的任务对象:
@Slf4j @Component public class SynthesisResultConsumer<T> { @Autowired private SynthesisResultComponent synthesisResultComponent; @Autowired private RDelayedQueue<SynthesisResultQryDto> delayedQueue; private static final int RETRY_MAX = 60; @Async public void accept(SynthesisResultQryDto qryDto) { qryDto.setRetryNum(qryDto.getRetryNum() + 1); SynthesisResultDto resultDto = synthesisResultComponent.getSynthesisResult(qryDto.getVideoId()); //合成结束 更新返回结果 if (resultDto.getIsEnd()) { synthesisResultComponent.updateSynthesisResult(resultDto); log.info("accept 查询合成视频结果 结果已出,videoId:{}", qryDto.getVideoId()); } else if (qryDto.getRetryNum() >= RETRY_MAX) { log.error("accept 查询合成视频结果 超过60次,请检查是否存异常,videoId:{}", qryDto.getVideoId()); } else { log.warn("accept 查询合成视频结果 本次查询未得到结果加入队列等待下次执行,videoId:{}", qryDto.getVideoId()); delayedQueue.offer(qryDto, 60, TimeUnit.SECONDS); } } }
从RBlockingQueue阻塞队列中取出到期的任务执行:
@Slf4j @Component public class SynthesisResultDelayJob { @Autowired private SynthesisResultConsumer<SynthesisResultQryDto> synthesisResultConsumer; @Autowired private RBlockingDeque<SynthesisResultQryDto> blockingDeque; @PostConstruct public void listen() { new Thread(() -> { while (true) { try { log.info("listen 本次监听时间:{}", DateUtil.date2String(new Date(), "yyyy-MM-dd HH:mm:ss")); SynthesisResultQryDto dto = blockingDeque.take(); log.info("listen 从队列中获取需要查询结果的合成视频任务信息:{}", JSON.toJSONString(dto)); synthesisResultConsumer.accept(dto); } catch (InterruptedException e) { log.error("listen InterruptedException,error msg:{}", ExceptionUtils.getMessage(e)); } } }).start(); } }
配置类:
@Configuration public class SynthesisResultDelayConfig { @Autowired private RedissonClient redissonClient; @Bean("blockQueue") public RBlockingDeque<SynthesisResultQryDto> getBlockQueue() { RBlockingDeque<SynthesisResultQryDto> blockingDeque = redissonClient.getBlockingDeque("prompter:synthesis:task:result"); return blockingDeque; } @Bean("delayedQueue") public RDelayedQueue<SynthesisResultQryDto> getDelayQueue() { RBlockingDeque<SynthesisResultQryDto> blockingDeque = getBlockQueue(); RDelayedQueue<SynthesisResultQryDto> delayedQueue = redissonClient.getDelayedQueue(blockingDeque); return delayedQueue; } }
依赖:
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.11.4</version> </dependency>
上面的示例可以看出,这里的延迟队列使用了两个队列,一个是RBlockingQueue,这个队列是用来取延迟任务的;一个是RDelayedDeque,这个队列是用来存延迟任务的;offer方法中第二个参数表示任务延迟时间;
总结:
横向比较kafka时间轮的实现方案和redisson的延迟队列的实现方案,可以充分理解不同方案的优劣和使用场景,选择最终的方案要综合考虑实现成本和使用场景,最终选定最优的方案
这篇关于redisson延迟队列解决延迟任务的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-12-07Redis高并发入门详解
- 2024-12-07Redis缓存入门:新手必读指南
- 2024-12-07Redis缓存入门:新手必读教程
- 2024-12-07Redis入门:新手必备的简单教程
- 2024-12-07Redis入门:新手必读的简单教程
- 2024-12-06Redis入门教程:从安装到基本操作
- 2024-12-06Redis缓存入门教程:轻松掌握缓存技巧
- 2024-12-04Redis入门:简单教程详解
- 2024-11-29Redis开发入门教程:从零开始学习Redis
- 2024-11-27Redis入门指南:快速掌握Redis基础操作