通过channel控制goroutine数量形成协程池

还需要多加强并发编程能力

最近业务上需要利用大量开goroutine去并发拉取数据,但是不能无限制的 for 数据长度来开启那么多的goroutine,就要写个协程池来协调,通过channel来限制goroutine的开启数量。

业务需求:

需要生产者去每日定时拉去第三方平台数据,放入到channel缓存通道中(类型存入消息队列),消费者需要开goroutine并发的进行拉取第三方平台数据整合后再插入表中,其中goroutine开启数与channel缓存接受到的数据之间形成了协程池,从而通过了channel控制goroutine开启的数据(原理就是带缓存channel不够时会阻塞,当缓存满时也会阻塞等待消费)

代码详解:

  1. 首先声明相关变量常量等数据:

假设拉取数据可达到10W条:

const (
    channelBufferSize = 1000 // channel 缓存数
    goSize            = 100  // 协程开启 最大数

)

// 声明一个用于定义channel的结构体
type Job struct {
    BucketName string // 存放 bucket 名称
    Count      int    // 计算 重试次数
}

// 全局声明该 带缓存的 channel
var channlBucket = make(chan *Job, channelBufferSize)
  1. 构建生产者:

    func Producer() (err error) {
    // 获取第三方业务相关数据,用于消费者函数调用的参数
    bucketNameList, err := qiniu.GetV2sBucketNameList()
    if err != nil {
        return
    }
        
    ......
    ......
    ......
        
    // 遍历拉取下来的[]string类型的数据
    for _, v := range bucketNameList {
        channlBucket <- &Job{
            BucketName: v, // 将具体某个数据放入到通道中缓存
        }
    }
    
    // Consumer() 这里可以直接调用消费者 进行调式,实际中消费者一直保持运行中的
    return
    }
  2. 最后开始我们消费者构建:

    func Consumer() {
    var c = channlBucket
    var wg sync.WaitGroup
    for i := 0; i < goSize; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done() // defer 在该函数结束时调用defer后面的函数执行
            // 获取channel中的已经缓存的数据,如果没有,则会阻塞。
            for job := range c {
                name := job.BucketName
    
            ......
            ......
            ......
                
                params := &storage.StorageOssDay{
                    BucketName: name,
                    Time:       0,
                    Data:       nil,
                }
                // 插入表中
                err = storage.InsertOssDay(params)
                // 如果获取的这条channel中的数据插入失败了,则将其返还回通道中,反复进行5次重试。
                if err != nil {
                    if job.Count < 5 { // 重试次数 默认定为5次吧
                        job.Count++
                        c <- job
                    } else {
                        log.Errorf("%+v", err)
                    }
    
                }
            }
    
        }()
    }
    
    wg.Wait()
    }

再来看一个简单的demo:

package main
 
import (
    "fmt"
    "runtime"
    "time"
)
 
func main() {
    runtime.GOMAXPROCS(runtime.NumCPU()) // 其实这里可以不用特意写,默认会取系统CPU最多时
    c := make(chan bool, 100)
    t := time.Tick(time.Second) // 一秒的心跳
 
    go func() {
        for {
            select {
            case <-t:
                watching() // 一秒打印当前goroutine运行个数
            }
        }
    }()
 
    for i := 0; i < 10000000; i++ {
        c <- true // c 只能缓存100个
        go worker(i, c)
    }
 
    fmt.Println("Done")
}
 
func watching() {
    fmt.Printf("NumGoroutine: %d\n", runtime.NumGoroutine())
}
 
func worker(i int, c chan bool) {
    //fmt.Println("worker", i)
    time.Sleep(100 * time.Microsecond) // 模拟其他代码运行消耗时间
    <-c
}