【双剑合璧】go channel源码剖析,goroutine数据通信,数据结构:环形队列+双向链表,channel设计思想,两个协程之间传数据,配合select使用,关闭channel,有无缓冲区,缓
2022/5/28 1:22:58
本文主要是介绍【双剑合璧】go channel源码剖析,goroutine数据通信,数据结构:环形队列+双向链表,channel设计思想,两个协程之间传数据,配合select使用,关闭channel,有无缓冲区,缓,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
go Channel
简介
channel
是用于goroutine
的数据通信,在Go
中通过goroutine+channel
的方式,可以简单、高效地解决并发问题。
channel设计思想
Go语言的并发模型基于CSP(Communicating Sequential Processes)理论。Go的并发哲学强调:
“Do not communicate by sharing memory; instead, share memory by communicating."
不要通过共享内存来通信,而要通过通信来实现内存共享。
什么是使用共享内存来通信?其实就是多个线程/协程使用同一块内存,通过加锁的方式来宣布使用某块内存,通过解锁来宣布不再使用某块内存。
什么是通过通信来实现共享内存?其实就是把一份内存的开销变成两份内存开销而已,再说的通俗一点就是,我们使用发送消息的方式来同步信息。
为什么鼓励使用通过通信来实现共享内存?使用发送消息来同步信息相比于直接使用共享内存和互斥锁是一种更高级的抽象,使用更高级的抽象能够为我们在程序设计上提供更好的封装,让程序的逻辑更加清晰;其次,消息发送在解耦方面与共享内存相比也有一定优势,我们可以将线程的职责分成生产者和消费者,并通过消息传递的方式将它们解耦,不需要再依赖共享内存。
对于这个理解更深的文章,建议读一下这篇文章:为什么使用通信来共享内存
goroutine和channel是Go语言CSP并发模型的两大核心概念,本文将从源码层面对channel进行深度解析。
版本说明
本文涉及到的源码解析部分来源于go官方的1.16.5版本,其中英文部分的注释也属于源码,中文部分的注释则属于笔者添加。
channel类型
channel
有三种类型的定义,分别是:chan
、chan <-
、<- chan
,可选的<-
代表channel
的方向,如果我们没有指定方向,那么channel
就是双向的,既可以接收数据,也可以发送数据。
chan T // 接收和发送类型为T的数据 chan<- T // 只可以用来发送 T 类型的数据 <- chan T // 只可以用来接收 T 类型的数据
channel使用姿势
1.两个协程之间传数据
func GoroutineOne(ch chan <-string) { fmt.Println("GoroutineOne running") ch <- "asong真帅" fmt.Println("GoroutineOne end of the run") } func GoroutineTwo(ch <- chan string) { fmt.Println("GoroutineTwo running") fmt.Printf("女朋友说:%s\n",<-ch) fmt.Println("GoroutineTwo end of the run") } func main() { ch := make(chan string) go GoroutineOne(ch) go GoroutineTwo(ch) time.Sleep(3 * time.Second) } // 运行结果 // GoroutineOne running // GoroutineTwo running // 女朋友说:asong真帅 // GoroutineTwo end of the run // GoroutineOne end of the run
这里我们运行了两个Goroutine
,在GoroutineOne
中我们向channel
中写入数据,在GoroutineTwo
中我们监听channel
,直到读取到"asong真帅"。我们可以画一个简单的图来表明一下这个顺序:
2.配合select使用
Go
语言中的select
能够让Goroutine
同时等待多个channel
读或者写,在channel
状态未改变之前,select
会一直阻塞当前线程或Goroutine
。先看一个例子:
func fibonacci(ch chan int, done chan struct{}) { x, y := 0, 1 for { select { case ch <- x: x, y = y, x+y case <-done: fmt.Println("over") return } } } func main() { ch := make(chan int) done := make(chan struct{}) go func() { for i := 0; i < 10; i++ { fmt.Println(<-ch) } done <- struct{}{} }() fibonacci(ch, done) }
select
与switch
具有相似的控制结构,与switch
不同的是,select
中的case
中的表达式必须是channel
的收发操作,当select
中的两个case
同时被触发时,会随机执行其中的一个。
为什么是随机执行的呢?随机的引入就是为了避免饥饿问题的发生,如果我们每次都是按照顺序依次执行的,若两个case
一直都是满足条件的,那么后面的case
永远都不会执行。
上面例子中的select
用法是阻塞式的收发操作,直到有一个channel
发生状态改变。我们也可以在select
中使用default
语句,那么select
语句在执行时会遇到这两种情况:
- 当存在可以收发的
Channel
时,直接处理该Channel
对应的case
; - 当不存在可以收发的
Channel
时,执行default
中的语句;
注意:nil channel
上的操作会一直被阻塞,如果没有default case
,只有nil channel
的select
会一直被阻塞。
3.关闭channel
内建的close
方法可以用来关闭channel
。如果channel
已经关闭,不可以继续发送数据了,否则会发生panic
,但是从这个关闭的channel
中不但可以读取出已发送的数据,还可以不断的读取零值。
//achan_test.go package achan import ( "fmt" "testing" ) func TestAMap11(t *testing.T) { ch := make(chan int, 10) ch <- 10 ch <- 20 close(ch) fmt.Println(<-ch) //10 fmt.Println(<-ch) //20 fmt.Println(<-ch) //0 fmt.Println(<-ch) //0 }
源码剖析
数据结构
type hchan struct { qcount uint // 循环数组(环形队列)中的元素数量,buffer 中已放入的元素个数 dataqsiz uint // 循环数组(环形队列)的长度,用户构造 channel 时指定的 buf 大小 buf unsafe.Pointer // 只针对有缓冲的channel,指向底层循环数组的指针,环形队列缓存的具体数据。 elemsize uint16 // 能够接收和发送的元素大小 closed uint32 // channel 是否关闭,== 0 代表未 closed elemtype *_type // channel 中元素的类型信息 sendx uint // 已发送元素在循环数组中的索引 send index recvx uint // 已接收元素在循环数组中的索引 receive index recvq waitq // 等待接收的 goroutine队列, // 当缓冲区已满且仍有send请求,send方将会阻塞,该双向链表存放阻塞sudog。 sendq waitq // 等待发送的 goroutine队列, // 当缓冲区已空且仍有recv请求,recv方将会阻塞,该双向链表存放阻塞sudog。 // sendq、recvq是一个双向链表结构,分别表示被阻塞的goroutine链表, // 这些 goroutine 由于尝试读取 channel 或向 channel 发送数据而被阻塞。 lock mutex //互斥锁(悲观锁),保护hchan中的字段,保证读写channel的操作都是原子的。。 }
channel的结构主要由以下几个部分组成:
- 环形队列缓存:当channel缓冲区未满且仍有send请求的时候,该队列将用于缓存此时收到的消息。
- 双向链表:当channel缓冲区已满且仍有send请求或者已空但仍有recv请求的时候,channel将进入阻塞状态。通过该双向链表存放阻塞的Goroutine信息,以sudog的形式包裹G。
- 锁:go语言的channel基于悲观锁实现。
channel的创建
我们可以通过make
函数创建chan
,分为不带缓冲和带缓冲两种:
package main import "fmt" func main() { ch1 := make(chan struct{}) // 不带缓冲 ch2 := make(chan struct{}, 10) // 带缓冲 fmt.Println(len(ch1), len(ch2)) }
不带缓冲和带缓冲的本质区别其实就在于底层的环形队列缓存。对于带缓冲的情况而言,消息可以被暂时存放进队列缓存中,不会导致发送方的G直接进入阻塞状态。
让我们通过go的官方工具来查看汇编结果:
$ go tool compile -S main.go "".main STEXT size=310 args=0x0 locals=0x80 funcid=0x0 0x0000 00000 (main.go:5) TEXT "".main(SB), ABIInternal, $128-0 0x0000 00000 (main.go:5) MOVQ (TLS), CX 0x0009 00009 (main.go:5) CMPQ SP, 16(CX) 0x000d 00013 (main.go:5) PCDATA $0, $-2 0x000d 00013 (main.go:5) JLS 300 0x0013 00019 (main.go:5) PCDATA $0, $-1 0x0013 00019 (main.go:5) ADDQ $-128, SP 0x0017 00023 (main.go:5) MOVQ BP, 120(SP) 0x001c 00028 (main.go:5) LEAQ 120(SP), BP 0x0021 00033 (main.go:5) FUNCDATA $0, gclocals·7d2d5fca80364273fb07d5820a76fef4(SB) 0x0021 00033 (main.go:5) FUNCDATA $1, gclocals·729b7dbc411005893d6fd941e5c100f6(SB) 0x0021 00033 (main.go:5) FUNCDATA $2, "".main.stkobj(SB) 0x0021 00033 (main.go:6) LEAQ type.chan struct {}(SB), AX 0x0028 00040 (main.go:6) MOVQ AX, (SP) 0x002c 00044 (main.go:6) MOVQ $0, 8(SP) 0x0035 00053 (main.go:6) PCDATA $1, $0 0x0035 00053 (main.go:6) CALL runtime.makechan(SB) # 创建不带缓冲的channel 0x003a 00058 (main.go:6) MOVQ 16(SP), AX 0x003f 00063 (main.go:6) MOVQ AX, "".ch1+72(SP) 0x0044 00068 (main.go:7) LEAQ type.chan struct {}(SB), CX 0x004b 00075 (main.go:7) MOVQ CX, (SP) 0x004f 00079 (main.go:7) MOVQ $10, 8(SP) 0x0058 00088 (main.go:7) PCDATA $1, $1 0x0058 00088 (main.go:7) CALL runtime.makechan(SB) # 创建带缓冲的channel 0x005d 00093 (main.go:7) MOVQ 16(SP), AX 0x0062 00098 (main.go:8) MOVQ "".ch1+72(SP), CX 0x0067 00103 (main.go:8) TESTQ CX, CX 0x006a 00106 (main.go:8) JEQ 293 0x0070 00112 (main.go:8) MOVQ (CX), CX 0x0073 00115 (main.go:8) TESTQ AX, AX 0x0076 00118 (main.go:8) JEQ 281 0x007c 00124 (main.go:8) MOVQ (AX), AX 0x007f 00127 (main.go:8) MOVQ AX, ""..autotmp_26+64(SP) 0x0084 00132 (main.go:8) MOVQ CX, (SP) 0x0088 00136 (main.go:8) PCDATA $1, $0 0x0088 00136 (main.go:8) CALL runtime.convT64(SB) 0x008d 00141 (main.go:8) MOVQ 8(SP), AX 0x0092 00146 (main.go:8) MOVQ AX, ""..autotmp_27+80(SP) 0x0097 00151 (main.go:8) MOVQ ""..autotmp_26+64(SP), CX 0x009c 00156 (main.go:8) MOVQ CX, (SP) 0x00a0 00160 (main.go:8) PCDATA $1, $2 0x00a0 00160 (main.go:8) CALL runtime.convT64(SB) 0x00a5 00165 (main.go:8) MOVQ 8(SP), AX 0x00aa 00170 (main.go:8) XORPS X0, X0 0x00ad 00173 (main.go:8) MOVUPS X0, ""..autotmp_18+88(SP) 0x00b2 00178 (main.go:8) MOVUPS X0, ""..autotmp_18+104(SP) 0x00b7 00183 (main.go:8) LEAQ type.int(SB), CX 0x00be 00190 (main.go:8) MOVQ CX, ""..autotmp_18+88(SP) 0x00c3 00195 (main.go:8) MOVQ ""..autotmp_27+80(SP), DX 0x00c8 00200 (main.go:8) MOVQ DX, ""..autotmp_18+96(SP) 0x00cd 00205 (main.go:8) MOVQ CX, ""..autotmp_18+104(SP) 0x00d2 00210 (main.go:8) MOVQ AX, ""..autotmp_18+112(SP) ... 省略不相关的部分
从汇编结果来看,无论是带缓冲还是不带缓冲,channel的创建底层实现都是基于runtime.makechan
函数,runtime.makechan64
本质也是调用的makechan
方法,只不过多了一个数值溢出的校验。runtime.makechan64
是用于处理缓冲区大于2的32方,所以这两个方法会根据传入的参数类型和缓冲区大小进行选择。大多数情况都是使用makechan
让我们进一步翻阅相关源码:
func makechan(t *chantype, size int) *hchan { elem := t.elem // compiler checks this but be safe. // 算术溢出判断 if elem.size >= 1<<16 { throw("makechan: invalid channel element type") } if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment") } mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) } // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers. // buf points into the same allocation, elemtype is persistent. // SudoG's are referenced from their owning thread so they can't be collected. // TODO(dvyukov,rlh): Rethink when collector can move allocated objects. // 根据不同情况来分配内存: // 1. 不带缓冲:只给hchan分配内存 // 2. 带缓冲且不包括指针类型:给hchan和环形队列缓存分配一段连续的内存空间 // 3. 带缓冲且包括指针类型:给hchan和环形队列缓存分别分配内存空间 var c *hchan switch { case mem == 0: // Queue or element size is zero. c = (*hchan)(mallocgc(hchanSize, nil, true)) // Race detector uses this location for synchronization. c.buf = c.raceaddr() case elem.ptrdata == 0: // Elements do not contain pointers. // Allocate hchan and buf in one call. c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: // Elements contain pointers. c = new(hchan) c.buf = mallocgc(mem, elem, true) } // 更新以下字段的值 c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) lockInit(&c.lock, lockRankHchan) if debugChan { print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n") } return c }
runtime.makechan
为新创建的channel分配内存空间,考虑三种情况:
- 不带缓冲区:只需要给
hchan
分配内存空间(调用一次mallocgc
分配一段连续的内存空间)。 - 带缓冲区且不包括指针类型:同时给
hchan
和环形队列缓存buf
分配一段连续的内存空间。 - 带缓冲区且包括指针类型:分别给
hchan
和环形队列缓存buf
分配不同的内存空间。
因为都是调用mallocgc
方法进行内存分配,所以channel
都是在堆上创建的,会进行垃圾回收,不关闭close
方法也是没有问题的(但是想写出漂亮的代码就不建议你这么做了)。
channel的发送和接收
基本原则
channel的核心作用在于为不同的goroutine传输数据,因此首先要声明强调的是:channel的发送和接收必须位于不同的goroutine内,在相同的goroutine内同时给channel发送和接收数据则会引发死锁问题:
package main func main() { ch := make(chan struct{}) ch <- struct{}{} <-ch }
$ go run main.go fatal error: all goroutines are asleep - deadlock! goroutine 1 [chan send]: main.main() /root/go/src/gogogo/main.go:6 +0x53 exit status 2
三种情况
channel的发送和接收主要考虑三种情况:
- 移动缓冲区
- 阻塞
- 缓冲区复制 && 直接发送
第一种情况:移动缓冲区
当channel缓冲区 未满 的时候,底层环形队列缓存将会保存消息,此时发送方和接收方都从队列缓存中直接操作消息。
- 当channel新增发送方,
hchan
首先会加锁,其次将发送方的消息体存放进sendx
,接着移动sendx
指针,最后释放锁。
channel新增发送方
当channel新增接收方,hchan
首先会加锁,其次将recvx
的消息体取出,接着移动recvx
指针,最后释放锁。
channel新增接收方
第二种情况:阻塞
当channel缓冲区 已满且仍有发送方 或者 已空且仍有接收方 的时候,此时新增的发送方G/接收方G将进入阻塞状态。此时,阻塞的G将被包装进sudog结构体中,并以双向链表的形式保存下来。
- 当channel缓冲区已满且仍有发送方,新增的发送方G将进入阻塞状态,以sudog的形式加入
sendq
中。
channel缓冲区已满,新增发送方
当channel缓冲区已空且仍有接收方,新增的接收方G将进入阻塞状态,以sudog的形式加入recvq
中。
channel缓冲区已空,新增接收方
第三种情况:缓冲区复制 && 直接发送
在上述第二种情况的基础上,当channel 存在阻塞发送方且新增了接收方 的时候,channel会将缓冲区的数据返回给接收方,同时将阻塞在双向链表队头的发送方唤醒,并将其消息复制进缓冲区。
channel发送方阻塞,新增接收方
当channel 存在阻塞接收方且新增了发送方 的时候,channel会唤醒阻塞的接收方,然后直接把数据从发送方复制到接收方,无需再经过缓冲区,即直接发送。
channel接收方阻塞,新增发送方
发送和接收源码分析
接下来,我们通过源码的方式进一步分析channel的发送和接收。
package main func main() { ch := make(chan struct{}) go func() { ch <- struct{}{} }() <-ch }
"".main STEXT size=133 args=0x0 locals=0x28 funcid=0x0 0x0000 00000 (main.go:3) TEXT "".main(SB), ABIInternal, $40-0 0x0000 00000 (main.go:3) MOVQ (TLS), CX 0x0009 00009 (main.go:3) CMPQ SP, 16(CX) 0x000d 00013 (main.go:3) PCDATA $0, $-2 0x000d 00013 (main.go:3) JLS 121 0x000f 00015 (main.go:3) PCDATA $0, $-1 0x000f 00015 (main.go:3) SUBQ $40, SP 0x0013 00019 (main.go:3) MOVQ BP, 32(SP) 0x0018 00024 (main.go:3) LEAQ 32(SP), BP 0x001d 00029 (main.go:3) FUNCDATA $0, gclocals·69c1753bd5f81501d95132d08af04464(SB) 0x001d 00029 (main.go:3) FUNCDATA $1, gclocals·9fb7f0986f647f17cb53dda1484e0f7a(SB) 0x001d 00029 (main.go:4) LEAQ type.chan struct {}(SB), AX 0x0024 00036 (main.go:4) MOVQ AX, (SP) 0x0028 00040 (main.go:4) MOVQ $0, 8(SP) 0x0031 00049 (main.go:4) PCDATA $1, $0 0x0031 00049 (main.go:4) CALL runtime.makechan(SB) 0x0036 00054 (main.go:4) MOVQ 16(SP), AX 0x003b 00059 (main.go:4) MOVQ AX, "".ch+24(SP) 0x0040 00064 (main.go:6) MOVL $8, (SP) 0x0047 00071 (main.go:6) LEAQ "".main.func1·f(SB), CX 0x004e 00078 (main.go:6) MOVQ CX, 8(SP) 0x0053 00083 (main.go:6) PCDATA $1, $1 0x0053 00083 (main.go:6) CALL runtime.newproc(SB) 0x0058 00088 (main.go:10) MOVQ "".ch+24(SP), AX 0x005d 00093 (main.go:10) MOVQ AX, (SP) 0x0061 00097 (main.go:10) MOVQ $0, 8(SP) 0x006a 00106 (main.go:10) PCDATA $1, $0 0x006a 00106 (main.go:10) CALL runtime.chanrecv1(SB) # channel接收 0x006f 00111 (main.go:11) MOVQ 32(SP), BP 0x0074 00116 (main.go:11) ADDQ $40, SP 0x0078 00120 (main.go:11) RET 0x0079 00121 (main.go:11) NOP 0x0079 00121 (main.go:3) PCDATA $1, $-1 0x0079 00121 (main.go:3) PCDATA $0, $-2 0x0079 00121 (main.go:3) CALL runtime.morestack_noctxt(SB) 0x007e 00126 (main.go:3) PCDATA $0, $-1 0x007e 00126 (main.go:3) NOP 0x0080 00128 (main.go:3) JMP 0 ... 省略不相关的部分 "".main.func1 STEXT size=71 args=0x8 locals=0x18 funcid=0x0 0x0000 00000 (main.go:6) TEXT "".main.func1(SB), ABIInternal, $24-8 0x0000 00000 (main.go:6) MOVQ (TLS), CX 0x0009 00009 (main.go:6) CMPQ SP, 16(CX) 0x000d 00013 (main.go:6) PCDATA $0, $-2 0x000d 00013 (main.go:6) JLS 64 0x000f 00015 (main.go:6) PCDATA $0, $-1 0x000f 00015 (main.go:6) SUBQ $24, SP 0x0013 00019 (main.go:6) MOVQ BP, 16(SP) 0x0018 00024 (main.go:6) LEAQ 16(SP), BP 0x001d 00029 (main.go:6) FUNCDATA $0, gclocals·1a65e721a2ccc325b382662e7ffee780(SB) 0x001d 00029 (main.go:6) FUNCDATA $1, gclocals·69c1753bd5f81501d95132d08af04464(SB) 0x001d 00029 (main.go:7) MOVQ "".ch+32(SP), AX 0x0022 00034 (main.go:7) MOVQ AX, (SP) 0x0026 00038 (main.go:7) LEAQ ""..autotmp_1+16(SP), AX 0x002b 00043 (main.go:7) MOVQ AX, 8(SP) 0x0030 00048 (main.go:7) PCDATA $1, $1 0x0030 00048 (main.go:7) CALL runtime.chansend1(SB) # channel发送 0x0035 00053 (main.go:8) MOVQ 16(SP), BP 0x003a 00058 (main.go:8) ADDQ $24, SP 0x003e 00062 (main.go:8) RET 0x003f 00063 (main.go:8) NOP 0x003f 00063 (main.go:6) PCDATA $1, $-1 0x003f 00063 (main.go:6) PCDATA $0, $-2 0x003f 00063 (main.go:6) NOP 0x0040 00064 (main.go:6) CALL runtime.morestack_noctxt(SB) 0x0045 00069 (main.go:6) PCDATA $0, $-1 0x0045 00069 (main.go:6) JMP 0 ... 省略不相关的部分
从汇编结果来看,channel的发送底层实现是runtime.chansend1
函数,channel的接收底层实现是runtime.chanrecv1
函数。我们进一步查看相关源码:
func chansend1(c *hchan, elem unsafe.Pointer) { chansend(c, elem, true, getcallerpc()) } func chanrecv1(c *hchan, elem unsafe.Pointer) { chanrecv(c, elem, true) } func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) { _, received = chanrecv(c, elem, true) return }
值得注意的是,runtime源码中包括2个recv函数:runtime.chanrecv1
和runtime.chanrecv2
。从实现上面可以看出,runtime.chanrecv2
是用于接收方返回两个参数的场景。
发送
channel
发送数据部分的代码经过编译器编译后对应的是runtime.chansend1
,其调用的也是runtime.chansend
方法。
chansend执行步骤:
- 前置检查
- 加锁/异常检查(加锁防止多个协程并发修改数据)
- channel直接发送数据
- channel发送数据缓冲区有可用空间
- channel发送数据缓冲区无可用空间
我们先来解析runtime.chansend
的源码部分(PS:由于源码分析比较复杂,因此主要阐述其中的关键环节):
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ... 省略不相关的部分 // 直接发送:当存在阻塞的接收方,且新增了发送方,那么直接把发送方的消息复制给接收方 if sg := c.recvq.dequeue(); sg != nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any). send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } // 缓冲区:当缓冲区未满,那么通过移动环形队列缓存的指针来存储消息 if c.qcount < c.dataqsiz { // Space is available in the channel buffer. Enqueue the element to send. qp := chanbuf(c, c.sendx) if raceenabled { racenotify(c, c.sendx, nil) } typedmemmove(c.elemtype, qp, ep) c.sendx++ // 移动指针 if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true } if !block { unlock(&c.lock) return false } // Block on the channel. Some receiver will complete our operation for us. // 阻塞:当缓冲区已满,此时当前的发送方需要进入阻塞状态。 gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) // Signal to anyone trying to shrink our stack that we're about // to park on a channel. The window between when this G's status // changes and when we set gp.activeStackChans is not safe for // stack shrinking. atomic.Store8(&gp.parkingOnChan, 1) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) // 将goroutine阻塞 // Ensure the value being sent is kept alive until the // receiver copies it out. The sudog has a pointer to the // stack object, but sudogs aren't considered as roots of the // stack tracer. KeepAlive(ep) ... 省略不相关的部分 }
接收
channel发送步骤:
- 前置检查
- 加锁和提前返回(加锁防止多个协程并发修改数据)
channel
直接接收数据channel
缓冲区有数据channel
缓冲区无数据
最后我们来解析runtime.chanrecv
源码部分:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ... 省略不相关的部分 // 缓冲区复制:当存在阻塞的发送方,会从缓冲区读取消息,并且唤醒发送方将消息放入缓冲区。 // 这里面同时存在一个临界状态,即当缓冲区长度是0的时候,则会将发送方的消息直接复制到接收方 if sg := c.sendq.dequeue(); sg != nil { // Found a waiting sender. If buffer is size 0, receive value // directly from sender. Otherwise, receive from head of queue // and add sender's value to the tail of the queue (both map to // the same buffer slot because the queue is full). recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } // 缓冲区:当缓冲区未空的时候,那么通过移动环形队列缓存的指针来发送消息 if c.qcount > 0 { // Receive directly from queue qp := chanbuf(c, c.recvx) if raceenabled { racenotify(c, c.recvx, nil) } if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ // 移动指针 if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true } if !block { unlock(&c.lock) return false, false } // no sender available: block on this channel. // 阻塞:当缓冲区已空,那么需要阻塞当前的接收方 gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) // Signal to anyone trying to shrink our stack that we're about // to park on a channel. The window between when this G's status // changes and when we set gp.activeStackChans is not safe for // stack shrinking. atomic.Store8(&gp.parkingOnChan, 1) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) // 将goroutine阻塞 ... 省略不相关的部分 }
对照一下runtime.recv
源码实现:
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if c.dataqsiz == 0 { // 缓冲区为0的情况,直接复制 if raceenabled { racesync(c, sg) } if ep != nil { // copy data from sender recvDirect(c.elemtype, sg, ep) } } else { // 缓冲区不为0的情况,需要从缓冲区去发送消息,并且把阻塞队列队头的发送方唤醒 // Queue is full. Take the item at the // head of the queue. Make the sender enqueue // its item at the tail of the queue. Since the // queue is full, those are both the same slot. qp := chanbuf(c, c.recvx) if raceenabled { racenotify(c, c.recvx, nil) racenotify(c, c.recvx, sg) } // copy data from queue to receiver if ep != nil { typedmemmove(c.elemtype, ep, qp) } // copy data from sender to queue typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } goready(gp, skip+1) }
channel的关闭
channel的关闭并不复杂。但由于channel本身是悲观锁实现,因此channel在关闭的时候,为了减少悲观锁的占用时间,channel会创建一个glist
队列,将当前仍然阻塞的sendq
和recvq
上面的sudog加入glist
,然后快速释放掉锁。紧接着,channel会将glist
上面阻塞的goroutine以此唤醒。
我们进一步分析channel的源码实现:
package main func main() { ch := make(chan struct{}) go func() { ch <- struct{}{} close(ch) }() <-ch }
$ go tool compile -S main.go "".main.func1 STEXT size=86 args=0x8 locals=0x18 funcid=0x0 0x0000 00000 (main.go:6) TEXT "".main.func1(SB), ABIInternal, $24-8 0x0000 00000 (main.go:6) MOVQ (TLS), CX 0x0009 00009 (main.go:6) CMPQ SP, 16(CX) 0x000d 00013 (main.go:6) PCDATA $0, $-2 0x000d 00013 (main.go:6) JLS 79 0x000f 00015 (main.go:6) PCDATA $0, $-1 0x000f 00015 (main.go:6) SUBQ $24, SP 0x0013 00019 (main.go:6) MOVQ BP, 16(SP) 0x0018 00024 (main.go:6) LEAQ 16(SP), BP 0x001d 00029 (main.go:6) FUNCDATA $0, gclocals·1a65e721a2ccc325b382662e7ffee780(SB) 0x001d 00029 (main.go:6) FUNCDATA $1, gclocals·69c1753bd5f81501d95132d08af04464(SB) 0x001d 00029 (main.go:7) MOVQ "".ch+32(SP), AX 0x0022 00034 (main.go:7) MOVQ AX, (SP) 0x0026 00038 (main.go:7) LEAQ ""..autotmp_1+16(SP), CX 0x002b 00043 (main.go:7) MOVQ CX, 8(SP) 0x0030 00048 (main.go:7) PCDATA $1, $0 0x0030 00048 (main.go:7) CALL runtime.chansend1(SB) 0x0035 00053 (main.go:9) MOVQ "".ch+32(SP), AX 0x003a 00058 (main.go:9) MOVQ AX, (SP) 0x003e 00062 (main.go:9) PCDATA $1, $1 0x003e 00062 (main.go:9) NOP 0x0040 00064 (main.go:9) CALL runtime.closechan(SB) # 关闭channel 0x0045 00069 (main.go:10) MOVQ 16(SP), BP 0x004a 00074 (main.go:10) ADDQ $24, SP 0x004e 00078 (main.go:10) RET 0x004f 00079 (main.go:10) NOP 0x004f 00079 (main.go:6) PCDATA $1, $-1 0x004f 00079 (main.go:6) PCDATA $0, $-2 0x004f 00079 (main.go:6) CALL runtime.morestack_noctxt(SB) 0x0054 00084 (main.go:6) PCDATA $0, $-1 0x0054 00084 (main.go:6) JMP 0 ... 省略不相关的部分
从汇编的结果可以看出,channel的关闭底层实现是runtime.closechan
,让我们进一步查看相关源码:
func closechan(c *hchan) { if c == nil { panic(plainError("close of nil channel")) } lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("close of closed channel")) } if raceenabled { callerpc := getcallerpc() racewritepc(c.raceaddr(), callerpc, funcPC(closechan)) racerelease(c.raceaddr()) } c.closed = 1 var glist gList // 新建glist,是一个队列结构 // release all readers // 把所有仍然阻塞的接收方加入glist for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } // release all writers (they will panic) // 把所有仍然阻塞的发送方加入glist for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } unlock(&c.lock) // Ready all Gs now that we've dropped the channel lock. // 将阻塞状态的goroutine唤醒 for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) // 唤醒goroutine } }
channel的关闭遵循两大原则:
- channel并不一定要关闭。Go语言本身可以通过GC来回收channel的内存空间,因此channel不关闭的情况也经常存在。但当不关闭channel引发goroutine无法正常退出的时候,我们需要考虑尽可能地关闭channel以解决goroutine泄露问题。
- 不要在接收方关闭channel,而要在发送方关闭channel,并发存在多个发送方的时候除外。
无锁channel
锁是一种常见的并发控制技术,我们通常将锁的类型分为乐观锁和悲观锁。而所谓的无锁channel更准确的说是基于乐观锁的实现,也即基于CAS(Compare And Swap)实现。
Go社区曾在2014年提出无锁channel的提案1,但是该提案虽然很早就提出,但至今仍然没有被接受。主要有如下原因:
- Go官方希望channel能够符合FIFO的顺序被唤醒,保证数据公平性,这是未使用无锁channel的最主要原因。
- 无锁channel并不是无等待算法,是否能够有效提高channel在大规模应用的性能并没有得到验证。一个社区的实现2甚至比基于futex的channel还要慢。
- 无锁channel的可维护性非常差。
总结
-
channel是用于goroutine的数据通信,设计思想CSP,通过通信来实现内存共享。
-
channel是安全的,加互斥锁(悲观锁),channel类型只读,只写,可读可写。
-
channel使用两个协程之间传递数据,配合select使用,close(chn)关闭。
-
channel select与switch相似,case是channel收发,随机执行顺序,避免饥饿问题,后面case无法执行。
-
channel数据结构:环形队列缓存+双向链表+互斥锁。
-
双向链表:当channel缓冲区已满且仍有send请求或者已空但仍有recv请求的时候,channel将进入阻塞状态。通过该双向链表存放阻塞的Goroutine信息,以sudog的形式包裹G。
-
channel创建:runtime.makechan,调用mallocgc方法进行内存分配,所以channel都是在堆上创建的,会进行垃圾回收;
不带缓冲区给hchan分配一段连续的内存;
带缓冲区且不包括指针类型:同时给hchan和环形队列缓存buf分配一段连续的内存空间;
带缓冲区且包括指针类型:分别给hchan和环形队列缓存buf分配不同的内存空间。
-
channel发送:runtime.chansend,
chansend执行步骤:
1.前置检查
2.加锁和异常检查
3.channel直接发送数据
4.channel发送数据缓冲区有可用空间
5.channel发送数据缓冲区无可用空间
直接发送数据是指 如果已经有阻塞的接收goroutines(即recvq中指向非空),那么数据将被直接发送给接收goroutine。
缓冲区有可用空间 调用chanbuf方法获取底层缓冲数组中sendx索引的元素指针值,调用typedmemmove方法将发送的值拷贝到缓冲区中
缓冲区无可用空间 有两种方式可以选择,一种是直接返回,另外一种是阻塞等待;阻塞的G将被包装进sudog结构体中,并以双向链表的形式保存下来。
-
channel接收:runtime.chansend,
chanrecv执行步骤:
1.前置检查
2.加锁和提前返回
3.channel直接接收数据
4.channel缓冲区有数据
5.channel缓冲区无数据
直接接收数据,当发现channel上有正在阻塞等待的发送方时,则直接进行接收。
缓冲区有数据,直接从循环队列中找到要接收的元素,接收数据完毕,释放锁。
缓冲区无数据,直接返回或阻塞接收;阻塞接受逻辑:获取当前的goroutine,然后构建sudog结构保存待接收数据的地址信息和goroutine信息,并将sudog加入等待接收队列,最后挂起当前goroutine,等待唤醒。
-
channel关闭:runtime.closechan,一个为nil的channel不允许进行关闭,不可重复关闭channel;
由于channel本身是悲观锁实现,因此channel在关闭的时候,为了减少悲观锁的占用时间,channel会创建一个glist队列,将当前仍然阻塞的sendq和recvq上面的sudog加入glist,然后快速释放掉锁。紧接着,channel会将glist上面阻塞的goroutine以此唤醒。
参考
https://www.kevinwu0904.top/blogs/golang-channel/#channel的发送和接收
https://asong.cloud/源码剖析channel设计与实现/#channel发送数据缓冲区有可用空间
https://www.cyhone.com/articles/analysis-of-golang-channel/
https://icebergu.com/archives/go-channel#aQYnzKRM
https://cloud.tencent.com/developer/article/1717414
http://legendtkl.com/2017/08/06/golang-channel-implement/
https://zhuanlan.zhihu.com/p/297053654
这篇关于【双剑合璧】go channel源码剖析,goroutine数据通信,数据结构:环形队列+双向链表,channel设计思想,两个协程之间传数据,配合select使用,关闭channel,有无缓冲区,缓的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-20MongoDB教程:从入门到实践详解
- 2024-11-17执行 Google Ads API 查询后返回的是空数组什么原因?-icode9专业技术文章分享
- 2024-11-17google广告数据不同经理账户下的凭证可以获取对方的api数据吗?-icode9专业技术文章分享
- 2024-11-15SendGrid 的 Go 客户端库怎么实现同时向多个邮箱发送邮件?-icode9专业技术文章分享
- 2024-11-15SendGrid 的 Go 客户端库怎么设置header 和 标签tag 呢?-icode9专业技术文章分享
- 2024-11-12Cargo deny安装指路
- 2024-11-02MongoDB项目实战:从入门到初级应用
- 2024-11-01随时随地一键转录,Google Cloud 新模型 Chirp 2 让语音识别更上一层楼
- 2024-10-25Google Cloud动手实验详解:如何在Cloud Run上开发无服务器应用
- 2024-10-24AI ?先驱齐聚 BAAI 2024,发布大规模语言、多模态、具身、生物计算以及 FlagOpen 2.0 等 AI 模型创新成果。