Object 的Wait Notify NotifyAll 源码解析
2022/2/27 17:52:33
本文主要是介绍Object 的Wait Notify NotifyAll 源码解析,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
Java 中Object 类中Wait Notify NotifyAll 源码如下:
/** * 线程等待 * @param var1 毫秒 * @param var3 纳秒 */ public final void wait(long var1, int var3) throws InterruptedException { if (var1 < 0L) { throw new IllegalArgumentException("timeout value is negative"); } else if (var3 >= 0 && var3 <= 999999) { //纳秒>0,毫秒直接++ if (var3 > 0) { ++var1; } //调用native方法 this.wait(var1); } else { throw new IllegalArgumentException("nanosecond timeout value out of range"); } } /** * native方法线程等待 */ public final native void wait(long var1) throws InterruptedException; /** * native方法线程单个唤醒 */ public final native void notify(); /** * native方法线程唤醒等待池中所有线程 */ public final native void notifyAll();
解析源码之前的先具备的条件:
对象锁ObjectMonitor拥有等待队列和同步队列两种队列
wait 方法:
线程等待,让出对象锁,加入等待队列,然后进入park,等待其他线程释放锁unpark
synchronized (a) { a.wait(); }
等价于
moniter.enter //获取对象锁 { 1.判断锁是否存在 2.判断中断状态 3.创建node 加入 等待队列 4.moniter.exit(根据不同策略,从同步队列获取头节点线程a,然后执行线程a的event.unpark 唤醒机制) 5.本线程执行event.park 等待其他线程唤醒 6.判断唤醒是不是被中断唤醒的,需不需要抛出异常 } moniter.exit //释放锁,唤醒同步队列下一个对象
- CHECK_OWNER 判断锁是否存在,不存在就抛异常。没有加Synchronize的话,会抛出IllegalMonitorStateException
#define CHECK_OWNER() do { if (THREAD != _owner) { if (THREAD->is_lock_owned((address) _owner)) { _owner = THREAD ; /* Convert from basiclock addr to Thread addr */ _recursions = 0; OwnerIsThread = 1 ; } else { TEVENT (Throw IMSX) ; THROW(vmSymbols::java_lang_IllegalMonitorStateException()); } } } while (false)
- 调用is_interrupted()判断并清除线程中断状态,如果中断状态为true,抛出中断异常并结束
//调用is_interrupted()判断并清除线程中断状态,如果中断状态为true,抛出中断异常并结束 if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) { ... TEVENT (Wait - Throw IEX) ; THROW(vmSymbols::java_lang_InterruptedException()); return ; }
- 利用自旋锁创建一个node 放入队列
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") AddWaiter (&node) Thread::SpinRelease (&_WaitSetLock)
- 退出监视器 exit (Self)
intptr_t save = _recursions; // 记录旧的递归次数 _waiters++; // waiters 自增 _recursions = 0; // 设置 recursion level to be 1 exit (Self) ; // 退出监视器
- 利用parkEvent.park 方法阻塞等待信号提醒
if (millis <= 0) { // 调用park()方法阻塞线程 Self->_ParkEvent->park () ; } else { // 调用park()方法在超时时间内阻塞线程 ret = Self->_ParkEvent->park (millis) ; }
- 判断是否需要中断,被parkEvent.unpark 唤醒判断一下interrupt 发起的,还是notify发起的
if (!WasNotified) { if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) { TEVENT (Wait - throw IEX from epilog) ; THROW(vmSymbols::java_lang_InterruptedException()); } }
wait 本质是调用了ObjectMonitor 的wait 方法
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) { Thread * const Self = THREAD ; assert(Self->is_Java_thread(), "Must be Java thread!"); JavaThread *jt = (JavaThread *)THREAD; DeferredInitialize () ; // Throw IMSX or IEX. CHECK_OWNER(); //调用is_interrupted()判断并清除线程中断状态,如果中断状态为true,抛出中断异常并结束 if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) { //post monitor waited event //注意这是过去式,已经等待完了 if (JvmtiExport::should_post_monitor_waited()) { //注意:这里传递参数'false',这是因为由于线程中断,等待不会超时 JvmtiExport::post_monitor_waited(jt, this, false); } TEVENT (Wait - Throw IEX) ; THROW(vmSymbols::java_lang_InterruptedException()); return ; } TEVENT (Wait) ; assert (Self->_Stalled == 0, "invariant") ; Self->_Stalled = intptr_t(this) ; jt->set_current_waiting_monitor(this); // create a node to be put into the queue // Critically, after we reset() the event but prior to park(), we must check // for a pending interrupt. //创建一个node放入队列 //关键是,在reset()之后,但在park()之前,必须检查是否有挂起的中断 ObjectWaiter node(Self); node.TState = ObjectWaiter::TS_WAIT ; Self->_ParkEvent->reset() ; OrderAccess::fence(); //在本例中等待队列是一个循环的双向链表,但它也可以是一个优先级队列或任何数据结构。 //_WaitSetLock保护着等待队列. //通常,等待队列只能由监视器*except*的所有者访问,但在park()因中断超时而返回的情况下也是可以。 //竞争非常小,所以使用一个自旋锁而不是重量级的阻塞锁。 Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ; AddWaiter (&node) ; Thread::SpinRelease (&_WaitSetLock) ; if ((SyncFlags & 4) == 0) { _Responsible = NULL ; } intptr_t save = _recursions; // 记录旧的递归次数 _waiters++; // waiters 自增 _recursions = 0; // 设置 recursion level to be 1 exit (Self) ; // 退出监视器 guarantee (_owner != Self, "invariant") ; //一旦在上面的exit()调用中删除了ObjectMonitor的所有权, //另一个线程就可以进入ObjectMonitor,执行notify()和exit()对象监视器。 //如果另一个线程的exit()调用选择此线程作为后继者,并且此线程在发布MONITOR_CONTENDED_EXIT时发生unpark()调用, //则我们使用RawMonitors运行事件风险处理,并使用unpark(). //为了避免这个问题,我们重新发布事件,即使未使用原来的unpark(), //这也不会造成任何伤害,因为已经为此监视器选好了继任者。 if (node._notified != 0 && _succ == Self) { node._event->unpark(); } // The thread is on the WaitSet list - now park() it. // On MP systems it's conceivable that a brief spin before we park // could be profitable. // // TODO-FIXME: change the following logic to a loop of the form // while (!timeout && !interrupted && _notified == 0) park() int ret = OS_OK ; int WasNotified = 0 ; { // State transition wrappers OSThread* osthread = Self->osthread(); OSThreadWaitState osts(osthread, true); { ThreadBlockInVM tbivm(jt); // Thread is in thread_blocked state and oop access is unsafe. //线程处于阻塞状态,并且oop访问是不安全的 jt->set_suspend_equivalent(); if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) { // Intentionally empty 空处理 } else if (node._notified == 0) { if (millis <= 0) { // 调用park()方法阻塞线程 Self->_ParkEvent->park () ; } else { // 调用park()方法在超时时间内阻塞线程 ret = Self->_ParkEvent->park (millis) ; } } // were we externally suspended while we were waiting? if (ExitSuspendEquivalent (jt)) { // TODO-FIXME: add -- if succ == Self then succ = null. jt->java_suspend_self(); } } // Exit thread safepoint: transition _thread_blocked -> _thread_in_vm //当线程不在等待队列时,使用双重检查锁定避免获取_WaitSetLock if (node.TState == ObjectWaiter::TS_WAIT) { Thread::SpinAcquire (&_WaitSetLock, "WaitSet - unlink") ; if (node.TState == ObjectWaiter::TS_WAIT) { DequeueSpecificWaiter (&node) ; // unlink from WaitSet assert(node._notified == 0, "invariant"); node.TState = ObjectWaiter::TS_RUN ; } Thread::SpinRelease (&_WaitSetLock) ; } //从这个线程的角度来看,Node's TState是稳定的, //没有其他线程能够异步修改TState guarantee (node.TState != ObjectWaiter::TS_WAIT, "invariant") ; OrderAccess::loadload() ; if (_succ == Self) _succ = NULL ; WasNotified = node._notified ; // Reentry phase -- reacquire the monitor. // re-enter contended(竞争) monitor after object.wait(). // retain OBJECT_WAIT state until re-enter successfully completes // Thread state is thread_in_vm and oop access is again safe, // although the raw address of the object may have changed. // (Don't cache naked oops over safepoints, of course). // post monitor waited event. //注意这是过去式,已经等待完了 if (JvmtiExport::should_post_monitor_waited()) { JvmtiExport::post_monitor_waited(jt, this, ret == OS_TIMEOUT); } OrderAccess::fence() ; assert (Self->_Stalled != 0, "invariant") ; Self->_Stalled = 0 ; assert (_owner != Self, "invariant") ; ObjectWaiter::TStates v = node.TState ; if (v == ObjectWaiter::TS_RUN) { enter (Self) ; } else { guarantee (v == ObjectWaiter::TS_ENTER || v == ObjectWaiter::TS_CXQ, "invariant") ; ReenterI (Self, &node) ; node.wait_reenter_end(this); } // Self has reacquired the lock. // Lifecycle - the node representing Self must not appear on any queues. // Node is about to go out-of-scope, but even if it were immortal(长久的) we wouldn't // want residual(残留的) elements associated with this thread left on any lists. guarantee (node.TState == ObjectWaiter::TS_RUN, "invariant") ; assert (_owner == Self, "invariant") ; assert (_succ != Self , "invariant") ; } // OSThreadWaitState() jt->set_current_waiting_monitor(NULL); guarantee (_recursions == 0, "invariant") ; _recursions = save; // restore the old recursion count _waiters--; // decrement the number of waiters // Verify a few postconditions assert (_owner == Self , "invariant") ; assert (_succ != Self , "invariant") ; assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ; if (SyncFlags & 32) { OrderAccess::fence() ; } //检查是否有通知notify发生 // 从park()方法返回后,判断是否是因为中断返回,再次调用 // thread::is_interrupted(Self, true)判断并清除线程中断状态 // 如果中断状态为true,抛出中断异常并结束。 if (!WasNotified) { // no, it could be timeout or Thread.interrupt() or both // check for interrupt event, otherwise it is timeout if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) { TEVENT (Wait - throw IEX from epilog) ; THROW(vmSymbols::java_lang_InterruptedException()); } } //注意:虚假唤醒将被视为超时;监视器通知优先于线程中断。 }
notify 方法 :从等待队列中获取第一个节点,然后加入同步队列,本身没有释放锁的功能,是Synchroinzed 自己提供的(重要)
synchronized (a) { a.notify(); }
相当于
moniter.enter //获取对象锁 { 1.判断锁是否存在 2.从等待队列中获取第一个节点 3.根据不同的policy策略加入到cxq 或者entryList 同步队列 } moniter.exit //释放锁,唤醒同步队列下一个对象
- CHECK_OWNER 判断锁是否存在,不存在就抛异常。没有加Synchronize的话,会抛出IllegalMonitorStateException
#define CHECK_OWNER() do { if (THREAD != _owner) { if (THREAD->is_lock_owned((address) _owner)) { _owner = THREAD ; /* Convert from basiclock addr to Thread addr */ _recursions = 0; OwnerIsThread = 1 ; } else { TEVENT (Throw IMSX) ; THROW(vmSymbols::java_lang_IllegalMonitorStateException()); } } } while (false)
- 从等待队列的取出第一个节点
ObjectWaiter * iterator = DequeueWaiter() ;
- 根据不同policy,将等待对列的节点加入到同步队列中
if (Policy == 0) { // prepend(预追加) to EntryList if (List == NULL) { iterator->_next = iterator->_prev = NULL ; _EntryList = iterator ; } else { List->_prev = iterator ; iterator->_next = List ; iterator->_prev = NULL ; _EntryList = iterator ; } }......
Notify 本质是调用了ObjectMonitor 的notify 方法
void ObjectMonitor::notify(TRAPS) { CHECK_OWNER(); if (_WaitSet == NULL) { TEVENT (Empty-Notify) ; return ; } DTRACE_MONITOR_PROBE(notify, this, object(), THREAD); int Policy = Knob_MoveNotifyee ; Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ; ObjectWaiter * iterator = DequeueWaiter() ; if (iterator != NULL) { TEVENT (Notify1 - Transfer) ; guarantee (iterator->TState == ObjectWaiter::TS_WAIT, "invariant") ; guarantee (iterator->_notified == 0, "invariant") ; if (Policy != 4) { iterator->TState = ObjectWaiter::TS_ENTER ; } iterator->_notified = 1 ; ObjectWaiter * List = _EntryList ; if (List != NULL) { assert (List->_prev == NULL, "invariant") ; assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ; assert (List != iterator, "invariant") ; } if (Policy == 0) { // prepend(预追加) to EntryList if (List == NULL) { iterator->_next = iterator->_prev = NULL ; _EntryList = iterator ; } else { List->_prev = iterator ; iterator->_next = List ; iterator->_prev = NULL ; _EntryList = iterator ; } } else if (Policy == 1) { // append(真正追加) to EntryList if (List == NULL) { iterator->_next = iterator->_prev = NULL ; _EntryList = iterator ; } else { //考虑:当前获取EntryList的tail需要遍历整个链表 //将tail访问转换为CDLL而不是使用当前的DLL,从而使访问时间固定。 ObjectWaiter * Tail ; for (Tail = List ; Tail->_next != NULL ; Tail = Tail->_next) ; assert (Tail != NULL && Tail->_next == NULL, "invariant") ; Tail->_next = iterator ; iterator->_prev = Tail ; iterator->_next = NULL ; } } else if (Policy == 2) { // prepend to cxq // prepend(预追加) to cxq if (List == NULL) { iterator->_next = iterator->_prev = NULL ; _EntryList = iterator ; } else { iterator->TState = ObjectWaiter::TS_CXQ ; for (;;) { ObjectWaiter * Front = _cxq ; iterator->_next = Front ; if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) { break ; } } } } else if (Policy == 3) { // append(真正追加) to cxq iterator->TState = ObjectWaiter::TS_CXQ ; for (;;) { ObjectWaiter * Tail ; Tail = _cxq ; if (Tail == NULL) { iterator->_next = NULL ; if (Atomic::cmpxchg_ptr (iterator, &_cxq, NULL) == NULL) { break ; } } else { while (Tail->_next != NULL) Tail = Tail->_next ; Tail->_next = iterator ; iterator->_prev = Tail ; iterator->_next = NULL ; break ; } } } else { ParkEvent * ev = iterator->_event ; iterator->TState = ObjectWaiter::TS_RUN ; OrderAccess::fence() ; ev->unpark() ; } if (Policy < 4) { iterator->wait_reenter_begin(this); } // _WaitSetLock protects the wait queue, not the EntryList. We could // move the add-to-EntryList operation, above, outside the critical section // protected by _WaitSetLock. In practice that's not useful. With the // exception of wait() timeouts and interrupts the monitor owner // is the only thread that grabs _WaitSetLock. There's almost no contention // on _WaitSetLock so it's not profitable to reduce the length of the // critical section. } Thread::SpinRelease (&_WaitSetLock) ; if (iterator != NULL && ObjectMonitor::_sync_Notifications != NULL) { ObjectMonitor::_sync_Notifications->inc() ; } }
notifyAll方法 :跟Notify 方法类似,只是利用for循环 将等待队列的全部节点,加入到同步队列中,本身没有释放锁的功能,是Synchroinzed 自己提供的
void ObjectMonitor::notifyAll(TRAPS) { CHECK_OWNER(); ObjectWaiter* iterator; if (_WaitSet == NULL) { TEVENT (Empty-NotifyAll) ; return ; } DTRACE_MONITOR_PROBE(notifyAll, this, object(), THREAD); int Policy = Knob_MoveNotifyee ; int Tally = 0 ; Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notifyall") ; for (;;) { iterator = DequeueWaiter () ; if (iterator == NULL) break ; TEVENT (NotifyAll - Transfer1) ; ++Tally ; // Disposition - what might we do with iterator ? // a. add it directly to the EntryList - either tail or head. // b. push it onto the front of the _cxq. // For now we use (a). guarantee (iterator->TState == ObjectWaiter::TS_WAIT, "invariant") ; guarantee (iterator->_notified == 0, "invariant") ; iterator->_notified = 1 ; if (Policy != 4) { iterator->TState = ObjectWaiter::TS_ENTER ; } ObjectWaiter * List = _EntryList ; if (List != NULL) { assert (List->_prev == NULL, "invariant") ; assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ; assert (List != iterator, "invariant") ; } if (Policy == 0) { // prepend to EntryList if (List == NULL) { iterator->_next = iterator->_prev = NULL ; _EntryList = iterator ; } else { List->_prev = iterator ; iterator->_next = List ; iterator->_prev = NULL ; _EntryList = iterator ; } } else if (Policy == 1) { // append to EntryList if (List == NULL) { iterator->_next = iterator->_prev = NULL ; _EntryList = iterator ; } else { // CONSIDER: finding the tail currently requires a linear-time walk of // the EntryList. We can make tail access constant-time by converting to // a CDLL instead of using our current DLL. ObjectWaiter * Tail ; for (Tail = List ; Tail->_next != NULL ; Tail = Tail->_next) ; assert (Tail != NULL && Tail->_next == NULL, "invariant") ; Tail->_next = iterator ; iterator->_prev = Tail ; iterator->_next = NULL ; } } else if (Policy == 2) { // prepend to cxq // prepend to cxq iterator->TState = ObjectWaiter::TS_CXQ ; for (;;) { ObjectWaiter * Front = _cxq ; iterator->_next = Front ; if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) { break ; } } } else if (Policy == 3) { // append to cxq iterator->TState = ObjectWaiter::TS_CXQ ; for (;;) { ObjectWaiter * Tail ; Tail = _cxq ; if (Tail == NULL) { iterator->_next = NULL ; if (Atomic::cmpxchg_ptr (iterator, &_cxq, NULL) == NULL) { break ; } } else { while (Tail->_next != NULL) Tail = Tail->_next ; Tail->_next = iterator ; iterator->_prev = Tail ; iterator->_next = NULL ; break ; } } } else { ParkEvent * ev = iterator->_event ; iterator->TState = ObjectWaiter::TS_RUN ; OrderAccess::fence() ; ev->unpark() ; } if (Policy < 4) { iterator->wait_reenter_begin(this); } // _WaitSetLock protects the wait queue, not the EntryList. We could // move the add-to-EntryList operation, above, outside the critical section // protected by _WaitSetLock. In practice that's not useful. With the // exception of wait() timeouts and interrupts the monitor owner // is the only thread that grabs _WaitSetLock. There's almost no contention // on _WaitSetLock so it's not profitable to reduce the length of the // critical section. } Thread::SpinRelease (&_WaitSetLock) ; if (Tally != 0 && ObjectMonitor::_sync_Notifications != NULL) { ObjectMonitor::_sync_Notifications->inc(Tally) ; } }
问题1:wait 在前面环节存在ParkEvent.park 阻塞等待唤醒,但是notify 本质只是将等待队列中的节点加入到了同步队列节点了,但是同步队列中有很多的节点,谁会拿出来用,在哪里调用了ParkEvent.unpark 唤醒线程继续往下走呢?
问题2:wait 方法只是退出对象锁。它是怎么将对象锁让给其他线程的,因为这个对象锁的转移只发生在wait 和notify 这个两个线程里面,没有第三者进行协调的,对象锁是怎么流转的。
其实本质都是一个问题:对象锁是怎么转让的?
关键点:wait 方法本身调用了一次ObjectMonitor.exit 方法,Synchronized 关键字本身也有一次ObjectMonitor.exit 方法。
void ATTR ObjectMonitor::exit(TRAPS) { ...... //根据QMode 策略从同步队列 取出节点 if (QMode == 2 && _cxq != NULL) { // QMode == 2 : cxq has precedence over EntryList. // Try to directly wake a successor from the cxq. // If successful, the successor will need to unlink itself from cxq. w = _cxq ; assert (w != NULL, "invariant") ; assert (w->TState == ObjectWaiter::TS_CXQ, "Invariant") ; ExitEpilog (Self, w) ; return ; } if (QMode == 3 && _cxq != NULL) { // Aggressively drain cxq into EntryList at the first opportunity. // This policy ensure that recently-run threads live at the head of EntryList. // Drain _cxq into EntryList - bulk transfer. // First, detach _cxq. // The following loop is tantamount to: w = swap (&cxq, NULL) w = _cxq ; for (;;) { assert (w != NULL, "Invariant") ; ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ; if (u == w) break ; w = u ; } assert (w != NULL , "invariant") ; ObjectWaiter * q = NULL ; ObjectWaiter * p ; for (p = w ; p != NULL ; p = p->_next) { guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ; p->TState = ObjectWaiter::TS_ENTER ; p->_prev = q ; q = p ; } // Append the RATs to the EntryList // TODO: organize EntryList as a CDLL so we can locate the tail in constant-time. ObjectWaiter * Tail ; for (Tail = _EntryList ; Tail != NULL && Tail->_next != NULL ; Tail = Tail->_next) ; if (Tail == NULL) { _EntryList = w ; } else { Tail->_next = w ; w->_prev = Tail ; } // Fall thru into code that tries to wake a successor from EntryList } ...... w = _EntryList ; if (w != NULL) { guarantee (w->TState == ObjectWaiter::TS_ENTER, "invariant") ; ExitEpilog (Self, w) ; return ; } } }
重点是拿到对应的节点执行了ExitEpilog 方法,唤醒这个正在wait 的节点
void ObjectMonitor::ExitEpilog (Thread * Self, ObjectWaiter * Wakee) { { assert (_owner == Self, "invariant") ; ParkEvent * Trigger = Wakee->_event ; .... //这里对应wait 方法使用的ParkEvent.park Trigger->unpark() ; //unpark唤醒wait线程 ..... if (ObjectMonitor::_sync_Parks != NULL) { ObjectMonitor::_sync_Parks->inc() ; } }
wait 方法过程
Notify 方法过程
这篇关于Object 的Wait Notify NotifyAll 源码解析的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2025-01-10Rakuten 乐天积分系统从 Cassandra 到 TiDB 的选型与实战
- 2025-01-09CMS内容管理系统是什么?如何选择适合你的平台?
- 2025-01-08CCPM如何缩短项目周期并降低风险?
- 2025-01-08Omnivore 替代品 Readeck 安装与使用教程
- 2025-01-07Cursor 收费太贵?3分钟教你接入超低价 DeepSeek-V3,代码质量逼近 Claude 3.5
- 2025-01-06PingCAP 连续两年入选 Gartner 云数据库管理系统魔力象限“荣誉提及”
- 2025-01-05Easysearch 可搜索快照功能,看这篇就够了
- 2025-01-04BOT+EPC模式在基础设施项目中的应用与优势
- 2025-01-03用LangChain构建会检索和搜索的智能聊天机器人指南
- 2025-01-03图像文字理解,OCR、大模型还是多模态模型?PalliGema2在QLoRA技术上的微调与应用