goroutine并发编程注意事项
总结在实际编写代码中,使用 goroutine 协程时的注意要点和一些编写规范。
在实践业务代码中,我们会经常看见启用 goroutine 方式: go func() {“这都是你业务代码,代码中没出现任何管控方式!”},然后就完事了,这里绝对不推荐这种方式,因为没有满足下面的三个条件:
1. 并发业务内容应该让调用者来开启go func(){} 或 go func_xxx() 显示的声明来开启协程。(有利于直观维护性)
2. 开启goroutine 协程跑业务代码,开启者你必须清楚的知道它的生命周期。(什么时候结束,考虑会不会长时间hang住,或者考虑有没有什么异常情况你无法控制)
3. 开启协程必定要加,超时控制退出 或 控制 channel 通知退出。(防止内存泄露出现 野G)
注意:也许有人会说,不好意思,我就没有控制生命周期线上依旧正常运行啊。
这里只能说,线上跑起来正常不表示你代码就没有BUG,只不过没有满足触发你BUG的条件摆了~
goroutine 开启的集几种姿势和控制方案
1. 直接协程go 匿名函数func()方式:
简单demo如下:
// 5秒超时控制避免 野G
// 5秒超时控制避免 野G
ctxTime, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
errCtx := make(chan error)
// 调用业务函数返回 error 一个参数,当然这里你也可以返回自己定义的response struct
go func() {
errCtx <- qwc.InsertCallbackContent(ctxTime, qwCallbackService.InsertCallbackContentReq{})
}()
select {
case <-ctxTime.Done():
log.Error("InsertCallbackContent:5秒超时")
return
case err := <-errCtx:
if err != nil {
logger.Errorf("TwEventCallback插入回调记录失败:%v", err.Error())
}
return
}
2. 直接 go func_xx(params) 无返回结果方式:
首先这个方式与上面区别就很大了,没有返回结果,那你如何控制你的 goroutine 的生命周期呢?
代码demo如下:
案例场景描述1:
一个最基本没有返回结果的情况下,如何进行goroutine生命周期控制,如下:
比如 程序中有 EventBus 事件上报执行对应的事件逻辑,但我们希望它是在我主业务的逻辑的旁支异步去执行,不应该阻塞的主业务浪费时间。(其实这里会演变成场景2)
func main() {
// 这里调用一个eventbus事件
eventbus := &App{}
eventbus.Event("this is eventbus")
// do ...常驻进程挂起操作
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
for {
si := <-c
switch si {
case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
return eventbus.shutdown()
default:
return nil
}
}
}
type App struct {
wg *sync.WaitGroup
}
func(a *App) shutdown() {
// 各种平滑退出接口调用
// 这里去调用一个 等待所有goroutine执行完毕
eventbus.WaitAsync()
}
func(a *App) Event(data string) {
a.wg.Add(1) // 这里一定要写在goroutine外部
go func(){
defer a.wg.Done()
time.Sleep(xxx)
fmt.Println(data)
}()
}
func(a *App) WaitAsync() {
a.wg.Wait()
}
上面这段代码,是一个功能完善但是并不完美的平滑退出goroutine,但是你得等待执行完毕。
所以它的缺陷就是,如果其中某event一直在挂起,难不成我一直等嘛?还不是要泄内存露。
所以我们来看看完善版demo:
func TestEvent(t *testing.T) {
tx := NewApp(10)
go tx.Run()
_ = tx.Event(context.Background(), "test1")
_ = tx.Event(context.Background(), "test2")
_ = tx.Event(context.Background(), "test3")
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
// 加入超时控制,而不是死等所有任务执行完毕
tx.shutdown(ctx)
}
func NewApp(chNum int) *App {
return &App{
// 设置待缓存的 channel
// 这里 直接选择使用通道到代替 新开 goroutine 降低资源消耗
ch: make(chan string, chNum),
}
}
type App struct {
ch chan string
stop chan struct{}
}
func (a *App) Event(ctx context.Context, data string) error {
select {
case a.ch <- data:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (a *App) Run() {
// 等待通道里的消息传入即可
// 一旦 channel 被close后,就会跳出for range
for data := range a.ch {
// 执行你的注册 event 逻辑
time.Sleep(1 * time.Second)
fmt.Println(data)
}
// 执行完成后发送通道消息
a.stop <- struct{}{}
}
func (a *App) shutdown(ctx context.Context) {
close(a.ch)
select {
case <-a.stop:
case <-ctx.Done():
}
}
这段demo2 和上面的demo1 差别还是比较明显的。
利用 channel 代替了 goroutine,并且取消了 WaitGroup 用于的等待所有 goroutine 的运行。
代替方法就是在 Run() 函数中了。
在demo2中你得自己预估好你的超时时间具体设定的值,结合你的业务事件设置合理的超时控制时间即可。不然你就要一直等待~~