007-Golang1.17源码分析之mutex
2022/2/21 20:57:45
本文主要是介绍007-Golang1.17源码分析之mutex,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
Golang1.17源码分析之mutex-007
Golang1.17 学习笔记007
源代码:sync/mutex.go
数据结构:
const ( // 锁标识位(state的最后一位) // Mutex.state & mutexLocked==1表示已经上锁;Mutex.state & mutexLocked==0表示已经未锁 mutexLocked = 1 << iota // mutex is locked // 是否存在运行中的协程(state的倒数第二位) // Mutex.state & mutexWoken==1表示存在运行中的协程;Mutex.state & mutexWoken==0表示不存在运行中的协程 mutexWoken // 饥饿状态位(state的倒数第三位) // Mutex.state & mutexStarving==1表示饥饿;Mutex.state & mutexStarving==0表示不饥饿 mutexStarving // 等待者数量偏移量(值为3) // 根据 mutex.state >> mutexWaiterShift 得到当前阻塞的goroutine数目,最多可以阻塞2^29个goroutine mutexWaiterShift = iota //值为1e6纳秒,也就是1毫秒,当等待队列中队首goroutine等待时间超过starvationThresholdNs也就是1毫秒,mutex进入饥饿模式 starvationThresholdNs = 1e6 ) type Mutex struct { //锁有两种模式:正常模式和饥饿模式 // 如果一个goroutine获取到了锁之后,它会判断以下两种情况: // 1. 它是队列中最后一个goroutine; // 2. 它拿到锁所花的时间小于1ms; // 以上只要有一个成立,它就会把锁转变回正常模式。 state int32 //表示互斥锁的状态,比如是否被锁定等。 sema uint32 //表示信号量,协程阻塞等待该信号量,解锁的协程释放信号量从而唤醒等待信号量的协程 }
加锁:
func (m *Mutex) Lock() { // 如果 m.state == 0 那么 m.state = 1 && return true // 如果当前没有被锁,那么锁上并返回true if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return } // 已经被其他协程持有,进入自旋阶段 m.lockSlow() } func (m *Mutex) lockSlow() { // 用来存当前goroutine等待的时间 var waitStartTime int64 // 用来存当前goroutine是否饥饿 starving := false // 用来存当前goroutine是否已唤醒 awoke := false // 用来存当前goroutine的循环次数(想一想一个goroutine如果循环了2147483648次咋办……) iter := 0 // 复制一下当前锁的状态 old := m.state // 自旋 for { // 如果是饥饿情况之下,就不要自旋了,因为锁会直接交给队列头部的goroutine // 如果锁是被获取状态,并且满足自旋条件(canSpin见后文分析),那么就自旋等锁 // 伪代码:if isLocked() and isNotStarving() and canSpin() if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // 自旋的过程中如果发现state还没有设置woken标识,则设置它的woken标识,并标记自己为被唤醒 // 这样当Unlock的时候就不会去唤醒其它被阻塞的goroutine了 if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } // 进行自旋(分析见后文) runtime_doSpin() iter++ // 更新锁的状态(有可能在自旋的这段时间之内锁的状态已经被其它goroutine改变) old = m.state continue } // 当走到这一步的时候,可能会有以下的情况: // 1. 锁被获取+饥饿 // 2. 锁被获取+正常 // 3. 锁空闲+饥饿 // 4. 锁空闲+正常 // goroutine的状态可能是唤醒以及非唤醒 // 复制一份当前的状态,目的是根据当前状态设置出期望的状态,存在new里面, // 并且通过CAS来比较以及更新锁的状态 // old用来存锁的当前状态 new := old // 如果说锁不是饥饿状态,就把期望状态设置为被获取(获取锁) // 也就是说,如果是饥饿状态,就不要把期望状态设置为被获取 // 新到的goroutine乖乖排队去 // 伪代码:if isNotStarving() if old&mutexStarving == 0 { // 伪代码:newState = locked new |= mutexLocked } // 如果锁是被获取状态,或者饥饿状态 // 就把期望状态中的等待队列的等待者数量+1(实际上是new + 8) // (会不会可能有三亿个goroutine等待拿锁……) if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } // 如果当前goroutine已经处于饥饿状态, 并且old state的已被加锁, // 将new state的状态标记为饥饿状态, 将锁转变为饥饿状态. if starving && old&mutexLocked != 0 { new |= mutexStarving } // 如果说当前goroutine是被唤醒状态,我们需要reset这个状态 // 因为goroutine要么是拿到锁了,要么是进入sleep了 if awoke { // The goroutine has been woken from sleep, // so we need to reset the flag in either case. if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } // 这句就是把new设置为非唤醒状态 // &^的意思是and not new &^= mutexWoken } if atomic.CompareAndSwapInt32(&m.state, old, new) { // 如果说old状态不是饥饿状态也不是被获取状态 // 那么代表当前goroutine已经通过CAS成功获取了锁 // (能进入这个代码块表示状态已改变,也就是说状态是从空闲到被获取) if old&(mutexLocked|mutexStarving) == 0 { break // locked the mutex with CAS } // If we were already waiting before, queue at the front of the queue. queueLifo := waitStartTime != 0 // 如果说之前没有等待过,就初始化设置现在的等待时间 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } // 既然获取锁失败了,就使用sleep原语来阻塞当前goroutine // 通过信号量来排队获取锁 // 如果是新来的goroutine,就放到队列尾部 // 如果是被唤醒的等待锁的goroutine,就放到队列头部 runtime_SemacquireMutex(&m.sema, queueLifo, 1) // 这里sleep完了,被唤醒 // 如果当前goroutine已经是饥饿状态了 // 或者当前goroutine已经等待了1ms(在上面定义常量)以上 // 就把当前goroutine的状态设置为饥饿 starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state // 如果说锁现在是饥饿状态,就代表现在锁是被释放的状态,当前goroutine是被信号量所唤醒的 // 也就是说,锁被直接交给了当前goroutine if old&mutexStarving != 0 { // 如果说当前锁的状态是被唤醒状态或者被获取状态,或者说等待的队列为空 // 那么是不可能的,肯定是出问题了,因为当前状态肯定应该有等待的队列,锁也一定是被释放状态且未唤醒 if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } // 当前goroutine获取锁,waiter数量-1 delta := int32(mutexLocked - 1<<mutexWaiterShift) // 如果当前goroutine非饥饿状态,或者说当前goroutine是队列中最后一个goroutine // 那么就退出饥饿模式,把状态设置为正常 if !starving || old>>mutexWaiterShift == 1 { // 在这里这么做至关重要,还要考虑等待时间。 // 饥饿模式是非常低效率的,一旦两个goroutine将互斥锁切换为饥饿模式,它们便可以无限锁 delta -= mutexStarving } // 原子性地加上改动的状态 atomic.AddInt32(&m.state, delta) break } // 如果锁不是饥饿模式,就把当前的goroutine设为被唤醒 // 并且重置iter(重置spin) awoke = true iter = 0 } else { // 如果CAS不成功,也就是说没能成功获得锁,锁被别的goroutine获得了或者锁一直没被释放 // 那么就更新状态,重新开始循环尝试拿锁 old = m.state } } if race.Enabled { race.Acquire(unsafe.Pointer(m)) } }
判断是否可以自旋:runtime/proc.go: sync_runtime_canSpin
// Active spinning for sync.Mutex. //go:linkname sync_runtime_canSpin sync.runtime_canSpin //go:nosplit func sync_runtime_canSpin(i int) bool { // 这里的active_spin是个常量,值为4 // 简单来说,sync.Mutex是有可能被多个goroutine竞争的,所以不应该大量自旋(消耗CPU) // 自旋的条件如下: // 1. 自旋次数小于active_spin(这里是4)次; // 2. 在多核机器上; // 3. GOMAXPROCS > 1并且至少有一个其它的处于运行状态的P; // 4. 当前P没有其它等待运行的G; // 满足以上四个条件才可以进行自旋。 if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 { return false } if p := getg().m.p.ptr(); !runqempty(p) { return false } return true }
解锁:
func (m *Mutex) Unlock() { if race.Enabled { _ = m.state race.Release(unsafe.Pointer(m)) } // 这里获取到锁的状态,然后将状态减去被获取的状态(也就是解锁),称为new(期望)状态 // 注意以上两个操作是原子的,所以不用担心多个goroutine并发的问题 new := atomic.AddInt32(&m.state, -mutexLocked) if new != 0 { // Outlined slow path to allow inlining the fast path. // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock. m.unlockSlow(new) } } func (m *Mutex) unlockSlow(new int32) { // 如果说,期望状态加上被获取的状态,不是被获取的话 // 那么就panic if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } // 如果说new状态(也就是锁的状态)不是饥饿状态 if new&mutexStarving == 0 { old := new for { // 如果说锁没有等待拿锁的goroutine // 或者锁被获取了(在循环的过程中被其它goroutine获取了) // 或者锁是被唤醒状态(表示有goroutine被唤醒,不需要再去尝试唤醒其它goroutine) // 或者锁是饥饿模式(会直接转交给队列头的goroutine) // 那么就直接返回,啥都不用做了 // 也就是没有等待的goroutine, 或者锁不处于空闲的状态,直接返回. if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } / // 走到这一步的时候,说明锁目前还是空闲状态,并且没有goroutine被唤醒且队列中有goroutine等待拿锁 // 将等待的goroutine数减一,并设置woken标识 new = (old - 1<<mutexWaiterShift) | mutexWoken // 设置新的state, 这里通过信号量会唤醒一个阻塞的goroutine去获取锁. if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false, 1) return } old = m.state } } else { // 饥饿模式下, 直接将锁的拥有权传给等待队列中的第一个. // 注意此时state的mutexLocked还没有加锁,唤醒的goroutine会设置它。 // 在此期间,如果有新的goroutine来请求锁, 因为mutex处于饥饿状态, mutex还是被认为处于锁状态, // 新来的goroutine不会把锁抢过去. runtime_Semrelease(&m.sema, true, 1) } }
mutex 模式
-
normal:协程如果加锁不成功不会立即转入阻塞排队,而是判断是否满足自旋的条件,如果满足则会启动自旋过程,尝试抢锁
-
starvation:自旋过程中能抢到锁,一定意味着同一时刻有协程释放了锁,我们知道释放锁时如果发现有阻塞等待的协程,
还会释放一个信号量来唤醒一个等待协程,被唤醒的协程得到CPU后开始运行,此时发现锁已被抢占了,自己只好再次阻塞,
不过阻塞前会判断自上次阻塞到本次阻塞经过了多长时间,如果超过1ms的话,会将Mutex标记为”饥饿”模式,然后再阻塞。
处于饥饿模式下,不会启动自旋过程,也即一旦有协程释放了锁,那么一定会唤醒协程,被唤醒的协程将会成功获取锁,
同时也会把等待计数减1
Woken状态
Woken状态用于加锁和解锁过程的通信,举个例子,同一时刻,两个协程一个在加锁,一个在解锁,在加锁的协程可能在自旋过程中,
此时把Woken标记为1,用于通知解锁协程不必释放信号量了,好比在说:你只管解锁好了,不必释放信号量,我马上就拿到锁了
参考文献:
《Go专家编程》之mutex
https://www.purewhite.io/2019/03/28/golang-mutex-source/
https://www.cnblogs.com/ricklz/p/14535653.html
这篇关于007-Golang1.17源码分析之mutex的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-12-24MongoDB资料:新手入门完全指南
- 2024-12-20go-zero 框架的 RPC 服务 启动start和停止 底层是怎么实现的?-icode9专业技术文章分享
- 2024-12-19Go-Zero 框架的 RPC 服务启动和停止的基本机制和过程是怎么实现的?-icode9专业技术文章分享
- 2024-12-18怎么在golang中使用gRPC测试mock数据?-icode9专业技术文章分享
- 2024-12-15掌握PageRank算法核心!你离Google优化高手只差一步!
- 2024-12-15GORM 中的标签 gorm:"index"是什么?-icode9专业技术文章分享
- 2024-12-11怎么在 Go 语言中获取 Open vSwitch (OVS) 的桥接信息(Bridge)?-icode9专业技术文章分享
- 2024-12-11怎么用Go 语言的库来与 Open vSwitch 进行交互?-icode9专业技术文章分享
- 2024-12-11怎么在 go-zero 项目中发送阿里云短信?-icode9专业技术文章分享
- 2024-12-11怎么使用阿里云 Go SDK (alibaba-cloud-sdk-go) 发送短信?-icode9专业技术文章分享