【并发编程】支持按优先级排序的无界阻塞队列PriorityBlockingQueue
2022/2/4 22:12:24
本文主要是介绍【并发编程】支持按优先级排序的无界阻塞队列PriorityBlockingQueue,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
什么是PriorityBlockingQueue
- PriorityBlockingQueue是一个无界的基于数组的优先级阻塞队列。
- 数组的默认长度是11,虽然指定了数组的长度,但是可以无限的扩充,直到资源消耗尽为止。
- 每次出队都返回优先级别最高的或者最低的元素。
- 默认情况下元素采用自然顺序升序排序,当然我们也可以通过构造函数来指定Comparator来对元素进行排序。
- PriorityBlockingQueue不能保证同优先级元素的顺序。
PriorityBlockingQueue的使用场景
- 抢购活动,会员级别高的用户优先抢购到商品
- 银行办理业务,vip客户插队
PriorityBlockingQueue的特点
- 优先级高的先出队,优先级低的后出队。
- 使用的数据结构是数组+二叉堆:默认容量11,可指定初始容量,会自动扩容,最大容量是(Integer.MAX_VALUE - 8)
- 使用的锁是是ReentrantLock,存取是同一把锁
PriorityBlockingQueue的入队出队逻辑
- 入队:无界队列不阻塞。根据比较器进行堆化(排序)自下而上。传入比较器对象就按照比较器的顺序排序,如果比较器为 null,则按照自然顺序排序。
- 出队:阻塞对象NotEmpty,队列为空时阻塞。优先级最高的元素在堆顶(弹出堆顶元素)。弹出前比较两个子节点再进行堆化(自上而下)。
二叉堆与数组
- 完全二叉树:除了最后一行,其他行都满的二叉树,而且最后一行所有叶子节点都从左向右开始排序。
- 二叉堆:完全二叉树的基础上,加以一定的条件约束的一种特殊的二叉树。
- 大顶堆(最大堆):父结点的键值总是大于或等于任何一个子节点的键值。
- 小顶堆(最小堆):父结点的键值总是小于或等于任何一个子节点的键值。
- 二叉堆与数组的转换规则
// 当前节点为i int i; // 当前节点的父节点计算 int parent = (i -1 )/ 2 ; // 左面的子节点计算 int leftChild = 2 * i + 1; // 右面的子节点计算 int rightChild = 2 * i + 2;
PriorityBlockingQueue的使用方式
// 创建优先级阻塞队列 Comparator为null,自然排序 PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<Integer>(5); // 自定义Comparator PriorityBlockingQueue<Integer> queue1 = new PriorityBlockingQueue<Integer>(5, new Comparator<Integer>() { @Override public int compare(Integer o1, Integer o2) { return o2 - o1; } });
PriorityBlockingQueue的构造方法源码分析
/** * 无参构造,直接调用俩个参数的构造 */ public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); } /** * 一个参数的构造,直接调用俩个参数的构造 * initialCapacity:数组的大小 */ public PriorityBlockingQueue(int initialCapacity) { this(initialCapacity, null); } /** * 俩个参数的构造 * initialCapacity:数组的大小 * comparator:排序规则 */ public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { // 数组长度小于0,抛异常 if (initialCapacity < 1) throw new IllegalArgumentException(); // 初始化锁 this.lock = new ReentrantLock(); // 初始化无数据等待的条件队列 this.notEmpty = lock.newCondition(); // 设置排序规则 this.comparator = comparator; // 初始化数组 this.queue = new Object[initialCapacity]; } /** * 需要初始化数组的构造方法 */ public PriorityBlockingQueue(Collection<? extends E> c) { // 初始化锁 this.lock = new ReentrantLock(); // 初始化无数据等待的条件队列 this.notEmpty = lock.newCondition(); // 一个标记:如果不知道堆的顺序,则为true boolean heapify = true; // true if not known to be in heap order // 一个标记:如果必须筛选空值,则为true boolean screen = true; // true if must screen for nulls // 判断集合是否实现了SortedSet(一个可自动为元素排序的集合) if (c instanceof SortedSet<?>) { // 数据强转 SortedSet<? extends E> ss = (SortedSet<? extends E>) c; // 设置排序规则 this.comparator = (Comparator<? super E>) ss.comparator(); // 标记已经知道如何排序了 heapify = false; } // 传入的集合是当前这种类型 else if (c instanceof PriorityBlockingQueue<?>) { // 强转! PriorityBlockingQueue<? extends E> pq = (PriorityBlockingQueue<? extends E>) c; // 设置排序规则 this.comparator = (Comparator<? super E>) pq.comparator(); // 不筛选空的值:由于PriorityBlockingQueue本身不容许空的值,所以传入的集合中肯定没有空值 screen = false; // 如果类型相同:说明元素已经是有序的 if (pq.getClass() == PriorityBlockingQueue.class) // exact match heapify = false; } // 元素转数组 Object[] a = c.toArray(); // 得到数组的长度 int n = a.length; // If c.toArray incorrectly doesn't return Object[], copy it. // 如果集合c转化为数组不是Object数组,就将其转化为Object数组 if (a.getClass() != Object[].class) a = Arrays.copyOf(a, n, Object[].class); // 数组中可能有空值,进行筛选。 // 数组只有一个元素的时候(不存在顺序),或者排序器存在的时候才会进行筛选 if (screen && (n == 1 || this.comparator != null)) { // 原理数组中的每个元素 for (int i = 0; i < n; ++i) // 元素为null,抛异常! if (a[i] == null) throw new NullPointerException(); } // 将数组赋值到PriorityBlockingQueue的实际数组中 this.queue = a; // 赋值数组长度 this.size = n; // 堆的顺序可能不对的时候,进行数组建堆 if (heapify) // 数组建堆:基于堆的排序算法 heapify(); }
PriorityBlockingQueue的入队方法源码分析
/** * PriorityBlockingQueue的入队方法 */ public void put(E e) { // 不需要阻塞,直接调用offer方法 offer(e); // never need to block } /** * 有返回值的入队方法 */ public boolean offer(E e) { // 入队元素为null,抛异常 if (e == null) throw new NullPointerException(); // 得到当前队列的锁 final ReentrantLock lock = this.lock; // 加锁 lock.lock(); // 定义俩个变量:n是size的长度 cap是数组的长度 int n, cap; // 定义临时一个数组 Object[] array; // size的长度大于数组的长度 while ((n = size) >= (cap = (array = queue).length)) // 尝试扩容 tryGrow(array, cap); try { // 得到排序器 Comparator<? super E> cmp = comparator; // 将当前节点放在应该放的二叉堆上的位置。这里进行了排序! if (cmp == null) siftUpComparable(n, e, array); else siftUpUsingComparator(n, e, array, cmp); // 长度加一 size = n + 1; // 消费者线程条件队列转同步队列 notEmpty.signal(); } finally { // 唤醒真正的消费者线程 lock.unlock(); } // 返回入队成功 return true; } /** * 扩容方法 */ private void tryGrow(Object[] array, int oldCap) { // 释放锁:防止阻塞出队的操作 lock.unlock(); // must release and then re-acquire main lock // 定义新的数组 Object[] newArray = null; // 通过CAS的方式去获取锁,执行下面的代码 if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { try { // 新的长度:长度小于64每次+2,大于64每次变为1.5倍 int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : // grow faster if small (oldCap >> 1)); // 扩容后超过最大容量 if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow // 旧的长度加一 int minCap = oldCap + 1; // 旧的长度加一就超过了最大的长度,抛异常! if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); // 直接设置为最大的长度 newCap = MAX_ARRAY_SIZE; } // 新的长度大于旧的长度并且没有其他线程改原有的长度 if (newCap > oldCap && queue == array) // 新数组赋值 newArray = new Object[newCap]; } finally { // 扩容完成,交给其他线程处理 allocationSpinLock = 0; } } // 另一个线程正在分配,释放时间片 if (newArray == null) // back off if another thread is allocating Thread.yield(); // 加锁 lock.lock(); // 新数组不为空(获取过扩容机制)并且数组没有改变 if (newArray != null && queue == array) { // 新数组直接执行 queue = newArray; // 复制数据 System.arraycopy(array, 0, newArray, 0, oldCap); } }
PriorityBlockingQueue的出队方法源码分析
/** * PriorityBlockingQueue的出队方法 */ public E take() throws InterruptedException { // 获取当前队列的锁对象 final ReentrantLock lock = this.lock; // 获取锁操作:优先响应中断 lock.lockInterruptibly(); // 定义结果变量 E result; try { // 出队的数据为null,进行等待 while ( (result = dequeue()) == null) notEmpty.await(); } finally { // 释放锁,唤醒下一个线程 lock.unlock(); } // 返回出队的元素 return result; } /** * 出队 */ private E dequeue() { // 长度减一 int n = size - 1; // 长度小于0,直接返回null,说明没有元素了 if (n < 0) return null; else { // 定义新的数组 Object[] array = queue; // 得到第一个元素(二叉堆的最顶级节点) E result = (E) array[0]; // 得到最后一个节点 E x = (E) array[n]; // 最后一个节点变为null array[n] = null; // 得到排序器 Comparator<? super E> cmp = comparator; // 将最后一个节点放在应该放的二叉堆上的位置。线放到头结点,然后进行排序 if (cmp == null) siftDownComparable(0, x, array, n); else siftDownUsingComparator(0, x, array, n, cmp); // 长度变化 size = n; // 返回出队的具体元素 return result; } }
结束语
- 获取更多本文的前置知识文章,以及新的有价值的文章,让我们一起成为架构师!
- 关注公众号,可以让你对MySQL有非常深入的了解
- 关注公众号,每天持续高效的了解并发编程!
- 关注公众号,后续持续高效的了解spring源码!
- 这个公众号,无广告!!!每日更新!!!
这篇关于【并发编程】支持按优先级排序的无界阻塞队列PriorityBlockingQueue的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2025-01-04BOT+EPC模式在基础设施项目中的应用与优势
- 2025-01-03用LangChain构建会检索和搜索的智能聊天机器人指南
- 2025-01-03图像文字理解,OCR、大模型还是多模态模型?PalliGema2在QLoRA技术上的微调与应用
- 2025-01-03混合搜索:用LanceDB实现语义和关键词结合的搜索技术(应用于实际项目)
- 2025-01-03停止思考数据管道,开始构建数据平台:介绍Analytics Engineering Framework
- 2025-01-03如果 Azure-Samples/aks-store-demo 使用了 Score 会怎样?
- 2025-01-03Apache Flink概述:实时数据处理的利器
- 2025-01-01使用 SVN合并操作时,怎么解决冲突的情况?-icode9专业技术文章分享
- 2025-01-01告别Anaconda?试试这些替代品吧
- 2024-12-31自学记录鸿蒙API 13:实现人脸比对Core Vision Face Comparator