用 go 处理1分钟百万请求

这是 Marcio Castilho 在 MBAM 时产生的一个需求,我在读后发现他们在处理百万请求时设计的并发池是非常巧妙的,在这里把这篇文章和代码做一个解析,原文在这里

这篇文章不是单纯的翻译,主要是对他们在处理问题时不同版本的代码进行一个分析,原文中有很多作者的心路历程,大家感兴趣的可以去看原文。

背景

Marcio Castilho 在处理他们的匿名数据统计和分析系统时遇到一个需求:我们的目标是能够处理来自数百万个端点的大量 POST 请求。Web 处理程序将接收一个 JSON 文档,该文档可能包含需要写入 Amazon S3 的许多有效负载的集合,以便我们的 map-reduce 系统稍后对这些数据进行操作。S3就是亚马逊的一个对象存储服务,其实他们的主要业务逻辑很简单,难就难在控制并发。最开始他们按照自己以往的经验想要建立一个worker层架构,使用:

  • Sidekiq
  • Resque
  • DelayedJob
  • Elasticbeanstalk Worker Tier
  • RabbitMQ
  • and so on…

建立两个集群,一个是前端集群进行请求消息收集,一个是worker层集群不停的进行请求处理,这样可以处理并hold住大量的请求。

但他们在讨论的时候发现这个架构很有可能变得复杂,庞大。用 golang 的天然支持并发的特点显然可以简单的进行并发处理。

代码版本一

请求结构

处理请求结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
type PayloadCollection struct {
WindowsVersion string `json:"version"`
Token string `json:"token"`
Payloads []Payload `json:"data"`
}

type Payload struct {
// [redacted]
}

func (p *Payload) UploadToS3() error {
// the storageFolder method ensures that there are no name collision in
// case we get same timestamp in the key name
storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())

bucket := S3Bucket

b := new(bytes.Buffer)
encodeErr := json.NewEncoder(b).Encode(payload)
if encodeErr != nil {
return encodeErr
}

// Everything we post to the S3 bucket should be marked 'private'
var acl = s3.Private
var contentType = "application/octet-stream"

return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}

这个代码是业务逻辑不做过多解释,你知道这是一个请求处理就好。

最初版本 goroutine 处理

最初他们采用了一个非常简单的 POST 处理程序实现,只是尝试将作业处理并行化为一个简单的 goroutine

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func payloadHandler(w http.ResponseWriter, r *http.Request) {

if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}

// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
go payload.UploadToS3() // <----- DON'T DO THIS
}

w.WriteHeader(http.StatusOK)
}

看来大佬考虑问题一开始也不会很周全,大家都是摸着石头过河,边做边改的。这个简单的做法的弱点是显而易见的,完全没有控制并发数量,而且实际的并发远超他们的想象,他们的机器很快就到达了处理极限,直接崩掉。

需要注意的是这只是一个伪代码,明显是有 bug 的,大家不要轻易的拿去用,比如经典的 for range循环问题,这个在之前的文章中我也说过 range 是形成一个新指针每次循环都将结构拷贝到新指针处,golang 在编译时,起 go 程是发生在函数调用之后的所以这个这个循环起 goroutine 将会使用最后一个数据集。建议用下标循环或是做一个新指针接收拷贝值再掉用

代码版本二

紧接着他们迅速开始了第二次尝试,他们觉得上一个的问题是没有进行请求的缓冲,直接并发导致程序迅速崩盘,这一次他们做了一个Buffered Channel就是有缓冲管道,将请求缓存进管道再起一个生产者来对请求进行后台处理。作者说了他们要解决问题的核心就是快速获取请求,让用户感知不到延迟,同时在后台默默的把请求做掉。

看一下版本二的代码:

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var Queue chan Payload

func init() {
Queue = make(chan Payload, MAX_QUEUE)
}

func payloadHandler(w http.ResponseWriter, r *http.Request) {
...
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
Queue <- payload
}
...
}

消费者

1
2
3
4
5
6
7
8
func StartProcessor() {
for {
select {
case job := <-Queue:
job.payload.UploadToS3() // <-- STILL NOT GOOD
}
}
}

好了,这个新版的处理方式也是让我惊掉下巴,这个一次进行一个请求处理,并把超出处理的请求缓存起来的方式实际上是个串行的程序,他们只是用了一个管道来缓冲,期待用管道完全消峰。但他们再一次的对请求量预估不足了。管道里的请求立刻就满了,随着请求的增加用户延迟不断飙升。一首凉凉。

最终版本

最终版本真的是完全解决了他们的问题,他们最终定位了解决问题的核心,一个是让并发请求始终在机器可以承受的最大范围,另一个是让用户请求可以快速通过没有延迟的感知。

他们最终使用双层管道实现了一个并发池,一层管道用于排队接收任务,一层管道用于控制并发的最大量。看代码:

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)

// Job represents the job to be run
type Job struct {
Payload Payload
}

// A buffered channel that we can send work requests on.
var JobQueue chan Job

// Worker represents the worker that executes the job
type Worker struct {
WorkerPool chan chan Job
JobChannel chan Job
quit chan bool
}

func NewWorker(workerPool chan chan Job) Worker {
return Worker{
WorkerPool: workerPool,
JobChannel: make(chan Job),
quit: make(chan bool)}
}

// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
go func() {
for {
// register the current worker into the worker queue.
w.WorkerPool <- w.JobChannel

select {
case job := <-w.JobChannel:
// we have received a work request.
if err := job.Payload.UploadToS3(); err != nil {
log.Errorf("Error uploading to S3: %s", err.Error())
}

case <-w.quit:
// we have received a signal to stop
return
}
}
}()
}

// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
go func() {
w.quit <- true
}()
}

这个消费者实际是个并发池设计,你可以根据你的程序进行最大数量的启动,消费者做的事情其实也很简单就是不断的生成管道放入并发管道池,并对该管道进行接收,收到任务就立刻去做,完成后在继续生成接收管道并放入并发管道池中。

任务列队

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func payloadHandler(w http.ResponseWriter, r *http.Request) {

if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}

// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {

// let's create a job with the payload
work := Job{Payload: payload}

// Push the work onto the queue.
JobQueue <- work
}

w.WriteHeader(http.StatusOK)
}

生产者

调用

1
2
dispatcher := NewDispatcher(MaxWorker)
dispatcher.Run()

逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)

type Dispatcher struct {
// A pool of workers channels that are registered with the dispatcher
WorkerPool chan chan Job
}

func NewDispatcher(maxWorkers int) *Dispatcher {
pool := make(chan chan Job, maxWorkers)
return &Dispatcher{WorkerPool: pool}
}

func (d *Dispatcher) Run() {
// starting n number of workers
for i := 0; i < d.maxWorkers; i++ {
worker := NewWorker(d.pool)
worker.Start()
}

go d.dispatch()
}

func (d *Dispatcher) dispatch() {
for {
select {
case job := <-JobQueue:
// a job request has been received
go func(job Job) {
// try to obtain a worker job channel that is available.
// this will block until a worker is idle
jobChannel := <-d.WorkerPool

// dispatch the job to the worker job channel
jobChannel <- job
}(job)
}
}
}

你是不是可以想象到生产者代码是怎么写的,其实就是通过从任务队列里获取任务

然后从任务并发池中获取空闲消费者的任务接收管道,一直等待到获取到,不然就会一直block在那里。获取到就把任务发送给他,让他去后台工作。

可以看到作者定义了一个 MaxWorker 这是并发的最大数量,起对应数量的消费者。

这个架构已经使用用户延迟立刻降到最低,并切程序稳定的运行处理请求,再也没有让机器崩盘。

代码的问题和我的疑问

问题:

这个大佬的代码还会有问题吗?不是实践中已经得到了验证吗?
其实我们应该带着批判的眼光来看每一条代码:

  • 代码没有控制协程的生命周期,无法优雅的优雅的退出。
  • 代码的退出 Stop 的方法仅仅是控制消费者退出,因为生产者也是并发的,所以一旦消费者完全退出。生产者将会在获取工作管道时阻塞并一直阻塞在哪里。

疑问:

  • 为什么要用双层管道,我们可以使用单管道实现类似的功能。

思路如下:

生产者

1
2
3
4
5
for _,job := range JobQueue{
go func (){
jobChannel <- job
}()
}

消费者

1
2
3
4
5
6
7
for i:=0;i<MaxCosumer;i++{
go func(){
for job := range jobChannel{
// 处理任务的逻辑
}
}()
}

是不是相同的功能呢?控制消费者的并发量同时让生产者也并发起来。

问题解决

我们就通过代码来依次解决提出的问题

首先基于前两个问题改造作者的代码:
具体代码可以点这里

工作者接口

做一个工作者的接口是为了做两个工作者,一个以作者的思路,一个以我刚刚列出的思路,接口需要实现以下方法:

1
2
3
4
5
type Work interface {
Dispatch(job string)
StartPool()
Stop()
}

新建工作者方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

const (
w1 = "w1"
w2 = "w2"
)

// NewWorker doc
func NewWorker(t string, max int) Work {
switch t {
case w1:
return &W1{
MaxNum: max,
Wg: &sync.WaitGroup{},
Ch: make(chan string),
}

case w2:
return &W2{
Wg: &sync.WaitGroup{},
MaxNum: max,
ChPool: make(chan chan string, max),
QuitChan: make(chan struct{}),
}

}

return nil
}

W1 是使用我的思路进行编码的,将在后面解析代码。

作者思路工作者 W2 及 SubWorker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
type W2 struct {
SubWorkers []SubWorker
Wg *sync.WaitGroup
MaxNum int
ChPool chan chan string
QuitChan chan struct{}
}


type SubWorker struct {
JobChan chan string
}

func (sw *SubWorker) Run(wg *sync.WaitGroup, poolCh chan chan string, quitCh chan struct{}) {
if sw.JobChan == nil {
sw.JobChan = make(chan string)
}
wg.Add(1)
go func() {
defer wg.Done()

for {
poolCh <- sw.JobChan
select {
case res := <-sw.JobChan:
logrus.Debugf("完成工作: %s \n", res)

case <-quitCh:
logrus.Debugf("消费者结束...... \n")
return
}
}
}()
}

作者的思路消费者独占一个工作接收管道,我把每个消费者都做成 SubWorker 并实现 Run 方法,Run 方法其实就是作者的消费者工作核心逻辑。可以看到他会像双层管道中 poolCh(以参数形式传入),工作过程为把自己自身的管道放入 poolCh ,然后在再接收自身管道任务并执行。quitCh 为结束该消费者的管道。

W2 工作者方法实现

Dispatch

1
2
3
4
5
6
7
8
9
10
11
12
13
func (w *W2) Dispatch(job string) {
jobChan := <-w.ChPool

select {
case jobChan <- job:
logrus.Debugf("发送任务 : %s 完成 \n", job)
return

case <-w.QuitChan:
logrus.Debugf("发送者(%s)结束 \n", job)
return
}
}

分发任务和作者思路一致,获取双层管道 ChPool 中的消费者管道,并把任务分发过去。

StartPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (w *W2) StartPool() {
if w.ChPool == nil {
w.ChPool = make(chan chan string, w.MaxNum)
}

if w.SubWorkers == nil {
w.SubWorkers = make([]SubWorker, w.MaxNum)
}

if w.Wg == nil {
w.Wg = &sync.WaitGroup{}
}

for i := 0; i < len(w.SubWorkers); i++ {
w.SubWorkers[i].Run(w.Wg, w.ChPool, w.QuitChan)
}
}

启动并发池,将最大数量的 SubWorker 设置出来,并 Run 起这些消费者形成并发池。

Stop

1
2
3
4
5
6
func (w *W2) Stop() {
close(w.QuitChan)
w.Wg.Wait()

close(w.ChPool)
}

结束,结束这里是作者没有考虑到的,优雅的结束一个并发组件是极其重要的,如果每次使用你所启动的并发都不考虑用完结束,那么将存在着极大的内存溢出的风险,影响机器性能。这里的结束其实就是关闭 QuitChan, 这个结束管道同时控制着消费者和生产者,我们看一下关闭结束管道后会发生些什么事。首先消费者将会在接收任务时触发关闭管道的 case 并退出该协程,生产者也会在发送任务的时候触发关闭 case 退出协程。

这里有一个值得注意的点,生产者过多会阻塞在第一步获取 poolCh 并发管道的时候,而消费者的在触发关闭管道的 case 前是不会被阻塞的,所以我们只要等待消费者把所有任务做完并全部退出,之后将并发池管道关闭,对于一个关闭的管道尝试去获取他数据都会获取到该数据类型的零值,生产者获取关闭的并发管道池将会获取 nil chan 在尝试将任务放入该管道时会阻塞,此时将会触发关闭管道的 case。此时所有生产者也都会退出协程。

这个退出方式的缺点是如果量请求并发堆积,将会放弃多数堆积请求。你可以将请求停掉,然后等待所有的生产者退出再退出消费者,这种方式其实缺点也很明显,你需要等待很久。所以我时默认触发退出时请求时可以进行丢弃的。

代码改进后可以进行优雅退出并且不会产生因生产者阻塞内存泄漏的风险。

启动

1
2
3
4
5
6
7
8
9
10
11

func DealW2(max int) {
w := NewWorker(w2, max)
w.StartPool()
for i := 0; i < 10*max; i++ {
go w.Dispatch(fmt.Sprintf("%d", i))
}

w.Stop()
}

启动其实就是起了额定数量的消费者,并发送十倍的请求量。

我的思路工作者 W1

1
2
3
4
5
6
7
type W1 struct {
WgSend *sync.WaitGroup
Wg *sync.WaitGroup
MaxNum int
Ch chan string
DispatchStop chan struct{}
}

我的工作者内部设计了两个 waitGroup 一个用于控制生产者的协程生命周期一个用于控制消费者的协程生命周期。为什么要这么做呢,因为在我的思路中,停止程序时一定是要工作者先行退出的,因为生产者在不断的向管道中发送消息,消费者是以管道关闭的方式结束退出的。管道关闭必须结束发送端否则会崩溃。当然也可以一个控制结束管道控制所有的发送者和接收者等到所有都结束再进行管道关闭。这样代码会更复杂一些。

工作者W1方法实现

Dispatch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (w *W1) Dispatch(job string) {
w.WgSend.Add(10 * w.MaxNum)
for i := 0; i < 10*w.MaxNum; i++ {
go func(i int) {
defer w.WgSend.Done()

select {
case w.Ch <- fmt.Sprintf("%d", i):
return
case <-w.DispatchStop:
logrus.Debugln("退出发送 job: ", fmt.Sprintf("%d", i))
return
}
}(i)
}
}

发送者就是给将任务发送到管道中

StartPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (w *W1) StartPool() {
if w.Ch == nil {
w.Ch = make(chan string, w.MaxNum)
}

if w.WgSend == nil {
w.WgSend = &sync.WaitGroup{}
}

if w.Wg == nil {
w.Wg = &sync.WaitGroup{}
}

if w.DispatchStop == nil {
w.DispatchStop = make(chan struct{})
}

w.Wg.Add(w.MaxNum)
for i := 0; i < w.MaxNum; i++ {
go func() {
defer w.Wg.Done()
for v := range w.Ch {
logrus.Debugf("完成工作: %s \n", v)
}
}()
}
}

启动消费者并发池,并发池就简单很就是起并发接收管道数据。

Stop

1
2
3
4
5
6
7
func (w *W1) Stop() {
close(w.DispatchStop)
w.WgSend.Wait()

close(w.Ch)
w.Wg.Wait()
}

结束方法就是进行先结束所有的发送者,等待所有的发送者结束再结束消费者。

启动

1
2
3
4
5
6
func DealW1(max int) {
w := NewWorker(w1, max)
w.StartPool()
w.Dispatch("")
w.Stop()
}

启动其实就是将额定的消费者和生产者进行启动并等待优雅退出

分析

使用 benchmark 进行性能分析

1
2
3
4
5
6
7
8
9
10
11
12
func BenchmarkDealW1(b *testing.B) {
for i := 0; i < b.N; i++ {
DealW1(100000)
}
}

func BenchmarkDealW2(b *testing.B) {
for i := 0; i < b.N; i++ {
DealW2(100000)
}
}

先进行大并发量的性能分析

image-20211022001304078

进行多次测试发现执行时间上W1也就是我的思路更好,W2则在内存分配上更具优势,但其实优势都不明显。

再看看普通量的并发表现,刚刚定并发量是100000,现在定为200试一下:

image-20211022002039175

可以看到从时间和空间角度都是我的W1思路更优的,所以我认为简单的使用我的思路来写就可以。

总结

作者使用双层管道的原因应该是作者认为获取管道任务需要以消费者注册管道方式去限制及调度,其实 go 语言中的 channel 在并发获取管道内容的时候 channel 会维护一个接收者队列,如果接收者去接收一个空管道,那么接收者是会被塞入接收者队列,并在管道中有任务的时候按照顺序来进行接收者的调度,所以只要控制管道接收者数量就可以实现并发量的控制。

题外话

这个例子让我想起初期用 go 的一个并发写法,抛出来看看有什么问题(与原文无关,完全是突然想起来总结一下):

  • 下面两段代码:代码(一) ,代码(二)的区别在哪,使用下面代码(一)处理并发存在什么问题

在我刚接触Go的时候,我学习团队大佬的并发任务调度写法是这样的:

代码(一):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87

const maxCurrency = 256

type Worker struct {
WorkChan chan <- interface{}
Total int64
WorkQueue sync.Map
}

func (w *Worker)add(){
atomic.AddInt64( &w.Total,1)
}

func (w *Worker)sub(){
atomic.AddInt64( &w.Total,1)
}

func(w *Worker)Do(){
for task := range w.WorkChan{
// 任务重复检查
if _,ok:= w.WorkQueue.LoadOrStore(task, struct {}{});!ok{
fmt.Println("work is doing",task)
continue
}

// 并发量控制
for atomic.LoadInt64(&w.Total) >= maxCurrency{
time.Sleep(1 * time.Second)
}

//核心逻辑
w.add()
go func() {
defer w.sub()

time.Sleep(1*time.Second)
fmt.Println("work done",task)
}()
}
}

func getTasks()([]interface{},error){
return []interface{}{"task1,task2,task3"},nil
}

func (w *Worker)SendWorkTask(){
tick := time.NewTicker(10 * time.Second)
for range tick.C{
tasks,err := getTasks()
if err != nil{
continue
}

for _,task := range tasks{
w. WorkChan <- task
}
}
}

func main(){
newWorker := &Worker{
WorkQueue: sync.Map{},
WorkChan: make(chan interface{},256),
}


go newWorker.SendWorkTask()
go newWorker.Do()
}

func (w *Worker)SendWorkTask(){
tick := time.NewTicker(10 * time.Second)
for range tick.C{
w. WorkChan <- "work"
}
}

func main(){
newWorker := &Worker{
WorkQueue: sync.Map{},
WorkChan: make(chan interface{},256),
}


go newWorker.SendWorkTask()
go newWorker.Do()
}

那么换个思路我把管道拿掉,我把并发任务直接写在发送方法中,同样的限制并发数。那么代码逻辑就变成了这样:

代码(二):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
type Worker struct {
WorkChan chan <- interface{}
Total int64
WorkQueue sync.Map
}

func (w *Worker)add(){
atomic.AddInt64( &w.Total,1)
}

func (w *Worker)sub(){
atomic.AddInt64( &w.Total,1)
}

func (w *Worker)SendWorkTask(){
tick := time.NewTicker(10 * time.Second)
for range tick.C{
tasks,err := getTasks()
if err != nil{
continue
}

for _,task := range tasks{
// 任务重复检查
if _,ok:= w.WorkQueue.LoadOrStore(task, struct {}{});!ok{
fmt.Println("work is doing",task)
continue
}

// 并发量控制
for atomic.LoadInt64(&w.Total) >= maxCurrency{
time.Sleep(1 * time.Second)
}

//核心逻辑
w.add()
go func(t interface{}) {
defer w.sub()

time.Sleep(1*time.Second)
fmt.Println("work done",task)
}(task)
}
}
}

func main(){
newWorker := &Worker{
WorkQueue: sync.Map{},
WorkChan: make(chan interface{},256),
}


go newWorker.SendWorkTask()
}

这个代码其实简单了很多,而且没有了管道,反而减少了管道调度生产消费者以及阻塞时上锁的性能消耗。那么我们要管道干嘛呢?优势在哪?

两个优势

  • 解耦生产者和消费者

这两段代码看上去是一样的效果,但是第一段代码其实利用管道将生产者消费者解耦了,这样以来我们可以把并发的操控逻辑单独抽取出来,将生产者消费者抽象出来,这样只要核心处理逻辑就好(其实就是利用接口和多态)。

  • 使数据处理更佳平滑,不卡顿

在获取数据后加一个加入缓存队列的唯一意义是在并发量有坑位的时候的迅速补上,而不是先去查数据再补上。这可能就是几毫秒的速度差距,根本可以忽略。

缺陷

这个并发的写法缺陷也十分明显,他虽然控制了并发量但他控制不了每个协程的生命周期,使用计数的方式控制并发,那么每个go程都将是执行完就泯灭的,所以根本没法去控制它的生命周期,试想如果你的程序重启了很多次,这个数字每次都归零,那么将会堆积大量野的go程造成内存泄漏,好一点也会大大影响性能增加延迟。