最近业务上需要利用大量开goroutine去并发拉取数据,但是不能无限制的 for 数据长度来开启那么多的goroutine,就要写个协程池来协调,通过channel来限制goroutine的开启数量。
业务需求:
需要生产者去每日定时拉去第三方平台数据,放入到channel缓存通道中(类型存入消息队列),消费者需要开goroutine并发的进行拉取第三方平台数据整合后再插入表中,其中goroutine开启数与channel缓存接受到的数据之间形成了协程池,从而通过了channel控制goroutine开启的数据(原理就是带缓存channel不够时会阻塞,当缓存满时也会阻塞等待消费)
代码详解:
- 首先声明相关变量常量等数据:
假设拉取数据可达到10W条:
const (
channelBufferSize = 1000 // channel 缓存数
goSize = 100 // 协程开启 最大数
)
// 声明一个用于定义channel的结构体
type Job struct {
BucketName string // 存放 bucket 名称
Count int // 计算 重试次数
}
// 全局声明该 带缓存的 channel
var channlBucket = make(chan *Job, channelBufferSize)
构建生产者:
func Producer() (err error) { // 获取第三方业务相关数据,用于消费者函数调用的参数 bucketNameList, err := qiniu.GetV2sBucketNameList() if err != nil { return } ...... ...... ...... // 遍历拉取下来的[]string类型的数据 for _, v := range bucketNameList { channlBucket <- &Job{ BucketName: v, // 将具体某个数据放入到通道中缓存 } } // Consumer() 这里可以直接调用消费者 进行调式,实际中消费者一直保持运行中的 return }
最后开始我们消费者构建:
func Consumer() { var c = channlBucket var wg sync.WaitGroup for i := 0; i < goSize; i++ { wg.Add(1) go func() { defer wg.Done() // defer 在该函数结束时调用defer后面的函数执行 // 获取channel中的已经缓存的数据,如果没有,则会阻塞。 for job := range c { name := job.BucketName ...... ...... ...... params := &storage.StorageOssDay{ BucketName: name, Time: 0, Data: nil, } // 插入表中 err = storage.InsertOssDay(params) // 如果获取的这条channel中的数据插入失败了,则将其返还回通道中,反复进行5次重试。 if err != nil { if job.Count < 5 { // 重试次数 默认定为5次吧 job.Count++ c <- job } else { log.Errorf("%+v", err) } } } }() } wg.Wait() }
再来看一个简单的demo:
package main
import (
"fmt"
"runtime"
"time"
)
func main() {
runtime.GOMAXPROCS(runtime.NumCPU()) // 其实这里可以不用特意写,默认会取系统CPU最多时
c := make(chan bool, 100)
t := time.Tick(time.Second) // 一秒的心跳
go func() {
for {
select {
case <-t:
watching() // 一秒打印当前goroutine运行个数
}
}
}()
for i := 0; i < 10000000; i++ {
c <- true // c 只能缓存100个
go worker(i, c)
}
fmt.Println("Done")
}
func watching() {
fmt.Printf("NumGoroutine: %d\n", runtime.NumGoroutine())
}
func worker(i int, c chan bool) {
//fmt.Println("worker", i)
time.Sleep(100 * time.Microsecond) // 模拟其他代码运行消耗时间
<-c
}