java基础----AQS---CountDownLatch
2022/7/22 1:31:10
本文主要是介绍java基础----AQS---CountDownLatch,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
AQS---abstractQueuedSynchronizer java并发的主要实现类,内部实现了获取锁和释放锁的主要流程,自定义的同步器,如果是排他锁,实现tryAcquire、tryRelease;如果是共享锁,实现tryAcquireShared,tryReleaseShared。 排他锁和共享锁的区别是,排他锁在同一时刻只能有一个线程获取锁,而共享锁则可以运行多个线程获取到锁去执行。 1. 内部主要实现原理: volatile类型的state表示同步锁的状态,FIFO的双向链表用来存放未获取锁而进入等待区的线程。 (1)CountDownLatch 实现 Demo: CountDownLatch countDownLatch = new CountDownLatch(5); //建立state是5的AQS for (int i=0;i<5;i++) { threadPoolExecutor.submit(() -> { try { System.out.println("threadName:"+Thread.currentThread().getName()); }catch (Exception ex) { ex.printStackTrace(); }finally { countDownLatch.countDown();//每执行完一个线程,countdownlatch-1,到0以后会通知阻塞的主线程继续执行 } }); } countDownLatch.await();//主线程阻塞 运行流程: 1. 初始处设置 new CountDownLatch(5) 是建立state是5的AQS 2. Demo里有一个主线程和5个子线程,主线程中启动5个子线程提交到线程池中运行。 3. 主线程继续运行,在countDownLatch.await(),进入
/** * Causes the current thread to wait until the latch has counted down to * zero, unless the thread is {@linkplain Thread#interrupt interrupted}. * * <p>If the current count is zero then this method returns immediately. * * <p>If the current count is greater than zero then the current * thread becomes disabled for thread scheduling purposes and lies * dormant until one of two things happen: * <ul> * <li>The count reaches zero due to invocations of the * {@link #countDown} method; or * <li>Some other thread {@linkplain Thread#interrupt interrupts} * the current thread. * </ul> * * <p>If the current thread: * <ul> * <li>has its interrupted status set on entry to this method; or * <li>is {@linkplain Thread#interrupt interrupted} while waiting, * </ul> * then {@link InterruptedException} is thrown and the current thread's * interrupted status is cleared. * * @throws InterruptedException if the current thread is interrupted * while waiting */ public void await() throws InterruptedException {//如果计数器(就是刚刚设置的Sync的state)是0,就返回;如果大于0,那么就会一直阻塞,直到计数器变为0唤醒他,或者是他被中断 sync.acquireSharedInterruptibly(1); } /** * Causes the current thread to wait until the latch has counted down to * zero, unless the thread is {@linkplain Thread#interrupt interrupted}, * or the specified waiting time elapses. * * <p>If the current count is zero then this method returns immediately * with the value {@code true}. * * <p>If the current count is greater than zero then the current * thread becomes disabled for thread scheduling purposes and lies * dormant until one of three things happen: * <ul> * <li>The count reaches zero due to invocations of the * {@link #countDown} method; or * <li>Some other thread {@linkplain Thread#interrupt interrupts} * the current thread; or * <li>The specified waiting time elapses. * </ul> * * <p>If the count reaches zero then the method returns with the * value {@code true}. * * <p>If the current thread: * <ul> * <li>has its interrupted status set on entry to this method; or * <li>is {@linkplain Thread#interrupt interrupted} while waiting, * </ul> * then {@link InterruptedException} is thrown and the current thread's * interrupted status is cleared. * * <p>If the specified waiting time elapses then the value {@code false} * is returned. If the time is less than or equal to zero, the method * will not wait at all. * * @param timeout the maximum time to wait * @param unit the time unit of the {@code timeout} argument * @return {@code true} if the count reached zero and {@code false} * if the waiting time elapsed before the count reached zero * @throws InterruptedException if the current thread is interrupted * while waiting */ public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); //和上面方法类似,多了个阻塞的时间,如果在指定的时间内还未获取锁,那么就返回。 }
/** * Acquires in shared mode, aborting if interrupted. Implemented * by first checking interrupt status, then invoking at least once * {@link #tryAcquireShared}, returning on success. Otherwise the * thread is queued, possibly repeatedly blocking and unblocking, * invoking {@link #tryAcquireShared} until success or the thread * is interrupted. * @param arg the acquire argument. * This value is conveyed to {@link #tryAcquireShared} but is * otherwise uninterpreted and can represent anything * you like. * @throws InterruptedException if the current thread is interrupted */ public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) //判断是否是被中断,如果是被中断,那么直接抛出中断异常 throw new InterruptedException(); if (tryAcquireShared(arg) < 0) //如果获取共享锁失败,则线程进入等待队列,可能会重复的发生阻塞,解开阻塞。一直到有线程唤醒他或者有中断发生 doAcquireSharedInterruptibly(arg); }
protected int tryAcquireShared(int acquires) { //CountDownLatch内部实现的Sync,实现了AQS的这个方法,如果state是0表示获取锁成功返回1,否则返回-1。开始时候如果子线程都没有执行的情况,这个值肯定是大于0的。 return (getState() == 0) ? 1 : -1; }
/** * Acquires in shared interruptible mode. * @param arg the acquire argument */ private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); //添加到等待队列,见下面方法 boolean failed = true; try { for (;;) { //节点被加入到等待队列以后,需要在for循环中不断的尝试去获取锁。 final Node p = node.predecessor();//获取当前节点的前驱节点 if (p == head) {//如果是头节点 int r = tryAcquireShared(arg);//直接尝试是否能获取锁 if (r >= 0) {//如果尝试成功, setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); //建立{共享}的节点,以为是CountDownLatch,如果是ReentrantLock则建立的是排他模式的节点。 // Try the fast path of enq; backup to full enq on failure //先通过快速路径添加试一下,加到队尾,如果添加成功就返回当前节点;如果失败则进入enq(node) Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
/** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node) { //在for循环中不断的去尝试。如果队列是空,则他就是头节点,用compareAndSetHead加入到头部;如果非空,则通过compareAndSetTail加入到尾部。
for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
/** * Sets head of queue, and checks if successor may be waiting * in shared mode, if so propagating if either propagate > 0 or * PROPAGATE status was set. * * @param node the node * @param propagate the return value from a tryAcquireShared */ private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ if (propagate > 0 || h == null || h.waitStatus < 0 || //如果当前AQS的信号量>0,或者是头节点为空,或者头节点等待状态<0; (h = head) == null || h.waitStatus < 0) { //或者在setHead之后的head((h=head)==null)为空或事等待状态<0,则表示当前节点的后继节点可以被唤醒。 Node s = node.next;
if (s == null || s.isShared()) doReleaseShared(); } }
/** * Release action for shared mode -- signals successor and ensures * propagation. (Note: For exclusive mode, release just amounts * to calling unparkSuccessor of head if it needs signal.) */ private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) {//在for循环中,当前节点如果是SIGNAL状态,说明后继节点需要被唤醒。则设置节点状态是0,同时唤醒后继节点 Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); //后继节点被释放,然后在获取锁的地方继续执行,回到doAcquireSharedInterruptibly,回在里面的for循环继续看能否获取到锁。然后释放它后面的Shared模式的线程。
} else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) //否则如果节点是0状态,则循环设置直到他状态为PROPAGATE模式。保障后续节点可以被释放 continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
运行流程继续: 4. 子线程运行,在countDownLatch.countDown(),进入
/** * Decrements the count of the latch, releasing all waiting threads if * the count reaches zero. * * <p>If the current count is greater than zero then it is decremented. * If the new count is zero then all waiting threads are re-enabled for * thread scheduling purposes. * * <p>If the current count equals zero then nothing happens. */ public void countDown() { sync.releaseShared(1); }
/** * Releases in shared mode. Implemented by unblocking one or more * threads if {@link #tryReleaseShared} returns true. * * @param arg the release argument. This value is conveyed to * {@link #tryReleaseShared} but is otherwise uninterpreted * and can represent anything you like. * @return the value returned from {@link #tryReleaseShared} */ public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { //可以释放锁 doReleaseShared();//调用上面提到的doReleaseShared return true; } return false; }
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; //如果减到0,说明锁已经释放,返回false int nextc = c-1; if (compareAndSetState(c, nextc))//CAS设置-1 return nextc == 0;//一直减到0返回true,否则返回false。countDownlatch的线程只有最后一个运行完的线程在这里可以把状态减到0可以去释放锁。其他的都不可以。 } }
参考文章: https://www.cnblogs.com/waterystone/p/4920797.html https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html https://zhuanlan.zhihu.com/p/268364895
这篇关于java基础----AQS---CountDownLatch的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-05-15鸿蒙生态设备数量超8亿台
- 2024-05-13TiDB + ES:转转业财系统亿级数据存储优化实践
- 2024-05-09“2024鸿蒙零基础快速实战-仿抖音App开发(ArkTS版)”实战课程已上线
- 2024-05-09聊聊如何通过arthas-tunnel-server来远程管理所有需要arthas监控的应用
- 2024-05-09log4j2这么配就对了
- 2024-05-09nginx修改Content-Type
- 2024-05-09Redis多数据源,看这篇就够了
- 2024-05-09Google Chrome驱动程序 124.0.6367.62(正式版本)去哪下载?
- 2024-05-09有没有大佬知道这种数据应该怎么抓取呀?
- 2024-05-09这种运行结果里的10.100000001,怎么能最快改成10.1?