【并发编程】基于优先级队列实现的无界阻塞队列DelayQueue
2022/2/4 22:12:27
本文主要是介绍【并发编程】基于优先级队列实现的无界阻塞队列DelayQueue,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
DelayQueue是什么
- DelayQueue 是一个支持延时获取元素的阻塞队列。
- 内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口;
- 在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素。
DelayQueue的使用场景
- 商城订单超时关闭。
- 异步短信通知功能。
- 关闭空闲连接。服务器中,有很多客户端的连接,空闲一段时间之后需要关闭。
- 缓存过期清除。缓存中的对象,超过了存活时间,需要从缓存中移出。
- 任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。
DelayQueue的特点
- 不是先进先出,而是会按照延迟时间的长短来排序,下一个即将执行的任务会排到队列的最前面。
- 使用的锁:ReentrantLock。
- 使用的数据结构:PriorityQueue。与PriorityBlockingQueue类似,不过没有阻塞功能。
- 阻塞的对象:Condition available
DelayQueue的入队出队逻辑
- 入队:不阻塞,无界队列,与优先级队列入队相同,available。
- 出队:为空时阻塞。不为空时检查堆顶元素过期时间,小于等于0则出队,大于0,说明没过期,则阻塞。
DelayQueue的数据结构源码分析
// 用于保证队列操作的线程安全 private final transient ReentrantLock lock = new ReentrantLock(); // 优先级队列,存储元素,用于保证延迟低的优先执行 private final PriorityQueue<E> q = new PriorityQueue<E>(); // 用于标记当前是否有线程在排队(仅用于取元素时):leader指向的是第一个从队列获取元素阻塞的线程 private Thread leader = null; // 条件,用于表示现在是否有可取的元素:当新元素到达,或新线程可能需要成为leader时被通知 private final Condition available = lock.newCondition();
DelayQueue的构造方法源码分析
/** * 无参构造,什么也不做 */ public DelayQueue() {} /** * 有数据需要初始化的构造方法。 */ public DelayQueue(Collection<? extends E> c) { this.addAll(c); } /** * 添加全部元素 */ public boolean addAll(Collection<? extends E> c) { // 添加的元素为空,抛异常 if (c == null) throw new NullPointerException(); // 传入的是当前对象,抛异常 if (c == this) throw new IllegalArgumentException(); // 定义修改的标志 boolean modified = false; for (E e : c) // 添加元素 if (add(e)) // 添加成功,修改标志 modified = true; // 返回修改的标志 return modified; }
DelayQueue的入队方法源码分析
/** * DelayQueue的入队方法 */ public void put(E e) { offer(e); } /** * 有返回值的入队方法 */ public boolean offer(E e) { // 得到当前队列的锁 final ReentrantLock lock = this.lock; // 获取锁 lock.lock(); try { // 调用优先级队列的offer操作 q.offer(e); // 调用优先级队列的窥探操作:入队的元素位于队列头部,说明当前元素延迟最小 if (q.peek() == e) { // 将 leader 置空 leader = null; // available条件队列转同步队列,准备唤醒阻塞在available上的线程 available.signal(); } // 返回入队成功 return true; } finally { // 解锁,真正唤醒阻塞的线程 lock.unlock(); } }
DelayQueue的出队方法源码分析
/** * DelayQueue的出队方法 */ public E take() throws InterruptedException { // 得到当前队列的锁 final ReentrantLock lock = this.lock; // 获取锁操作:优先响应中断 lock.lockInterruptibly(); try { for (;;) { // 取出堆顶元素( 最早过期的元素,但是不弹出对象) E first = q.peek(); // 如果堆顶元素为空,说明队列中还没有元素,直接阻塞等待 if (first == null) available.await(); else { // 得到堆顶元素的到期时间 long delay = first.getDelay(NANOSECONDS); if (delay <= 0) // 如果小于0说明已到期,直接调用poll()方法弹出堆顶元素 return q.poll(); // 下面的逻辑是delay大于0 ,要阻塞了 // 将first置为空方便gc first = null; // don't retain ref while waiting // 如果有线程争抢的Leader线程,则进行无限期等待。 if (leader != null) available.await(); else { // 如果leader为null,把当前线程赋值给它 Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 等待剩余等待时间 available.awaitNanos(delay); } finally { // 如果leader还是当前线程就把它置为空,让其它线程有机会获取元素 if (leader == thisThread) leader = null; } } } } } finally { // 成功出队后,如果leader为空且堆顶还有元素,就唤醒下一个等待的线程 if (leader == null && q.peek() != null) // available条件队列转同步队列,准备唤醒阻塞在available上的线程 available.signal(); // 解锁,真正唤醒阻塞的线程 lock.unlock(); } }
结束语
- 获取更多本文的前置知识文章,以及新的有价值的文章,让我们一起成为架构师!
- 关注公众号,可以让你对MySQL有非常深入的了解
- 关注公众号,每天持续高效的了解并发编程!
- 关注公众号,后续持续高效的了解spring源码!
- 这个公众号,无广告!!!每日更新!!!
这篇关于【并发编程】基于优先级队列实现的无界阻塞队列DelayQueue的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-09-10RabbitMQ教程:初学者指南
- 2024-09-10RabbitMQ教程:初学者指南
- 2024-09-01Kafka事务实现原理
- 2024-08-09KubeSphere 部署 Kafka 集群实战指南
- 2024-07-24百行代码实现 Kafka 运行在 S3 之上
- 2024-07-18如何使用观测云监测 AutoMQ 集群状态
- 2024-07-18活动回顾 | AutoMQ 联合 GreptimeDB 共同探讨新能源汽车数据基础设施
- 2024-07-15AutoMQ vs Kafka: 来自小红书的独立深度评测与对比
- 2024-07-15AutoMQ 生态集成 Kafdrop-ui
- 2024-07-15AutoMQ 与蚂蚁数科达成战略合作