Goroutine 生命周期

管好goroutine的生命周期

每次起一个goroutine 都该问自己两个问题,1.它什么时候该结束,2. 我怎么结束它

一般我们起程序的时候都会在监听应用程序端口的同时,起一个debug的监听比如pprof,这个但是有没有想过,如果go出去的debug监听挂了我们如何知道以及做些什么操作呢。下面这个例子可以在某个监听挂掉后同时结束另一个监听,从而实现go程生命周期的处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

// debug 服务
func serveDebug(stop <-chan struct{})error{
return serve(":6060",nil,stop)
}

// app 服务
func serveAPP(stop <-chan struct{})error{
return serve(":8080",nil,stop)
}

// 启动一个服务
func serve(addr string,handler http.Handler,stop <-chan struct{})error{
s := http.Server{
Addr: addr,
Handler: handler,

}

go func() {
<- stop
s.Shutdown(context.Background())
}()

return s.ListenAndServe()
}

// main
func main(){
done := make(chan error,2)
stop := make(chan struct{})

go func() {
done <- serveDebug(stop)
}()

go func() {
done <- serveAPP(stop)
}()

var stopped bool
for i:=0; i<2;i++{
if err := <- done ;err != nil{
fmt.Printf("error %v \n",err)
}

if ! stopped{
stopped = true
close(stop)
}
}

}

追踪goroutine 的生命周期

使用WaitGroup

看下面一个例子:

我们使用服务端埋点来跟踪记录一些事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type Tracker struct {

}

type App struct {
track Tracker
}

func (a *App)Handle(w http.ResponseWriter,r *http.Request){
w.WriteHeader(http.StatusCreated)

go a.track.Event("this event")
}

func (t *Tracker)Event(data string){

log.Println(data)
}

这个例子,无法保证创建的 goroutine 生命周期管理,会导致最常见的问题,就是在服务关闭时候,有一些事件丢失

使用 sync.WaitGroup 来追踪每一个创建的 goroutine,下面使用waitgroup 改进一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
type Tracker struct {
wg sync.WaitGroup
}

type App struct {
track Tracker
}

func (a *App)Handle(w http.ResponseWriter,r *http.Request){
w.WriteHeader(http.StatusCreated)

go a.track.Event("this event")
}

func (t *Tracker)Event(data string){
t.wg.Add(1)

go func() {
defer t.wg.Done()

time.Sleep(time.Millisecond)
log.Println(data)

}()

}

func (t *Tracker)Shutdown(){
t.wg.Wait()
}

func main(){
var a App

a.track.Shutdown()
}

这样改进之后所有gorotuine结束后主程才会退出,可以说是追踪了每一个go的生命。

但是,你仍然不能确定go程会不会阻塞从而导致主程序无法退出

使用context做超时处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
type Tracker struct {
wg sync.WaitGroup
}

type App struct {
track Tracker
}

func (a *App)Handle(w http.ResponseWriter,r *http.Request){
w.WriteHeader(http.StatusCreated)

go a.track.Event("this event")
}

func (t *Tracker)Event(data string){
t.wg.Add(1)

go func() {
defer t.wg.Done()

time.Sleep(time.Millisecond)
log.Println(data)

}()

}

func (t *Tracker)Shutdown(ctx context.Context)error{
ch := make(chan struct{})


go func() {
t.wg.Wait()
close(ch)
}()

select {
case <-ch:
return nil
case <-ctx.Done():
return errors.New("timeout")

}
}

func main(){
var a App

ctx,cancel := context.WithTimeout(context.Background(),5*time.Second)
defer cancel()

err := a.track.Shutdown(ctx)
if err != nil{
log.Println("shutdown err :",err)
}
}

这个例子虽然完美的去最终了生命周期,也确认了何时结束,但大量创建goroutine 来处理任务,代价高。

使用chanel

使用chanel来同步处理事件,同时控制事件处理的goroutine会是更好的选择:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
type Tracker2 struct {
ch chan string
stop chan struct{}
}

func NewTracker()*Tracker2{
return &Tracker2{
ch: make(chan string, 100),
}
}

func main(){
tr := NewTracker()
go tr.Run()
_ = tr.Event(context.Background(),"test1")
_ = tr.Event(context.Background(),"test2")
_ = tr.Event(context.Background(),"test3")
ctx ,cancel := context.WithTimeout(context.Background(),5*time.Second)
defer cancel()

tr.Shutdown(ctx)
}

func (t *Tracker2)Event(ctx context.Context,data string)error{
select {
case t.ch <- data:
return nil
case <- ctx.Done():
return ctx.Err()
}
}

func (t *Tracker2)Run(){
for data := range t.ch{
time.Sleep(time.Second)
fmt.Println(data)
}

t.stop <- struct{}{}
}


func (t *Tracker2)Shutdown(ctx context.Context) {
close(t.ch)
select {
case <-t.stop:
case <-ctx.Done():

}
}