AQS(AbstractQueuedSynchronizer)源码初识

2021/7/18 17:06:48

本文主要是介绍AQS(AbstractQueuedSynchronizer)源码初识,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

        前几天在对比Synchronized和ReentrantLock的关系和区别时,以及学习使用Semaphore、CountDownLatch和CyclicBarrier时,发现里面底层都有这样一个同步器。这让我觉得学习它们的底层原理,就不得不学习AQS自身的底层原理,那么,我们就来吧。

这里参考了队列同步器(AQS)详解 和 这才是图文并茂:我写了1万多字,就是为了让你了解AQS是怎么运行的

目录

成员变量

Node

独占式同步组件的设计

同步器提供的模板方法

供子类重写的方法

具体方法实现

acquire方法

Release


成员变量

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

    private static final long serialVersionUID = 7373984972572414691L;

    /**
		创建一个新的AbstractQueuedSynchronizer实例,初始同步状态为零。
     */
    protected AbstractQueuedSynchronizer() { }
  
    
    static final class Node{...} //数据结构中的节点,下面细说。
  
  
    /**
		等待队列的头部,延迟初始化。 除初始化外,仅通过 setHead 方法进行修改。 注意:如果 head 存在,则保证其     waitStatus 不会被 CANCELLED。
     */
    private transient volatile Node head;

    /**
		等待队列的尾部,延迟初始化。 仅通过方法 enq 修改以添加新的等待节点
     */
    private transient volatile Node tail;

    /**
		同步状态。
     */
    private volatile int state;
  
}

        可以看出同步器本身其实就是一个双向链表,我们直接能获取的节点只有head和tail。主要是我们要考擦这个节点Node的含义。其中head节点为正在运行线程的的节点。

         

Node

static final class Node {
        /** 指示节点在共享模式下等待的标记 */
        static final Node SHARED = new Node();
        /** 指示节点正在以独占模式等待的标记 */
        static final Node EXCLUSIVE = null;

        /** 指示线程已取消的 waitStatus 值*/
        static final int CANCELLED =  1;
        /** 指示后继节点需要被唤醒的waitStatus 值 */
        static final int SIGNAL    = -1;
        /** waitStatus 值指示线程正在等待条件(进入等待队列) */
        static final int CONDITION = -2;
        /**指示下一个acquireShared 应无条件传播的waitStatus 值*/
        static final int PROPAGATE = -3;


        volatile int waitStatus;

        /**
        前驱节点
         */
        volatile Node prev;

        /**
		  后继节点
         */
        volatile Node next;

        /**
            该节点装载的线程 在构造时初始化并在使用后归零。
         */
        volatile Thread thread;

        /**
链接到下一个等待条件的节点,或特殊值 SHARED。 因为条件队列只有在独占模式下才会被访问,所以我们只需要一个简单的链接队列来保存节点,因为它们正在等待条件。 然后将它们转移到队列以重新获取。 并且因为条件只能是独占的,所以我们通过使用特殊值来表示共享模式来保存字段。
         */
        Node nextWaiter;

        /**
			如果节点在共享模式下等待,则返回 true。
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

         /** 上一个节点*/
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

        代码李定义了一个表示当前Node节点等待状态的字段waitStatus,取值总共有CANCELLED(-1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3)、0,这5个值代表了不同的特定场景。

  • CANCELLED:表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。
  • SIGNAL:表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL(记住这个-1的值,因为后面我们讲的时候经常会提到)
  • CONDITION:表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。(注:Condition是AQS的一个组件,后面会细说)
  • PROPAGATE:共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
  • 0:新结点入队时的默认状态。

当waitStatus为负值表示结点处于有效等待状态,为正值的时候表示结点已被取消。

独占式同步组件的设计

同步器提供的模板方法

//独占式获取同步状态,如果当前线程获取同步状态成功,立即返回。否则,将会进入同步队列等待,
//该方法将会重复调用重写的tryAcquire(int arg)方法
public final void acquire(int arg) {...}


//与acquire(int arg)基本相同,但是该方法响应中断。
public final void acquireInterruptibly(int arg){...}



//独占式释放同步状态,该方法会在释放同步状态后,将同步队列中第一个节点包含的线程唤醒
public final boolean release(int arg) {...}

供子类重写的方法

//独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行CAS设置同步状态
protected boolean tryAcquire(int arg)
 
//独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态
protected boolean tryRelease(int arg)
 
//当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占
protected boolean isHeldExclusively()

具体方法实现

acquire方法

/**
以独占模式获取,忽略中断。 通过至少调用一次tryAcquire ,成功返回。 否则线程会排队,可能会反复阻塞和解除阻塞,调用tryAcquire直到成功。 此方法可用于实现方法Lock.lock 。
*/
    public final void acquire(int arg) {
        if (!tryAcquire(arg) && //如果tryacquire失败
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//生成节点并加入同步队列
            selfInterrupt();
    }

/**
为当前线程和给定mode创建节点并排入队列
*/
private Node addWaiter(Node mode) { //共享/独占性,null为独占
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) { //
            node.prev = pred;//连接到尾部之后
            if (compareAndSetTail(pred, node)) { //CAS设置,期望当前尾部是pred,是的话改为node。
               //如果CAS尝试成功,就说明"设置当前节点node的前驱"与"CAS设置tail"之间没有别的线程设置tail								成功
                //只需要将"之前的tail"的后继节点指向node即可
                pred.next = node;
                return node;
            }
        }
        enq(node);//否则,通过死循环来保证节点的正确添加
        return node;
    }


/**
每个节点以死循环方式来判断自身是否头结点。

*/
    final boolean acquireQueued(final Node node, int arg) { //自旋
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {//前驱节点是头节点并且获取到了同步状态
                    setHead(node); //设置其为首节点
                    p.next = null; // help GC 断开引用
                    failed = false;
                    return interrupted; //自旋退出
                }
                if (shouldParkAfterFailedAcquire(p, node) && //获取同步状态失败后判断是否需要阻塞或中断
                    parkAndCheckInterrupt()) //阻塞当前线程
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

        上述逻辑可由下图表示:

         当前线程获取同步状态失败时,同步器会将当前线程、等待状态等信息构造成一个Node并加入同步器队列中,同时会阻塞当前线程。

        同时可以看出每个同步器队列中的节点都在进行自旋,自省地观察自己的前驱节点是否为head节点,如果是就尝试tryAcquire,成功即可退出自旋。

可以看到节点和节点之间在循环检查的过程中基本不相互通信,而是简单地判断自己的前驱是否为头节点,这样就使得节点的释放规则符合FIFO。并且也便于对过早通知的处理(过早通知是指:前驱节点不是头节点的线程由于中断而被唤醒)。 

Release

        head节点的线程在释放同步状态时,将会唤醒后续节点,后续节点将在获取同步状态成功时将自己设置为head节点。

 

public final boolean release(int arg) {
        if (tryRelease(arg)) { //释放同步状态成功的话
            Node h = head;
            if (h != null && h.waitStatus != 0) //独占即为SIGNAL -1 要唤醒后面的
                unparkSuccessor(h);
            return true;
        }
        return false;
    }


private void unparkSuccessor(Node node) {

        int ws = node.waitStatus; //当前节点waitStatus
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);//更新为0

        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev) //从尾部往前找
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread); //唤醒
    }



这篇关于AQS(AbstractQueuedSynchronizer)源码初识的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程