Go语言基础(十二):并发编程

2021/12/25 20:08:38

本文主要是介绍Go语言基础(十二):并发编程,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

文章目录

    • 一、前言
    • 二、goroutine
        • 1、使用goroutine
        • 2、启动goroutine示例
        • 3、main优雅谢幕
    • 三、goroutine与线程
        • 1、可增长的栈
        • 2、goroutine调度
        • 3、GOMAXPROCS
    • 四、channel
        • 1、channel类型
        • 2、创建channel
        • 3、channel操作
            • (1)发送
            • (2)接收
            • (3)关闭
        • 4、无缓冲的通道
        • 5、有缓冲的通道
        • 6、for range从通道循环取值
        • 7、单向通道
        • 8、通道总结
    • 五、worker pool(goroutine池)
    • 六、select多路复用

一、前言

Go语言的并发通过goroutine实现。goroutine类似于线程,属于用户态的线程,我们可以根据需要创建成千上万个goroutine并发工作。goroutine是由Go语言的运行时(runtime)调度完成,而线程是由操作系统调度完成。

Go语言还提供channel在多个goroutine间进行通信。goroutine和channel是 Go 语言秉承的 CSP(Communicating Sequential Process)并发模式的重要实现基础。

由于其是用户态线程,没有从用户态到核心态的切换的开销,因此goroutine是非常轻量级的线程。其实goroutine和channel之间的关系,相当于进程与队列之间的关系

Go语言中的goroutine就是这样一种机制,goroutine的概念类似于线程,但 goroutine是由Go的运行时(runtime)调度和管理的。Go程序会智能地将 goroutine 中的任务合理地分配给每个CPU。Go语言之所以被称为现代化的编程语言,就是因为它在语言层面已经内置了调度和上下文切换的机制。

二、goroutine

在Go语言编程中你不需要去自己写进程、线程、协程,你的技能包里只有一个技能–goroutine,当你需要让某个任务并发执行的时候,你只需要把这个任务包装成一个函数,开启一个goroutine去执行这个函数就可以了,就是这么简单粗暴。

1、使用goroutine

Go语言中使用goroutine非常简单,只需要在调用函数的时候在前面加上go关键字,就可以为一个函数创建一个goroutine。

一个goroutine必定对应一个函数,可以创建多个goroutine去执行相同的函数。

一个函数,一般定义成一个要做的任务。

2、启动goroutine示例

func hello() {
	fmt.Println("Hello Goroutine!")
}
func main() {
	hello() //这是我们一般执行代码的逻辑
	fmt.Println("main goroutine done!")
}

接下来我们需要,用goroutine启动一个线程去完成hello任务

func main() {
	go hello() // 加go关键字,启动另外一个goroutine线程去执行hello函数
	fmt.Println("main goroutine done!")
}

如果执行了上述代码你就会发现,执行结果只打印了main goroutine done!,并没有打印Hello Goroutine!。为什么呢?
答: Go开启一个线程去执行hello任务之后,然后直接就print输出main goroutine done!,主进程就结束了,然后所属它的子线程也会被杀死。因为创建线程有时间开销,代码执行速度是非常快的。所以子线程还没来得及打印Hello Goroutine!就被杀死了。

所以我们要想办法让main函数等一等hello函数,最简单粗暴的方式就是time.Sleep了。

func main() {
	go hello() // 启动另外一个goroutine去执行hello函数
	fmt.Println("main goroutine done!")
	time.Sleep(time.Second)
}

执行上面的代码你会发现,这一次先打印main goroutine done!,然后紧接着打印Hello Goroutine!。

3、main优雅谢幕

上面的sleep去等子线程结束之后,主进程再退出,存在可能等多了,可能等少了这种情况,因此需要有一种优雅的方式去结束主进程(因此下面使用了 sync.WaitGroup 实现main优雅谢幕)。

var wg sync.WaitGroup //声明一个变量wg

func hello(i int) {
	defer wg.Done() // goroutine结束,wg-=1
	fmt.Println("Hello Goroutine!", i)
}
func main() {

	for i := 0; i < 10; i++ {
		wg.Add(1) // 启动一个goroutine,wg+=1
		go hello(i)
	}
	wg.Wait() // 当监听到wg为0(即所有goroutine结束了),则执行后面的逻辑
}

除此之外,上面还利用for循环,生成了多个goroutine。值得注意的是:上面的10个goroutine只要有一个线程发生了阻塞,这个主进程都不会退出,这样会浪费大量资源,一定要确保goroutine不会发生阻塞

三、goroutine与线程

1、可增长的栈

OS线程栈内存(操作系统线程)一般都是固定的(通常为2MB),一个goroutine栈内存在其生命周期开始时只有很小的栈(典型情况下2KB),goroutine的栈内存不是固定的,他可以按需增大和缩小,goroutine的栈大小限制可以达到1GB,虽然极少会用到这么大。所以在Go语言中一次创建十万左右的goroutine也是可以的。

2、goroutine调度

GPM是Go语言运行时(runtime)层面的实现,是go语言自己实现的一套调度系统。区别于操作系统调度OS线程。

  • G很好理解,就是个goroutine的,里面除了存放本goroutine信息外 还有与所在P的绑定等信息。
  • P管理着一组goroutine队列,P里面会存储当前goroutine运行的上下文环境(函数指针,堆栈地址及地址边界),P会对自己管理的goroutine队列做一些调度(比如把占用CPU时间较长的goroutine暂停、运行后续的goroutine等等)当自己的队列消费完了就去全局队列里取,如果全局队列里也消费完了会去其他P的队列里抢任务。
  • M(machine)是Go运行时(runtime)对操作系统内核线程的虚拟, M与内核线程一般是一一映射的关系,一个groutine最终是要放到M上执行的;

P与M一般也是一一对应的。他们关系是: P管理着一组G挂载在M上运行。当一个G长久阻塞在一个M上时,runtime会新建一个M,阻塞G所在的P会把其他的G 挂载在新建的M上。当旧的G阻塞完成或者认为其已经死掉时 回收旧的M。

P的个数是通过runtime.GOMAXPROCS设定(最大256),Go1.5版本之后默认为物理线程数。 在并发量大的时候会增加一些P和M,但不会太多,切换太频繁的话得不偿失。

单从线程调度讲,Go语言相比起其他语言的优势在于OS线程是由OS内核来调度的,goroutine则是由Go运行时(runtime)自己的调度器调度的,这个调度器使用一个称为m:n调度的技术(复用/调度m个goroutine到n个OS线程)。 其一大特点是goroutine的调度是在用户态下完成的, 不涉及内核态与用户态之间的频繁切换,包括内存的分配与释放,都是在用户态维护着一块大的内存池不直接调用系统的malloc函数(除非内存池需要改变),成本比调度OS线程低很多。 另一方面充分利用了多核的硬件资源,近似的把若干goroutine均分在物理线程上, 再加上本身goroutine的超轻量,以上种种保证了go调度方面的性能。

点我了解更多哦

3、GOMAXPROCS

Go运行时的调度器使用GOMAXPROCS参数来确定需要使用多少个OS线程来同时执行Go代码。默认值是机器上的CPU核心数。例如在一个8核心的机器上,调度器会把Go代码同时调度到8个OS线程上(GOMAXPROCS是m:n调度中的n)。

Go语言中可以通过runtime.GOMAXPROCS()函数设置当前程序并发时占用的CPU逻辑核心数。

Go1.5版本之前,默认使用的是单核心执行。Go1.5版本之后,默认使用全部的CPU逻辑核心数。

我们可以通过将任务分配到不同的CPU逻辑核心上实现并行的效果,这里举个例子:

func main() {
	runtime.GOMAXPROCS(1)
	go a()
	go b()
	time.Sleep(time.Second)
}

两个任务只有一个逻辑核心,此时是做完一个任务再做另一个任务。 将逻辑核心数设为2,此时两个任务并行执行,代码如下。

func main() {
	runtime.GOMAXPROCS(2)
	go a()
	go b()
	time.Sleep(time.Second)
}

Go语言中的操作系统线程和goroutine的关系

  1. 一个操作系统线程对应用户态多个goroutine。
  2. go程序可以同时使用多个操作系统线程。
  3. goroutine和OS线程是多对多的关系,即m:n

四、channel

单纯地将函数并发执行是没有意义的。函数与函数间需要交换数据才能体现并发执行函数的意义。

虽然可以使用共享内存进行数据交换,但是共享内存在不同的goroutine中容易发生竞态问题。为了保证数据交换的正确性,必须使用互斥量对内存进行加锁,这种做法势必造成性能问题。

Go语言的并发模型是CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存而实现通信

如果说goroutine是Go程序并发的执行体,channel就是它们之间的连接。channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。goroutine之间通信通过channel,类似进程之间通信通过队列

Go 语言中的通道(channel)是一种特殊的类型。通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则(队列?很像!),保证收发数据的顺序。每一个通道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。

1、channel类型

channel是一种类型,一种引用类型。声明通道类型的格式如下:

var 变量 chan 元素类型

举栗:

var ch1 chan int   // 声明一个传递整型的通道
var ch2 chan bool  // 声明一个传递布尔型的通道
var ch3 chan []int // 声明一个传递int切片的通道

2、创建channel

通道是引用类型,通道类型的空值是nil

var ch chan int
fmt.Println(ch) // <nil>

声明的通道后需要使用make函数初始化之后才能使用

创建channel的格式如下:

make(chan 元素类型, [缓冲大小])

举栗:

ch4 := make(chan int)
ch5 := make(chan bool)
ch6 := make(chan []int)

3、channel操作

通道有发送(send)接收(receive)关闭(close)三种操作。
通道定义如下:

ch := make(chan int)
(1)发送

将一个值发送到通道中。

ch <- 10 // 把10发送到ch中
(2)接收

从一个通道中接收值。

x := <- ch // 从ch中接收值并赋值给变量x
<-ch       // 从ch中接收值,忽略结果
(3)关闭

我们通过调用内置的close函数来关闭通道。

close(ch)

关于关闭通道需要注意的事情是,只有在通知接收方goroutine所有的数据都发送完毕的时候才需要关闭通道通道是可以被垃圾回收机制回收的,它和关闭文件是不一样的,在结束操作之后关闭文件是必须要做的,但关闭通道不是必须的。

关闭后的通道有以下特点:

  1. 对一个关闭的通道再发送值就会导致panic。
  2. 对一个关闭的通道进行接收会一直获取值直到通道为空。
  3. 对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值。
  4. 关闭一个已经关闭的通道会导致panic。

4、无缓冲的通道

无缓冲的通道又称为阻塞的通道。我们来看一下下面的代码:

func main() {
	ch := make(chan int)
	ch <- 10
	fmt.Println("发送成功")
}

上面这段代码能够通过编译,但是执行的时候会出现以下错误:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
        .../src/github.com/Q1mi/studygo/day06/channel02/main.go:8 +0x54

为什么会出现deadlock错误呢?

因为我们使用ch := make(chan int)创建的是无缓冲的通道,无缓冲的通道只有在有人接收值的时候才能发送值。就像你住的小区没有快递柜和代收点,快递员给你打电话必须要把这个物品送到你的手中,简单来说就是无缓冲的通道必须有接收才能发送。

上面的代码会阻塞在ch <- 10这一行代码形成死锁,那如何解决这个问题呢?

一种解决方法是启用一个goroutine去接收值,例如:

func recv(c chan int) {
	ret := <-c
	fmt.Println("接收成功", ret)
}
func main() {
	ch := make(chan int)
	go recv(ch) // 启用goroutine从通道接收值
	ch <- 10
	fmt.Println("发送成功")
}

无缓冲通道上的发送操作会阻塞,直到另一个goroutine在该通道上执行接收操作,这时值才能发送成功,两个goroutine将继续执行。相反,如果接收操作先执行,接收方的goroutine将阻塞,直到另一个goroutine在该通道上发送一个值

使用无缓冲通道进行通信将导致发送和接收的goroutine同步化。因此,无缓冲通道也被称为同步通道

5、有缓冲的通道

解决上面问题的方法还有一种就是使用有缓冲区的通道。我们可以在使用make函数初始化通道的时候为其指定通道的容量,例如:

func main() {
	ch := make(chan int, 1) // 创建一个容量为1的有缓冲区通道
	ch <- 10
	fmt.Println("发送成功")
}

只要通道的容量大于零,那么该通道就是有缓冲的通道,通道的容量表示通道中能存放元素的数量。就像你小区的快递柜只有那么个多格子,格子满了就装不下了,就阻塞了,等到别人取走一个快递员就能往里面放一个。

我们可以使用内置的len函数获取通道内元素的数量,使用cap函数获取通道的容量,虽然我们很少会这么做。

6、for range从通道循环取值

当向通道中发送完数据时,我们可以通过close函数来关闭通道
通道被关闭时,再往该通道发送值会引发panic,从该通道取值会先取完通道中的值,再然后取到一直为零值。那如何判断通道是否被关闭了呢?

// channel 练习
func main() {
	ch1 := make(chan int)
	ch2 := make(chan int)
	// 开启goroutine将0~100的数发送到ch1中
	go func() {
		for i := 0; i < 100; i++ {
			ch1 <- i
		}
		close(ch1)
	}()
	// 开启goroutine从ch1中接收值,并将该值的平方发送到ch2中
	go func() {
		for {
			i, ok := <-ch1 // 第一种:通道关闭后再取值ok=false
			if !ok {
				break
			}
			ch2 <- i * i
		}
		close(ch2)
	}()
	// 在主goroutine中从ch2中接收值打印
	for i := range ch2 { // 第二种:通道关闭后会退出for range循环
		fmt.Println(i)
	}
}

7、单向通道

有的时候我们会将通道作为参数在多个任务函数间传递,很多时候我们在不同的任务函数中使用通道都会对其进行限制,比如限制通道在函数中只能发送或只能接收。

Go语言中提供了单向通道来处理这种情况。如下:

func counter(out chan<- int) {
	for i := 0; i < 100; i++ {
		out <- i
	}
	close(out)
}

func squarer(out chan<- int, in <-chan int) {
	for i := range in {
		out <- i * i
	}
	close(out)
}
func printer(in <-chan int) {
	for i := range in {
		fmt.Println(i)
	}
}

func main() {
	ch1 := make(chan int)
	ch2 := make(chan int)
	go counter(ch1)
	go squarer(ch2, ch1)
	printer(ch2)
}

其中,

  • chan<- int是一个只写单向通道(发送进)(只能对其写入int类型值),可以对其执行发送操作但是不能执行接收操作;
  • <-chan int是一个只读单向通道(接收)(只能从其读取int类型值),可以对其执行接收操作但是不能执行发送操作。

函数传参及任何赋值操作中可以将双向通道转换为单向通道(类似自动类型转换),但反过来是不可以的。

8、通道总结

channel常见的异常总结,如下图:
在这里插入图片描述
关闭已经关闭的channel也会引发panic

五、worker pool(goroutine池)

在工作中我们通常会使用可以指定启动的goroutine数量–worker pool模式,控制goroutine的数量,防止goroutine泄漏和暴涨

一个简易的work pool示例代码如下:

func worker(id int, jobs <-chan int, results chan<- int) {
	for j := range jobs { // 当某个goroutine先做完一个任务时,这个for循环让这个goroutine又去通道接收任务参数,进行下一个任务
		fmt.Printf("worker:%d start job:%d\n", id, j)
		time.Sleep(time.Second)
		fmt.Printf("worker:%d end job:%d\n", id, j)
		results <- j * 2
	}
}


func main() {
	jobs := make(chan int, 100)
	results := make(chan int, 100)
	// 开启了一个包含3个goroutine的池子
	// 池子等于两个for循环+一个channel,外部循环控制池子大小,内部循环保证线程一直在寻找新的任务做,channel是任务发布中心
	for w := 1; w <= 3; w++ {
		go worker(w, jobs, results)
	}
	// 5个任务
	for j := 1; j <= 5; j++ {
		jobs <- j
	}
	close(jobs)
	// 输出结果
	for a := 1; a <= 5; a++ {
		<-results
	}
}

六、select多路复用

在某些场景下我们需要同时从多个通道接收数据。通道在接收数据时,如果没有数据可以接收将会发生阻塞。你也许会写出如下代码使用遍历的方式来实现:

for{
    // 尝试从ch1接收值
    data, ok := <-ch1
    // 尝试从ch2接收值
    data, ok := <-ch2
    …
}

这种方式虽然可以实现从多个通道接收值的需求,但是运行性能会差很多。为了应对这种场景,Go内置了select关键字,可以同时响应多个通道的操作

select{
    case <-ch1:
        ...
    case data := <-ch2:
        ...
    case ch3<-data:
        ...
    default:
        默认操作
}

举个小例子来演示下select的使用:

func main() {
	ch := make(chan int, 1)
	for i := 0; i < 10; i++ {
		select {
		case x := <-ch: // 有东西能取出就取出打印
			fmt.Println(x)
		case ch <- i: // 没满能写就写
		}
	}
}

使用select语句能提高代码的可读性

  • 可处理一个或多个channel的发送/接收操作。
  • 如果多个case同时满足,select会随机选择一个
  • 对于没有case的select{}会一直等待,可用于阻塞main函数。


这篇关于Go语言基础(十二):并发编程的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程