2022/1/26 20:04:54
// AddRows adds the given mrs to s. func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error { if len(mrs) == 0 { return nil } // Limit the number of concurrent goroutines that may add rows to the storage. // This should prevent from out of memory errors and CPU trashing when too many // goroutines call AddRows. select { case addRowsConcurrencyCh <- struct{}{}: default: // Sleep for a while until giving up atomic.AddUint64(&s.addRowsConcurrencyLimitReached, 1) t := timerpool.Get(addRowsTimeout) // Prioritize data ingestion over concurrent searches. storagepacelimiter.Search.Inc() select { case addRowsConcurrencyCh <- struct{}{}: timerpool.Put(t) storagepacelimiter.Search.Dec() case <-t.C: timerpool.Put(t) storagepacelimiter.Search.Dec() atomic.AddUint64(&s.addRowsConcurrencyLimitTimeout, 1) atomic.AddUint64(&s.addRowsConcurrencyDroppedRows, uint64(len(mrs))) return fmt.Errorf("cannot add %d rows to storage in %s, since it is overloaded with %d concurrent writers; add more CPUs or reduce load", len(mrs), addRowsTimeout, cap(addRowsConcurrencyCh)) } }
- 物理核是性能真正的限制。无论你有多少协程,理论上N个核就最多只有N个协程处于执行状态。
- 协程调度并非没有成本,协程越多,就会有越多的CPU时间花在协程调度上。对于CPU密集型的业务,计算的协程数超过物理核的个数的部分都是白瞎。
- 假设写的协程数是读的协程数的2倍,概率上看调度到写的次数是读的次数的2倍;但是读和写的计算量并不是对等的,假设某个查询的数据量较大,就会导致读协程总体的CPU时间多于写协程,最终可能会导致写入超时失败。正确的办法是通过机制来让读协程主动让出CPU资源。
- 区分IO协程和计算协程。
- 计算协程的数量与核的数量相等。
- 处理insert操作的协程数等于CPU的核数,且接收任务的channel的长度也等于CPU核数。
- 处理query_range等查询操作的协程数是CPU核数的2倍,猜测这里是因为部分读操作可能导致mmap区域内存产生缺页中断,继而引发IO阻塞。但不管怎么样,对协程数仍然是很克制。
如果insert操作排队成功,计数器就会减一。当计数器为0时,通过条件变量来发起 broadcast(),唤醒在等待的select操作。
select协程中,每扫描4095个block就会检查一次是否有insert操作在等待。如果有,调用条件变量 cond.Wait()进入等待,让出协程调度。
2. insert操作源码分析
2.1 工作协程的创建
// StartUnmarshalWorkers starts unmarshal workers. func StartUnmarshalWorkers() { if unmarshalWorkCh != nil { logger.Panicf("BUG: it looks like startUnmarshalWorkers() has been alread called without stopUnmarshalWorkers()") } gomaxprocs := cgroup.AvailableCPUs() //获取物理核的个数 unmarshalWorkCh = make(chan UnmarshalWork, gomaxprocs) //创建一个channel,长度与核数相等 unmarshalWorkersWG.Add(gomaxprocs) for i := 0; i < gomaxprocs; i++ { go func() { // 启动N个协程,数量与核数相等 defer unmarshalWorkersWG.Done() for uw := range unmarshalWorkCh { uw.Unmarshal() // 这里调用具体的业务处理函数 } }() } }
// ScheduleUnmarshalWork schedules uw to run in the worker pool. // // It is expected that StartUnmarshalWorkers is already called. func ScheduleUnmarshalWork(uw UnmarshalWork) { unmarshalWorkCh <- uw }
2.2 insert协程的并发检查
var ( // Limit the concurrency for data ingestion to GOMAXPROCS, since this operation // is CPU bound, so there is no sense in running more than GOMAXPROCS concurrent // goroutines on data ingestion path. addRowsConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs()) addRowsTimeout = 30 * time.Second )
// AddRows adds the given mrs to s. func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error { if len(mrs) == 0 { return nil } // Limit the number of concurrent goroutines that may add rows to the storage. // This should prevent from out of memory errors and CPU trashing when too many // goroutines call AddRows. select { case addRowsConcurrencyCh <- struct{}{}: //如果写入channel成功,说明并发小于最大核数。然后走到插入逻辑去。 default: //如果插入channel失败,说明某个insert操作的协程被阻塞。这时需要通知select协程去让出。 // Sleep for a while until giving up atomic.AddUint64(&s.addRowsConcurrencyLimitReached, 1) t := timerpool.Get(addRowsTimeout) // Prioritize data ingestion over concurrent searches. storagepacelimiter.Search.Inc() // pacelimiter(步长限制器)中有个原子累加的变量,说明有多少个insert操作在等待 select { case addRowsConcurrencyCh <- struct{}{}: //在超时的时间内,等待入队成功的事件。 timerpool.Put(t) //把timer放回对象池,减少GC storagepacelimiter.Search.Dec() // insert操作可以顺利调度了,等待的数量原子减一。 // 等待数量为0的时候,调用 cond.Broadcast() 来通知select协程开始工作。 case <-t.C: //等待30秒 timerpool.Put(t) storagepacelimiter.Search.Dec() atomic.AddUint64(&s.addRowsConcurrencyLimitTimeout, 1) atomic.AddUint64(&s.addRowsConcurrencyDroppedRows, uint64(len(mrs))) return fmt.Errorf("cannot add %d rows to storage in %s, since it is overloaded with %d concurrent writers; add more CPUs or reduce load", len(mrs), addRowsTimeout, cap(addRowsConcurrencyCh)) // 等待了30秒仍然没有CPU资源,只能报错 } } // 这里以下是具体的插入逻辑... <-addRowsConcurrencyCh // insert逻辑执行完成后,出队 return firstErr }
3. select操作源码分析
3.1 用于查询并发限制的channel
var ( // Limit the concurrency for TSID searches to GOMAXPROCS*2, since this operation // is CPU bound and sometimes disk IO bound, so there is no sense in running more // than GOMAXPROCS*2 concurrent goroutines for TSID searches. searchTSIDsConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs()*2) )
// searchTSIDs returns sorted TSIDs for the given tfss and the given tr. func (s *Storage) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]TSID, error) { // Do not cache tfss -> tsids here, since the caching is performed // on idb level. // Limit the number of concurrent goroutines that may search TSIDS in the storage. // This should prevent from out of memory errors and CPU trashing when too many // goroutines call searchTSIDs. select { case searchTSIDsConcurrencyCh <- struct{}{}: //处理思路上与insert并发限制一样。入队成功才允许进入查询逻辑 default: // Sleep for a while until giving up atomic.AddUint64(&s.searchTSIDsConcurrencyLimitReached, 1) currentTime := fasttime.UnixTimestamp() timeoutSecs := uint64(0) if currentTime < deadline { timeoutSecs = deadline - currentTime //与insert的超时处理不同,每个查询可能与不同的查询超时时间 } timeout := time.Second * time.Duration(timeoutSecs) t := timerpool.Get(timeout) select { case searchTSIDsConcurrencyCh <- struct{}{}: timerpool.Put(t) case <-t.C: timerpool.Put(t) atomic.AddUint64(&s.searchTSIDsConcurrencyLimitTimeout, 1) return nil, fmt.Errorf("cannot search for tsids, since more than %d concurrent searches are performed during %.3f secs; add more CPUs or reduce query load", cap(searchTSIDsConcurrencyCh), timeout.Seconds()) } }
3.2 select协程主动让出的实现
// NextMetricBlock proceeds to the next MetricBlockRef. func (s *Search) NextMetricBlock() bool { if s.err != nil { return false } for s.ts.NextBlock() { if s.loops&paceLimiterSlowIterationsMask == 0 { //每执行4095次后,检查是否有insert协程在等待 if err := checkSearchDeadlineAndPace(s.deadline); err != nil { // 如果有insert协程等待,在WaitIfNeeded()方法中用条件变量阻塞: cond.Wait() s.err = err return false } } s.loops++ //... } //... }
// WaitIfNeeded blocks while the number of Inc calls is bigger than the number of Dec calls. func (pl *PaceLimiter) WaitIfNeeded() { if atomic.LoadInt32(&pl.n) <= 0 { // Fast path - there is no need in lock. return } // Slow path - wait until Dec is called. pl.mu.Lock() for atomic.LoadInt32(&pl.n) > 0 { // n代表了高优先级协程等到的个数 pl.delaysTotal++ pl.cond.Wait() // 当n==0时,触发 pl.cond.Broadcast(),让低优先级的协程重新调度 } pl.mu.Unlock() }
4. 总结
- 关键的计算协程的数量,围绕可用的物理CPU核的数量展开。超过物理核数的协程,CPU资源只会白白浪费在协程调度器上。
- 区分高优先级和低优先级的协程,低优先级的协程要能够主动让出。
- 用一个队列来代表被调度的关键协程的数量,队列被阻塞就证明有关键协程处于未被调度的状态,这时就需要触发对应的协调机制。感觉就像在golang调度器的基础上又封装了部分能力。
- 2024-12-24MongoDB资料:新手入门完全指南
- 2024-12-20go-zero 框架的 RPC 服务 启动start和停止 底层是怎么实现的?-icode9专业技术文章分享
- 2024-12-19Go-Zero 框架的 RPC 服务启动和停止的基本机制和过程是怎么实现的?-icode9专业技术文章分享
- 2024-12-18怎么在golang中使用gRPC测试mock数据?-icode9专业技术文章分享
- 2024-12-15掌握PageRank算法核心!你离Google优化高手只差一步!
- 2024-12-15GORM 中的标签 gorm:"index"是什么?-icode9专业技术文章分享
- 2024-12-11怎么在 Go 语言中获取 Open vSwitch (OVS) 的桥接信息(Bridge)?-icode9专业技术文章分享
- 2024-12-11怎么用Go 语言的库来与 Open vSwitch 进行交互?-icode9专业技术文章分享
- 2024-12-11怎么在 go-zero 项目中发送阿里云短信?-icode9专业技术文章分享
- 2024-12-11怎么使用阿里云 Go SDK (alibaba-cloud-sdk-go) 发送短信?-icode9专业技术文章分享