Goroutine并发编程注意事项

goroutine并发编程注意事项

总结在实际编写代码中,使用 goroutine 协程时的注意要点和一些编写规范。

在实践业务代码中,我们会经常看见启用 goroutine 方式: go func() {“这都是你业务代码,代码中没出现任何管控方式!”},然后就完事了,这里绝对不推荐这种方式,因为没有满足下面的三个条件:

1. 并发业务内容应该让调用者来开启go func(){} 或 go func_xxx() 显示的声明来开启协程。(有利于直观维护性)

2. 开启goroutine 协程跑业务代码,开启者你必须清楚的知道它的生命周期。(什么时候结束,考虑会不会长时间hang住,或者考虑有没有什么异常情况你无法控制)

3. 开启协程必定要加,超时控制退出 或 控制 channel 通知退出。(防止内存泄露出现 野G)

注意:也许有人会说,不好意思,我就没有控制生命周期线上依旧正常运行啊。

这里只能说,线上跑起来正常不表示你代码就没有BUG,只不过没有满足触发你BUG的条件摆了~

goroutine 开启的集几种姿势和控制方案

1. 直接协程go 匿名函数func()方式:

简单demo如下:

// 5秒超时控制避免 野G
// 5秒超时控制避免 野G
	ctxTime, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	errCtx := make(chan error)
	// 调用业务函数返回 error 一个参数,当然这里你也可以返回自己定义的response struct
	go func() {
		errCtx <- qwc.InsertCallbackContent(ctxTime, qwCallbackService.InsertCallbackContentReq{})
	}()

	select {
	case <-ctxTime.Done():
		log.Error("InsertCallbackContent:5秒超时")
		return
	case err := <-errCtx:
		if err != nil {
			logger.Errorf("TwEventCallback插入回调记录失败:%v", err.Error())
		}
		return
	}

2. 直接 go func_xx(params) 无返回结果方式:

首先这个方式与上面区别就很大了,没有返回结果,那你如何控制你的 goroutine 的生命周期呢?

代码demo如下:

案例场景描述1:

一个最基本没有返回结果的情况下,如何进行goroutine生命周期控制,如下:

比如 程序中有 EventBus 事件上报执行对应的事件逻辑,但我们希望它是在我主业务的逻辑的旁支异步去执行,不应该阻塞的主业务浪费时间。(其实这里会演变成场景2)

func main() {
		// 这里调用一个eventbus事件
		eventbus := &App{}
		eventbus.Event("this is eventbus")

		// do ...常驻进程挂起操作
	  c := make(chan os.Signal, 1)
		signal.Notify(c, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
		for {
			si := <-c
			switch si {
			case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
				return eventbus.shutdown()
			default:
				return nil
			}
		}
}

type App struct {
	wg *sync.WaitGroup
}

func(a *App) shutdown() {
	// 各种平滑退出接口调用
	// 这里去调用一个 等待所有goroutine执行完毕 
	eventbus.WaitAsync()
}

func(a *App) Event(data string) {
	a.wg.Add(1) // 这里一定要写在goroutine外部
	go func(){
		defer a.wg.Done()
		time.Sleep(xxx)
		fmt.Println(data)
	}()
}

func(a *App) WaitAsync() {
	a.wg.Wait()
}

上面这段代码,是一个功能完善但是并不完美的平滑退出goroutine,但是你得等待执行完毕。

所以它的缺陷就是,如果其中某event一直在挂起,难不成我一直等嘛?还不是要泄内存露。

所以我们来看看完善版demo:

func TestEvent(t *testing.T) {
	tx := NewApp(10)
	go tx.Run()
	_ = tx.Event(context.Background(), "test1")
	_ = tx.Event(context.Background(), "test2")
	_ = tx.Event(context.Background(), "test3")
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
	defer cancel()

	// 加入超时控制,而不是死等所有任务执行完毕
	tx.shutdown(ctx)
}

func NewApp(chNum int) *App {
	return &App{
		// 设置待缓存的 channel
		// 这里 直接选择使用通道到代替 新开 goroutine 降低资源消耗
		ch: make(chan string, chNum),
	}
}

type App struct {
	ch   chan string
	stop chan struct{}
}

func (a *App) Event(ctx context.Context, data string) error {
	select {
	case a.ch <- data:
		return nil
	case <-ctx.Done():
		return ctx.Err()
	}
}

func (a *App) Run() {
	// 等待通道里的消息传入即可
	// 一旦 channel 被close后,就会跳出for range
	for data := range a.ch {
		// 执行你的注册 event 逻辑
		time.Sleep(1 * time.Second)
		fmt.Println(data)
	}
	// 执行完成后发送通道消息
	a.stop <- struct{}{}
}

func (a *App) shutdown(ctx context.Context) {
	close(a.ch)
	select {
	case <-a.stop:
	case <-ctx.Done():
	}
}

这段demo2 和上面的demo1 差别还是比较明显的。

利用 channel 代替了 goroutine,并且取消了 WaitGroup 用于的等待所有 goroutine 的运行。

代替方法就是在 Run() 函数中了。

在demo2中你得自己预估好你的超时时间具体设定的值,结合你的业务事件设置合理的超时控制时间即可。不然你就要一直等待~~