聊聊 ErrorGroup 的用法和拓展

在写业务代码时经常碰到需要将一个通用的父任务拆成几个小任务并发执行的场景。此时需要将一个大的任务拆成几个小任务并发执行,来提高程序效率。

那我是怎么做的呢,我会在一个函数中启动我需要的协程来并发完成任务,每个协程构建一个管道进行结果传输,最后起一个for select 收集每个不同的管道任务。使用wait group来控制每个协程生命周期。

其实类似的业务场景已经有很多大神进行了抽象并形成了简单易用的代码框架,本文将以 go 官方的 ErrGroup 库为切入点对各路大佬的拓展库进行分析。

为什么要写这个文章呢,因为我在看这些库时发现,在这个基础上大佬们约拓展越丰富,把这些代码看下来感觉自己对业务场景的抽象能力以及处理能力增加了很多,但这些必须总结下来!因为一定会忘记,尤其是长时间不用,一旦出现相似的场景,那么很难直接回忆起来,所以提笔写了这篇文章。有各路大佬保驾护航那一定能快速形成没有坑的优秀代码。

官方 ErrorGroup 介绍

ErrGroup 是 Go 官方提供的一个同步扩展库。接下来,我来给你介绍一下 ErrGroup 的基本用法和几种应用场景。基本用法golang.org/x/sync/errgroup 包下定义了一个 Group struct,它就是我们要介绍的 ErrGroup 并发原语,底层也是基于 WaitGroup 实现的。在使用 ErrGroup 时,我们要用到三个方法,分别是 WithContext、Go 和 Wait。

WithContext

在创建一个 Group 对象时,需要使用 WithContext 方法:

1
func WithContext(ctx context.Context) (*Group, context.Context)

这个方法返回一个 Group 实例,同时还会返回一个使用 context.WithCancel(ctx) 生成的新 Context。一旦有一个子任务返回错误,或者是 Wait 调用返回,这个新 Context 就会被 cancel。Group 的零值也是合法的,只不过,你就没有一个可以监控是否 cancel 的 Context 了。

看看 WithContext 的源码:

1
2
3
4
5
6
7
8
9
// WithContext returns a new Group and an associated Context derived from ctx.
//
// The derived Context is canceled the first time a function passed to Go
// returns a non-nil error or the first time Wait returns, whichever occurs
// first.
func WithContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := context.WithCancel(ctx)
return &Group{cancel: cancel}, ctx
}

可以看到这是新形成一个 Context,我们知道 Context 的子 Context 在执行他的 cancel 方法时,父 Context 将不会接收到结束信号 Done。所以在使用 g.cancel 时要注意,如果你穿入 Context 正控制着一些程序那么他们将不会被终止。会终止的只是返回 Context 控制的程序。

Go

1
func (g *Group) Go(f func() error)

传入的子任务函数 f 是类型为 func() error 的函数,如果任务执行成功,就返回 nil,否则就返回 error,并且会 cancel 那个新的 Context。

看看 Go 方法的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Go calls the given function in a new goroutine.
//
// The first call to return a non-nil error cancels the group; its error will be
// returned by Wait.
func (g *Group) Go(f func() error) {
g.wg.Add(1)

go func() {
defer g.wg.Done()

if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel()
}
})
}
}()
}

这里可以看到每个Go方法都会起一个协程来做你传入的逻辑函数,如果有错误的话那么只执行一次错误赋值,就是只将第一次出错的值赋给 g.err。

Wait

Wait 方法其实类似于 WaitGroup 的 Wait 方法,直接看源码:

1
2
3
4
5
6
7
8
9
// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {
g.wg.Wait()
if g.cancel != nil {
g.cancel()
}
return g.err
}

他在等待所有的Go方法启动的协程结束后将 Context 取消然后返回 err。

官方的 ErrGroup 非常简单,其实就是解决小型多任务并发任务,为什么说小型呢,因为它没有控制并发量的设计,在需要进行大量任务并行的业务场景下是不适用的。但好像控制并发量这个业务逻辑很容易实现,确实如此,与其自己去拓展不如直接拿大佬的去用,很多拓展包都实现了控制并发量的ErrGroup。

B站拓展包:bilibili/errgroup

这是 B 站微服务框架中的一个拓展包。他解决了官方 ErrGroup 的几个痛点

  • 控制并发量
  • Recover 住协程的 Panic 并打出堆栈信息

先看一下抽象出的 Group 结构

Group

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// A Group is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
// A zero Group is valid and does not cancel on error.
type Group struct {
err error
wg sync.WaitGroup
errOnce sync.Once

workerOnce sync.Once
ch chan func(ctx context.Context) error
chs []func(ctx context.Context) error

ctx context.Context
cancel func()
}

相比官方的结构,B站的结构多出了一个函数签名管道和一个函数签名切片。接下来我们看看具体是怎么用的。接下来看看他是怎么通过这个两个结构实现并发控制的。

WithContext

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// WithContext create a Group.
// given function from Go will receive this context,
func WithContext(ctx context.Context) *Group {
return &Group{ctx: ctx}
}

// WithCancel create a new Group and an associated Context derived from ctx.
//
// given function from Go will receive context derived from this ctx,
// The derived Context is canceled the first time a function passed to Go
// returns a non-nil error or the first time Wait returns, whichever occurs
// first.
func WithCancel(ctx context.Context) *Group {
ctx, cancel := context.WithCancel(ctx)
return &Group{ctx: ctx, cancel: cancel}
}

B站的拓展包把 Context 直接放入了返回的 Group 结构,返回仅返回一个 Group 结构指针。

Go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Go calls the given function in a new goroutine.
//
// The first call to return a non-nil error cancels the group; its error will be
// returned by Wait.
func (g *Group) Go(f func(ctx context.Context) error) {
g.wg.Add(1)
if g.ch != nil {
select {
case g.ch <- f:
default:
g.chs = append(g.chs, f)
}
return
}
go g.do(f)
}

Go方法可以看出并不是直接起协程的(如果管道已经初始化好了),而是优先将函数签名放入管道,管道如果满了就放入切片。

do

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (g *Group) do(f func(ctx context.Context) error) {
ctx := g.ctx
if ctx == nil {
ctx = context.Background()
}
var err error
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 64<<10)
buf = buf[:runtime.Stack(buf, false)]
err = fmt.Errorf("errgroup: panic recovered: %s\n%s", r, buf)
}
if err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel()
}
})
}
g.wg.Done()
}()
err = f(ctx)
}

do方法才是执行逻辑的方法,Go 方法是在“官方的 Go 方法”之外包的一层函数签名控制层。这里有两个点值得注意:

  • 传入的函数签名并不是 0 参数的而是添加了 ctx 参数,如果为空的化会把 Group 结构中存储的 context 传入,这个改动应该是防止忘记使用 ctx 来控制协程的生命周期。
  • 在函数结束时添加 recover 函数,并将堆栈信息保存。

GOMAXPROCS

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// GOMAXPROCS set max goroutine to work.
func (g *Group) GOMAXPROCS(n int) {
if n <= 0 {
panic("errgroup: GOMAXPROCS must great than 0")
}
g.workerOnce.Do(func() {
g.ch = make(chan func(context.Context) error, n)
for i := 0; i < n; i++ {
go func() {
for f := range g.ch {
g.do(f)
}
}()
}
})
}

这是整个拓展的精髓所在,这个函数其实是起了一个并发池来控制协程数量,传入最大协程数量进行并发消费管道里的函数签名。

这样就可以明白它是如何控制并发数量的,他的ch是用来接收任务的,而并发处理任务的数量是固定。如果管道超过额定值堵塞了那么就会放入缓存切片中。那么切片中的任务是如何消费的呢,结下来看 wait 函数。

Wait

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {
if g.ch != nil {
for _, f := range g.chs {
g.ch <- f
}
}
g.wg.Wait()
if g.ch != nil {
close(g.ch) // let all receiver exit
}
if g.cancel != nil {
g.cancel()
}
return g.err
}

可以看到 Wait 函数先把切片中的任务放入管道进行处理再等待所有启动的任务完成,完成之后关掉管道(结束所有协程),调用cancel,结束掉 context。

所以只有在进入Wait 的时候才会进行切片中的任务执行。

整个流程梳理下来其实就是启动一个固定数量的并发池消费任务,Go函数其实是向管道中发送任务的生产者,这个设计中有意思的是他的协程生命周期的控制,他的控制方式是每发送一个任务都进行 WaitGroup 加一,在最后结束时的 wait 函数中进行等待,等待所有的请求都处理完才会关闭管道,返出错误。

问题

值得注意的是这个包几个比较坑的问题

  • Go方法并发的去调用在量很多的情况下会产生死锁,因为他的切片不是线程安全的,如果要并发,并发数量一定不能过大,一旦动用了任务切片,那么很有可能就在wait 方法那里 hold 住了。这个可以加个锁来优化。
  • Wg watigroup 只在Go方法中进行Add(),并没有控制消费者的并发,Wait的逻辑就是分发者都分发完成,直接关闭管道,让消费者并发池自行销毁,不去管控,一旦逻辑中有完全hold住的方法那么容易产生内存泄漏。

还有一个更好的优化方法,就是利用双层管道,并发池和双层管道外层的缓存数量相同,每个并发的消费者都先生成一个管道,然后在接收管道中的任务,Go 方法就是从双层管道中获取管道然后将任务放入,如果并发池都在忙则会直接 hold 住。可以点击这里看一下我优化后的代码。

neilotoole/errgroup

我们再看一个拓展,这个是较新的一个 ErrGroup 扩展库,它可以直接替换官方的 ErrGroup,方法都一样,原有功能也一样,只不过增加了可以控制并发 goroutine 的功能。

Group

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// A Group is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
// A zero Group is valid and does not cancel on error.
//
// This Group implementation differs from sync/errgroup in that instead
// of each call to Go spawning a new Go routine, the f passed to Go
// is sent to a queue channel (qCh), and is picked up by one of N
// worker goroutines. The number of goroutines (numG) and the queue
// channel size (qSize) are args to WithContextN. The zero Group and
// the Group returned by WithContext both use default values (the value
// of runtime.NumCPU) for the numG and qSize args. A side-effect of this
// implementation is that the Go method will block while qCh is full: in
// contrast, errgroup.Group's Go method never blocks (it always spawns
// a new goroutine).
type Group struct {
cancel func()

wg sync.WaitGroup

errOnce sync.Once
err error

// numG is the maximum number of goroutines that can be started.
numG int

// qSize is the capacity of qCh, used for buffering funcs
// passed to method Go.
qSize int

// qCh is the buffer used to hold funcs passed to method Go
// before they are picked up by worker goroutines.
qCh chan func() error

// qMu protects qCh.
qMu sync.Mutex

// gCount tracks the number of worker goroutines.
gCount int64
}

相比官方的库,他多了函数签名管道 qCh,最大并发数量 numG,最大管道缓存量 qSize,以及一个保护锁 qMu。看完刚刚B站的拓展包是不是感觉大致可以猜出来他是怎么控制并发的。你可以大胆的想一想这些参数的作用。

WithContextN

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

// WithContext returns a new Group and an associated Context derived from ctx.
// It is equivalent to WithContextN(ctx, 0, 0).
func WithContext(ctx context.Context) (*Group, context.Context) {
return WithContextN(ctx, 0, 0) // zero indicates default values
}

// WithContextN returns a new Group and an associated Context derived from ctx.
//
// The derived Context is canceled the first time a function passed to Go
// returns a non-nil error or the first time Wait returns, whichever occurs
// first.
//
// Param numG controls the number of worker goroutines. Param qSize
// controls the size of the queue channel that holds functions passed
// to method Go: while the queue channel is full, Go blocks.
// If numG <= 0, the value of runtime.NumCPU is used; if qSize is
// also <= 0, a qSize of runtime.NumCPU is used.
func WithContextN(ctx context.Context, numG, qSize int) (*Group, context.Context) {
ctx, cancel := context.WithCancel(ctx)
return &Group{cancel: cancel, numG: numG, qSize: qSize}, ctx
}

看 WithContextN 函数其实是赋值了并发量和管道容量

Go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// Go adds the given function to a queue of functions that are called
// by one of g's worker goroutines.
//
// The first call to return a non-nil error cancels the group; its error will be
// returned by Wait.
//
// Go may block while g's qCh is full.
func (g *Group) Go(f func() error) {
g.qMu.Lock()
if g.qCh == nil {
// We need to initialize g.

// The zero value of numG would mean no worker goroutine
// would be created, which would be daft.
// We want the "effective" zero value to be runtime.NumCPU.
if g.numG == 0 {
// Benchmarking has shown that the optimal numG and
// qSize values depend on the particular workload. In
// the absence of any other deciding factor, we somewhat
// arbitrarily default to NumCPU, which seems to perform
// reasonably in benchmarks. Users that care about performance
// tuning will use the WithContextN func to specify the numG
// and qSize args.
g.numG = runtime.NumCPU()
if g.qSize == 0 {
g.qSize = g.numG
}
}

g.qCh = make(chan func() error, g.qSize)

// Being that g.Go has been invoked, we'll need at
// least one goroutine.
atomic.StoreInt64(&g.gCount, 1)
g.startG()

g.qMu.Unlock()

g.qCh <- f

return
}

g.qCh <- f

// Check if we can or should start a new goroutine?
g.maybeStartG()

g.qMu.Unlock()

}

可以看到如果管道为 nil 那么Go 方法将会为之创建一个管道,管道容量是 qSize 参数。如果 qSize 为 0 那么并发量 numG 就是管道容量,如果 numG 也为 0 那么将会取 CPU 核数为并发量。之后将 gCount 设置为 1 调用 startG 函数,最终把任务放入管道。

如果管道一开始不为 nil 那么将会直接把任务放入管道,调用 maybeStartG 函数。看一下 startG 和 maybeStartG 函数

maybeStartG

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// maybeStartG might start a new worker goroutine, if
// needed and allowed.
func (g *Group) maybeStartG() {
if len(g.qCh) == 0 {
// No point starting a new goroutine if there's
// nothing in qCh
return
}

// We have at least one item in qCh. Maybe it's time to start
// a new worker goroutine?
if atomic.AddInt64(&g.gCount, 1) > int64(g.numG) {
// Nope: not allowed. Starting a new goroutine would put us
// over the numG limit, so we back out.
atomic.AddInt64(&g.gCount, -1)
return
}

// It's safe to start a new worker goroutine.
g.startG()
}

startG

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// startG starts a new worker goroutine.
func (g *Group) startG() {
g.wg.Add(1)
go func() {
defer g.wg.Done()
defer atomic.AddInt64(&g.gCount, -1)

var f func() error

for {
// Block until f is received from qCh or
// the channel is closed.
f = <-g.qCh
if f == nil {
// qCh was closed, time for this goroutine
// to die.
return
}

if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel()
}
})

return
}
}
}()
}

可以看到 maybeStartG 是判断当前的并发数量是否足够,如果超出了并发量将不再启动消费者并发,startG 就是启动一个消费者的并发,接收 qCh 中的任务并进行处理。

Wait

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {
g.qMu.Lock()

if g.qCh != nil {
// qCh is typically initialized by the first call to method Go.
// qCh can be nil if Wait is invoked before the first
// call to Go, hence this check before we close qCh.
close(g.qCh)
}

// Wait for the worker goroutines to finish.
g.wg.Wait()

// All of the worker goroutines have finished,
// so it's safe to set qCh to nil.
g.qCh = nil

g.qMu.Unlock()

if g.cancel != nil {
g.cancel()
}

return g.err
}

最后看一下 wait 函数,关闭管道,等待所有并发结束,最后 cancel 掉 context 返回 err。

最后整理一下整体的逻辑,Go 方法一样也是一个生产者,像管道中传送任务,当管道满了以后将会 hold 住,这个拓展的好处是他不存在线程不安全的数据结构所以并发调用 Go 是没有问题的,Go 发送任务的同时也承担着启动消费者的重任,区别于B站的一下启动定量的并发这个启动时按需启动的,启动多少个 Go 就启动多少个并发,直到并发量达到上限。

缺陷

  • 这个设计同样也不能进行并发调用 Go,因为最后结束的时候是关闭 qCh 来结束所有消费者,此时如果并发的去调用 Go 方法,将会由于管道充满而产生阻塞,大量Go会等待锁的释放,此时关闭管道,等待的Go会继续发送消息到管道从而导致程序崩溃
  • Go方法中由于涉及到参数赋值所以赋值的参数必须是线程安全的,代码一开始就进行了数据上锁,消耗无谓的性能

总结

官方的 ErrGroup 仅仅解决了并发处理任务,并控制每个协程生命周期。而我们在现实的业务总往往希望并发不要无序的扩张,但这两个拓展包也是各有各的问题,都控制了并发,但却无法并发的去使用 Go 方法,其实我更倾向于B站的拓展,他的协程控制更加科学,如果将切片改成线程安全的比如缓冲管道或是sync Map 都可以实现并发调用 Go 方法。但他的结束方式其实需要等待所有的请求都结束,可能会导致等待时间过长但也没问题总是因该把所有请求处理完再结束。也不排除有的时候可能需要的是丢弃掉还未进行处理的请求并结束。