java基础----AQS---CountDownLatch

2022/7/22 1:31:10

本文主要是介绍java基础----AQS---CountDownLatch,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

AQS---abstractQueuedSynchronizer java并发的主要实现类,内部实现了获取锁和释放锁的主要流程,自定义的同步器,如果是排他锁,实现tryAcquire、tryRelease;如果是共享锁,实现tryAcquireShared,tryReleaseShared。
排他锁和共享锁的区别是,排他锁在同一时刻只能有一个线程获取锁,而共享锁则可以运行多个线程获取到锁去执行。

1. 内部主要实现原理:
volatile类型的state表示同步锁的状态,FIFO的双向链表用来存放未获取锁而进入等待区的线程。


(1)CountDownLatch 实现

Demo:
CountDownLatch countDownLatch = new CountDownLatch(5); //建立state是5的AQS
for (int i=0;i<5;i++) {
    threadPoolExecutor.submit(() -> {
        try {
            System.out.println("threadName:"+Thread.currentThread().getName());
        }catch (Exception ex) {
            ex.printStackTrace();
        }finally {
            countDownLatch.countDown();//每执行完一个线程,countdownlatch-1,到0以后会通知阻塞的主线程继续执行
        }
    });
}
countDownLatch.await();//主线程阻塞

运行流程:
1. 初始处设置 new CountDownLatch(5) 是建立state是5的AQS
2. Demo里有一个主线程和5个子线程,主线程中启动5个子线程提交到线程池中运行。
3. 主线程继续运行,在countDownLatch.await(),进入
/**
 * Causes the current thread to wait until the latch has counted down to
 * zero, unless the thread is {@linkplain Thread#interrupt interrupted}.
 *
 * <p>If the current count is zero then this method returns immediately.
 *
 * <p>If the current count is greater than zero then the current
 * thread becomes disabled for thread scheduling purposes and lies
 * dormant until one of two things happen:
 * <ul>
 * <li>The count reaches zero due to invocations of the
 * {@link #countDown} method; or
 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
 * the current thread.
 * </ul>
 *
 * <p>If the current thread:
 * <ul>
 * <li>has its interrupted status set on entry to this method; or
 * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
 * </ul>
 * then {@link InterruptedException} is thrown and the current thread's
 * interrupted status is cleared.
 *
 * @throws InterruptedException if the current thread is interrupted
 *         while waiting
 */
public void await() throws InterruptedException {//如果计数器(就是刚刚设置的Sync的state)是0,就返回;如果大于0,那么就会一直阻塞,直到计数器变为0唤醒他,或者是他被中断
    sync.acquireSharedInterruptibly(1);
}

/**
 * Causes the current thread to wait until the latch has counted down to
 * zero, unless the thread is {@linkplain Thread#interrupt interrupted},
 * or the specified waiting time elapses.
 *
 * <p>If the current count is zero then this method returns immediately
 * with the value {@code true}.
 *
 * <p>If the current count is greater than zero then the current
 * thread becomes disabled for thread scheduling purposes and lies
 * dormant until one of three things happen:
 * <ul>
 * <li>The count reaches zero due to invocations of the
 * {@link #countDown} method; or
 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
 * the current thread; or
 * <li>The specified waiting time elapses.
 * </ul>
 *
 * <p>If the count reaches zero then the method returns with the
 * value {@code true}.
 *
 * <p>If the current thread:
 * <ul>
 * <li>has its interrupted status set on entry to this method; or
 * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
 * </ul>
 * then {@link InterruptedException} is thrown and the current thread's
 * interrupted status is cleared.
 *
 * <p>If the specified waiting time elapses then the value {@code false}
 * is returned.  If the time is less than or equal to zero, the method
 * will not wait at all.
 *
 * @param timeout the maximum time to wait
 * @param unit the time unit of the {@code timeout} argument
 * @return {@code true} if the count reached zero and {@code false}
 *         if the waiting time elapsed before the count reached zero
 * @throws InterruptedException if the current thread is interrupted
 *         while waiting
 */
public boolean await(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); //和上面方法类似,多了个阻塞的时间,如果在指定的时间内还未获取锁,那么就返回。
}

/**
 * Acquires in shared mode, aborting if interrupted.  Implemented
 * by first checking interrupt status, then invoking at least once
 * {@link #tryAcquireShared}, returning on success.  Otherwise the
 * thread is queued, possibly repeatedly blocking and unblocking,
 * invoking {@link #tryAcquireShared} until success or the thread
 * is interrupted.
 * @param arg the acquire argument.
 * This value is conveyed to {@link #tryAcquireShared} but is
 * otherwise uninterpreted and can represent anything
 * you like.
 * @throws InterruptedException if the current thread is interrupted
 */
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted()) //判断是否是被中断,如果是被中断,那么直接抛出中断异常
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0) //如果获取共享锁失败,则线程进入等待队列,可能会重复的发生阻塞,解开阻塞。一直到有线程唤醒他或者有中断发生
        doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) { //CountDownLatch内部实现的Sync,实现了AQS的这个方法,如果state是0表示获取锁成功返回1,否则返回-1。开始时候如果子线程都没有执行的情况,这个值肯定是大于0的。
    return (getState() == 0) ? 1 : -1;
}

/**
 * Acquires in shared interruptible mode.
 * @param arg the acquire argument
 */
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED); //添加到等待队列,见下面方法
    boolean failed = true;
    try {
        for (;;) { //节点被加入到等待队列以后,需要在for循环中不断的尝试去获取锁。
            final Node p = node.predecessor();//获取当前节点的前驱节点
            if (p == head) {//如果是头节点
                int r = tryAcquireShared(arg);//直接尝试是否能获取锁
                if (r >= 0) {//如果尝试成功,
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

/**
 * Creates and enqueues node for current thread and given mode.
 *
 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
 * @return the new node
 */
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode); //建立{共享}的节点,以为是CountDownLatch,如果是ReentrantLock则建立的是排他模式的节点。
    // Try the fast path of enq; backup to full enq on failure //先通过快速路径添加试一下,加到队尾,如果添加成功就返回当前节点;如果失败则进入enq(node)
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

/**
 * Inserts node into queue, initializing if necessary. See picture above.
 * @param node the node to insert
 * @return node's predecessor
 */
private Node enq(final Node node) { //在for循环中不断的去尝试。如果队列是空,则他就是头节点,用compareAndSetHead加入到头部;如果非空,则通过compareAndSetTail加入到尾部。
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

/**
 * Sets head of queue, and checks if successor may be waiting
 * in shared mode, if so propagating if either propagate > 0 or
 * PROPAGATE status was set.
 *
 * @param node the node
 * @param propagate the return value from a tryAcquireShared
 */
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    /*
     * Try to signal next queued node if:
     *   Propagation was indicated by caller,
     *     or was recorded (as h.waitStatus either before
     *     or after setHead) by a previous operation
     *     (note: this uses sign-check of waitStatus because
     *      PROPAGATE status may transition to SIGNAL.)
     * and
     *   The next node is waiting in shared mode,
     *     or we don't know, because it appears null
     *
     * The conservatism in both of these checks may cause
     * unnecessary wake-ups, but only when there are multiple
     * racing acquires/releases, so most need signals now or soon
     * anyway.
     */
    if (propagate > 0 || h == null || h.waitStatus < 0 || //如果当前AQS的信号量>0,或者是头节点为空,或者头节点等待状态<0;
        (h = head) == null || h.waitStatus < 0) { //或者在setHead之后的head((h=head)==null)为空或事等待状态<0,则表示当前节点的后继节点可以被唤醒。

        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

/**
 * Release action for shared mode -- signals successor and ensures
 * propagation. (Note: For exclusive mode, release just amounts
 * to calling unparkSuccessor of head if it needs signal.)
 */
private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    for (;;) {//在for循环中,当前节点如果是SIGNAL状态,说明后继节点需要被唤醒。则设置节点状态是0,同时唤醒后继节点
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h); //后继节点被释放,然后在获取锁的地方继续执行,回到doAcquireSharedInterruptibly,回在里面的for循环继续看能否获取到锁。然后释放它后面的Shared模式的线程。
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) //否则如果节点是0状态,则循环设置直到他状态为PROPAGATE模式。保障后续节点可以被释放
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}
 
运行流程继续:
4. 子线程运行,在countDownLatch.countDown(),进入

/**
 * Decrements the count of the latch, releasing all waiting threads if
 * the count reaches zero.
 *
 * <p>If the current count is greater than zero then it is decremented.
 * If the new count is zero then all waiting threads are re-enabled for
 * thread scheduling purposes.
 *
 * <p>If the current count equals zero then nothing happens.
 */
public void countDown() {
    sync.releaseShared(1);
}

/**
 * Releases in shared mode.  Implemented by unblocking one or more
 * threads if {@link #tryReleaseShared} returns true.
 *
 * @param arg the release argument.  This value is conveyed to
 *        {@link #tryReleaseShared} but is otherwise uninterpreted
 *        and can represent anything you like.
 * @return the value returned from {@link #tryReleaseShared}
 */
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) { //可以释放锁
        doReleaseShared();//调用上面提到的doReleaseShared
        return true;
    }
    return false;
}

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false; //如果减到0,说明锁已经释放,返回false
        int nextc = c-1;
        if (compareAndSetState(c, nextc))//CAS设置-1
            return nextc == 0;//一直减到0返回true,否则返回false。countDownlatch的线程只有最后一个运行完的线程在这里可以把状态减到0可以去释放锁。其他的都不可以。
    }
}


 
 
参考文章:

https://www.cnblogs.com/waterystone/p/4920797.html

https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html

https://zhuanlan.zhihu.com/p/268364895


这篇关于java基础----AQS---CountDownLatch的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程