两种全表扫描的思路
2022/4/13 6:16:53
本文主要是介绍两种全表扫描的思路,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
日常的工作中,可能需要对分片表进行全表扫描,这里介绍两种并发全表扫描的方法:
思路1:分片分页读取+并发请求
两个for循环,外层for循环遍历每个分片,内层for循环并发处理这些数据。整个处理过程可分为数据获取和并发请求两部分,两部分串行执行,先获取数据,再并发处理这些数据。数据获取
a、外层循环遍历每个分片
b、每个分片内分页读取数据
// 初试化分片为0 minId := int64(0) // 遍历分片 for shardId := int64(0); shardId < info.getShardingCount(); { // 获取数据 data, err := info.PageQueryData(shardId, PageSize, minId) // 并发处理 此处省略 // ... // 如果获取到的数据长度小于每页数量,则表明数据读取完,分片数+1,否则继续读取该分片数据 if len(data) < PageSize { shardId++ minId = 0 } }
并发处理
a、设置最大并发goroutine数量n
b、通过buffersize = n的channel控制并发goroutine数量:每开启一个goroutine向channel中插入一条数据,每个goroutine结束后从channel取出一条数据
c、通过sync.WaitGroup等待所有goroutine执行完后才读取下一批数据
// 初试化分片为0 minId := int64(0) // 遍历分片 for shardId := int64(0); shardId < info.getShardingCount(); { // 获取数据 data, err := info.BatchGetData(shardId, PageSize, minId) // 并发处理 for _, item := range data { info.waitGroup.Add(1) // 向channel插入数据 info.concurrencyCtlChannel <- 1 if item.Id > minId { minId = item.Id } go func(ctx context.Context, data *Data) { defer func() { if err := recover(); err != nil { logs.CtxError(ctx, "Task: task execute error. err: %v", err) } // 运行完后从channel读取数据 <-info.concurrencyCtlChannel info.waitGroup.Done() }() // 业务处理 此处省略 // ... }(ctx, item) } // 等待data全部处理完 info.waitGroup.Wait() // 如果获取到的数据长度小于每页数量,则表明数据读取完,分片数+1,否则继续读取该分片数据 if len(data) < PageSize { shardId++ minId = 0 } }
思路2:并发读取+并发处理(生产者-消费者模型)
a、创建一组生产者goroutine和消费者goroutine,以及一组传递消息的channel,不同的生产者goroutine从不同的分片中分页获取数据,向channel中插入数据,消费者goroutine从channel中获取数据进行消费。
b、另外通过一个channel控制生产者goroutine的数量,用一个输入参数控制消费者goroutine的数量。
c、使用sync.WaitGroup等待生产者生产完数据后关闭管道,等待消费者消费完数据后结束任务。
并发读取
func ProduceData() { for i = minShardingKey; i <= maxShardingKey; i++ { channel := channelGroup[i % channelSize] pwg.Add(1) // 控制produce goroutine数量 produceChannel <- 1 go func(ch chan *db.Data) { defer func() { t.pwg.Done() <- produceChannel } minId := int64(1) for { // 分页从数据库读取数据 data, err := db.GetData(shardingKey, minId, PageSize) // 向channel生产数据 for _, order := range data { if order.Id > minId { minId = order.Id } // 如果channel已满则阻塞在此等待消费goroutine消费channel中的数据 ch <- order } minId++ if len(orderInfos) < PageSize { break } } }(channel) } }
并发处理
func ConsumeData() { for i = 0; i < consumerGouroutineSize; i++ { cwg.Add(1); ch := channelGroup[i & channelSize] go func(ch chan *db.Data) { defer func() { cwg.Done() } for order := range ch { // do business } } }(channel) } }
管道关闭
func WaidAndClose() { // 等所有生产者goroutine结束 pwg.Wait() // 关闭管道 for { //... } // 等到消费完毕 cwg.wait() // 结束任务 return nil }
这篇关于两种全表扫描的思路的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-05-31全网首发第二弹!软考2024年5月《软件设计师》真题+解析+答案!(11-20题)
- 2024-05-31全网首发!软考2024年5月《软件设计师》真题+解析+答案!(21-30题)
- 2024-05-30【Java】百万数据excel导出功能如何实现
- 2024-05-30我们小公司,哪像华为一样,用得上IPD(集成产品开发)?
- 2024-05-30java excel上传--poi
- 2024-05-30安装笔记本应用商店的pycharm,再安排pandas等模块,说是没有打包工具?
- 2024-05-29java11新特性
- 2024-05-29哪些无用敏捷指标正在破坏敏捷转型?
- 2024-05-29鸿蒙原生应用再新丁!新华社 入局鸿蒙
- 2024-05-29设计模式 之 迭代器模式(Iterator)