JUC源码学习笔记3——AQS等待队列和CyclicBarrier,BlockingQueue
2022/7/25 14:28:00
本文主要是介绍JUC源码学习笔记3——AQS等待队列和CyclicBarrier,BlockingQueue,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
一丶Condition
1.概述
任何一个java对象都拥有一组定义在Object中的监视器方法——wait(),wait(long timeout),notify(),和notifyAll()方法,这些方法配合sync hronized同步关键字,可以实现等待/通知模式。Condition接口也提供了类似于Object的监视器方法,可以和Lock接口的实现配合实现等待/通知模式
2.Object监视器 和 Condition的对比
Object监视器 | Condition | |
---|---|---|
前置条件 | 获取对象的锁 | Lock.lock()获取,并且通过当前Lock.newCondtion()获取Condition对象 |
等待队列 | 一个 | 多个,一个lock 可以调用多次newCondtion()生成多个等待队列 |
调用等待方法使当前线程放弃锁进入等待队列 | 是 | 是 |
等待时可以不响应中断 | 不支持 | 支持 |
超时等待 | 支持 | 支持 |
等待到指定的绝对时间 | 不支持 | 不支持 |
唤醒等待队列中一个线程 | 支持 | 支持 |
唤醒所有等待队列中的线程 | 支持 | 支持 |
3.Condition中常用方法
方法名称 | 方法描述 |
---|---|
void await() throws InterruptedException | 当前线程进入等待状态直到被通知or中断,当前线程结束await()时必然获取到了锁,那怕是在等待的时候被中断也必须获取锁后响应中断才可以返回 |
void awaitUninterruptibly() | 对中断不敏感的等待,即使被中断,该方法返回之前也只是补上中断标志位,同样返回的时候必须获取到锁 |
long awaitNanos(long nanosTimeout) throws InterruptedException | 超时等待nanosTimeout 纳秒,返回值是剩余的时间,如果耗时h纳秒被唤醒 那么返回nanosTimeout-n,返回0 or 负数表示是超时退出,对中断敏感 |
boolean await(long time, TimeUnit unit) throws InterruptedException | 使当前线程一直等待,直到它发出信号或中断,或者指定的等待时间过去。此方法在行为上等价于: awaitNanos(unit.toNanos(time)) > 0,支持指定时间单位 |
boolean awaitUntil(Date deadline) throws InterruptedException | 当前线程进入等待直到被唤醒,or中断or到了指定的截至日期,在指定截至日期前被通知返回true,反之false |
void signal() | 唤醒一个等待线程。如果有任何线程在此条件下等待,则选择一个用于唤醒。然后,该线程必须在从等待返回之前重新获取锁。实现注意事项 当调用此方法时,通常要求当前线程持有与此条件关联的锁 |
void signalAll() | 唤醒所有等待的线程。如果有任何线程正在等待这种情况,那么它们都会被唤醒。每个线程必须重新获取锁才能从等待返回。实现注意事项 当调用此方法时,通常要求当前线程持有与此条件关联的锁。 |
4.Condition的实现
Condition的实现一般都是AQS中的内部类ConditionObject,下面Condition都是指AQS中的ConditionObject
1.await()中断敏感的进行等待
1.1源码初步解析
await方法会当前获取锁的线程从同步队列移动到等待队列,并且完全释放到锁,挂起当前线程直到被signal或者被中断,并且必须获取到锁才可以返回,如果在等待的过程中被中断还会根据中断在signal之前或者signal选择是补上中断标识还是抛出中断异常
1.2 源码详细学习
中断对此方法十分关键,也是最难理解的部分,后续会解释doug lea是如何实现
public final void await() throws InterruptedException { //如果进入方法就已经被中断那么复位中断标识并且抛出中断异常 if (Thread.interrupted()) throw new InterruptedException(); //把当前线程假如到等待队列 Node node = addConditionWaiter(); //完全释放掉锁 int savedState = fullyRelease(node); //中断标识 0 表示在等待的过程中从没有被中断过,1表示抛出中断异常,2表示重新中断“伪造”一个中断表示 int interruptMode = 0; //如果不在同步队列 那么一致挂起,直到被signal唤醒后移动到同步队列 或者被中断 while (!isOnSyncQueue(node)) { LockSupport.park(this); //中断可以让线程从park中返回 //检查是否因为中断而从park中返回,如果是由于中断那么还要判断中断在sigal之前还是之后,只要是中断都会从打断循环 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //acquireQueued是尝试获取锁,返回是否中断 //如果获取锁的时候被中断 且 中断发生在signal之后或者 上面等待的过程没有被中断过 //那么中断表示置为 重新中断 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; //清理放弃等待了的节点 if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); //如果发生了中断 if (interruptMode != 0) //选择是抛出中断异常 还是重新中断自己 reportInterruptAfterWait(interruptMode); }
1.2.1 addConditionWaiter 假如到Condition队列中
private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
和同步队列不同的是,没有哑节点作为头节点的操作,也不需要自旋,因为当前节点必然是获取到锁的节点(这里的锁指的是独占锁)并且节点的状态初始的时候默认是CONDITION,并且注意等待队列中的节点是使用nextWaiter属性串联起来的,是一个单向链表,当前节点是独占模式中的头节点,无需设置pre next指针,释放锁后自然会有其他节点获取到锁设置自己为头节点
1.2.2 fullyRelease 完全释放锁
final int fullyRelease(Node node) { boolean failed = true; try { //对于ReentrantLock state 表示的是重入了几次, int savedState = getState(); //完全释放 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
这里把原来的state返回了方便后续,等待结束后再次占用恢复到等待钱的状态
1.2.3 isOnSyncQueue 当前节点是否在同步队列
目前节点已经进入到了等待队列,来到同步队列存在两个可能——其他线程Signal到当前节点,or当前线程被中断后自己入队,这部分源码最好结合唤醒操作源码一起看
final boolean isOnSyncQueue(Node node) { //这里node.waitStatus == Node.CONDITION 成立 说明还没被singal,唤醒调用transferForSignal第一行代码就是cas状态从CONDITION到0 //需要判断node.prev==null(前面的成立那就短路了) 说明transferForSignal第一行代码CAS改为0结束 但是没还没来得及执行后续的enq,也就是没来得及接到同步队列后 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; //如果后面有节点 那么一定是在同步队列的 if (node.next != null) return true; //说明上面第一个if没有成立 说明执行的时候来到同步队列, //第二个if没有成立说明 执行的时候没有没有节点 //说明刚来到共享队列 //可能出现尾分叉 pre指向尾部,但是自己还没能CAS为尾,这种情况进入这个方法的一瞬间如果可以成功CAS为尾巴,或者在之前CAS为尾巴, 那么视为已经在同步队列,反之视为不在同步队列 return findNodeFromTail(node); }
-
node.waitStatus == Node.CONDITION
假如到等待队列的时候节点初始状态为CONDITION,CONDITION表示当前节点处于等待状态,唤醒当前节点的第一件事就是让CAS设置当前节点状态从CONDITION为0,等待线程被中断后检查中断也是先CAS设置当前节点状态从CONDITION为0,所以如果节点状态为CONDITION那么一定是在等待队列
-
node.prev == null
唤醒当前节点后续会让当前节点进入同步队列(
enq
方法)enq首先设置自己的prev为尾巴,并且在等待队列是nextWaiter串联起来的不存在前置指针,所以如果前置指针指向的是null那么一定是在等待队列 -
node.next != null
等待队列是nextWaiter串联起来的不存在next指针,那么如果next指向的不是null,说明一定在同步队列,并且这是进入同步成功且后续有其他节点排在当前节点后面
-
findNodeFromTail
从尾部开始查找当前节点这里需要复习下
enq
完整入队的做法,先设置自己的pre指向尾巴,然后cas设置自己为尾巴,后改变前置节点的next,一瞬间只可能存在一个线程CAS设置自己为尾成功,就出现了尾部分叉的情况,从尾部开始搜索主要原因是——效率,节点都是从尾部入队的,从尾部开始搜索肯定是大于从头部开始搜索的
1.2.4检查中断 checkInterruptWhileWaiting
上面我们看到,如果没有其他线程唤醒,当前线程无法移动到同步队列,isOnSyncQueue
一定为真,为真就会被park节省CPU资源。退出while循环的另外一种方法就是当前线程被中断。但是
- 如果中断发生在唤醒之前,说明是在等待中被中断,
await
是对中断敏感的,唤醒前中断的话当前线程在获取到锁后需要抛出中断异常, - 如果中断发生在唤醒之后,后续获取到锁只需要重新自己中断,“伪造”一个中断标识就好像中断是在await方法结束发生的一样
checkInterruptWhileWaiting
的返回值有三种
- 0 表示没有发生中断,所有不会break掉循环,线程会继续判断自己在不在同步队列,这种情况从park中返回是由于其他线程的唤醒,所以后续判断自己是否在同步队列大概率是true,从循环中退出,后续去排队获取锁即可
- THROW_IE (-1)表示当前线程被中断,且中断在唤醒之前
- REINTERRUPT (1)表示当前线程被中断,且中断在唤醒之后
private int checkInterruptWhileWaiting(Node node) { //如果发生了中断,这种是await中被挂起的是被中断打断 return Thread.interrupted() ? //检查中断在signal 之前还是signal 之后 如果之前 那么后续抛出异常 反之只需要补上中断,后续补上的中断是在当前线程拿到锁后的中断 (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; }
这里可以看到Thread.interrupted()
重置中断标志,并且返回是否中断,如果是false 那么直接返回0,表示没有被中断。反之需要调用transferAfterCancelledWait
进行进一步的判断
1.2.5 transferAfterCancelledWait 选择放弃等待还是让步
final boolean transferAfterCancelledWait(Node node) { //如果这里设置成功 那么后续存在线程signal当前节点 那么会失败 //这里设置成功 说明我们由于中断被唤醒,且这个唤醒在singal之前 //说明我们成功放弃其他线程的signal if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { //入队 enq(node); //返回true 代表中断在signal 之前 return true; } //反之如果上面CAS失败,说明中断发生在Signal之后 那么当前线程让步 等待signal的线程把我们移动到同步队列 while (!isOnSyncQueue(node)) Thread.yield(); //中断在signal 之后返回false return false; }
之前我们说过,唤醒方法的第一步就是CAS设置等待状态从CONDITION到0,所以
- 如果本方法
compareAndSetWaitStatus(node, Node.CONDITION, 0)
成功,说明当前节点没有被其他线程唤醒,那么当前线程自己调用enq
入队,返回true(true表示放弃等待,在checkInterruptWhileWaiting中的作用就是让checkInterruptWhileWaiting返回THROW_IE,因为这意味着中断在唤醒之前) - 如果本方法
compareAndSetWaitStatus(node, Node.CONDITION, 0)
失败,说明当前节点已经被唤醒,唤醒发出的线程会把当前节点移动到同步队列的,所以后续只需要判断自己是否在同步队列,如果不在那么就让出CPU资源,直到唤醒线程移动当前节点到同步队列,返回false(在checkInterruptWhileWaiting中的作用就是让checkInterruptWhileWaiting返回REINTERRUPT,因为这意味着中断在唤醒之后)
//所以只要当前线程在等待的途中被中断,都会让他接续掉 这个while循环 //只是被中断在唤醒之前的时候自己入队,中断在唤醒之后的话让唤醒发出的线程移动自己到同步队列 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; }
1.2.6 尝试获取锁acquireQueued
acquireQueued
在AQS独占模式已经讲解了,该方法返回一个布尔值,true表示获取的的途中被中断了,false则反之
//如果获取的途中被中断了 且 interruptMode != THROW_IE 前面我们说过THROW_IE表示中断在唤醒之前,所以说这里两种情况下会设置interruptMode=REINTERRUPT //1 等待的时候没有被中断 interruptMode之前一直为0 //2 等待的时候被中断 但是中断在 唤醒之后 interruptMode之前一直为REINTERRUPT //也就是说,interruptMode = REINTERRUPT有两种情况中断在 唤醒之后和 从等待队列移动到同步队列获取锁的途中被中断 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT;
还需要注意的是acquireQueued是在获取到锁之后才会返回,没有获取锁会一直自旋挂起的
1.2.7 删除放弃的节点unlinkCancelledWaiters
常规的链表操作,这里不需要保证线程安全,因为当前节点执行此方法的时候必然已经获取到了独占锁
if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters();
private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } }
1.2.8 处理中断reportInterruptAfterWait
//0 表示从始至终没被中断过,即等待唤醒没被中断,获取锁也没有被中断,这种情况下不需要响应中断 if (interruptMode != 0) //处理中断 reportInterruptAfterWait(interruptMode);
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { //THROW_IE——抛出中断异常 if (interruptMode == THROW_IE) throw new InterruptedException(); //REINTERRUPT——自我中断补上中断标识 else if (interruptMode == REINTERRUPT) selfInterrupt(); }
也就是说
- 如果中断在唤醒之前,
当前线程拿到锁之后
抛出中断异常 - 如果中断在唤醒之后或者唤醒后获取锁的途中被中断,
当前线程拿到锁之后
自我中断(避免影响到线程中根据中断标识进行操作的代码逻辑)
2.awaitUninterruptibly中断不敏感的等待
public final void awaitUninterruptibly() { Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false; while (!isOnSyncQueue(node)) { LockSupport.park(this); //获取中断标识,并且重置中断标识 if (Thread.interrupted()) //等待的时候被中断过 interrupted = true; } //获取锁 or 等待的时候被中断 都只是自我中断补上中断标志 if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); }
代码和await
套路差不多,区别在于awaitUninterruptibly
无论是等待中被中断(无论是在唤醒前还是唤醒后),亦或者是在获取锁的过程中被中断,都只是补上中断标志位
3.超时等待
超时等待代码逻辑都差不多,我们挑取一个看一看
public final long awaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; int interruptMode = 0; while (!isOnSyncQueue(node)) { //超时了 if (nanosTimeout <= 0L) { transferAfterCancelledWait(node); break; } //剩余时间 大于阈值才进行挂起, if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return deadline - System.nanoTime(); }
和await()
一样的套路,只是在挂起上加了一层判断——只有大于阈值的时候才挂起,以及如果等待超时了会调用transferAfterCancelledWait
方法,这个方法上面说过:尝试自己回到同步队列,但是如果进入方法的一瞬间被唤醒了,会让当前线程让步,等待唤醒线程将当前线程移动到同步队列
4.signalAll唤醒所以等待队列中节点
public final void signalAll() { //要求唤醒的线程必须独占锁 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); }
private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); }
代码并不复杂,就是循环调用transferForSignal
唤醒所有等待队列中的节点,transferForSignal
返回true 表示成功唤醒,返回false表示唤醒之前当前节点已经放弃
final boolean transferForSignal(Node node) { //首先CAS设置状态从Condition到0 //这个和放弃等待具有一个竞争的关系,可以判断是放弃在前 还是唤醒在前 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; //到这说明没有取消等待 那么让当前节点进入到同步队列 Node p = enq(node); //enq返回的前置节点 这里是前置节点的状态 int ws = p.waitStatus; //前置节点取消了,或者CAS修改前置节点SINGNAL失败,那么唤醒当前线程让他自己去入队, //这里有个小细节,如果前置节点在当前节点入队之前就放弃了,那么会cancelAquire方法没办法唤醒到当前节点的线程 //如果前置节点在CAS操作的时候放弃了,唤醒当前线程,前置线程的放弃和当前线程自己主动入队,双线程一起操作,提高效率 //如果前置节点一直没有放弃,cas成功了,那么当前线程等着就好 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
5.signal唤醒单个
public final void signal() { //同样要求独占 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; //transferForSignal返回true 表示成功唤醒,返回false表示唤醒之前当前节点已经放弃 //所有这里while表示必须成功唤醒一个直到队列无节点唤醒 } while (!transferForSignal(first) && (first = firstWaiter) != null); }
6.transferForSignal 和 transferAfterCancelledWait
- transferAfterCancelledWait 调用时机
- 线程在等待队列中等待的时候被中断
- 等待超时当前线程放弃等待
下面我们研究下,doug lea是如何判断中断在唤醒前还是唤醒后的
final boolean transferAfterCancelledWait(Node node) { //1 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { enq(node); return true; } //3 while (!isOnSyncQueue(node)) Thread.yield(); return false; } final boolean transferForSignal(Node node) { //2 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
假如同一瞬间,线程B在等待的时候被中断,线程A且在此时企图唤醒B,这时候B运行到1这一行,A运行到2这一行,它们两个线程都尝试把B线程代表的节点状态从CONDITION修改位0,这两个CAS操作必然有一个前有一个后
- B线程先执行完CAS操作
- B会自己进入到同步队列,返回true 表示成功放弃等待(可以是超时放弃or中断,对于中断来说就是中断在唤醒之前)
- A执行CAS操作失败,返回false表示唤醒失败,如果这时候A是唤醒所有,那么A继续唤醒下一个,如果A唤醒一个当前唤醒B失败,A还会唤醒下一个节点(如果存在的话)
- A线程先执行完CAS操作
- 那么A不会进入到2的if中,继续负责对B执行enq,把B移动到同步队列,需要的话就唤醒B,返回true表示成功唤醒
- 由于A线程CAS成功,B就会失败,进入3中的while循环判断是否在同步队列,如果不在那么让步,为什么让出CPU,因为当前有线程A移动B到同步队列,B乖乖等着就好,返回false表示放弃失败——中断在唤醒之后,亦或者是超时放弃在唤醒之后
二丶CyclicBarrier
1.概述
循环栅栏,栅栏的作用是拦住一系列线程,循环意味着,这个栅栏可以循环使用 下文中的栅栏和屏障都是指的CyclicBarrier
一种同步辅助工具,它允许一组线程相互等待以达到共同的障碍点。 CyclicBarriers 在涉及固定大小的线程组的程序中很有用,这些线程组必须偶尔相互等待。屏障被称为循环的,因为它可以在等待线程被释放后重新使用。 CyclicBarrier 支持一个可选的 Runnable 命令,该命令在每个屏障点运行一次,且在在队伍中的最后一个线程到达之后,但在任何线程被释放之前之前。
2.和CountDownLatch对比
CountDownLatch只可以使用一次,但是CyclicBarrier 可以使用多次(使用reset()
方法重置)CountDownLatch是一组线程等待另外一组线程释放共享锁,CyclicBarrier 是一组线程中每一个线程都等待其他线程一起执行到一个点
CountDownLatch 可以理解moba游戏开始游戏的确认,需要等待五个人都同意后才可以进入选英雄界面(选择英雄的线程等待未确认人数到达0后运行) CyclicBarrier 是等待五个人都选完英雄点击确定后,五个人才能进入读条环节,五个人都读条完才能一起进入游戏(五个人中任何一个人都要等其他人选完英雄后才能一起进入读条,五个人中每一个人都要等其他人读条完才能一起进入峡谷)
3.CyclicBarrier中常用的方法
方法 | 描述 |
---|---|
public CyclicBarrier(int parties, Runnable barrierAction) | 表示一共有多少个线程(parties个)一起到达后一起执行下一个操作(barrierAction)barrierAction由最后到达的线程执行 |
public CyclicBarrier(int parties) | 表示一共有多少个线程(parties个)一起到达后才能继续运行 |
public int getParties() | 返回触发此障碍所需的参与方数量,派对需要多少人才能开始 |
public int await() | 当前线程等待所有线程都在屏障上调用此方法,如果当前线程不是最后一个到达的线程那么会处于挂起状态,除非出现最后一个线程到达,或者其他线程中断挂起的线程,或者其他线程使用的是超时await方法并且超时,或者其他线程调用屏障的重置方法 |
int await(long timeout, TimeUnit unit) | 比上面的await多一个当前线程超时的机制 |
public boolean isBroken() | 查询此屏障是否处于损坏状态,线程的中断,超时,出现异常都将导致屏障破碎 |
public void reset() | 将屏障重置为其初始状态。如果任何线程当前在屏障处等待那么将返回一个 BrokenBarrierException。 |
public int getNumberWaiting() | 等待的线程的数量 |
4.源码解读
4.1 CyclicBarrier 的属性
/*保证CyclicBarrier线程安全的锁 */ private final ReentrantLock lock = new ReentrantLock(); /** 调用await方法线程将在此condition上面等待*/ private final Condition trip = lock.newCondition(); /** 这个“派对”需要多少线程都到达 */ private final int parties; /* 线程都到达之后执行此方法 */ private final Runnable barrierCommand; /** 代,每次 CyclicBarrier 构造或者重置的时候都会创建新的一代,其中只有一个属性记录栅栏是否破碎*/ private Generation generation = new Generation(); /** * 还需要等待多少个线程到达 */ private int count;
其中比较难以理解的就是Generation
好比一群好朋友一起去看电影,一开始我们约定好5一起去看(parties=5)后续陆续来了3人 (count=5-3)但是突然有人等待的途中被她的妈妈打电话叫回去了(被打断了)导致抛出中断异常并且使栅栏破碎,或者有人性急进行超时等待,还没有来就呼吁朋友们都别等了,已经到达的人决定不在等待一起去看电影 什么叫Generation: 一个过山车有10个座位,景区常常需要等够10个人了,才会去开动过山车。于是我们常常在栏杆(barrier)外面等,等凑够了10个人,工作人员就把栏杆打开,让10个人通过;然后再将栏杆归位,后面新来的人还是要在栏杆外等待。这里,前面已经通过的人就是一“代”,后面再继续等待的一波人就是另外一“代”,栏杆每打开关闭一次,就产生新一的“代”。
4.2 构造方法
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); //parties 这是一个final 变量,构造后将不允许改变 this.parties = parties; //初始的时候还有多少个线程没有到达(count)就等于 parties this.count = parties; //线程到达后都之下此方法 this.barrierCommand = barrierAction; }
public CyclicBarrier(int parties) { this(parties, null); }
4.3 breakBarrier 打破栅栏
private void breakBarrier() { //设置当前这一代打破标记为true generation.broken = true; count = parties; //并且唤醒所有等待的线程 trip.signalAll(); }
相当于存在一个人无法赴约了,为了不让其他人无限制的等下去(其他人都等待在trip的等待队列上)选择打破栅栏,意味着唤醒其他人说别等了,我有事情来不了
显然breakBarrier的调用线程必须是当前已经持有锁的线程,所以上面代码线程安全,后续的代码我们将看到什么情况下栅栏会被打破
4.4 nextGeneration 开启下一代
private void nextGeneration() { //唤醒当前等待队列上的线程 trip.signalAll(); //重置 count = parties; generation = new Generation(); }
相当于过山车凑齐了一批次人,或者说工作人员发现人流量不行,一直等下去也不行,工作人员让之前准备玩的人发车玩(唤醒等待的线程)并且重置,开启下一代,等待下一批次的人
这也是CyclicBarrier可以循环使用的关键
4.5 reset 重置
public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); } }
这相当于工作人员看人流量不行,等下去顾客得抱怨了,直接让等待的人玩(唤醒)并且开启下一代,这里首先获取了下锁,因为可能存在多个工作人员,其中任何一个觉得不行都可以选择这么做。
4.5 await
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
直接调用的了dowait(boolean timed, long nanos)
方法包含两个参数,第一个是否超时等待,第二个等待多久
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { //当前这一代 final Generation g = generation; //如果栅栏已经被打破 那么抛出BrokenBarrierException //意味着我们使用的时候在外层catch住,进行应对栅栏被打破的业务逻辑即可 if (g.broken) throw new BrokenBarrierException(); //如果线程已经被中断,那么复位中断表示,并且打破栅栏,抛出中断异常 //意味着我们使用的时候在外层catch住,进行应对中断后的业务逻辑即可 if (Thread.interrupted()) { //打破栅栏意味着后续的线程指向await都会抛出BrokenBarrierException,而不是等待所有线程就位后重获自由 breakBarrier(); throw new InterruptedException(); } //至此意味着已经有一个线程到达了 count自减 int index = --count; //如果全部都到了,并且说明当前线程是最后一个到达的 if (index == 0) { boolean ranAction = false; try { //构造时候指定需要执行的 final Runnable command = barrierCommand; //直接调用的run 而不是启动一个线程去执行 if (command != null) command.run(); ranAction = true; //开启下一代,相当于这一批过上车的人凑齐了 发车了,那么开启等待下一批次人来 nextGeneration(); return 0; } finally { //说明barrierCommand执行的时候抛出异常了 //直接打破栅栏 if (!ranAction) breakBarrier(); } } //到达这里说明,还有线程没有到达,当前线程需要等待其他线程到达 for (;;) { try { //非超时等待 if (!timed) //那么让当前线程放弃锁,调用Condition.await() trip.await(); else if (nanos > 0L)//超时等待,并且等待时间大于0 //调用Condition的超时等待 nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { //如果还是当前这一代,并且栅栏没有被打破,那么打破栅栏 if (g == generation && ! g.broken) { breakBarrier(); //抛出中断 throw ie; } else { //如果换代了说明有线程A调用了nextGeneration,如果被栅栏被打破了说有有线程A调用了breakBarrier 这两个方法都会唤醒所以等待的线程 //自我中断 重置中断标志(也许外层线程方法中根据判断中断标志进行不同的业务逻辑,所以要重置中断标志) Thread.currentThread().interrupt(); } } //等待中被唤醒,或者超时等待时间过了 执行到此 发现栅栏被打破了 抛出异常 if (g.broken) throw new BrokenBarrierException(); //如果开启了新的一代 if (g != generation) return index; //超时等待但是甚于等待的时间小于等于0,意味着不等了 直接打破栅栏 //nanos是上面trip.awaitNanos(nanos)的返回值,指剩余等待时间 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
4.5.1 前置判断
- 首先判断栅栏是否被打破,如果被打破了那么直接同行即可,类似于ABCDE约定好一起看电影,AB在10:10就到达等待了,C说我不能来了,这时候栅栏被打破了,DE后续来到等待地点的时候发现已经被打破了那么不会等待,抛出BrokenBarrierException异常,在catch中执行自定义的业务逻辑即可
- 然后看当前线程是否被中断,如果被中断那么直接复位中断标志,打破栅栏,抛出中断异常,相当于D在前往约定好的地点的时候,途中被他妈打电话喊回家了,这时候D首先通知好朋友们自己来不了(打断栅栏)然后抛出中断异常,在catch中执行自定义的业务逻辑即可
4.5.1当前线程是最后一个到达的线程
如果当前线程是最后一个线程,它会负责调用barrierCommand的run方法(如果barrierCommand不为空的话)如果运行run失败(run抛出异常)那么会打破栅栏,反之如果允许成功并且开启下一代,开启下一代会唤醒所有等待的线程,相当于最开始约定好5人一起看电影,最后来的买奶茶,E是最后来的先卖奶茶,然后到达目的地跟朋友们打招呼:不好意思我来了,我们走呗(唤醒其他等待的线程),如果卖奶茶的途中出现意外,那么告知朋友们你们先去看(打破栅栏)
4.5.2 当前线程不是最后一个到达的线程
如果当前线程不是最后一个,那么需要等待其他线程一起到达,在等待的途中会出现以下情况:
(1)情况1:
当前线程被唤醒,这种情况说明是有线程执行了breakBarrier 或者 nextGeneration,那么condition.await 或者awaitNanos 执行完后需要判断是栅栏被打破还是正常所以线程都到达了
//是栅栏被打破 那么抛出BrokenBarrierException if (g.broken) throw new BrokenBarrierException(); //如果是正常所以线程都到达了 那么继续 if (g != generation) return index;
类似于,五个人看电影,ABCD被E通知不要继续等待了,ABCD要问下E:“你是马上到了,还是来不了了,如果到了我们团购票,如果来不了我们只能单独买票”这里根据E的不同情况进行不同买票其实就是catch住 BrokenBarrierException和正常返回中不同代码逻辑
try{ cyclicBarrier.await(); //买团体票 }cath(BrokenBarrierException e){ //单独买票 }
(2)情况2
当前线程被中断了,这里有需要分情况讨论下:
-
ABCD都正常等待,但是E被中断了
这时候E要负责去通知ABCD别等待了(打破栅栏)随后自己抛出中断异常
//对于E来说 可能是这样的 try{ cyclicBarrier.await(); //开心看电影 }catch(InterruptedException e){ //如果妈妈打电话让我回家 那么就回家 } //对于ABCD看来说 E打破栅栏并且唤醒了他们,后续就会来到 (1)情况1 try{ cyclicBarrier.await(); //买团体票 }cath(BrokenBarrierException e){ //单独买票 }
-
ABC都正常等待,D在E之前中断,E随后中断
D会先打破栅栏抛出中断异常,E随后被中断从等待中唤醒只需要复位中断标志,继续运行到情况1,就好像E的中断是在D的唤醒之后一样(就好像妈妈打电话让回家,是在D之后,这时候已经去买票的路上了)
//对于E来说代码是这样的 try{ cyclicBarrier.await(); while(Thread.currentThread.isInterrupted()){ //如果没有中断 那么一直看电影 } //反之 throw new InterruptedException() }catch(InterruptedException e){ }
-
ABCE都正常等待,D来了,但是E在D来了之后才被中断,相当于是在买票的途中被中断
//对于E来说代码是这样的 try{ cyclicBarrier.await(); //买票 while(Thread.currentThread.isInterrupted()){ //如果没有中断 那么一直看电影 } //反之 throw new InterruptedException() }catch(InterruptedException e){ }
这中情况下,D执行了
nextGeneration
后释放了锁,但是E需要拿到锁才能继续运行for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { //D执行完nextGeneration 那么这里就不成立了 //也就是唤醒是在中断之前的 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { //进入此,说明是在一起买票的时候被中断的 Thread.currentThread().interrupt(); } } //省略 }
-
等待超时了
五个人中存在一个人是急性子,他说爷只能等10分钟,10分钟过去了,他会说跟爷走,都别等了,随后自己抛出中断异常
if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); }
//对于急性子的人来说代码是这样的 try{ cyclicBarrier.await(10分钟); }catch(TimeoutException e){ //等了十分钟 火气很大地看电影,或者火气很大的发脾气中 }
4.5.3 打破栅栏的情况
- 一开始就被中断
- 等待的途中线程被中断,并且是在最后一个人来到执行
nextGeneration
之前被中断且顺利拿到锁 - 最后一个人到达后执行
barrierCommand
.run()(如果有barrierCommand)失败 - 等待超时了
- 重置栅栏
4.5.4 开启下一代的情况
- 最后一个人来了,成功执行完
barrierCommand
.run()(如果有barrierCommand)后 - 重置栅栏
三丶BlockingQueue
1.概述
一个支持在获取元素但是队列中不存在会让当前线程等待,支持在线程添加元素但队列已满的时候让当前线程等待的队列。等待队列中的入队和出队,都有四种特性方法应对容量不足或者没有元素——抛出异常,返回特殊值(入队对于false,出队就是null),阻塞当前线程,超时等待。由于null值对于阻塞队列来说是一种特殊值(用于获取元素但是队列中,直接返回null)所以等待队列是不接受null值的(抛出NPE),等待队列中的大部分操作是线程安全,但是例如addAll,removeAll等方法也许不是线程安全的(这取决于具体实现),等待队列就是生产者消费者模型的绝佳帮手,生产者只需要生产完向队列中塞(一般是空间不足挂起生产线程)消费者就从队列无脑取(一般是无元素的时候进行等待)
2.BlockingQueue中常用的方法
方法 | 描述 |
---|---|
boolean add(E e) | 如果可以在不违反容量限制的情况下立即将指定元素插入此队列,则返回 true,如果当前没有可用空间则抛出 IllegalStateException |
boolean offer(E e) | 如果可以在不违反容量限制的情况下立即将指定元素插入此队列,则返回 true,如果当前没有可用空间则返回 false。在使用容量受限的队列时,这种方法一般更可取 |
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException | boolean offer(E e)的超时等待版本,如果有如果可以在不违反容量限制的情况下立即将指定元素插入此队列,则返回 true,如果等待超时了还不能入队那么返回false,在等待的中途被中断抛出中断异常 |
void put(E e) throws InterruptedException | 将元素入队,如果没有空间进行无限等待,在等待的中途被中断抛出中断异常 |
E take() throws InterruptedException | 检索并删除此队列的头部,如果没有元素可以删除那么一直等待,在等待的中途被中断抛出中断异常 |
boolean remove(Object o) | 如果该队列包含一个或多个这样的元素,则删除一个满足 o.equals(e) 的元素 ,则返回 true。反之返回false |
E poll(long timeout, TimeUnit unit) throws InterruptedException | 超时等待获取并移除队列头部,如果超时但是没有元素那么返回null,在等待的中途被中断抛出中断异常 |
E take() throws InterruptedException | 无限期等待的获取并移除队列头部,在等待的中途被中断抛出中断异常 |
E element() | 检索但不删除此队列的头部。如果队列为空那么抛出NoSuchElementException |
E peek() | 检索但不删除此队列的头部,如果此队列为空,则返回 null |
int remainingCapacity() | 返回此队列理想情况下(在没有内存或资源限制的情况下)可以不阻塞地接受的附加元素的数量,如果没有内在限制,则返回 Integer.MAX_VALUE。不能总是通过检查剩余容量来判断插入元素的尝试是否会成功,因为可能存在另一个线程即将插入或删除元素的情况。返回:剩余容量 |
3.Java中的阻塞队列
- ArrayBlockingQueue 基于数组的有界队列
- LinkedBlockingQeque 基于链表的有界队列(默认容量是int类型最大)
- PriorityBlockingQueue 优先阻塞队列,基于数组实现的堆,并且具有阻塞队列的特性
- DelayQueue 基于优先队列实现的无界阻塞队列,元素只有在其延迟到期时才能被取出
- SynchronousQueue 不存储元素的阻塞队列,每个插入操作都必须等待另一个线程的相应删除操作
- LinkedTransferQueue 基于链表的无界阻塞队列,生产者可以在其中等待消费者接收元素
- LinkedBlockingDeque 基于链表的双向阻塞队列
4.从ArrayBlockingQueue看阻塞队列的实现原理
为了实现,生产者在容量不足的时候阻塞,消费者在没有元素的时候阻塞,阻塞队列都基于等待通知模式
final ReentrantLock lock; /**当队列为空,并且当前线程尝试等待地获取元素时,将在此等待队列上等待*/ private final Condition notEmpty; /**当队列满,并且当前线程尝试等待地插入元素时,将在此等待队列上等待*/ private final Condition notFull;
-
等待地获取元素
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; //响应中断的获取锁 如果获取锁的途中被中断那么抛出中断异常 lock.lockInterruptibly(); try { //队列为空 那么一直等待 while (count == 0) notEmpty.await(); //反之返回队列头并且会唤醒等待地插入元素的线程,唤醒等待的生产者 return dequeue(); } finally { lock.unlock(); } }
public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) { //如果超时 返回null if (nanos <= 0) return null; //不同的超时等待,此方法返回的是等待的剩余时间 nanos = notEmpty.awaitNanos(nanos); } //类似 return dequeue(); } finally { lock.unlock(); } }
-
等待地插入元素
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); //入队 并且这里会唤醒等待获取元素的线程 enqueue(e); } finally { lock.unlock(); } }
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //队列满 while (count == items.length) { //超时返回fasle 表示入队失败 if (nanos <= 0) return false; //超时等待,返回的是剩余等待时间 nanos = notFull.awaitNanos(nanos); } enqueue(e); return true; } finally { lock.unlock(); } }
这篇关于JUC源码学习笔记3——AQS等待队列和CyclicBarrier,BlockingQueue的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-23增量更新怎么做?-icode9专业技术文章分享
- 2024-11-23压缩包加密方案有哪些?-icode9专业技术文章分享
- 2024-11-23用shell怎么写一个开机时自动同步远程仓库的代码?-icode9专业技术文章分享
- 2024-11-23webman可以同步自己的仓库吗?-icode9专业技术文章分享
- 2024-11-23在 Webman 中怎么判断是否有某命令进程正在运行?-icode9专业技术文章分享
- 2024-11-23如何重置new Swiper?-icode9专业技术文章分享
- 2024-11-23oss直传有什么好处?-icode9专业技术文章分享
- 2024-11-23如何将oss直传封装成一个组件在其他页面调用时都可以使用?-icode9专业技术文章分享
- 2024-11-23怎么使用laravel 11在代码里获取路由列表?-icode9专业技术文章分享
- 2024-11-22怎么实现ansible playbook 备份代码中命名包含时间戳功能?-icode9专业技术文章分享