go并发编程

2022/7/1 14:24:40

本文主要是介绍go并发编程,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

go 并发编程之协程

一个goroutine会以一个很小的栈开始其生命周期,一般只需要2KB。区别于操作系统线程由系统内核进行调度, goroutine 是由Go运行时(runtime)负责调度。Goroutine 是 Go 程序中最基本的并发执行单元。每一个 Go 程序都至少包含一个 goroutine——main goroutine

创建一个协程非常简单,就是在一个任务函数前面加一个go关键字:

go task()

// 匿名函数
go func(){
    
}()

示例:

package main

import (
	"fmt"
	"time"
)

func show(msg string) {
	for i := 0; i < 5; i++ {
		fmt.Printf("msg: %v\n", msg)
		time.Sleep(time.Millisecond * 100)
	}
}

func main() {
	go show("java")           // 启动了一个协程。  第一个协程
	show("golang")            // 主协程
	fmt.Println("main end..") // 主函数退出,程序就结束了,第一个协程没有执行结束。
}

注意:

  • 通过go show("java") 启动一个协程。
  • 启动goroutine,当对应的函数执行结束了,goroutine就结束了。
  • main函数结束了,所有开启的goroutine也就结束了。

示例2:

package main

import (
	"fmt"
    "io/ioutil"
    "log"
    "net/http"
    "time"
)
func responseSize(url string) {
	fmt.Println("step1:", url)
	resp, err := http.Get(url)
	if err != nil {
		log.Fatal(err)
	}
	defer resp.Body.Close()

	fmt.Println("step2:", url)
	body, err := ioutil.ReadAll(resp.Body)
	if err != nil {
		log.Fatal(err)
	}
	fmt.Println("setp3:", url, " len:", len(body))

}
func main() {
	go responseSize("http://www.baidu.com")
	go responseSize("http://jd.com")
	go responseSize("http://taobao.com")

	time.Sleep(time.Second * 10)
	fmt.Println("end...")
}

动态栈

操作系统的线程一般都固定的栈内存(通常为2MB),而go语言中的goroutine非常轻量级,一个goroutine的初始栈空间很小(一般为2kb),并且goroutine的栈大小不是很固定的,可以根据动态的增大或缩小,go的runtime会自动为goroutine分配合适的栈空间

goroutine的调度

目前 Go 语言的调度器采用的是 GMP 调度模型。 GMP调度模型主要是由G、M、P三元素构成,除此之外,还有全局队列、本地队列等元素。1656321003561.png

  • G:表示 goroutine,每执行一次go f()就创建一个 G,包含要执行的函数和上下文信息
  • 全局队列(Global Queue):存放等待运行的 G。
  • P:代表调度器,负责调度goroutine,维护一个本地goroutine队列,M从P上获得goroutine并执行。最多有GOMAXPROCS 个P。
  • P 的本地队列:同全局队列类似,存放的也是等待运行的G,存的数量有限,不超过256个。新建 G 时,G 优先加入到 P 的本地队列,如果本地队列满了会批量移动部分 G 到全局队列。
  • M:每个M都代表了1个内核线程,OS调度器负责把内核线程分配到CPU的核上执行。线程想运行任务就得获取 P,从 P 的本地队列获取 G,当 P 的本地队列为空时,M 也会尝试从全局队列或其他 P 的本地队列获取 G。M 运行 G,G 执行之后,M 会从 P 获取下一个 G,不断重复下去。

优点:

单从线程调度讲,Go语言相比起其他语言的优势在于OS线程是由OS内核来调度的, goroutine 则是由Go运行时(runtime)自己的调度器调度的,完全是在用户态下完成的, 不涉及内核态与用户态之间的频繁切换,包括内存的分配与释放,都是在用户态维护着一块大的内存池, 不直接调用系统的malloc函数(除非内存池需要改变),成本比调度OS线程低很多。 另一方面充分利用了多核的硬件资源,近似的把若干goroutine均分在物理线程上, 再加上本身 goroutine 的超轻量级,以上种种特性保证了 goroutine 调度方面的性能。

GOMAXPROCS

默认CPU的逻辑核心数。

Go运行时的调度器使用GOMAXPROCS参数来确定需要使用多少个 OS 线程来同时执行 Go 代码。默认值是机器上的 CPU 核心数。例如在一个 8 核心的机器上,GOMAXPROCS 默认为 8。Go语言中可以通过runtime.GOMAXPROCS函数设置当前程序并发时占用的 CPU逻辑核心。

示例:

package main

import (
	"fmt"
	"runtime"
	"sync"
)

var wg sync.WaitGroup

func a() {
	defer wg.Done()
	for i := 0; i < 10; i++ {
		fmt.Println("A:", i)
	}
}

func b() {
	defer wg.Done()

	for i := 0; i < 10; i++ {
		fmt.Println("B:", i)
	}
}

func main() {
	runtime.GOMAXPROCS(1)  // 如果指定1,则只有1个核心,但有两个goroutune。a()执行完,在执行b,如果设置为2,则a和b交替执行
	wg.Add(2)
	go a()
	go b()
	wg.Wait()
}

练习:

func test(){
    for i:=0; i<5; i++{
        go func(){
            fmt.Println(i)  // 此时的i引用协程外的外部变量。 原因:主协程的循环很快就跑完了,各个协程才开始跑,当协程中访问i的值的时候,i可以已经执行到5了
        }()
    }
}

通道channel

go语言中的通道(channel)是一种特殊类型,是一种引用类型。提供一种称为通道的机制,用于在goroutine之间共享数据。遵循先进先出的规则。

数据在通道chan上传递,在任何给定时间只有一个goroutine可以访问数据项,因此不会发生数据竞争。

有两种类型的通道无缓冲通道和缓冲通道。无缓冲通道用于执行goroutine之间的同步信息,而缓冲通道用于执行异步通信

注意:

  • 通道的零值为nil,因此需要初始化。
  • 在声明通道时需要指定数据类型

声明:

var 变量名称 chan 元素类型

// 示例:
var ch1 chan int     // 声明一个传递整形的通道
var ch2 chan bool    
var ch3 chan []int   // 声明一个int切片的通道

初始化:

ch := make(chan 类型, [缓冲大小])

// 示例:
unbuffered := make(chan int)  // 定义整形的无缓冲通道
buffered := make(chan int, 10)  // 整形的有缓冲的通道, 第一个参数为chan + 元素类型,第二个参数为缓冲区大小。

channel的操作

通道共有发送、接收、和关闭操作。发送操作接收操作都使用<-符号。

1)发送数据

将值发送到通道中。

ch := make(chan int)  // 初始化

ch = <- 10           // 将10发送到通道
2)接收数据

从通道接收值。

// 1. 基本使用
x := <- ch          // 从通道接收值,赋值给x
<- ch               // 接收值,并忽略结果


// 2. 执行接收操作的时候,可以判断是否channel已经关闭
value, ok := <- ch      // ok表示成功接收到了值为true,如果接收到零值为false.

func main() {
	var ch chan int
	ch = make(chan int, 10)
	ch <- 10
	ch <- 20
	ch <- 30
	close(ch)
	for {
		data, ok := <-ch
		if !ok {
			break
		}
		fmt.Printf("data: %v\n", data)
		fmt.Printf("ok: %v\n", ok)
	}
}

// 3. 通过for range 接收值
// 当通道被关闭后,会在通道内的所有值被接收完毕后会自动退出循环
func main() {
	var ch chan int
	ch = make(chan int, 10)
	ch <- 10
	ch <- 20 
	close(ch)              // 如果没有close,执行for range时,两次迭代后,会报deadlock
	for v := range ch {   
		fmt.Println("接收到的值:", v)
	}
}

/*
接收到的值: 10
接收到的值: 20
*/

注意:

  • 对于同一个通道,发送操作之间是互斥的,接收操作之间也是互斥的
  • 发送操作和接收操作中对元素值的处理都是不可分割的。
3)关闭操作

通过内置函数close来关闭通道。

关闭操作,由发送方完成。

close(ch)

示例:

// 示例1:
package main

import "fmt"

func main() {
	var ch chan int
	ch = make(chan int, 10)
	ch <- 10            // ok
	close(ch)           // 关闭通道
	ch <- 20            // panic: 不能给已关闭的通道发送值
	fmt.Println("end.")

}

// 示例2:接收已经关闭的channel, 会接收通道的值,直到通道为空之后,返回零值。
package main

import "fmt"

func main() {
	var ch chan int
	ch = make(chan int, 10)
	ch <- 10                       // ok
	close(ch)                      // 关闭通道
	data := <-ch                   // 可以接收值
	fmt.Printf("data: %v\n", data) // ok 10
	data = <-ch
	fmt.Printf("data: %v\n", data) // ok 0 现在ch已经为空了,再次获取,会获取对应的零值。
	data = <-ch
	fmt.Printf("data: %v\n", data) // ok 0
	fmt.Println("end.")
}

// 示例3:关闭已经关闭的channel,会导致panic
func main(){
	var ch chan int
    ch = make(chan int, 10)
    close(ch) 
    close(ch)   // panic
}

注意:

  • 对一个关闭的通道再发送值就会导致panic
  • 对一个关闭的通道进行接收会一直获取值直到通道为空。
  • 对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值
  • 关闭一个已经关闭的通道会导致 panic。

channel的遍历

方法一:

package main
import "fmt"

var c = make(chan int, 10)

func main11() {
    c <- 10
    c <- 20
   	c <- 30
	// for循环遍历
	for i := 0; i < 3; i++ {  // 结果为 10 20 30
		data := <-c
		fmt.Printf("data: %v\n", data)
	}
}

func main22(){
    // for range遍历
	for v := range c {        // 结果为 10 20 30
		fmt.Printf("v: %v\n", v)
	}
    r1 := <-c 
    fmt.Println(r1)  // 遍历过后,再接收值,如果close()了,此时输出为0 【类型的零值】,如果没有close(),deadlock错误。
}

注意:

  • 当在chan中写入两个,但读的时候读取三次,会报deadlock错误。解决办法:close(c) 关闭channel,这样会读取到“零值”。

**方式2: for循环 + if判断 **

// for 循环
for {
    c<-10
    c<-20 
    
    v, ok := <-c      // ok表示是否成功获取到发送的值,如果获取为true, 如果过去到零值为false。 如果chan未关闭,第三次读取会deadlock
    if ok {
        fmt.Printf("v: %v\n", v)
    } else {
        break
    }
}

无缓冲通道

无缓冲的通道又称为阻塞的通道,或同步通道

无缓冲的通道发送操作时,只有在有接收方能够接收值的时候才能发送成功,否则会一直处于等待发送的阶段。同理,如果对一个无缓冲通道执行接收操作时,没有任何向通道中发送值的操作那么也会导致接收操作阻塞。

示例:

// 示例1:
func main() {
	var ch chan int
    ch = make(chan int)
	ch <- 10             // 只执行了发送操作,会报错deadlock。
	fmt.Println("end.")
}

func main() {
	var ch chan int
	ch = make(chan int)    
	data := <-ch        // 只有接收操作,会deadlock
	fmt.Printf("data: %v\n", data)
}


/*
fatal error: all goroutines are asleep - deadlock!
*/

// 示例2:通过另一个goroutine接收值。

func main() {
	var ch chan int
	ch = make(chan int)

	go func(c chan int) {          // 启动新的goroutine接收值
		data := <-c
		fmt.Println("接收到的值:", data)
	}(ch)

	ch <- 10
	fmt.Println("发送数据结束.")
}

有缓冲通道

可以在使用 make 函数初始化通道时,可以为其指定通道的容量。

// 示例1:通道发送值超过chan容量,再次发送数据会报deadlock
func main() {
	var ch chan int
	ch = make(chan int, 10)

	for i := 0; i < 10; i++ {
		ch <- i
	}

	ch <- 10                              // 超过channel容量,报deadlock错误
	fmt.Printf("len(ch): %v\n", len(ch))
}

// 示例2: 通道没有值,接收会deadlock
func main() {
	var ch chan int
	ch = make(chan int, 10)
	data := <-ch                    // 通道没有值,接收会报deadlock
	fmt.Printf("data: %v\n", data)
}

注意:

  • 只要通道的容量大于零,那么该通道就属于有缓冲的通道,通道的容量表示通道中最大能存放的元素数量。当通道内已有元素数达到最大容量后,再向通道执行发送操作就会阻塞,除非有从通道执行接收操作。

单向通道

在某些场景下我们可能会将通道作为参数在多个任务函数间进行传递,通常我们会选择在不同的任务函数中对通道的使用进行限制,比如限制通道在某个函数中只能执行发送或只能执行接收操作

Go语言中提供了单向通道来处理这种需要限制通道只能进行某种操作的情况。箭头<-和关键字chan的相对位置表明了当前通道允许的操作。

格式:

chan <- 类型   // 只发送通道,不能接收
<- chan 类型    // 只接收通道,不能发送

// 示例:
chan <- int   // 只能发送int类型的通道
<- chan int   // 只接收int类型的通道

注意:

  • 只能对发送通道执行关闭操作。

总结: 通道异常情况

通道操作 nil 非空 空的 满了 未满
发送操作 deadlock 发送值 发送值 deadlock 发送值
接收操作 deadlock 接收值 deadlock 接收值 接收值
关闭 panic 关闭,接收所有值后,返回零值 关闭,返回零值 关闭,接收所有值后,返回零值 关闭,接收所有值后,返回零值

练习

// work-pool(goroitine池)
package main

import (
	"fmt"
	"time"
)

func worker(id int, jobs <-chan int, results chan<- int) {
	for v := range jobs {
		fmt.Printf("worker:%v start job :%d\n", id, v)
		time.Sleep(time.Second)
		fmt.Printf("worker:%v end job :%d\n", id, v)
		results <- v * 2
	}
}

func main() {
	var jobs = make(chan int, 100)
	var results = make(chan int, 100)

	// 开启3个goroutine
	for i := 1; i < 4; i++ {
		go worker(i, jobs, results)
	}

	// 5个任务
	for i := 1; i < 6; i++ {
		jobs <- i
	}
	close(jobs)

	// 输出结果
	for i := 1; i < 6; i++ {
		data := <-results
		fmt.Printf("data: %v\n", data)
	}
}


// 示例2
/*
	使用goroutine和channel实现一个计算int64随机数各位和。
	1. 开启goroutine循环生成int64类型的随机数,发送到jobChan
	2. 开启24个goroutine从jobChan中取出随机数并计算各位数的和,将结果保存到resultChan中
	3. 主goroutine从resultChan中取出结果并打印
*/

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

var wg sync.WaitGroup

type Job struct {
	value int64
}

type Result struct {
	Job
	result int64
}

func generateRandom(jobsChan chan<- *Job) {
	for {
		newJob := &Job{
			value: rand.Int63(),
		}
		jobsChan <- newJob
		time.Sleep(time.Second)
	}
	close(jobsChan)
}

func computeDigitsSum(jobsChan <-chan *Job, result chan<- *Result) {
	for {
		v := <-jobsChan
		n := v.value
		var sum int64
		for n > 0 {
			sum += n % 10
			n /= 10
		}
		newResult := &Result{
			Job:    *v,
			result: sum,
		}
		result <- newResult
	}
	close(result)
}

func main() {
	var jobsChan = make(chan *Job, 100)
	var resultChan = make(chan *Result, 100)
	wg.Add(1)
	go generateRandom(jobsChan)
	wg.Add(24)
	for i := 0; i < 24; i++ {
		go computeDigitsSum(jobsChan, resultChan)
	}
	// 打印结果
	for v := range resultChan {
		fmt.Printf("%v-%v\n", v.value, v.result)
	}
	wg.Wait()
}

select语句

select是go中的一个控制结构,类似switch语句,用于处理异步IO操作

select会监听case语句中channel的读写操作,当case中的channel读写操作为非阻塞状态(即能读写)时,将触发相应的动作。

注意:

  • select中的case语句必须是一个channel操作,要么是发送要么是接收

    case r == 2 // error

  • 如果多个case都可以运行,select随机公平的选出一个执行,其他不会执行

  • 如果没有可运行的case,且有default语句,那么就会执行default的动作。

  • 如果没有可运行的case语句,且没有default语句,会deadlock

示例:

package main

import "fmt"

func main() {
	var ch chan int
	ch = make(chan int, 1)
	for i := 0; i < 10; i++ {
		select {
		case <-ch:
			// fmt.Println("receive", d)
		case ch <- i:
			fmt.Println("send ok:", i)
		}
	}
}

/*
send ok: 0
send ok: 2
send ok: 4
send ok: 6
send ok: 8
*/

示例2:

func main() {
	go func() {
		fmt.Println("走了...")
		r := <-chanInt
		fmt.Printf("receive: %v\n", r)
		close(chanInt)
	}()

	fmt.Println("start..")
	select {
	case chanInt <- 1:         // 两个case都可以执行,这里会随机执行一个
		fmt.Println("send 1")
	case chanInt <- 2:
		fmt.Println("send 2")
	}
	fmt.Println("end..")
}

runtime包

runtime包中定义了一些协程管理相关的api。

runtime.Gosched()

让出时间片,让给其他子协程来执行。

package main

import (
	"fmt"
	"runtime"
)

func show(msg string) {
	for i := 0; i < 2; i++ {
		fmt.Printf("msg: %v\n", msg)
	}
}
func main() {
	go show("golang") // 启动子协程来运行

	for i := 0; i < 2; i++ {
		runtime.Gosched() // 让出时间片,让其他协程来执行
		fmt.Printf("i: %v\n", i)
	}
	fmt.Println("end...")
}

// msg: golang    // 先让其他协程执行 
// mgs: golang
// i: 0           // 最后执行主协程任务
// i: 1

runtime.Goexit()

退出当前协程。

func show2() {
	for i := 0; i < 10; i++ {
		if i > 5 {
			runtime.Goexit() // 执行到i为6时候,协程结束
		}
		fmt.Printf("i: %v\n", i)
	}
}

func main() {
	// 示例2
	go show2()

	for i := 0; i < 10; i++ {
		runtime.Gosched()
		fmt.Printf("\"golang\": %v\n", "golang")
	}
	fmt.Println("end...")
}

**runtime.NumCPU() **

返回cpu的核心数

runtime.GOMAXPROCS(n int)

设置执行子协程时候,cpu核心数。

  • =1:单核心执行。
  • >1:多核并发执行。
func a() {
	for i := 0; i < 20; i++ {
		fmt.Printf("A i: %v\n", i)
	}
}

func b() {
	for i := 0; i < 20; i++ {
		fmt.Printf("B i: %v\n", i)
	}
}

func main() {
	fmt.Printf("runtime.NumCPU(): %v\n", runtime.NumCPU())
	runtime.GOMAXPROCS(1)  // 设置cpu数量
	go a()
	go b()

	time.Sleep(time.Second)
}

并发安全和锁

sync.WaitGroup

Go语言中可以使用sync.WaitGroup来实现并发中多个任务的同步。waitGroup可以保证在并发环境中完成指定数量的任务。

原理:

sync.WaitGroup内部维护着一个计数器,计数器的值可以增加和减少。例如:当我们启动了 N 个并发任务时,就将计数器值增加N。每个任务完成时通过调用 Done 方法将计数器减1。通过调用 Wait 来等待并发任务执行完,当计数器值为 0 时,表示所有并发任务已经完成。

方法

函数名 描述
func (wg * WaitGroup) Add(delta int) 计数器加 delta
(wg *WaitGroup) Done() 计数器减1
(wg *WaitGroup) Wait() 阻塞直到计数器为0

示例:

func main() {  // 两个协程执行结束后,主协程才结束
	var wg sync.WaitGroup

	wg.Add(2)

	go func() {
		time.Sleep(time.Second * 1)
		fmt.Println("goroutine 1 finished")
		wg.Done()
	}()

	go func() {
		time.Sleep(2 * time.Second)
		fmt.Println("goroutine 2 finished")
		wg.Done()
	}()

	wg.Wait()
	fmt.Println("end..")
}

// goroutine 1 finished
// goroutine 2 finished
// end...

示例2:

package main

import (
	"fmt"
	"sync"
	"time"
)

var wp sync.WaitGroup

func showmsg(i int) {
	defer wp.Add(-1) // 等同与wp.Done()
	fmt.Printf("i: %v\n", i)
}

func main() {
	for i := 0; i < 10; i++ {
		// 启动一个协程来执行
		go showmsg(i)
		wp.Add(1)
	}
	// 主协程
	wp.Wait()
	fmt.Println("end...")
}

Mutex互斥锁实现同步

互斥锁是一种常用的控制共享资源访问的方法,它能够保证同一时间只有一个 goroutine 可以访问共享资源。

常用方法:

方法名 描述
func (m *Mutex)Lock() 获取互斥锁
func(m *Mutex)Unlock() 释放互斥锁

示例:

package main

import (
	"fmt"
	"sync"
)

var (
	x    int
	wg   sync.WaitGroup
	lock sync.Mutex
)

func add() {  // 
	defer wg.Done()

	for i := 0; i < 10000; i++ {
		lock.Lock()              // 加锁
		x += 1                   // 两个goroutine同时对x进行访问。对共享资源访问控制。
		lock.Unlock()            // 释放锁
	}
}

func main() {
	wg.Add(2)
	go add()
	go add()

	wg.Wait()
	fmt.Printf("x: %v\n", x)
}

RWMutex读写互斥锁

RWMutex读写互斥锁不限制资源的并发读,但是读写、写写操作无法并行执行。在读多写少的场景下,当我们并发的去读取一个资源,而不涉及资源修改的时候是没有必要加互斥锁的。

读写锁分为两种:读锁和写锁。当一个 goroutine 获取到读锁之后,其他的 goroutine 如果是获取读锁会继续获得锁,如果是获取写锁就会等待。而当一个 goroutine 获取写锁之后,其他的 goroutine 无论是获取读锁还是写锁都会等待。

常用方法:

方法名 描述
func (rw *RWMutex)Lock() 获取写锁
func (rw *RWMutex)Unlock() 释放写锁
func (rw *RWMutex)RLock() 获取读锁
func (rw *RWMutex)RUnlock() 释放读锁

注意:

  • 获取写锁后,读和写都要阻塞。
  • 获取读锁后,读可以,写要阻塞。

示例:

// 1. 并发读
package main

import (
	"fmt"
	"sync"
	"time"
)

var wg sync.WaitGroup
var rwlock sync.RWMutex

func read(id int) {
	defer wg.Done()

	rwlock.RLock()  
	fmt.Println("read start...", id)
	time.Sleep(time.Millisecond * 200)
	fmt.Println("read end...", id)
	rwlock.RUnlock()
}

func main() {
	wg.Add(3)

	go read(1)
	go read(2)
	go read(3)

	wg.Wait()
}
/*
read start... 3
read start... 2
read start... 1
read end... 1
read end... 3
read end... 2
*/


// 2. 并发读写
package main

import (
	"fmt"
	"sync"
	"time"
)

var wg sync.WaitGroup
var rwlock sync.RWMutex

func read(id int) {
	defer wg.Done()

	rwlock.RLock()
	fmt.Println("read start...", id)
	time.Sleep(time.Millisecond * 200)
	fmt.Println("read end...", id)
	rwlock.RUnlock()
}

func write(id int) {
	defer wg.Done()

	rwlock.Lock()
	fmt.Println("write start...", id)
	time.Sleep(time.Millisecond * 400)
	fmt.Println("write end..", id)
	rwlock.Unlock()
}

func main() {
	wg.Add(5)

	for i := 1; i < 4; i++ {
		go read(i)
	}

	go write(10)
	go write(11)

	wg.Wait()
}

/*
read start... 1
read end... 1
write start... 11
write end.. 11
read start... 2
read start... 3
read end... 3
read end... 2
write start... 10
write end.. 10
*/

sync.Once

保证某些操作在高并发的场景下只执行一次。例如只加载一次配置文件等。

sync.Once只有一个Do方法。

函数:

方法名 描述
func (o *Once) Do(f func()) 如果要执行的函数f需要传递参数就需要搭配闭包来使用

示例:

package main

import (
	"fmt"
	"sync"
)

var once sync.Once
var wg sync.WaitGroup

func loadConfig() {
	defer wg.Done()

	load := func() {
		fmt.Println("加载配置文件...")
	}
	once.Do(load)      // 只执行一次
}
func main() {
	wg.Add(5)
	for i := 0; i < 5; i++ {
		go loadConfig()
	}

	wg.Wait()
}

sync.Map

go中内置的map不是并发安全的。其不用像内置的 map 一样使用 make 函数初始化就能直接使用。

常用函数:

方法名 描述
func (m *Map)Store(key, value interface{}) 存储key-value数据
func(m * Map)Load(key interface{}) (value interface{}, ok bool) 查询key对应的值
func(m * Map)LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) 查询或存储对应的key,如果存在返回存在的值,否则store并返回这个值
func(m *Map)LoadAndDelete(key interface{}) (value interface{}, loaded bool) 查询并删除
func(m *Map)Delete(key interface{}) 删除key
func(m *Map)Range(f func(key, value interface{}) bool ) 对map中的key-value依次调用f函数。

示例:

// 1. 示例
package main

import (
	"fmt"
	"strconv"
	"sync"
)

var m sync.Map

func main() {
	wg := sync.WaitGroup{}
	for i := 0; i < 21; i++ {
		wg.Add(1)

		go func(index int) {
			key := strconv.Itoa(index)
			value := "value - " + key
			// 存储值
			m.Store(key, value)

			// 获取值
			value2, ok := m.Load(key)
			fmt.Printf("value: %v, ok:%v\n", value2, ok)
			wg.Done()
		}(i)
	}
	wg.Wait()
}

// 2. Range
package main

import (
	"fmt"
	"sync"
)

var m sync.Map

func f(key, value interface{}) bool {
	fmt.Println("key: ", key, "value:", value)
	return true
}

func main() {
	m.Store("name", "zs")
	m.Store("age", 19)
	m.Store("gender", "男")
	m.Range(f)
	fmt.Println("---------")
	m.Delete("name")         // 删除key
	m.Range(f)
}

/*
key:  name value: zs
key:  age value: 19
key:  gender value: 男
---------
key:  age value: 19
key:  gender value: 男
*/

原子变量的引入

引用sync/atomic提供了对原子操作的支持。

package main

import (
	"fmt"
	"sync/atomic"
	"time"
)

// atomic原子变量
var i int32 = 100

func add() {
	atomic.AddInt32(&i, 1) // 加1
}

func sub() {
	atomic.AddInt32(&i, -1) // 减1
}

func main() {
	for i := 0; i < 100; i++ {
		go add()
		go sub()
	}
	time.Sleep(time.Second * 2)
	fmt.Println("end:", i)
}

原子操作详解

atomic提供的原子操作,能够确保在同一时刻只有一个goroutine对变量进行操作,善用atomic能够避免程序中出现大量的锁操作。

atomic常见的操作:

  • 增减
  • 载入read
  • 比较并交换cas
  • 交换
  • 存储write
增减操作

增减,操作的方法名方式为AddXXXType,保证对操作数进行原子的增减,支持的类型为int32int64uint32uint64uintptr

// 加减操作
var i int32 = 100
atomic.AddInt32(&i, 1)   // 加法操作
fmt.Printf("i: %v\n", i) // 101
atomic.AddInt32(&i, -1)  // 减法操作
fmt.Printf("i: %v\n", i) // 100

var j int64 = 200
atomic.AddInt64(&j, 10)
fmt.Printf("j: %v\n", j)

载入操作和存储操作

读和取操作。保证了读取到操作数前没有其他任务对它进行变更,操作方法的命名方式为LoadXXXType,支持的类型除了int32int64uint32uint64uintptr等类型外还支持Pointer,也就是支持载入任何类型的指针。

存储,有载入了就必然有存储操作,这类操作的方法名以Store开头。

var i int32 = 100

atomic.LoadInt32(&i) // read读取
fmt.Printf("i: %v\n", i)

atomic.StoreInt32(&i, 200) // store写入
fmt.Printf("i: %v\n", i)
比较并交换操作

该操作再进行交换之前首先确保变量的值未被修改,即仍然为old所记录的值,如果等于old值,才进行交换。 操作方法命名方式为CompareAndSwapXXXType(*type, old, new)

// 比较并交换操作
var i int32 = 100

// 在修改之前先比较,在交换
// 与old值进行比较,如果等于old,交换为200
b := atomic.CompareAndSwapInt32(&i, 100, 200)
fmt.Printf("b: %v\n", b)
fmt.Printf("i: %v\n", i)
交换操作

不比较直接交换,操作很少使用。

互斥锁和原子操作的区别
  • 使用目的:互斥锁是用来保护一段逻辑,原子操作用于对一个变量的更新保护
  • 底层实现:Mutex操作系统的调度器实现,而atomic包中的原子操作则由底层硬件指令直接提供支持,这些指令在执行的过程中是不允许中断的,因此原子操作可以在lock-free的情况下保证并发安全,并且它的性能也能做到随CPU个数的增多而线性扩展。

Timer定时器

timer可以实现定时的操作,内部也是通过channel来实现的。

通过time.NewTimer()time.After()两种方法都可以实现。

timer只执行一次。

time.NewTimer(d duration) 创建定时器

func main() {
	timer := time.NewTimer(time.Second * 2)    // 2秒的计时器
	fmt.Printf("time.Now(): %v\n", time.Now()) // 返回当前事件

	t1 := <-timer.C // 阻塞,直到时间到了,返回channel,t1为两秒后时间
	fmt.Printf("t1: %v\n", t1)

	// 方式2
	timer2 := time.NewTimer(time.Second * 2)
	<-timer2.C // 阻塞,等待时间到
    fmt.Printf("time.Now(): %v\n", time.Now())   
}

time.After(d duration) 创建定时器, 返回的chan Time

// 方式3
<-time.After(time.Second * 2)  // 阻塞,两秒之后打印
fmt.Printf("time.Now(): %v\n", time.Now()) 

time.Stop() 停止定时器

func main(){
    timer := time.NewTimer(time.Second)
	go func() {
		<-timer.C  // 定时器阻塞
		fmt.Println("func....")         // 停止后,不会执行了
	}()

	isStopped := timer.Stop()          // 停止之前的定时器
	fmt.Printf("isStopped: %v\n", isStopped)
	if isStopped {
		fmt.Println("stopped...")
	}

	time.Sleep(time.Second * 3)
	fmt.Println("main end...")
}

time.Reset(d duration) 重新设置时间,即修改NewTimer时间

func main() {
	fmt.Println("before")

	timer4 := time.NewTimer(time.Second * 5) // 设置5s
	timer4.Reset(time.Second * 1)            // 重新设置时间,修改原来NewTimer的时间
	<-timer4.C
	fmt.Println("after")
}

Ticker定时器

timer只执行一次,ticker可以周期执行,即每隔固定时间执行,除非Stop()

示例:

func main() {
	ticker := time.NewTicker(time.Second)

	counter := 1
    for _ = range ticker.C { // 这里使用=,    也可以   for t1 := range ticker.C{}
		fmt.Println("ticker...")
		if counter >= 5 {
			ticker.Stop() // 停止
			break
		}
		counter++
	}
}

示例2:

for i := range ticker.C {
  fmt.Printf("i: %v\n", i) 
}

// i: 2022-04-28 10:10:57.714142132 +0800 CST m=+1.000508099
// i: 2022-04-28 10:10:58.714142132 +0800 CST m=+1.000508099
// i: 2022-04-28 10:10:59.714142132 +0800 CST m=+1.000508099
...

示例3:

// 每秒发送1,2,3中随机数字到 ch, 开启goroutine读取ch数据到data
package main

import (
	"fmt"
	"time"
)

func main() {
	timer := time.NewTicker(time.Second)
	var ch = make(chan int)

	go func() {
		for {
			data := <-ch
			fmt.Printf("data: %v\n", data)
		}
	}()

	for _ = range timer.C {
		select {
		case ch <- 1:      
		case ch <- 2:
		case ch <- 3:
		}
		fmt.Println("send ok")
	}
}



这篇关于go并发编程的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程