【源码解析】ReentrantLock的重入锁分析AQS
2021/10/3 12:40:08
本文主要是介绍【源码解析】ReentrantLock的重入锁分析AQS,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
本文以公平锁的角度切入AQS
ReentrantLock
Synchronized | ReentrantLock | |
---|---|---|
锁实现机制 | 对象头监视器模式 | 依赖 AQS |
灵活性 | 不灵活 | 支持响应中断、超时、尝试获取锁 |
释放锁形式 | 自动释放锁 | 显示调用 unlock() |
支持锁类型 | 非公平锁 | 公平锁 & 非公平锁 |
条件队列 | 单条件队列 | 多个条件队列 |
是否支持可重入 | 支持 | 支持 |
写在前面:
分清楚:锁是对外界开放的API,而AQS同步器是对线程进行管理的容器,任务区分开来,研究的是AQS。
(Lock:lock、unlock;AQS:acquire、acquireQueue……)。
抽象同步队列最终的实现是:公平锁或者非公平锁
加锁过程:
加锁:
- 通过 ReentrantLock 的加锁方法 Lock 进行加锁操作。
- 会调用到内部类 Sync 的 Lock 方法,由于 Sync#lock 是抽象方法,根据 ReentrantLock 初始化选择的公平锁和非公平锁,执行相关内部类的 Lock 方法,本质上都会执行 AQS 的 Acquire 方法。
- AQS 的 Acquire 方法会执行 tryAcquire 方法,但是由于 tryAcquire 需要自定义同步器实现,因此执行了 ReentrantLock 中的 tryAcquire 方法,由于 ReentrantLock 是通过公平锁和非公平锁内部类实现的 tryAcquire 方法,因此会根据锁类型不同,执行不同的 tryAcquire。
- tryAcquire 是获取锁逻辑,获取失败后,会执行框架 AQS 的后续逻辑,跟 ReentrantLock 自定义同步器无关。
解锁:
- 通过 ReentrantLock 的解锁方法 Unlock 进行解锁。
- Unlock 会调用内部类 Sync 的 Release 方法,该方法继承于 AQS。
- Release 中会调用 tryRelease 方法,tryRelease 需要自定义同步器实现,tryRelease 只在 ReentrantLock 中的 Sync 实现,因此可以看出,释放锁的过程,并不区分是否为公平锁。
- 释放成功后,所有处理由 AQS 框架完成,与自定义同步器无关。
state
state为锁的状态:0~n state:volatile 、CAS 确保该变量对其他线程也是可见的。 无锁机制改变state的值 1)当state=0时,表示无锁状态 2)当state>0时,表示已经有线程获得了锁,也就是 state=1,但是因为 ReentrantLock 允许重入,所以同一个线程多次获得同步锁的时候,state 会递增,比如重入5次,那么state=5。 而在释放锁的时候,同样需要释放 5 次直到 state=0 其他线程才有资格获得锁
AOS:指定当前的独占锁的线程。
AQS中的队列
特点:
1、先进先出的双端队列
2、通过 Head、Tail 头尾两个节点来组成队列结构,通过 volatile 修饰保证可见性
3、Head 指向节点为已获得锁的节点,head指向的时得到锁的Node节点,是一个虚拟节点,节点本身不持有具体线程;更可以看作是 【有锁节点 + 阻塞队列】
4、获取不到同步状态,会将节点进行自旋获取锁,自旋一定次数失败后会将线程阻塞,相对于 CLH 队列性能较好【抢锁方式:自适应自旋锁】
节点的状态
关键方法与属性 | 对应含义 |
---|---|
waitStatus | 当前节点在队列中处于什么状态 |
thread | 表示节点对应的线程 |
prev | 前驱指针,指向本节点的上一个节点 |
next | 后继指针,指向本节点的下一个节点 |
predecessor | 返回前驱节点,没有的话抛出 NPE 异常 |
属性值 | 值含义 |
---|---|
0 | Node 被初始化后的默认值 |
CANCELLED | 值为1,由于中断或超时,节点被取消 |
SIGNAL | 值为-1,表示节点的后继节点即将被阻塞;当前节点需要唤醒他的后继节点 |
CONDITION | 值为-2,表示节点在等待队列中,节点线程等待唤醒 |
获取锁_核心方法:
- acquire:自旋获取锁的方式
- tryAcquire:会尝试再次通过 CAS 获取一次锁。
- addWaiter:① 将当前线程封装为Node节点,② 加入上面锁的双向链表(等待队列)中(enq)
- 如果为尾节点不为空,需要将新节点添加到 oldTail 的 next 节点,同时将新节点的 prev 节点指向 oldTail;
- 如果当前队列为空,需要进行初始化,此时 head 结点和 tail 节点都是 h = new Node () 实例;此时 oldTail = h 不为空,node 的 prev 为 oldTail, oldTail 的 next 是 node。(必成功,因为自旋)
- acquireQueued:通过自旋,判断当前队列前置节点是否可以获取锁
- 当前节点就是队列第一个(head.next),有抢锁的资格tryAcquire一次,返回interrupted = false,退出自旋;
- 如果 node 是第一个加入等待队列的,此时 node 的 prev 节点是 head ( new Node() ),node 会先去获取锁,失败后,因为 prev 的 waitStatus = 0,这时候将其 waitStatus 设置为 -1,然后再次循环,再获取锁失败就会调用 parkAndCheckInterrupt 阻塞当前线程;
- shouldParkAfterFailedAcquire 过程中会将队列中处于 CANCELLED = 1 的节点删除。也就是说每添加一个节点,获取锁失败后,都可能会对队列做一遍整理;判断当前节点是否需要park
tryAcquire
尝试拿锁,只有两种情况能够成功:
1)无锁
2)自己人(涉及到锁的重入)
protected final boolean tryAcquire(int acquires) { // 判断锁状态 和 线程能否重入 final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //因为fairSync是公平锁,任何时候都需要检查一下 队列中是否在当前线程之前有等待者.. if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
addWaiter
addWaiter:① 将当前线程封装为Node节点,② 加入上面锁的双向链表(等待队列)中(enq)
- 如果为尾节点不为空,需要将新节点添加到 oldTail 的 next 节点,同时将新节点的 prev 节点指向 oldTail;
- 如果当前队列为空,需要进行初始化,此时 head 结点和 tail 节点都是 h = new Node () 实例;此时 oldTail = h 不为空,node 的 prev 为 oldTail, oldTail 的 next 是 node。(必成功,因为自旋)
acquireQueued
获取锁(tryAcquire)->构造节点(addWaiter)->加入队列(addWaiter)->自旋获取锁(acquireQueued)
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; //当前线程是否被中断 = 标记等待过程中是否中断过 boolean interrupted = false; try { //自旋..前置节点..Node 要么获取锁,要么中断 for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; //当前线程 获取锁 过程中..没有异常 return interrupted; // 这里才是出口,唯一的出口 } // 挂起当前线程,并且唤醒之后 返回 当前线程的 中断标记 // 唤醒:1.正常唤醒 其它线程 unpark 2.其它线程给当前挂起的线程 一个中断信号.. if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //interrupted == true 表示当前node对应的线程是被 【中断信号唤醒的...】 interrupted = true; } } finally { //true 表示当前线程抢占锁成功,普通情况下【lock】 当前线程早晚会拿到锁.. //false 表示失败,需要执行出队的逻辑... (响应中断的lock方法时。) if (failed) cancelAcquire(node); } }
shouldParkAfterFailedAcquire
1、pred(waitState)为0,当前的结点不需要阻塞。;
2、pred(waitState)为1,跳过;
3、pred(waitState)为-1,park当前节点。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 1、前方节点是处于阻塞,挂起当前线程(申锁节点进行阻塞) if (ws == Node.SIGNAL) return true; // 2、队列包含了被解约的节点 if (ws > 0) { do { // 跳过所有的取消的Node == 将所有的CANCELLED节点出队 node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; // 3、初始化的Node,默认状态为0。 } else { // 将前置节点强制设置为 SIGNAl,表示前置节点释放锁之后需要 喊醒我.. compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
取消任务
public void unlock() { sync.release(1); }
这里主要就是对资源的释放,更新同步状态,然后唤醒下一个等待线程。
release
//AQS#release方法 //ReentrantLock.unlock() -> sync.release()【AQS提供的release】 public final boolean release(int arg) { //尝试释放锁,tryRelease 返回true 表示当前线程已经完全释放锁 //返回false,说明当前线程尚未完全释放锁.. if (tryRelease(arg)) { //head什么情况下会被创建出来? //当持锁线程未释放线程时,且持锁期间 有其它线程想要获取锁时,其它线程发现获取不了锁,而且队列是空队列,此时后续线程会为当前持锁中的 //线程 构建出来一个head节点,然后后续线程 会追加到 head 节点后面。 Node h = head; //条件一:成立,说明队列中的head节点已经初始化过了,ReentrantLock 在使用期间 发生过 多线程竞争了... //条件二:条件成立,说明当前head后面一定插入过node节点。 if (h != null && h.waitStatus != 0) //唤醒后继节点.. unparkSuccessor(h); return true; } return false; }
tryRelease
//Sync#tryRelease() protected final boolean tryRelease(int releases) { //减去释放的值.. int c = getState() - releases; //条件成立:说明当前线程并未持锁..直接异常.,. if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); //当前线程持有锁.. //是否已经完全释放锁..默认false boolean free = false; //条件成立:说明当前线程已经达到完全释放锁的条件。 c == 0 if (c == 0) { free = true; setExclusiveOwnerThread(null); } //更新AQS.state值 setState(c); return free; }
unparkSuccessor
1)如果其下一个节点为空,或者其等待状态是取消状态,那么就从后往前找,找到一个等待状态 <=0 的,然后将其唤醒;
2)如果下一个节点不为空,且等待状态 <=0,将其唤醒。
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; //条件一: //s 什么时候等于null? //1.当前节点就是tail节点时 s == null。 //2.当新节点入队未完成时(1.设置新节点的prev 指向pred 2.cas设置新节点为tail 3.(未完成)pred.next -> 新节点 ) //需要找到可以被唤醒的节点.. //条件二:s.waitStatus > 0 前提:s != null //成立:说明 当前node节点的后继节点是 取消状态... 需要找一个合适的可以被唤醒的节点.. if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //如果找到合适的可以被唤醒的node,则唤醒.. 找不到 啥也不做。 if (s != null) LockSupport.unpark(s.thread); }
cancelAcquire
/** * 当前取消排队的node所在 队列的位置不同,执行的出队策略是不一样的,一共分为三种情况: * 1.当前node是队尾 tail -> node * 2.当前node 不是 head.next 节点,也不是 tail * 3.当前node 是 head.next节点。 */ private void cancelAcquire(Node node) { if (node == null) return; node.thread = null; Node pred = node.prev; //滤除所有的无效节点pred.waitStatus > 0 即:1 cancelled while (pred.waitStatus > 0) node.prev = pred = pred.prev; Node predNext = pred.next; node.waitStatus = Node.CANCELLED; //条件一:node == tail 成立:当前node是队尾 tail -> node //条件二:compareAndSetTail(node, pred) 成功的话,说明修改tail完成。 if (node == tail && compareAndSetTail(node, pred)) { //修改pred.next -> null. 完成node出队。 compareAndSetNext(pred, predNext, null); } else { int ws; //条件一:pred != head 成立, 说明当前node 不是 head.next 节点,也不是 tail【中间的节点】 if (pred != head && //条件2.1:成立:说明node的前驱状态是 Signal 状态 不成立:前驱状态可能是0, ((ws = pred.waitStatus) == Node.SIGNAL || // 假设前驱状态是 <= 0 则设置前驱状态为 Signal状态..表示要唤醒后继节点。 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&pred.thread != null) { Node next = node.next; //让pred.next -> node.next ,所以需要保证pred节点状态为 Signal状态。 if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { //head.next节点,直接unpark后继(相当于忽略现在的节点) unparkSuccessor(node); } node.next = node; // help GC,修改了 node 的next 指针,但是保证了 prev 指针的不变 } }
为什么所有的变化都是对 Next 指针进行了操作,而没有对 Prev 指针进行操作呢?
执行 cancelAcquire 的时候,当前节点的前置节点可能已经从队列中出去了(已经执行过 Try 代码块中的 shouldParkAfterFailedAcquire 方法了),如果此时修改 Prev指针,有可能会导致 Prev 指向另一个已经移除队列的 Node,因此这块变化 Prev 指针不安全。
什么情况下会对 Prev 指针进行操作?
shouldParkAfterFailedAcquire 方法中,会执行下面的代码,其实就是在处理 Prev 指针。shouldParkAfterFailedAcquire 是获取锁失败的情况下才会执行,进入该方法后,说明共享资源已被获取,当前节点之前的节点都不会出现变化,因此这个时候变更 Prev 指针比较安全。
do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0);
参考资料:
《Java并发编程实战》
《小刘源码课程》
https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html
这篇关于【源码解析】ReentrantLock的重入锁分析AQS的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-23增量更新怎么做?-icode9专业技术文章分享
- 2024-11-23压缩包加密方案有哪些?-icode9专业技术文章分享
- 2024-11-23用shell怎么写一个开机时自动同步远程仓库的代码?-icode9专业技术文章分享
- 2024-11-23webman可以同步自己的仓库吗?-icode9专业技术文章分享
- 2024-11-23在 Webman 中怎么判断是否有某命令进程正在运行?-icode9专业技术文章分享
- 2024-11-23如何重置new Swiper?-icode9专业技术文章分享
- 2024-11-23oss直传有什么好处?-icode9专业技术文章分享
- 2024-11-23如何将oss直传封装成一个组件在其他页面调用时都可以使用?-icode9专业技术文章分享
- 2024-11-23怎么使用laravel 11在代码里获取路由列表?-icode9专业技术文章分享
- 2024-11-22怎么实现ansible playbook 备份代码中命名包含时间戳功能?-icode9专业技术文章分享