Goroutine 生命周期
管好goroutine的生命周期
每次起一个goroutine 都该问自己两个问题,1.它什么时候该结束,2. 我怎么结束它
一般我们起程序的时候都会在监听应用程序端口的同时,起一个debug的监听比如pprof,这个但是有没有想过,如果go出去的debug监听挂了我们如何知道以及做些什么操作呢。下面这个例子可以在某个监听挂掉后同时结束另一个监听,从而实现go程生命周期的处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
|
func serveDebug(stop <-chan struct{})error{ return serve(":6060",nil,stop) }
func serveAPP(stop <-chan struct{})error{ return serve(":8080",nil,stop) }
func serve(addr string,handler http.Handler,stop <-chan struct{})error{ s := http.Server{ Addr: addr, Handler: handler,
}
go func() { <- stop s.Shutdown(context.Background()) }()
return s.ListenAndServe() }
func main(){ done := make(chan error,2) stop := make(chan struct{})
go func() { done <- serveDebug(stop) }()
go func() { done <- serveAPP(stop) }()
var stopped bool for i:=0; i<2;i++{ if err := <- done ;err != nil{ fmt.Printf("error %v \n",err) }
if ! stopped{ stopped = true close(stop) } }
}
|
追踪goroutine 的生命周期
使用WaitGroup
看下面一个例子:
我们使用服务端埋点来跟踪记录一些事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| type Tracker struct {
}
type App struct { track Tracker }
func (a *App)Handle(w http.ResponseWriter,r *http.Request){ w.WriteHeader(http.StatusCreated)
go a.track.Event("this event") }
func (t *Tracker)Event(data string){ log.Println(data) }
|
这个例子,无法保证创建的 goroutine 生命周期管理,会导致最常见的问题,就是在服务关闭时候,有一些事件丢失
使用 sync.WaitGroup 来追踪每一个创建的 goroutine,下面使用waitgroup 改进一下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| type Tracker struct { wg sync.WaitGroup }
type App struct { track Tracker }
func (a *App)Handle(w http.ResponseWriter,r *http.Request){ w.WriteHeader(http.StatusCreated)
go a.track.Event("this event") }
func (t *Tracker)Event(data string){ t.wg.Add(1)
go func() { defer t.wg.Done()
time.Sleep(time.Millisecond) log.Println(data)
}()
}
func (t *Tracker)Shutdown(){ t.wg.Wait() }
func main(){ var a App
a.track.Shutdown() }
|
这样改进之后所有gorotuine结束后主程才会退出,可以说是追踪了每一个go的生命。
但是,你仍然不能确定go程会不会阻塞从而导致主程序无法退出
使用context做超时处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| type Tracker struct { wg sync.WaitGroup }
type App struct { track Tracker }
func (a *App)Handle(w http.ResponseWriter,r *http.Request){ w.WriteHeader(http.StatusCreated)
go a.track.Event("this event") }
func (t *Tracker)Event(data string){ t.wg.Add(1)
go func() { defer t.wg.Done()
time.Sleep(time.Millisecond) log.Println(data)
}()
}
func (t *Tracker)Shutdown(ctx context.Context)error{ ch := make(chan struct{})
go func() { t.wg.Wait() close(ch) }()
select { case <-ch: return nil case <-ctx.Done(): return errors.New("timeout") } }
func main(){ var a App
ctx,cancel := context.WithTimeout(context.Background(),5*time.Second) defer cancel()
err := a.track.Shutdown(ctx) if err != nil{ log.Println("shutdown err :",err) } }
|
这个例子虽然完美的去最终了生命周期,也确认了何时结束,但大量创建goroutine 来处理任务,代价高。
使用chanel
使用chanel来同步处理事件,同时控制事件处理的goroutine会是更好的选择:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| type Tracker2 struct { ch chan string stop chan struct{} }
func NewTracker()*Tracker2{ return &Tracker2{ ch: make(chan string, 100), } }
func main(){ tr := NewTracker() go tr.Run() _ = tr.Event(context.Background(),"test1") _ = tr.Event(context.Background(),"test2") _ = tr.Event(context.Background(),"test3") ctx ,cancel := context.WithTimeout(context.Background(),5*time.Second) defer cancel()
tr.Shutdown(ctx) }
func (t *Tracker2)Event(ctx context.Context,data string)error{ select { case t.ch <- data: return nil case <- ctx.Done(): return ctx.Err() } }
func (t *Tracker2)Run(){ for data := range t.ch{ time.Sleep(time.Second) fmt.Println(data) }
t.stop <- struct{}{} }
func (t *Tracker2)Shutdown(ctx context.Context) { close(t.ch) select { case <-t.stop: case <-ctx.Done():
} }
|