ConcurrentLinkedQueue 源码浅析
2021/6/7 12:24:44
本文主要是介绍ConcurrentLinkedQueue 源码浅析,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
队列是一种常见的数据结构,主要特点是 FIFO,Java 为其定义了接口类:Queue,并提供了丰富的实现,有底层基于数组的[有界]队列,也有基于节点链接的无界队列,有阻塞队列,有非阻塞队列,还有并发安全的队列。
常见的队列实现的两种方式:数组、节点链接。
Java 对队列的基本实现在包:java.util 中,对并发安全实现主要存在于 Java 的 JUC 包下。
在使用 Java 的线程池工具:ThreadPoolExecutor,其使用阻塞队列来缓存任务,因为阻塞队列具备通知唤醒的功能,能够在任务添加或消耗时进行线程通知,同时保证了线程并发安全;而 JUC 中还有另外一种 Queue,是并发安全的非阻塞队列,那就是:ConcurrentLinkedQueue 。
适用场景
ConcurrentLinkedQueue 是 java 提供的一个无界非阻塞的 FIFO 队列,具备并发安全特性。适用于多线程共享访问相同的集合,要求多线程主动获取而不是线程阻塞等待通知;并且于队列的大小要求无限制,这常常是使用节点链接的形式来实现。不过,无界的场景也可能会导致内存占用过大。
底层实现(本文基于 jdk15 源码)
从名称我们可以看出来 ConcurrentLinedQueue 是基于节点链接的形式。节点的定义如下:
static final class Node<E> { volatile E item; volatile Node<E> next; Node(E item) { ITEM.set(this, item); } /** Constructs a dead dummy node. */ Node() {} void appendRelaxed(Node<E> next) { NEXT.set(this, next); } boolean casItem(E cmp, E val) { return ITEM.compareAndSet(this, cmp, val); } }
内部属性只有 item、next,是一种常见的队列节点结构,使用链接形式。
ConcurrentLinkedQueue 的算法实现基于 Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms( Maged M. Michael , Michael L. Scott),是此论文中非阻塞算法的一种修改实现。
在并发过程中,不使用任何锁如:synchronized、Lock 等,而是纯粹通过 CAS 完成。同时,修改的部分主要有:基于 JVM 的回收环境,让元素能够被自动回收,不会存在 ABA 的循环引用导致回收不了的问题;还有,提供了 remove 操作,支持内部元素删除。
添加元素 add(E)
按照前述节点链接结构,节点通过 next 链接到下一节点,而新节点的 next 总是链接到 null 中。我们可以这样实现,每次添加到队列时,从 head 开始遍历到最后一个节点,并通过 CAS(next,E),这样,我们一定程度上完成了并发的安全性(这里尚未考虑删除情况)。
boolean add(E e){ E t = head; E next; while(t != null){ if(t.next != null){ t = t.next; // 推进到下一节点 }else if(CAS(t.next,null,e)){ // CAS更新,失败则从头开始 return true; }else{ t=head; } } }
因为队列是无界的,所以此方法将会一直重试直到添加成功,并永远返回 true。这种实现虽然无锁,不过明显效率低下,所以我们在实现节点链接时,常常会引入 head、tail 指针来辅助推进,避免遍历的情况。同样地,ConcurrentLinkedQueue 使用了这种优化手段。在初始化时,队列就默认初始化了 head指针
和 tail指针
,为 dummy 节点。
public ConcurrentLinkedQueue() { head = tail = new Node<E>(); }
但是,next引用
与 tail指针
是两个不同的属性,而 CAS 在同一个时刻只能更新一个变量,如果想要确保 next引用
与 tail指针
的更新具备原子性
,又回到了需要用到锁的境地。ConcurrentLinkedQueue 是如何解决这个问题的?
答案是:延缓更新
。在 ConcurrentLinkedQueue 的注释文档中,Doug Lea 提及 LinkedTransferQueue 也同样使用了这种做法,称为:slack threshold
。在每一次插入操作之后,tail指针 没有与 next节点 一起做原子性的更新操作
。具体我们看下代码,在这里,我们模拟四个并发插入的线程:
/** * 队列为无界,所以方法永远不返回 false <br/> */ public boolean offer(E e) { final Node<E> newNode = new Node<E>(Objects.requireNonNull(e)); // p、q 前后节点 for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; if (q == null) { // 并发 CAS 控制入口 // p is last node; p 总是最后的节点(即tail) if (NEXT.compareAndSet(p, null, newNode)) { // 一次跳俩或多节点,允许失败。如果在 if 此处阻塞,将会出现 tail 滞后于 head 的情况 ✌ // (不一定是只有最后更新的 next 节点的线程才可以更新成功,也就是说更新后的 tail 不一定是最新最准确的) // 如果不准确,后续的线程依然可以通过 next 往下推进 if (p != t) // hop two nodes at a time; failure is OK TAIL.weakCompareAndSet(this, t, newNode); return true; } // Lost CAS race to another thread; re-read next } else ... } }
① 当线程一添加元素时,head=tail=new Node(),将 p(=>pointer:指针)
赋值为 tail,将 q 赋值为 p 的下一个节点,即 p=tail.next=null
,所以进入后线程一能直接 CAS 插入节点,此时 p=t=tail,所以第一个更新的线程满足 p!=t,不会更新 tail指针
。
② 并发的另外三个线程因为 CAS失败,重新进入循环。此时 q = tail.next != null
,进入第二个分支判断 p==q
,这是插入时针对并发删除的分支处理。在没有删除操作时,p 总是不等于 q,因为 p = tail; q = p.next = tail.next
else if (p == q) // 此处是与删除操作的多线程处理 // 前后节点相等,说明该节点的已经被执行过移除操作 // 节点移除后,如果此时 tail 未变更(可能阻塞了),也组织不了它被移除的命运。 // 此时,我们要做的是将指针跳到 head,一边它能够从头遍历。否则更新 tail 更好。 // We have fallen off list. If tail is unchanged, it // will also be off-list, in which case we need to // jump to head, from which all live nodes are always // reachable. Else the new tail is a better bet. p = (t != (t = tail)) ? t : head;
目前在单纯的并发插入操作中,q 总是 p 的下一个节点(next),可能为新节点,也可能为 null。在等于 null 时表示 p 为尾结点可进行插入,否则,表示尾结点已被其他线程更新,我们需要进入重试,对当前状态进行调整;直接进入第三个分支;
③ 此时并发的另外三个线程发现尾结点被更新后,它们执行操作:
else // 此处是与添加操作的多线程并发处理 // Check for tail updates after two hops. // ① 发生在添加时,q = tail.next != null(进入发现下一级节点不为 null,无法添加),通过判断进入此分支, // 并在这里 p != t ,即在未进行移除操作,将 p 赋值为 q(即移动导下一节点),而 t=tail 没有被执行,所以 tail 前于 p。 // ② 上一步骤 p=q 之后,循环回去操作,如果没有被其他线程添加,则成功执行,并通过 p != t,更新 tail 节点更新为 newNode(最后的更新节点) // ③ 如果上一步骤 p=q 之后,循环回去操作,有其他多个线程操作了,则 q==null 返回 false。然后进入 p==q 返回false(只有删除操作能导致), // 而在此方法内 t != (t=tail) 总是返回true,所以这里更新了 t 后,尝试从新的 tail 开始,即 p=t=tail,如果回去操作成功,则是在最末的 tail 更新 // 否则,重新进入 ① // 此处一直负责推进 next 节点 p = (p != t && t != (t = tail)) ? t : q;
这里有两个迷惑点:
迷惑点1:p != t
似乎不可能成立,因为一开始就赋值 p = t
。大家需要注意,当前是在一个for(;;)
操作中,这个判断条件并非是在当前此次循环中使用,所以第一次的判断总为 false
;代码执行了:
p = q
也就是说,当线程第一次发现尾结点被更新了(tail.next != null),那么它将把 p(指针)推进到下一个节点(这是推进过程)
,重新进入循环。
三个线程并发推进,重新循环,只有一个能够 CAS(tail.next,null,e)
。执行成功的线程二,执行判断:
// 这里有可能发生阻塞,或者丢失时间片 if (p != t) // hop two nodes at a time; failure is OK TAIL.weakCompareAndSet(this, t, newNode);
此时的 p 是推进的下一节点,而 t 保持不变,所以 p != t
为 true,执行 tail指针
更新。这就是 ConcurrentLinkedQueue 注释中提及的,tail指针
的更新是跳跃式更新,总是大于等于2个节点。并且有可能在判断分支后丢失时间片,未能立即执行 CAS 操作。
④ 我们将目光转移到线程三、四,它们因为与线程二争夺 CAS 失败,所以又一次回到第三个分支判断(它判断了第②依旧失败,因为重新循环后 q 总是被赋值为 p 的next 节点,q = p.next):
p = (p != t && t != (t = tail)) ? t : q;
这里我们看到了第二个疑惑点
疑惑点2:
因为 pointer 在上一次循环被推进( p=q ),所以 p != t。进入第二个条件判断:
t != (t = tail)
乍一看 t != (t = tail)
,这个表达式让人很冲动地认为应该总是返回 false。因为 t 是对象(非原生类型),同时先执行括号内表达式,t 被赋值为 tail 后与其自身比较,难道会出现 t!=t?网上有些博文直接描述为,由于此处可能发生并发导致 t 被赋值后又被修改,这是不正确的,因为 t 是一个局部变量,不可能发生并发修改情况。
但结果是:只要 tail 被更新过,即 t != tail,那么此表达式总是返回 true。
我们在 idea工具中测试:
可以看到对原生类型, idea 可以直接推断并提示永远返回 true;对于 Object 类型,查看其字节码:
原因是:Java 是基于堆栈的程序语言,对关系运算法使用的是后缀表达式法——又称逆波兰表达式法
。
表达式的表现形式分三种:
前缀表达式:计算机常用,从右向左解析,能够消除括号 中缀表达式:人类适用 后缀表达式:计算机常用,基于堆栈的程序语言常用,能够消除括号,从左向右解析
对应的上述表达式:
前缀表达式:!= t = tail t 中缀表达式:t != (t = tail) 后缀表达式:t tail t = !=
查看后缀表达式,也就是执行了:
先入栈 t 再入栈 tail 再入栈 t 取出栈顶 t、tail,执行赋值 入栈结果,假定为 x(实际值为 tail) 取出栈顶 t、x,执行比较,最终返回 true。
后缀法的使用导致,该表达式的实际执行效果如下:
t != tail; t = tail;
效果相等,不过要记住的是:先执行了赋值,只不过赋值后被存入到了变量表中,而操作数栈中已经有了旧值 t,比较总是不相等的。如果是对象,那它就是比较的引用(对象堆地址)。
好了,疑惑点解决了,回到我们判断分支处表达式:
p = (p != t && t != (t = tail)) ? t : q;
在线程三、四因抢占失败,进入比较 t != ( t = tail )
时,如果此时线程二CAS 成功并且更新了尾指针(tail指针
),那线程三、四的表达式执行就是返回 true!
此时 true && true
,结果 p = t
,我们前面说过,疑惑点二这里,实际上已经将 t 进行了赋值:t=tail
,也就是说:在程序发现 t 已经不是最新的尾结点时(被其他线程执行插入后更新了),它将更新局部变量 t 和 p 为 最新的 tail指针
,即获取最新的尾结点,又开始了跟线程一一样的循环处境,并尝试 CAS 下一节点。
什么时候 t != (t = tail)
返回 false 呢?
就是线程二成功执行了 CAS,但尚未执行 CAS(tail) 时(可能是发生了其他情况,如:系统阻塞了,时间片丢失了等等),那么线程三、四无法获取到最新的尾结点,它们只会推进到下一个节点。
这里就有可能导致,线程三、四推进到下一个节点后,线程四成功 CAS 并更新了 tail指针,然后线程二获取到时间片执行 CAS(tail) 失败。不过 tail指针
的 CAS 失败是允许的。
最后,剩下线程三又回到分支三,并发现 tail指针
被更新了,执行tail更新,连续跳过两个节点。(如果是 n 个线程,极端情况下,可连续跳跃 n 个节点。)
这两者的不同是:推进到下一个节点的单步的,而更新为 tail指针 则可能实现跨多步的优化。
这些或许就是 Doug Lea 提及的 trick
(技巧) 吧。
获取元素 poll()
ConcurrentLinkedQueue 是 FIFO 队列,所以支持从首部 poll 出节点。内部实现依旧是采用 CAS 方式,不过此方法支持返回元素 e或者 null。
截屏2021-06-02 22.38.07
从前面我们可以看到,添加操作完全针对于 tail指针
;而获取元素则将从头部节点 head 开始获取,并保证向后推进,从不关心 tail指针
,代码如下:
// 此方法没有与添加操作产生并发冲突,所以只关注删除操作 public E poll() { restartFromHead: for (;;) { // 每次循环都推进到下一节点,注意 for 循环中的 p=q,就是推进 for (Node<E> h = head, p = h, q;; p = q) { final E item; if ((item = p.item) != null && p.casItem(item, null)) { // 并发 CAS 控制入口 // Successful CAS is the linearization point // for item to be removed from this queue. if (p != h) // hop two nodes at a time // 如果 p.next == null,则说明到了尾节点,直接返回当前指针 p, // 否则赋值给 q 返回将 head 更新为 p.next(非空节点) // 如果 p.next != null,那么 head 就是一次跳两个节点,否则就是跳一个节点 updateHead(h, ((q = p.next) != null) ? q : p); return item; } else if ((q = p.next) == null) { // 并发删除操作的退出口,如果已到尾部,如果 !=null,将推进指针到下一个节点(for 循环中的 p=q) // 指向下一个节点,并判断下一节点是否为 null // 如果为 null,则直接更新头节点并返回 null(表示已经到尾部了),这是一个辅助操作,不管有没有成功(优化操作) updateHead(h, p); return null; } else if (p == q) // 与其他删除线程的并发处理,非退出口, // 已知节点被其他线程更新(无法获取到下一节点),从头开始(在 updateHead 中被操作了) continue restartFromHead; } }
我们这里假定 4个线程;并且已知 p => pointer,q 为 p 的下个节点。
① 头部节点是 dummy 节点,第一次分支判断为 false,进入分支二,执行推进到下个节点,如果下个节点不为 null,进入分支三判断 p == q
,重新进入分支一,只有线程一成功更新
② 如果此时线程一成功执行 updateHead,则 head
指针将被跳跃式更新到下个节点(下个节点不为空时)或当前指针节点(下个节点为空时):
updateHead(h, ((q = p.next) != null) ? q : p); final void updateHead(Node<E> h, Node<E> p) { // 更新 head 指针,并将原节点的 next 引用到其自身 if (h != p && HEAD.compareAndSet(this, h, p)) NEXT.setRelease(h, h); }
更新节点成功以便其他线程能获取到最新的位置进行元素获取,同时,将原 head节点的 next 引用到其自身,这是对算法的改进——使其支持回收环境的一个优化操作;同时此操作能够在后续的迭代方法中被使用,避免过期的 head 节点的迭代。
比如,我们可以先看方法 succ
,它用于获取后继节点,如果后继节点是本身,则直接更新为 head指针:
final Node<E> succ(Node<E> p) { // 推进到 p = p.next,表明 p 之前的已被移除并 self-ref // 所以直接更新 p=head(也不用再关注原来 p 之前[也可能是 head 之前]的元素) if (p == (p = p.next)) p = head; return p; }
可见,只有在初始化及下个节点为 null 时,head节点为 dummy Node,其 item 为 null,next 执行 null。
③ 节点更新成功 head指针
后,直接返回 item。
此时,其他线程在 CAS 失败后,将执行推进,因为线程一已将 原head节点的next引用到自身,所以他们将进入分支三:
else if (p == q) continue restartFromHead;
后续线程又重新进入循环,从 新head节点 开始,只不过线程 head#item 已不再是 null,可以直接争抢 CAS 执行。
④ 如果节点在更新 head指针
丢失时间片,那么其他线程争抢并推进到下个节点,并执行 CAS 和尝试更新 head指针,如果 head指针
由他们进行更新,这又是一次跳跃式更新
这里有两种情况:
- 线程一一直未得到时间片,最终
head指针
由线程二、三、四他们更新,实现了跳跃式更新,并且 head 的 item 不为 null,后续线程一更新头结点失败(这是允许的); - 线程一在其他线程 CAS 推进之后执行了
head指针
更新,此时链表上,head 又是一个 dummy 节点,item 为 null(后续其他线程更新头结点失败,这也是允许的)。
⑤ 随着节点推进,当线程并发 CAS 失败然后推进到最后一个节点时,即 线程三、四在 tail指针
所在处发生了并发 CAS。我们假定线程一、二、三在 CAS(next)成功后,准备执行 updateHead
都遇到了时间片丢失,那么线程四此时的处境是:
同样地,也是两种情况,更新 head节点或者得到 dummy(item=null)
的 head节点。
并发情况下,poll 操作会影响 add 操作。
因为有可能出现以下情况:
- 线程一、二并发添加元素(add),线程三并发获取元素(poll),线程一添加成功,线程三获取成功并返回,此时的状态图如下:
这就是 offer 方法中的分支:
else if (p == q) // 此处是与删除操作的并发处理 // 前后节点相等,说明该节点的已经被执行过移除操作 // 节点移除后,如果此时 tail 未变更(可能阻塞了,也阻止不了它被移除的命运)。 // 此时,我们要做的是将指针跳到 head,以便它能够从头遍历。否则更新 tail 就好。 // 也就是 tail 被更新了,则获取新尾开始循环,否则,从 head 开始 // We have fallen off list. If tail is unchanged, it // will also be off-list, in which case we need to // jump to head, from which all live nodes are always // reachable. Else the new tail is a better bet. p = (t != (t = tail)) ? t : head;
当线程并发 add 与 poll 操作时,可能会出现线程操作的指针节点被 poll 操作更新 next 引用到自身,此时的优化技巧是:
- 如果指针判断当前 pointer 指针与 tail指针相等,则认为没有其他线程更新 tail 指针,它是不准确的,直接使用 poll操作更新后的 head指针,从新head 开始;
- 如果指针判断当前 pointer 指针与 tail指针不相等,则认为有新线程更新了 tail指针,是的 tail指针处于较新的状态,使用 tail指针是个优化操作;
为什么认为相等时,tail指针不准确,而不相等时,tail指针只处于较新而非最新状态呢?
这是因为:tail指针是延缓更新的,它甚至可能滞后于 head指针,比如以下情况:
还有以下 tail 的序号落后于 head:
删除元素 remove(e)
删除操作时 Doug Lea 基于该算法上自行实现的支持,能够删除链表中指定元素的节点。删除操作与添加 add、出队 poll 能够并发执行,这主要也是取决于他们都是针对 item 的判断,同时对 head 的延缓更新支持。
先看下 remove 的代码:
public boolean remove(Object o) { if (o == null) return false; restartFromHead: for (;;) { for (Node<E> p = head, pred = null; p != null; ) { // 下一个节点 Node<E> q = p.next; final E item; // 刚好是删除的首节点,则尝试 casItem,失败则进行推进 if ((item = p.item) != null) { if (o.equals(item) && p.casItem(item, null)) { skipDeadNodes(pred, p, p, q); return true; } // 前继节点赋值为 p;p 进行推进 pred = p; p = q; continue; } // 对并发 poll 的支持,当并发 poll 时,节点 p 被 poll 取走 // 此时,移除操作需要跳跃到下一节点 for (Node<E> c = p;; q = p.next) { // 查看 q(下一节点)是否为 null,是则表示无法推进,需要跳过已经挂了的节点 // 如果此时 c == p,说明 p 是被移走的,只剩 pred(即尾结点) // 如果此时 c != p,说明 p 尚未被移走(c是最开始的赋值,不变,p会推进),则将 q 赋值为 p // 或者 q != null 且 q.item != null,说明下一节点正常,可以进行连接 // 将 pred -> q 连接 if (q == null || q.item != null) { pred = skipDeadNodes(pred, c, p, q); p = q; break; } // 如果 p == q,即引用到自身,则该节点为已 poll,从头开始 // 如果 p != q,推进到下一个节点 p = q if (p == (p = q)) continue restartFromHead; } } return false; } }
删除的常见示意图:
就是将前置节点链接到后继节点,中间的节点进行删除,不过由于本身 ConcurrentLinkedQueue 的特殊结构和特性,以及存在并发,需要考虑的删除情况较多,有以下情况:
没有其他并发操作时,也就是说推进是单节点推进的
- 一、推进节点记录为 p(=>pointer),下一个节点赋值为 q = p.next,当匹配到 item 的节点时,CAS 其 item 为 null,然后调用方法
skipDeadNodes
来进行节点重链接,从链上移除节点,先查看方法:
private Node<E> skipDeadNodes(Node<E> pred, Node<E> c, Node<E> p, Node<E> q) { // assert pred != c; // assert p != q; // assert c.item == null; // assert p.item == null; // 能够进入到此方法,必要的条件是以上 assert: // 其中 q 要么等于 null,则可以将其前节点 p 作为后继节点 // 要么 q != null && q.item != null,符合作为后继节点的条件 // 加入后继节点为 null,说明是尾节点, // 此时如果 c == p,表示节点已被 poll 走(或 remove 走),直接以前节点返回 // 如果 c != p,则推进到后继节点 if (q == null) { // Never unlink trailing node. if (c == p) return pred; q = p; } // 如果前置节点为 null,也就是删除的是第一个节点,则返回可以形成 dummy 节点, // return (tryCasSuccessor(pred, c, q) && (pred == null || ITEM.get(pred) != null)) ? pred : p; }
此方法主要帮我们进行跨节点连接,并且返回链接后的前置节点:可能是原来的旧pred 节点;如果 pred 为死亡节点时,意味着 pred 已不存在链表中,此时则返回 p(它可能是后继节点,也可能是当前的删除节点)。
pred:删除节点的前置节点 c:第一个死亡节点 p:最后一个死亡节点 q:删除节点的后置节点
情况①:单节点的删除,当删除的节点的后继节点为 null 时,表示删除的节点是尾结点,也就是不再需要链接。此时因为是单线程删除单个节点,外部传入的参数:c 总是等于 p:skipDeadNodes(pred, p, p, q)
,所以会直接返回 pred;
情况②:如果删除的节点不是尾结点,也就是 q != null
,将进行链接,此时:
pred:前置节点 c=p:删除节点 q:后继节点
后续调用 tryCasSuccessor(pred, c, q)
,我们查看其方法实现:
private boolean tryCasSuccessor(Node<E> pred, Node<E> c, Node<E> p) { // assert p != null; // assert c.item == null; // assert c != p; // 不是处于首节点,或首节点可能为 dummy 节点 if (pred != null) return NEXT.compareAndSet(pred, c, p); // 首节点更新并更新其 next,引用到自身 if (HEAD.compareAndSet(this, c, p)) { NEXT.setRelease(c, c); return true; } return false; }
更新链接又有两种情况:
I:如果前置节点 pred 是 null,表示删除的刚好是第一个有元素的节点,那我们直接更新首节点为 p(此时 p参数在外部传入是 q,即下一节点,因为 q != null)。因为 p 已经被移除了 item,并同时将原首节点的 next 引用进行更新
II:如果前置节点 pred 不是 null,则说明删除的不是第一个元素(或 head 节点为 dummy 节点),执行了跨节点链接,直接更新 next指针
;
III:成功更新链接后,skipDeadNode
需要将前置节点进行返回,此时如果 pred 是 null,说明是第一个节点被删除,没有前置节点,直接返回 pred(依旧为 null);如果 pred 不是 null,则判断其 item 是否为 null,不为 null 则进行返回(属于一个正常的节点),如果为 null,则返回 p,来表示表示前置节点(它们的处境相同)。
多线程并发操作数据,进行 poll 和 remove
单线程操作或者无并发情况下,总是单个元素进行删除及推进,因此在这种情况下,方法:skipDeadNodes
的参数 c 总是等于 p,因为只删除了一个节点。
而多线程并发情况下,skipDeadNodes
的参数 c 就表示第一个死亡节点,p表示最后一个死亡节点。这是因为多个删除 remove 或出队 poll 操作会导致多个节点死亡。具体代码则是 remove 的第二个for循环:
第一个循环是简单地推进节点并进行重链接的操作,针对单个节点;
第二个循环则是针对并发操作下的处理,注意此循环的退出方法只有两处:一个是推进成功并进行跨节点重链后,跳出第二个循环并回调第一个循环来执行单元素删除;另外一个则是根据条件回到方法入口,重新开始方法;
- 一、当删除线程进入,并在单节点推进过程中,发现元素的
item==null
,这意味着该元素已被其他线程提前抢占并移除,此时我们保存第一个遇到的节点,赋值到 c,并尝试推进; - 二、判断推进的下一个元素是否为 null,为 null 表示已经到达尾部,进行死亡节点的跳过重链后退出循环,如果不为 null 则表示可继续推进,并判断其 item 是否为 null:
item 为 null 表示遇到了其他并发的 poll 操作或 remove 操作;
item 不为 null 表示此区间的死亡节点已通过,需要进行跳过重链;
假定有以下场景,三个线程并发执行,线程一 remove(3)、线程二 poll、线程三 remove(2);
- 一开始 head为 dummy 节点,则直接进入第二个 for循环,线程一推进到 item=1;
- 线程二执行 poll,成功将 cas(2,null),但它未成功更新 head指针;
- 线程三执行 remove(2),从 dummy 节点推进到 2 后,对其先进行 head指针的更新 cas(head,q),成功更相信 head 指针指向 item=3 的节点,并更新原 head#next 指向其自身;
- 线程二继续推进,发现 item!= null 满足,则尝试跨节点重链,发现 head 指针已被更新,无法通过 cas(head,c,q) 来进行更新,cas 失败。此时方法:
skipDeadNodes
返回的就是指针 p,而不是 pred,但并不影响它继续推进。跳出skipDeadNodes
后,将指针 p 指向下个节点 q,q#item=3,跳出回到第一个循环,成功删除 item=3。
这里就是
skipDeadNodes
的返回处理:由于存在并发的跨节点重链,对于失败的线程,说明有其他线程完成跨链操作,就不再为此操作,将 pred 进行更新,同时与后续的 p 推进(p=q)依旧形成前后关系,这是一个新的开始,不再需要考虑之前的死亡节点问题。
private Node<E> skipDeadNodes(Node<E> pred, Node<E> c, Node<E> p, Node<E> q) { if (q == null) { // Never unlink trailing node. if (c == p) return pred; q = p; } // 如果前置节点为 null,也就是删除的是第一个节点,则返回可以形成 dummy 节点, // tryCasSuccessor 失败,则返回 p,意味着不需要此线程负责跨节点了,更新前置节点为 p(过程中的最后一个死亡节点 item=null) // tryCasSuccessor 成功,如果 pred==null,直接返回,意味着后续删除操作,可以直接根据 pred=null 继续更新 head 节点 // 如果 pred != null 且 item != null,返回最后一个死亡节点 p,让其能够在后续 remove 后,通过 next 指针重链。 // 此方法是返回一个处理后的前置节点,要么是 null 要么是 普通的 node,不管哪一个,出了此方法之后,会执行 p=q, // 让方法外部的 pred 又与 p 形成前后置关系 return (tryCasSuccessor(pred, c, q) && (pred == null || ITEM.get(pred) != null)) ? pred : p;
- 线程一、三都执行完后,线程二执行 poll 的 cas(head) 失败,因为 head 已经被 线程三进行了更新
整个 remove 操作需要考虑极多的并发情况,Doug Lea 在没有算法伪代码参考的情况下,凭实力给出的算法实现却让我掉了不少头发,真是让人想给他献下膝盖。remove 操作除了上述的实例,实际还有许多考虑的情况,如:
- 前节点是否 dummy 节点
- 删除的是否 尾结点
- poll 操作超过 remove 时等等
大概是下图的一些情况:
ConcurrentLinkedQueue 的删除操作在 JDK8 中的算法实现较目前 JDK15 的易阅读,JDK15 中的实现是 Doug Lea 对算法进行一些 refactor,因此根据有抽象及分支复杂的情况。
除此上述的 add、poll、remove 之外,ConcurrentLinkedQueue 的实现还提供了其他的 Iterator 等并发实现(也是 Doug Lea 自行实现),也是具有学习意义。但本文不再赘述(已经过长)。
总结:
- ConcurrentLinkedQueue 是基于已有算法的改进版,使其支持回收与删除操作;
- 支持回收的操作有:将原 head 节点的 next 关联到其自身;
- ConcurrentLinkedQueue 基于节点链接的形式,但不要求
tail指针
和head指针
为最新状态,支持跳跃式更新,属于优化操作; - 跨节点连接可跨多个节点;
- ConcurrentLinkedQueue 的添加只关注于节点
next
的 CAS 更新,这种做法保证了并发安全与高效; - 而除了 item 的 CAS 更新要求成功,其他的如
tail
、head
均不要求成功更新 - 存在一些 trick 的代码,如:t != (t = tail)、p == (p = q) 等
注:
在 idea 中 debug ConcurrentLinkedQueue 代码时,会因为 idea 编辑器自动调用 toString 等方法,遇到不符合认知逻辑的情况,需要更改 idea 配置来避免。
参考:
波兰表示法
逆波兰表示法
java 字节码 指令集 汇编
t != (t = tail) 可以为 true ?!
并发容器-ConcurrentLinkedQueue详解
IDEA debug ConcurrentLinkedQueue 的时候踩的坑
Java进阶——数据结构‘栈’的应用——前、中、后缀表达式
这篇关于ConcurrentLinkedQueue 源码浅析的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-12-25TypeScript基础知识详解
- 2024-12-25安卓NDK 是什么?-icode9专业技术文章分享
- 2024-12-25caddy 可以定义日志到 文件吗?-icode9专业技术文章分享
- 2024-12-25wordfence如何设置密码规则?-icode9专业技术文章分享
- 2024-12-25有哪些方法可以实现 DLL 文件路径的管理?-icode9专业技术文章分享
- 2024-12-25错误信息 "At least one element in the source array could not be cast down to the destination array-icode9专业技术文章分享
- 2024-12-25'flutter' 不是内部或外部命令,也不是可运行的程序 或批处理文件。错误信息提示什么意思?-icode9专业技术文章分享
- 2024-12-25flutter项目 as提示Cannot resolve symbol 'embedding'提示什么意思?-icode9专业技术文章分享
- 2024-12-24怎么切换 Git 项目的远程仓库地址?-icode9专业技术文章分享
- 2024-12-24怎么更改 Git 远程仓库的名称?-icode9专业技术文章分享