Go并发编程(三)协程池
2021/5/2 1:25:10
本文主要是介绍Go并发编程(三)协程池,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
文章目录
- Go并发编程(三)协程池
- 为什么需要协程池
- 实现
- 数据结构定义
- 新增任务&执行任务
- goroutine异常处理
- 关闭协程池
- 使用
Go并发编程(三)协程池
本文参考如下博客实现了一个简易的协程池
- 100 行写一个 go 的协程池 (任务池)
为什么需要协程池
goroutine 太多仍会导致调度性能下降、GC 频繁、内存暴涨, 引发一系列问题。在面临这样的场景时, 限制 goroutine 的数量、重用 goroutine
实现
实现的基本思路是采用生产者-消费者模型,用来执行任务的goroutine作为消费者,操作任务队列的goroutine是生产者,任务队列使用的是go中的buffer channel
数据结构定义
任务定义:
// 任务定义 type Task struct { Handler func(v ...interface{}) // 任务处理函数 Params []interface{} // 处理函数参数列表 }
协程池定义:
// 任务池定义 type TaskPool struct { Capacity int64 // 任务池容量 RunningGoroutine int64 // 运行中的goroutine数量 TaskQueue chan *Task // 任务队列 Status int64 // 任务池状态 sync.Mutex PanicHandler func(interface{}) // goroutine异常处理机制 }
协程池状态常量定义
// 协程池状态 const( RUNNING = iota STOP )
全局异常定义:
// 池容量非法异常 var ErrInvalidPoolCap = errors.New("task pool capacity invaild") var ErrPoolAlreadyClosed = errors.New("pool is already go")
新增任务&执行任务
新增任务本质就是做goroutine数量检查,小于协程池容量则新启协程,超过就复用原有协程,协程的回收依赖于GC,任务是直接丢进管道,等待消费的goroutine执行
// 新增任务 func (p *TaskPool) Put(t *Task) error{ p.Lock() defer p.Unlock() if p.Status == STOP{ return ErrPoolAlreadyClosed } // 如果协程池未满则新启协程 if p.RunningGoroutine < p.Capacity{ // 协程池未满,则产生协程 p.run() } // 任务入队 p.TaskQueue <- t return nil }
执行任务其实就是监听channel消费具体的任务,这里采用的是带缓冲区的channel,所以消费生产是非阻塞的
// 从任务队列中取出任务执行 func (pool *TaskPool)run() { // 新增运行中的goroutine incRunning(pool) go func() { // 执行完成后运行中的goroutine-- defer func() { decRunning(pool) // goroutine panic if r := recover();r != nil{ if pool.PanicHandler != nil{ pool.PanicHandler(r); } else { // 默认处理 log.Printf("Worker panic: %s\n", r) } } pool.checkRunningWork() }() // 具体goroutine执行策略 for{ select { case task,ok := <- pool.TaskQueue:{ if !ok{ // 任务从管道消费失败 return } // 执行任务 task.Handler(task.Params) } } } }() }
goroutine异常处理
如果某一个goroutine抛出panic就会导致整个程序崩溃退出,为了保证程序安全执行,需要对panic进行recover,进行异常处理,异常处理函数用户自定义
defer func() { decRunning(pool) // goroutine panic if r := recover();r != nil{ if pool.PanicHandler != nil{ pool.PanicHandler(r); } else { // 默认处理 log.Printf("Worker panic: %s\n", r) } } pool.checkRunningWork() }()
关闭协程池
关闭协程池需要做两个步骤:
- 关闭任务进入队列的入口
- 执行完任务队列中剩余的任务
// 安全关闭协程池 func (p *TaskPool) CloseTask() error{ p.Lock() defer p.Unlock() if p.Status == STOP{ return ErrPoolAlreadyClosed } atomic.CompareAndSwapInt64(&p.Status,RUNNING,STOP) // 清空任务队列 for len(p.TaskQueue) > 0 { // 阻塞等待所有任务被 worker 消费 time.Sleep(1e6) // 防止等待任务清空 cpu 负载突然变大, 这里小睡一下 } return nil }
使用
func TestMyPool() { pool,err := InitTaskPool(10) if err != nil{ panic(err) } for i := 0;i < 20;i++{ time.Sleep(1e6) pool.Put(&Task{Handler: func(v ...interface{}) { fmt.Print("i = ",i," ") },Params: []interface{}{i}}) fmt.Println("pool running goroutine size: ",pool.GetPoolRunningGSize()) } }
这篇关于Go并发编程(三)协程池的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-04-01got an unexpected keyword argument
- 2024-03-30维多利亚的秘密 golang入坑系统
- 2024-03-29mongodb sort by date
- 2024-03-29go swagger
- 2024-03-25mongodb cdc
- 2024-03-25how to use go in vscode
- 2024-03-22mongooseserverselectionerror: connect econnrefused ::1:27017
- 2024-03-21pymongo insert_many
- 2024-03-18projection mongodb
- 2024-03-14clickhouse-go