ConcurrentLinkedQueue 源码浅析

2021/6/7 12:24:44

本文主要是介绍ConcurrentLinkedQueue 源码浅析,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

队列是一种常见的数据结构,主要特点是 FIFO,Java 为其定义了接口类:Queue,并提供了丰富的实现,有底层基于数组的[有界]队列,也有基于节点链接的无界队列,有阻塞队列,有非阻塞队列,还有并发安全的队列。

常见的队列实现的两种方式:数组、节点链接。
image
image

Java 对队列的基本实现在包:java.util 中,对并发安全实现主要存在于 Java 的 JUC 包下。
在使用 Java 的线程池工具:ThreadPoolExecutor,其使用阻塞队列来缓存任务,因为阻塞队列具备通知唤醒的功能,能够在任务添加或消耗时进行线程通知,同时保证了线程并发安全;而 JUC 中还有另外一种 Queue,是并发安全的非阻塞队列,那就是:ConcurrentLinkedQueue 。

适用场景

ConcurrentLinkedQueue 是 java 提供的一个无界非阻塞的 FIFO 队列,具备并发安全特性。适用于多线程共享访问相同的集合,要求多线程主动获取而不是线程阻塞等待通知;并且于队列的大小要求无限制,这常常是使用节点链接的形式来实现。不过,无界的场景也可能会导致内存占用过大。

底层实现(本文基于 jdk15 源码)

从名称我们可以看出来 ConcurrentLinedQueue 是基于节点链接的形式。节点的定义如下:

static final class Node<E> {
    volatile E item;
    volatile Node<E> next;
    Node(E item) {
        ITEM.set(this, item);
    }
    /** Constructs a dead dummy node. */
    Node() {}
    void appendRelaxed(Node<E> next) {
        NEXT.set(this, next);
    }
    boolean casItem(E cmp, E val) {
        return ITEM.compareAndSet(this, cmp, val);
    }
}

内部属性只有 item、next,是一种常见的队列节点结构,使用链接形式。
ConcurrentLinkedQueue 的算法实现基于 Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms( Maged M. Michael , Michael L. Scott),是此论文中非阻塞算法的一种修改实现。

在并发过程中,不使用任何锁如:synchronized、Lock 等,而是纯粹通过 CAS 完成。同时,修改的部分主要有:基于 JVM 的回收环境,让元素能够被自动回收,不会存在 ABA 的循环引用导致回收不了的问题;还有,提供了 remove 操作,支持内部元素删除。

添加元素 add(E)

按照前述节点链接结构,节点通过 next 链接到下一节点,而新节点的 next 总是链接到 null 中。我们可以这样实现,每次添加到队列时,从 head 开始遍历到最后一个节点,并通过 CAS(next,E),这样,我们一定程度上完成了并发的安全性(这里尚未考虑删除情况)。

boolean add(E e){
    E t = head; E next;
    while(t != null){
        if(t.next != null){
            t = t.next;    // 推进到下一节点
        }else if(CAS(t.next,null,e)){    // CAS更新,失败则从头开始
            return true;
        }else{
            t=head;
        }
    }
}

因为队列是无界的,所以此方法将会一直重试直到添加成功,并永远返回 true。这种实现虽然无锁,不过明显效率低下,所以我们在实现节点链接时,常常会引入 head、tail 指针来辅助推进,避免遍历的情况。同样地,ConcurrentLinkedQueue 使用了这种优化手段。在初始化时,队列就默认初始化了 head指针tail指针,为 dummy 节点。

public ConcurrentLinkedQueue() {
    head = tail = new Node<E>();
}

image

但是,next引用tail指针 是两个不同的属性,而 CAS 在同一个时刻只能更新一个变量,如果想要确保 next引用tail指针 的更新具备原子性,又回到了需要用到锁的境地。ConcurrentLinkedQueue 是如何解决这个问题的?

答案是:延缓更新。在 ConcurrentLinkedQueue 的注释文档中,Doug Lea 提及 LinkedTransferQueue 也同样使用了这种做法,称为:slack threshold在每一次插入操作之后,tail指针 没有与 next节点 一起做原子性的更新操作。具体我们看下代码,在这里,我们模拟四个并发插入的线程:

/**
 * 队列为无界,所以方法永远不返回 false <br/>
 */
public boolean offer(E e) {
    final Node<E> newNode = new Node<E>(Objects.requireNonNull(e));
    // p、q 前后节点
    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        if (q == null) {
            // 并发 CAS 控制入口
            // p is last node; p 总是最后的节点(即tail)
            if (NEXT.compareAndSet(p, null, newNode)) {
                // 一次跳俩或多节点,允许失败。如果在 if 此处阻塞,将会出现 tail 滞后于 head 的情况 ✌
                // (不一定是只有最后更新的 next 节点的线程才可以更新成功,也就是说更新后的 tail 不一定是最新最准确的)
                // 如果不准确,后续的线程依然可以通过 next 往下推进
                if (p != t) // hop two nodes at a time; failure is OK
                    TAIL.weakCompareAndSet(this, t, newNode);
                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
        else ...
    }
}

① 当线程一添加元素时,head=tail=new Node(),将 p(=>pointer:指针) 赋值为 tail,将 q 赋值为 p 的下一个节点,即 p=tail.next=null,所以进入后线程一能直接 CAS 插入节点,此时 p=t=tail,所以第一个更新的线程满足 p!=t,不会更新 tail指针
image

② 并发的另外三个线程因为 CAS失败,重新进入循环。此时 q = tail.next != null,进入第二个分支判断 p==q,这是插入时针对并发删除的分支处理。在没有删除操作时,p 总是不等于 q,因为 p = tail; q = p.next = tail.next

else if (p == q)
    // 此处是与删除操作的多线程处理
    // 前后节点相等,说明该节点的已经被执行过移除操作
    // 节点移除后,如果此时 tail 未变更(可能阻塞了),也组织不了它被移除的命运。
    // 此时,我们要做的是将指针跳到 head,一边它能够从头遍历。否则更新 tail 更好。
    // We have fallen off list.  If tail is unchanged, it
    // will also be off-list, in which case we need to
    // jump to head, from which all live nodes are always
    // reachable.  Else the new tail is a better bet.
    p = (t != (t = tail)) ? t : head;

目前在单纯的并发插入操作中,q 总是 p 的下一个节点(next),可能为新节点,也可能为 null。在等于 null 时表示 p 为尾结点可进行插入,否则,表示尾结点已被其他线程更新,我们需要进入重试,对当前状态进行调整;直接进入第三个分支;

③ 此时并发的另外三个线程发现尾结点被更新后,它们执行操作:

else
    // 此处是与添加操作的多线程并发处理
    // Check for tail updates after two hops.
    // ① 发生在添加时,q = tail.next != null(进入发现下一级节点不为 null,无法添加),通过判断进入此分支,
    // 并在这里 p != t ,即在未进行移除操作,将 p 赋值为 q(即移动导下一节点),而 t=tail 没有被执行,所以 tail 前于 p。
    // ② 上一步骤 p=q 之后,循环回去操作,如果没有被其他线程添加,则成功执行,并通过 p != t,更新 tail 节点更新为 newNode(最后的更新节点)
    // ③ 如果上一步骤 p=q 之后,循环回去操作,有其他多个线程操作了,则 q==null 返回 false。然后进入 p==q 返回false(只有删除操作能导致),
    // 而在此方法内 t != (t=tail) 总是返回true,所以这里更新了 t 后,尝试从新的 tail 开始,即 p=t=tail,如果回去操作成功,则是在最末的 tail 更新
    // 否则,重新进入 ①
    // 此处一直负责推进 next 节点
    p = (p != t && t != (t = tail)) ? t : q;

这里有两个迷惑点:
迷惑点1:p != t 似乎不可能成立,因为一开始就赋值 p = t。大家需要注意,当前是在一个for(;;)操作中,这个判断条件并非是在当前此次循环中使用,所以第一次的判断总为 false;代码执行了:

p = q

也就是说,当线程第一次发现尾结点被更新了(tail.next != null),那么它将把 p(指针)推进到下一个节点(这是推进过程),重新进入循环。
三个线程并发推进,重新循环,只有一个能够 CAS(tail.next,null,e)。执行成功的线程二,执行判断:

// 这里有可能发生阻塞,或者丢失时间片
if (p != t) // hop two nodes at a time; failure is OK
    TAIL.weakCompareAndSet(this, t, newNode);

此时的 p 是推进的下一节点,而 t 保持不变,所以 p != t 为 true,执行 tail指针 更新。这就是 ConcurrentLinkedQueue 注释中提及的,tail指针 的更新是跳跃式更新,总是大于等于2个节点。并且有可能在判断分支后丢失时间片,未能立即执行 CAS 操作。
image
④ 我们将目光转移到线程三、四,它们因为与线程二争夺 CAS 失败,所以又一次回到第三个分支判断(它判断了第②依旧失败,因为重新循环后 q 总是被赋值为 p 的next 节点,q = p.next):

p = (p != t && t != (t = tail)) ? t : q;

这里我们看到了第二个疑惑点
疑惑点2:
因为 pointer 在上一次循环被推进( p=q ),所以 p != t。进入第二个条件判断:

t != (t = tail)

乍一看 t != (t = tail),这个表达式让人很冲动地认为应该总是返回 false。因为 t 是对象(非原生类型),同时先执行括号内表达式,t 被赋值为 tail 后与其自身比较,难道会出现 t!=t?网上有些博文直接描述为,由于此处可能发生并发导致 t 被赋值后又被修改,这是不正确的,因为 t 是一个局部变量,不可能发生并发修改情况。
但结果是:只要 tail 被更新过,即 t != tail,那么此表达式总是返回 true。

我们在 idea工具中测试:
image
可以看到对原生类型, idea 可以直接推断并提示永远返回 true;对于 Object 类型,查看其字节码:
image

原因是:Java 是基于堆栈的程序语言,对关系运算法使用的是后缀表达式法——又称逆波兰表达式法
表达式的表现形式分三种:

前缀表达式:计算机常用,从右向左解析,能够消除括号
中缀表达式:人类适用
后缀表达式:计算机常用,基于堆栈的程序语言常用,能够消除括号,从左向右解析

对应的上述表达式:

前缀表达式:!= t = tail t
中缀表达式:t != (t = tail)
后缀表达式:t tail t = !=

查看后缀表达式,也就是执行了:

先入栈 t
再入栈 tail
再入栈 t
取出栈顶 t、tail,执行赋值
入栈结果,假定为 x(实际值为 tail)
取出栈顶 t、x,执行比较,最终返回 true。

后缀法的使用导致,该表达式的实际执行效果如下:

t != tail;
t = tail;

效果相等,不过要记住的是:先执行了赋值,只不过赋值后被存入到了变量表中,而操作数栈中已经有了旧值 t,比较总是不相等的。如果是对象,那它就是比较的引用(对象堆地址)。

好了,疑惑点解决了,回到我们判断分支处表达式:

p = (p != t && t != (t = tail)) ? t : q;

在线程三、四因抢占失败,进入比较 t != ( t = tail ) 时,如果此时线程二CAS 成功并且更新了尾指针(tail指针),那线程三、四的表达式执行就是返回 true

此时 true && true,结果 p = t,我们前面说过,疑惑点二这里,实际上已经将 t 进行了赋值:t=tail,也就是说:在程序发现 t 已经不是最新的尾结点时(被其他线程执行插入后更新了),它将更新局部变量 t 和 p 为 最新的 tail指针,即获取最新的尾结点,又开始了跟线程一一样的循环处境,并尝试 CAS 下一节点。
image
什么时候 t != (t = tail) 返回 false 呢?

就是线程二成功执行了 CAS,但尚未执行 CAS(tail) 时(可能是发生了其他情况,如:系统阻塞了,时间片丢失了等等),那么线程三、四无法获取到最新的尾结点,它们只会推进到下一个节点。

这里就有可能导致,线程三、四推进到下一个节点后,线程四成功 CAS 并更新了 tail指针,然后线程二获取到时间片执行 CAS(tail) 失败。不过 tail指针的 CAS 失败是允许的。

最后,剩下线程三又回到分支三,并发现 tail指针 被更新了,执行tail更新,连续跳过两个节点。(如果是 n 个线程,极端情况下,可连续跳跃 n 个节点。)
image
这两者的不同是:推进到下一个节点的单步的,而更新为 tail指针 则可能实现跨多步的优化。
这些或许就是 Doug Lea 提及的 trick(技巧) 吧。

获取元素 poll()

ConcurrentLinkedQueue 是 FIFO 队列,所以支持从首部 poll 出节点。内部实现依旧是采用 CAS 方式,不过此方法支持返回元素 e或者 null。
截屏2021-06-02 22.38.07image
从前面我们可以看到,添加操作完全针对于 tail指针;而获取元素则将从头部节点 head 开始获取,并保证向后推进,从不关心 tail指针,代码如下:

// 此方法没有与添加操作产生并发冲突,所以只关注删除操作
public E poll() {
    restartFromHead: for (;;) {
        // 每次循环都推进到下一节点,注意 for 循环中的 p=q,就是推进
        for (Node<E> h = head, p = h, q;; p = q) {
            final E item;
            if ((item = p.item) != null && p.casItem(item, null)) {
                // 并发 CAS 控制入口
                // Successful CAS is the linearization point
                // for item to be removed from this queue.
                if (p != h) // hop two nodes at a time
                    // 如果 p.next == null,则说明到了尾节点,直接返回当前指针 p,
                    // 否则赋值给 q 返回将 head 更新为 p.next(非空节点)
                    // 如果 p.next != null,那么 head 就是一次跳两个节点,否则就是跳一个节点
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            else if ((q = p.next) == null) {
                // 并发删除操作的退出口,如果已到尾部,如果 !=null,将推进指针到下一个节点(for 循环中的 p=q)
                // 指向下一个节点,并判断下一节点是否为 null
                // 如果为 null,则直接更新头节点并返回 null(表示已经到尾部了),这是一个辅助操作,不管有没有成功(优化操作)
                updateHead(h, p);
                return null;
            }
            else if (p == q)
                // 与其他删除线程的并发处理,非退出口,
                // 已知节点被其他线程更新(无法获取到下一节点),从头开始(在 updateHead 中被操作了)
                continue restartFromHead;
        }
    }    

我们这里假定 4个线程;并且已知 p => pointer,q 为 p 的下个节点。
① 头部节点是 dummy 节点,第一次分支判断为 false,进入分支二,执行推进到下个节点,如果下个节点不为 null,进入分支三判断 p == q,重新进入分支一,只有线程一成功更新
image
② 如果此时线程一成功执行 updateHead,则 head指针将被跳跃式更新到下个节点(下个节点不为空时)或当前指针节点(下个节点为空时):

updateHead(h, ((q = p.next) != null) ? q : p);
final void updateHead(Node<E> h, Node<E> p) {
    // 更新 head 指针,并将原节点的 next 引用到其自身
    if (h != p && HEAD.compareAndSet(this, h, p))
        NEXT.setRelease(h, h);
}

image
更新节点成功以便其他线程能获取到最新的位置进行元素获取,同时,将原 head节点的 next 引用到其自身,这是对算法的改进——使其支持回收环境的一个优化操作;同时此操作能够在后续的迭代方法中被使用,避免过期的 head 节点的迭代。

比如,我们可以先看方法 succ,它用于获取后继节点,如果后继节点是本身,则直接更新为 head指针:

final Node<E> succ(Node<E> p) {
    // 推进到 p = p.next,表明 p 之前的已被移除并 self-ref
    // 所以直接更新 p=head(也不用再关注原来 p 之前[也可能是 head 之前]的元素)
    if (p == (p = p.next))
        p = head;
    return p;
}

可见,只有在初始化及下个节点为 null 时,head节点为 dummy Node,其 item 为 null,next 执行 null。
③ 节点更新成功 head指针 后,直接返回 item。

此时,其他线程在 CAS 失败后,将执行推进,因为线程一已将 原head节点的next引用到自身,所以他们将进入分支三:

else if (p == q)
    continue restartFromHead;

后续线程又重新进入循环,从 新head节点 开始,只不过线程 head#item 已不再是 null,可以直接争抢 CAS 执行。

④ 如果节点在更新 head指针 丢失时间片,那么其他线程争抢并推进到下个节点,并执行 CAS 和尝试更新 head指针,如果 head指针 由他们进行更新,这又是一次跳跃式更新
image
这里有两种情况:

  • 线程一一直未得到时间片,最终 head指针 由线程二、三、四他们更新,实现了跳跃式更新,并且 head 的 item 不为 null,后续线程一更新头结点失败(这是允许的);
  • 线程一在其他线程 CAS 推进之后执行了 head指针 更新,此时链表上,head 又是一个 dummy 节点,item 为 null(后续其他线程更新头结点失败,这也是允许的)。

⑤ 随着节点推进,当线程并发 CAS 失败然后推进到最后一个节点时,即 线程三、四在 tail指针所在处发生了并发 CAS。我们假定线程一、二、三在 CAS(next)成功后,准备执行 updateHead 都遇到了时间片丢失,那么线程四此时的处境是:
image
同样地,也是两种情况,更新 head节点或者得到 dummy(item=null) 的 head节点。

并发情况下,poll 操作会影响 add 操作。
因为有可能出现以下情况:

  • 线程一、二并发添加元素(add),线程三并发获取元素(poll),线程一添加成功,线程三获取成功并返回,此时的状态图如下:

image
这就是 offer 方法中的分支:

else if (p == q)
    // 此处是与删除操作的并发处理
    // 前后节点相等,说明该节点的已经被执行过移除操作
    // 节点移除后,如果此时 tail 未变更(可能阻塞了,也阻止不了它被移除的命运)。
    // 此时,我们要做的是将指针跳到 head,以便它能够从头遍历。否则更新 tail 就好。
    // 也就是 tail 被更新了,则获取新尾开始循环,否则,从 head 开始
    // We have fallen off list.  If tail is unchanged, it
    // will also be off-list, in which case we need to
    // jump to head, from which all live nodes are always
    // reachable.  Else the new tail is a better bet.
    p = (t != (t = tail)) ? t : head;

当线程并发 add 与 poll 操作时,可能会出现线程操作的指针节点被 poll 操作更新 next 引用到自身,此时的优化技巧是:

  • 如果指针判断当前 pointer 指针与 tail指针相等,则认为没有其他线程更新 tail 指针,它是不准确的,直接使用 poll操作更新后的 head指针,从新head 开始;
  • 如果指针判断当前 pointer 指针与 tail指针不相等,则认为有新线程更新了 tail指针,是的 tail指针处于较新的状态,使用 tail指针是个优化操作;

为什么认为相等时,tail指针不准确,而不相等时,tail指针只处于较新而非最新状态呢?
这是因为:tail指针是延缓更新的,它甚至可能滞后于 head指针,比如以下情况:
image
还有以下 tail 的序号落后于 head:
image

删除元素 remove(e)

删除操作时 Doug Lea 基于该算法上自行实现的支持,能够删除链表中指定元素的节点。删除操作与添加 add、出队 poll 能够并发执行,这主要也是取决于他们都是针对 item 的判断,同时对 head 的延缓更新支持。
先看下 remove 的代码:

public boolean remove(Object o) {
    if (o == null) return false;
    restartFromHead: for (;;) {
        for (Node<E> p = head, pred = null; p != null; ) {
            // 下一个节点
            Node<E> q = p.next;
            final E item;
            // 刚好是删除的首节点,则尝试 casItem,失败则进行推进
            if ((item = p.item) != null) {
                if (o.equals(item) && p.casItem(item, null)) {
                    skipDeadNodes(pred, p, p, q);
                    return true;
                }
                // 前继节点赋值为 p;p 进行推进
                pred = p; p = q; continue;
            }
            // 对并发 poll 的支持,当并发 poll 时,节点 p 被 poll 取走
            // 此时,移除操作需要跳跃到下一节点
            for (Node<E> c = p;; q = p.next) {
                // 查看 q(下一节点)是否为 null,是则表示无法推进,需要跳过已经挂了的节点
                //      如果此时 c == p,说明 p 是被移走的,只剩 pred(即尾结点)
                //      如果此时 c != p,说明 p 尚未被移走(c是最开始的赋值,不变,p会推进),则将 q 赋值为 p
                // 或者 q != null 且 q.item != null,说明下一节点正常,可以进行连接
                // 将 pred -> q 连接
                if (q == null || q.item != null) {
                    pred = skipDeadNodes(pred, c, p, q); p = q; break;
                }
                // 如果 p == q,即引用到自身,则该节点为已 poll,从头开始
                // 如果 p != q,推进到下一个节点 p = q
                if (p == (p = q)) continue restartFromHead;
            }
        }
        return false;
    }
}

image
删除的常见示意图:
image
就是将前置节点链接到后继节点,中间的节点进行删除,不过由于本身 ConcurrentLinkedQueue 的特殊结构和特性,以及存在并发,需要考虑的删除情况较多,有以下情况:

没有其他并发操作时,也就是说推进是单节点推进的

  • 一、推进节点记录为 p(=>pointer),下一个节点赋值为 q = p.next,当匹配到 item 的节点时,CAS 其 item 为 null,然后调用方法 skipDeadNodes 来进行节点重链接,从链上移除节点,先查看方法:
private Node<E> skipDeadNodes(Node<E> pred, Node<E> c, Node<E> p, Node<E> q) {
    // assert pred != c;
    // assert p != q;
    // assert c.item == null;
    // assert p.item == null;
    // 能够进入到此方法,必要的条件是以上 assert:
    //      其中 q 要么等于 null,则可以将其前节点 p 作为后继节点
    //      要么 q != null && q.item != null,符合作为后继节点的条件
    // 加入后继节点为 null,说明是尾节点,
    // 此时如果 c == p,表示节点已被 poll 走(或 remove 走),直接以前节点返回
    // 如果 c != p,则推进到后继节点
    if (q == null) {
        // Never unlink trailing node.
        if (c == p) return pred;
        q = p;
    }
    // 如果前置节点为 null,也就是删除的是第一个节点,则返回可以形成 dummy 节点,
    //
    return (tryCasSuccessor(pred, c, q)
            && (pred == null || ITEM.get(pred) != null))
        ? pred : p;
}

此方法主要帮我们进行跨节点连接,并且返回链接后的前置节点:可能是原来的旧pred 节点;如果 pred 为死亡节点时,意味着 pred 已不存在链表中,此时则返回 p(它可能是后继节点,也可能是当前的删除节点)。

pred:删除节点的前置节点
c:第一个死亡节点
p:最后一个死亡节点
q:删除节点的后置节点

情况①:单节点的删除,当删除的节点的后继节点为 null 时,表示删除的节点是尾结点,也就是不再需要链接。此时因为是单线程删除单个节点,外部传入的参数:c 总是等于 p:skipDeadNodes(pred, p, p, q),所以会直接返回 pred;
image
情况②:如果删除的节点不是尾结点,也就是 q != null,将进行链接,此时:

pred:前置节点
c=p:删除节点
q:后继节点

后续调用 tryCasSuccessor(pred, c, q),我们查看其方法实现:

private boolean tryCasSuccessor(Node<E> pred, Node<E> c, Node<E> p) {
    // assert p != null;
    // assert c.item == null;
    // assert c != p;
    // 不是处于首节点,或首节点可能为 dummy 节点
    if (pred != null)
        return NEXT.compareAndSet(pred, c, p);
    // 首节点更新并更新其 next,引用到自身
    if (HEAD.compareAndSet(this, c, p)) {
        NEXT.setRelease(c, c);
        return true;
    }
    return false;
}

更新链接又有两种情况:
I:如果前置节点 pred 是 null,表示删除的刚好是第一个有元素的节点,那我们直接更新首节点为 p(此时 p参数在外部传入是 q,即下一节点,因为 q != null)。因为 p 已经被移除了 item,并同时将原首节点的 next 引用进行更新
image
II:如果前置节点 pred 不是 null,则说明删除的不是第一个元素(或 head 节点为 dummy 节点),执行了跨节点链接,直接更新 next指针
image
III:成功更新链接后,skipDeadNode 需要将前置节点进行返回,此时如果 pred 是 null,说明是第一个节点被删除,没有前置节点,直接返回 pred(依旧为 null);如果 pred 不是 null,则判断其 item 是否为 null,不为 null 则进行返回(属于一个正常的节点),如果为 null,则返回 p,来表示表示前置节点(它们的处境相同)。

多线程并发操作数据,进行 poll 和 remove

单线程操作或者无并发情况下,总是单个元素进行删除及推进,因此在这种情况下,方法:skipDeadNodes 的参数 c 总是等于 p,因为只删除了一个节点。
而多线程并发情况下,skipDeadNodes的参数 c 就表示第一个死亡节点,p表示最后一个死亡节点。这是因为多个删除 remove 或出队 poll 操作会导致多个节点死亡。具体代码则是 remove 的第二个for循环:
image
第一个循环是简单地推进节点并进行重链接的操作,针对单个节点;
第二个循环则是针对并发操作下的处理,注意此循环的退出方法只有两处:一个是推进成功并进行跨节点重链后,跳出第二个循环并回调第一个循环来执行单元素删除;另外一个则是根据条件回到方法入口,重新开始方法;

  • 一、当删除线程进入,并在单节点推进过程中,发现元素的 item==null,这意味着该元素已被其他线程提前抢占并移除,此时我们保存第一个遇到的节点,赋值到 c,并尝试推进;
  • 二、判断推进的下一个元素是否为 null,为 null 表示已经到达尾部,进行死亡节点的跳过重链后退出循环,如果不为 null 则表示可继续推进,并判断其 item 是否为 null:

item 为 null 表示遇到了其他并发的 poll 操作或 remove 操作;
item 不为 null 表示此区间的死亡节点已通过,需要进行跳过重链;

假定有以下场景,三个线程并发执行,线程一 remove(3)、线程二 poll、线程三 remove(2);
image

  • 一开始 head为 dummy 节点,则直接进入第二个 for循环,线程一推进到 item=1;
  • 线程二执行 poll,成功将 cas(2,null),但它未成功更新 head指针;
  • 线程三执行 remove(2),从 dummy 节点推进到 2 后,对其先进行 head指针的更新 cas(head,q),成功更相信 head 指针指向 item=3 的节点,并更新原 head#next 指向其自身;
  • 线程二继续推进,发现 item!= null 满足,则尝试跨节点重链,发现 head 指针已被更新,无法通过 cas(head,c,q) 来进行更新,cas 失败。此时方法:skipDeadNodes 返回的就是指针 p,而不是 pred,但并不影响它继续推进。跳出 skipDeadNodes 后,将指针 p 指向下个节点 q,q#item=3,跳出回到第一个循环,成功删除 item=3。

这里就是 skipDeadNodes 的返回处理:由于存在并发的跨节点重链,对于失败的线程,说明有其他线程完成跨链操作,就不再为此操作,将 pred 进行更新,同时与后续的 p 推进(p=q)依旧形成前后关系,这是一个新的开始,不再需要考虑之前的死亡节点问题。

private Node<E> skipDeadNodes(Node<E> pred, Node<E> c, Node<E> p, Node<E> q) {
    if (q == null) {
        // Never unlink trailing node.
        if (c == p) return pred;
        q = p;
    }
    // 如果前置节点为 null,也就是删除的是第一个节点,则返回可以形成 dummy 节点,
    // tryCasSuccessor 失败,则返回 p,意味着不需要此线程负责跨节点了,更新前置节点为 p(过程中的最后一个死亡节点 item=null)
    // tryCasSuccessor 成功,如果 pred==null,直接返回,意味着后续删除操作,可以直接根据 pred=null 继续更新 head 节点
    //      如果 pred != null 且 item != null,返回最后一个死亡节点 p,让其能够在后续 remove 后,通过 next 指针重链。
    // 此方法是返回一个处理后的前置节点,要么是 null 要么是 普通的 node,不管哪一个,出了此方法之后,会执行 p=q,
    // 让方法外部的 pred 又与 p 形成前后置关系
    return (tryCasSuccessor(pred, c, q)
            && (pred == null || ITEM.get(pred) != null)) 
        ? pred : p;
  • 线程一、三都执行完后,线程二执行 poll 的 cas(head) 失败,因为 head 已经被 线程三进行了更新

整个 remove 操作需要考虑极多的并发情况,Doug Lea 在没有算法伪代码参考的情况下,凭实力给出的算法实现却让我掉了不少头发,真是让人想给他献下膝盖。remove 操作除了上述的实例,实际还有许多考虑的情况,如:

  • 前节点是否 dummy 节点
  • 删除的是否 尾结点
  • poll 操作超过 remove 时等等

大概是下图的一些情况:
image
ConcurrentLinkedQueue 的删除操作在 JDK8 中的算法实现较目前 JDK15 的易阅读,JDK15 中的实现是 Doug Lea 对算法进行一些 refactor,因此根据有抽象及分支复杂的情况。
除此上述的 add、poll、remove 之外,ConcurrentLinkedQueue 的实现还提供了其他的 Iterator 等并发实现(也是 Doug Lea 自行实现),也是具有学习意义。但本文不再赘述(已经过长)。

image
总结:

  • ConcurrentLinkedQueue 是基于已有算法的改进版,使其支持回收与删除操作;
  • 支持回收的操作有:将原 head 节点的 next 关联到其自身;
  • ConcurrentLinkedQueue 基于节点链接的形式,但不要求 tail指针head指针 为最新状态,支持跳跃式更新,属于优化操作;
  • 跨节点连接可跨多个节点;
  • ConcurrentLinkedQueue 的添加只关注于节点 next 的 CAS 更新,这种做法保证了并发安全与高效;
  • 而除了 item 的 CAS 更新要求成功,其他的如 tailhead 均不要求成功更新
  • 存在一些 trick 的代码,如:t != (t = tail)、p == (p = q) 等

注:
在 idea 中 debug ConcurrentLinkedQueue 代码时,会因为 idea 编辑器自动调用 toString 等方法,遇到不符合认知逻辑的情况,需要更改 idea 配置来避免。

参考:
波兰表示法
逆波兰表示法
java 字节码 指令集 汇编
t != (t = tail) 可以为 true ?!
并发容器-ConcurrentLinkedQueue详解
IDEA debug ConcurrentLinkedQueue 的时候踩的坑
Java进阶——数据结构‘栈’的应用——前、中、后缀表达式



这篇关于ConcurrentLinkedQueue 源码浅析的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程