goroutine如何退出任务和goroutine生命周期的控制(runner并发模式)

关于goroutine的生命周期及其控制过程

开了goroutine如何才能退出呢?如何控制它并了解其生命周期?这里包括主动退出和外部的被动退出

一. 如何退出goroutine?

1. 主动自己退出:

  1. 经过多少时间后主动退出 (channel)
func main() {
    var wg sync.WaitGroup
    quit := time.Tick(time.Second * 2) // 给个定时器

    wg.Add(1)
    go func() {
        defer wg.Done()
        <-quit
        fmt.Println("over goroutine")
        return
    }()
    wg.Wait()

    fmt.Println("over all")
}
  1. 通过context通知goroutine退出:
func main() {
    var wg sync.WaitGroup
    ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
    defer cancel()

    wg.Add(1)
    go func() {
        defer wg.Done()
        <-ctx.Done()
        fmt.Println("over goroutine")
        return
    }()
    wg.Wait()

    fmt.Println("over all")
}

2. 外部被动退出:

我们可以通过外部操作接收信号方式来退出:

func main() {
   var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        defer wg.Done()
        ch := make(chan os.Signal)
        signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
        <-ch
        fmt.Println("over")
        return
    }()
    wg.Wait()
}

二. goroutine的生命周期之runner并发模式

ruuner 用于展示如何使用通道来监视程序的执行时间,如果程序运行时间太长,也可以用 runner 包来终止程序。

当开发者需要调度后台吹了任务的程序的时候,这种模式会很有用。这个程序可能会作为 cron 作业执行,或者在基于定时任务的云环境(iron.io)里执行。

在设计上,可以实现以下几点:

  • 程序可以在分配的时间内完成工作,正常终止;

  • 程序没有及时完成工作,“自杀”;

  • 接收到操作系统发送的中断事件,程序立刻试图清理状态并停止工作。

完整代码实现如下:

//统一错误处理
//超时错误信息,会在任务执行超时时返回
var ErrTimeOut = errors.New("received timeout")
//中断错误信号,会在接收到操作系统的事件时返回
var ErrInterrupt = errors.New("received interrupt")

// Runner 在给定的超时时间内执行一组任务
// 并且在操作系统发送中断信号时结束这些任务
type Runner struct {
   //从操作系统发送信号
   interrupt chan os.Signal
   //报告处理任务已完成
   complete chan error
   //报告处理任务已经超时
   timeout <- chan time.Time
   //持有一组以索引为顺序的依次执行的以int类型id为参数的函数
   tasks [] func(id int)
}

 // New 函数返回一个新的准备使用的Runner,d:自定义分配的时间
 // 初始化结构体参数
func New(d time.Duration) *Runner{
   return &Runner{
      interrupt:make(chan os.Signal,1),
      complete:make(chan error),
      //会在另一线程经过时间段d后向返回值发送当时的时间。
      // 因为 task 字段的零值是 nil,已经满足初始化的要求,所以没有被明确初始化。
      timeout:time.After(d),
   }
}

//将一个任务加入到Runner中
func (r *Runner)Add(tasks ...func(id int))  {
   r.tasks = append(r.tasks,tasks...)
}

//开始执行所有任务,并监控通道事件
func (r *Runner)Start()error  {
   //监控所有的中断信号
   signal.Notify(r.interrupt,os.Interrupt)
   //使用不同的goroutine执行不同的任务
   go func() {
      r.complete <- r.run()
   }()
   //使用 select 语句来监控goroutine的通信
   select {
      //等待任务完成
      case err := <- r.complete:
         return err
      //任务超时
      case <- r.timeout:
         return ErrTimeOut
   }
}

//执行每一个已注册的任务
func (r *Runner) run() error {
   for id, task := range r.tasks {
      //检测操作系统的中断信号
      if r.gotInterrupt() {
         return ErrInterrupt
      }
      //执行已注册的任务
      task(id)
   }
   return nil
}

//检测是否收到了中断信号
func (r *Runner)gotInterrupt()bool  {
   select {
   //当中断事件被触发时
      case <- r.interrupt:
         //停止接收后续的任何信号
         signal.Stop(r.interrupt)
      return true
      //继续执行
   default:
      return false
   }
}

main.go演示Runner测试

// timeout 规定了必须在多少秒内处理完成
const timeout = 3 * time.Second

func main() {
   log.Println("Starting work.")

   // 为本次执行分配超时时间
   r := New(timeout)

   // 加入要执行的任务
   r.Add(createTask(), createTask(), createTask())
   // 执行任务并处理结果
   if err := r.Start(); err != nil {
      switch err {
      case ErrTimeOut:
         log.Println("Terminating due to timeout.")
         os.Exit(1)
      case ErrInterrupt:
         log.Println("Terminating due to interrupt.")
         os.Exit(2)
      }
   }
   log.Println("Process ended.")
}

//createTask 返回一个根据 id 休眠指定秒数的示例任务
func createTask() func(int) {
   return func(id int) {
      log.Printf("Processor - Task #%d.", id)
      time.Sleep(time.Duration(id) * time.Second)
   }
}
Golang