如何处理一组并发任务

这片文章是承接上一篇 聊聊 ErrorGroup 的用法和拓展 ErrorGroup 官方包的创建初衷就是给我们提供一个正确的处理一组任务的并发方式,经过拓展,我们还可以控制最大并发任务量,之后进一步拓展 Group 的用法更加多样,适用业务也更加多元,所以我这片文章题目就扩大到了处理一组并发任务。本文将从多个 Group 的多元拓展包入手,对并发处理任务的要点及实际处理方式进行总结和提炼,力求看后可以掌握处理并发任务的核心要点。

Facebook 拓展:facebookgo/errgroup

这个 face book 的拓展包实际是对 waitgroup 的拓展,这个包不但可以用来记录并发控制并发协程的生命周期,还可以对所有并发任务的错误进行处理及记录。

Group 结构

1
2
3
4
5
6
7
8
// Group is similar to a sync.WaitGroup, but allows for collecting errors.
// The collected errors are never reset, so unlike a sync.WaitGroup, this Group
// can only be used _once_. That is, you may only call Wait on it once.
type Group struct {
wg sync.WaitGroup
mu sync.Mutex
errors MultiError
}

可以看到这个 group 的结构是在 WaitGroup 结构的基础上增加了一个锁及一个 MultiError 的结构,MultiError 就是处理并发协程错误输出的结构。

MultiError 结构及实现方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// MultiError allows returning a group of errors as one error.
type MultiError []error

// Error returns a concatenated string of all contained errors.
func (m MultiError) Error() string {
l := len(m)
if l == 0 {
panic("MultiError with no errors")
}
if l == 1 {
panic("MultiError with only 1 error")
}
var b bytes.Buffer
b.WriteString("multiple errors: ")
for i, e := range m {
b.WriteString(e.Error())
if i != l-1 {
b.WriteString(" | ")
}
}
return b.String()
}

// NewMultiError returns nil if all input errors passed in are nil. Otherwise,
// it coalesces all input errors into a single error instance. Useful for
// code like this:
//
// func doThisAndThat() error {
// err1 := tryThis()
// err2 := tryThat()
// return errgroup.NewMultiError(err1, err2)
// }
//
func NewMultiError(errs ...error) error {
var multiErr MultiError
for _, err := range errs {
if err != nil {
multiErr = append(multiErr, err)
}
}

if len(multiErr) == 1 {
return multiErr[0]
} else if len(multiErr) > 1 {
return multiErr
}
return nil
}

multiError 的实现方法有两个:

一个是实现 error 类型的 Error( ) 聚合所有的 error 描描述。

另一个是用来新建一个 multiErr ,如果是单个 error 传入则直接翻出,多个则新建 multiError,空则翻出nil,这个新建的方法是独立的,与主要控制逻辑无关。

Group 的实现方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// Add adds delta, which may be negative. See sync.WaitGroup.Add documentation
// for details.
func (g *Group) Add(delta int) {
g.wg.Add(delta)
}

// Done decrements the Group counter.
func (g *Group) Done() {
g.wg.Done()
}

// Error adds an error to return in Wait. The error must not be nil.
func (g *Group) Error(e error) {
if e == nil {
panic("error must not be nil")
}
g.mu.Lock()
defer g.mu.Unlock()
g.errors = append(g.errors, e)
}

// Wait blocks until the Group counter is zero. If no errors were recorded, it
// returns nil. If one error was recorded, it returns it as is. If more than
// one error was recorded it returns a MultiError which is a slice of errors.
func (g *Group) Wait() error {
g.wg.Wait()
g.mu.Lock()
defer g.mu.Unlock()
errors := g.errors
l := len(errors)
if l == 0 {
return nil
}
if l == 1 {
return errors[0]
}
return errors
}

Add 和 Done 方法和 WaitGroup 一致,新增了 Error 方法用于为 Group 结构收集 error。Wait 方法不仅进行协程生命周期的控制,还通过判断收集的 errors 来翻出正确的 error。

这个多协程的 error 处理方式是值得学习的这个Multi 结构也是可以为我们所用的。

有了上面几个拓展包(上接 聊聊 ErrorGroup 的用法和拓展 )我们接下来看几个功能更加强大的 Group 并发任务处理包。

SizedGroup/ErrSizedGroup

go-pkgz/syncs 提供了两个 Group 并发原语,分别是 SizedGroup 和 ErrSizedGroup。

这个拓展包设计巧妙,他在任务消费的 Go 方法中通过信号量的方式实现了两种控制并发的模式,你可以控制 goroutine 的数量,还可以不限制 goroutine 数量只限制执行任务的 goroutine 数量。只控制执行任务的活跃 gouroutine 的数量这样的设计可以让我们不再去纠结阻塞延迟的问题,我们可以肆无忌惮的发送任务给消费者,消费者不会阻塞直接启动并发。

先看代码,然后我在举一个简单的例子来说明一下仅控制活跃 goroutine 能解决的实际业务问题。

Semaphore

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Semaphore implementation, counted lock only. Implements sync.Locker interface, thread safe.
type semaphore struct {
sync.Locker
ch chan struct{}
}

// NewSemaphore makes Semaphore with given capacity
func NewSemaphore(capacity int) sync.Locker {
if capacity <= 0 {
capacity = 1
}
return &semaphore{ch: make(chan struct{}, capacity)}
}

// Lock acquires semaphore, can block if out of capacity.
func (s *semaphore) Lock() {
s.ch <- struct{}{}
}

// Unlock releases semaphore, can block if nothing acquired before.
func (s *semaphore) Unlock() {
<-s.ch
}

这是作者实现的一个信号量的结构,简单的解释一下:

这个结构其实就是用于控制的并发量的计数结构。使用 NewSemaphore 创建一个给定容量的管道,这个容量就是限制最大并发的数量。使用 lock 方法来获取一个信号量,使用 Unlock 方法来释放一个信号量。注意获取和释放的一一对应关系,防止造成永久阻塞。如果并发量达到额定数量即管道容量被占满,那么下一个lock将会阻塞直到释放一个容量为止。

Option

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
type options struct {
ctx context.Context
cancel context.CancelFunc
preLock bool
termOnError bool
}

// GroupOption functional option type
type GroupOption func(o *options)

// Context passes ctx and makes it cancelable
func Context(ctx context.Context) GroupOption {
return func(o *options) {
o.ctx, o.cancel = context.WithCancel(ctx)
}
}

// Preemptive sets locking mode preventing spawning waiting goroutine. May cause Go call to block!
func Preemptive(o *options) {
o.preLock = true
}

// TermOnErr prevents new goroutines to start after first error
func TermOnErr(o *options) {
o.termOnError = true
}

Option 结构用于设定一些选项:

ctx 初始Group 结构时放入的上下文,用于传入所有执行逻辑函数,主要是监控判断该上下文是否已被取消。

cancel 控制上下文的取消。

preLock 参数为 true 即为控制所有 goroutine 的数量,为 false 即为控制活跃 goroutine 的数量。

termOnError 这个字段是用在 ErrsizeGroup 中的,表示只要有一个 error 出现那么这组子任务就会全部返出或取消

辅助结构介绍完后我们来进行主结构介绍:

SizeGroup

1
2
3
4
5
6
7
8
// SizedGroup has the same role as WaitingGroup but adds a limit of the amount of goroutines started concurrently.
// Uses similar Go() scheduling as errgrp.Group, thread safe.
// SizedGroup interface enforces constructor usage and doesn't allow direct creation of sizedGroup
type SizedGroup struct {
options
wg sync.WaitGroup
sema sync.Locker
}

这是实现自任务并发控制的Group 结构,如前面所说这个结构可以用来控制 goroutine 数量,也可以控制活跃 goroutine 数量。

options 结构刚刚已经介绍过,用来控制一些任务执行管控方式。

wg 就是一个 WaitGroup 结构用来控制任务协程的生命周期。

sema 是一个 sync.Locker 接口,这里作者使用的其实就是我们刚开始介绍的 Semaphore 结构它也实现了 sync.Locker 接口。这个 sema 字段主要就是使用信号量的方式来控制 goroutine 数量。

接下来看下 sizeGroup 实现的方法

NewSizedGroup

1
2
3
4
5
6
7
8
9
// NewSizedGroup makes wait group with limited size alive goroutines
func NewSizedGroup(size int, opts ...GroupOption) *SizedGroup {
res := SizedGroup{sema: NewSemaphore(size)}
res.options.ctx = context.Background()
for _, opt := range opts {
opt(&res.options)
}
return &res
}

初始化一个 SizeGroup 的结构可以看到主要是option 和 sema 的初始化。

Go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// Go calls the given function in a new goroutine.
// Every call will be unblocked, but some goroutines may wait if semaphore locked.
func (g *SizedGroup) Go(fn func(ctx context.Context)) {

canceled := func() bool {
select {
case <-g.ctx.Done():
return true
default:
return false
}
}

if canceled() {
return
}

g.wg.Add(1)

if g.preLock {
g.sema.Lock()
}

go func() {
defer g.wg.Done()

if canceled() {
return
}

if !g.preLock {
g.sema.Lock()
}

fn(g.ctx)
g.sema.Unlock()
}()
}

这个方法就是在启动逻辑函数的同时控制并发数量。在进入函数是先检查上下文是否被取消,没有的话进行任务量加1,之后判断option 中的 prelock 是否开启。开启的话直接判断是否到达信号量,到达的话就阻塞等待信号量释放,没有的话继续启动 goroutine 去执行函数逻辑。如果是关闭的那么将直接启动 goroutine 但会在执行函数逻辑的时候进行信号量的判断来决定是执行函数还是阻塞。最后函数执行完成进行信号量释放。

Wait

1
2
3
4
5
// Wait blocks until the SizedGroup counter is zero.
// See sync.WaitGroup documentation for more information.
func (g *SizedGroup) Wait() {
g.wg.Wait()
}

这个 wait 函数就比较简单了,就是等待所有的 goroutine 结束

这个包还有一个 ErrSizedGroup 的结构,这个结构进行了并发任务的错误处理:

ErrSizedGroup

1
2
3
4
5
6
7
8
9
10
11
12
// ErrSizedGroup is a SizedGroup with error control. Works the same as errgrp.Group, i.e. returns first error.
// Can work as regular errgrp.Group or with early termination. Thread safe.
// ErrSizedGroup interface enforces constructor usage and doesn't allow direct creation of errSizedGroup
type ErrSizedGroup struct {
options
wg sync.WaitGroup
sema sync.Locker

err *multierror
errLock sync.RWMutex
errOnce sync.Once
}

相对于SizeGroup,多出了三个用于错误处理的结构:

err 字段是作者自己实现的 multierror 结构,该结构实现了 error 接口。我们来看一下这个结构:

mutierror

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
type multierror struct {
errors []error
lock sync.Mutex
}

func (m *multierror) append(err error) *multierror {
m.lock.Lock()
m.errors = append(m.errors, err)
m.lock.Unlock()
return m
}

func (m *multierror) errorOrNil() error {
m.lock.Lock()
defer m.lock.Unlock()
if len(m.errors) == 0 {
return nil
}
return m
}

// Error returns multierror string
func (m *multierror) Error() string {
m.lock.Lock()
defer m.lock.Unlock()
if len(m.errors) == 0 {
return ""
}

errs := []string{}

for n, e := range m.errors {
errs = append(errs, fmt.Sprintf("[%d] {%s}", n, e.Error()))
}
return fmt.Sprintf("%d error(s) occurred: %s", len(m.errors), strings.Join(errs, ", "))
}

这是一个线程安全的切片结构类型为 error 因为其实现了 Error() 方法所以同样为 error 接口类型。它实现的方法也十分简单一个是线程安全的 append ,一个是判断是否没有 error 的 errorOrNil 方法,最后一个就是 Error 放法实现了多个 error 的聚合描述。

ErrSizedGroup 方法实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// NewErrSizedGroup makes wait group with limited size alive goroutines.
// By default all goroutines will be started but will wait inside. For limited number of goroutines use Preemptive() options.
// TermOnErr will skip (won't start) all other goroutines if any error returned.
func NewErrSizedGroup(size int, options ...GroupOption) *ErrSizedGroup {

res := ErrSizedGroup{
sema: NewSemaphore(size),
err: new(multierror),
}

for _, opt := range options {
opt(&res.options)
}

return &res
}

// Go calls the given function in a new goroutine.
// The first call to return a non-nil error cancels the group if termOnError; its error will be
// returned by Wait. If no termOnError all errors will be collected in multierror.
func (g *ErrSizedGroup) Go(f func() error) {

g.wg.Add(1)

if g.preLock {
g.sema.Lock()
}

go func() {
defer g.wg.Done()

// terminated will be true if any error happened before and g.termOnError
terminated := func() bool {
if !g.termOnError {
return false
}
g.errLock.RLock()
defer g.errLock.RUnlock()
return g.err.errorOrNil() != nil
}

if terminated() {
return // terminated due prev error, don't run anything in this group anymore
}

if !g.preLock {
g.sema.Lock()
}

if err := f(); err != nil {

g.errLock.Lock()
g.err = g.err.append(err)
g.errLock.Unlock()

g.errOnce.Do(func() { // call context cancel once
if g.cancel != nil {
g.cancel()
}
})
}
g.sema.Unlock()
}()
}

// Wait blocks until all function calls from the Go method have returned, then
// returns all errors (if any) wrapped with multierror from them.
func (g *ErrSizedGroup) Wait() error {
g.wg.Wait()
if g.cancel != nil {
g.cancel()
}
return g.err.errorOrNil()
}

总体上和SizeGroup 差不多,但是Go 函数在启动goroutine 之前会去判断 err ,如果是 termOnErr 字段打开那么只要遇到一次 Err 那么其他将要执行的Go直接跳过。

但这个Go 方法有几个问题:

  • 没必要在外层再去设置一个锁,mutierr 本身内部就有锁,这个切片是线程安全的,没必要在外层在做一个 errLock 的读写锁。
  • 没有判断是否 ctx 被取消,
  • 在出现error时调用了 g.cancel() 这个方法并没有对应的 ctx 来接收,而且即便时函数对 ctx 进行了监听,那么一旦有一个错误所有函数都会结束,那么做 terminated 这个函数的意义就没有了,直接判断ctx.done 可以达到同样的效果。

Wait 方法就比较简单了,就是等待所有 goroutine 结束后返出错误

上面都是类似 ErrorGroup 的拓展与改造,其主要思想都是对单个逻辑函数进行并发处理,并简单进行一些并发数量控制,并发任务的取消等。任务的编排还是要你在此基础上进行的,下面我们来看几个直接进行多任务编排的包:

gollback

这个包主要是进行子任务并发的编排处理,包括All,Race,Retry等几种方法,下面我们来看看他是如果进行并发任务编排的:

包中新建了一个函数签名类型,一个内置 response 结构,函数签名类型用来统一传入的函数类型,response 结构用来接收函数执行结果。

1
2
3
4
5
6
7
8
// AsyncFunc represents asynchronous function
type AsyncFunc func(ctx context.Context) (interface{}, error)

type response struct {
res interface{}
err error
}

All

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// All method returns when all of the callbacks passed as an iterable have finished,
// returned responses and errors are ordered according to callback order
// will panic if context is nil
func All(ctx context.Context, fns ...AsyncFunc) ([]interface{}, []error) {
if ctx == nil {
panic("nil context provided")
}

rs := make([]interface{}, len(fns))
errs := make([]error, len(fns))

var wg sync.WaitGroup
wg.Add(len(fns))

for i, fn := range fns {
go func(index int, f AsyncFunc) {
defer wg.Done()

var r response
r.res, r.err = f(ctx)

rs[index] = r.res
errs[index] = r.err
}(i, fn)
}

wg.Wait()

return rs, errs
}

All 方法是等待所有任务执行完成,并把所有任务的结果进行收集返出

Race

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// Race method returns a response as soon as one of the callbacks in an iterable executes without an error,
// otherwise last error is returned
// will panic if context is nil
func Race(ctx context.Context, fns ...AsyncFunc) (interface{}, error) {
if ctx == nil {
panic("nil context provided")
}

out := make(chan *response, 1)
ctx, cancel := context.WithCancel(ctx)
defer cancel()

for i, fn := range fns {
go func(index int, f AsyncFunc) {
c := make(chan *response, 1)

go func() {
defer close(c)
var r response
r.res, r.err = f(ctx)

c <- &r
}()

for {
select {
case <-ctx.Done():
if index == len(fns)-1 {
out <- &response{
err: ctx.Err(),
}
}

return
case r := <-c:
if r.err == nil || index == len(fns)-1 {
out <- r
}

return
}
}
}(i, fn)
}

r := <-out

return r.res, r.err
}

Race方法其实是子任务并发执行,如果有一个任务完成就直接返出结果,该方法在起并发执行任务时使用了一个异步处理的方式,并监听每个子任务是否会超时,一旦超时就会返出错误,如果没有超时就会在某个任务成功执行后直接返出该任务结果,如果所有任务全部失败那么就返出最后一个错误。

Retry

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Retry method retries callback given amount of times until it executes without an error,
// when retries = 0 it will retry infinitely
// will panic if context is nil
func Retry(ctx context.Context, retires int, fn AsyncFunc) (interface{}, error) {
if ctx == nil {
panic("nil context provided")
}

i := 1

for {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
var r response
r.res, r.err = fn(ctx)

if r.err == nil || i == retires {
return r.res, r.err
}

i++
}
}
}

Retry 方法其实就是在出现错误的并且重试次数小于额定数量时重试,在 ctx 被取消时直接返出ctx.Err。

Hunch

Hunch提供的功能和 gollback 类似,不过它提供的方法更多,而且它提供的和 gollback 相应的方法,也有一些不同。我来一一介绍下。

先看下内建的一些结构和辅助方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// Executable represents a singular logic block.
// It can be used with several functions.
type Executable func(context.Context) (interface{}, error)

// ExecutableInSequence represents one of a sequence of logic blocks.
type ExecutableInSequence func(context.Context, interface{}) (interface{}, error)

// IndexedValue stores the output of Executables,
// along with the index of the source Executable for ordering.
type IndexedValue struct {
Index int
Value interface{}
}

// IndexedExecutableOutput stores both output and error values from a Excetable.
type IndexedExecutableOutput struct {
Value IndexedValue
Err error
}

func pluckVals(iVals []IndexedValue) []interface{} {
vals := []interface{}{}
for _, val := range iVals {
vals = append(vals, val.Value)
}

return vals
}

func sortIdxVals(iVals []IndexedValue) []IndexedValue {
sorted := make([]IndexedValue, len(iVals))
copy(sorted, iVals)
sort.SliceStable(
sorted,
func(i, j int) bool {
return sorted[i].Index < sorted[j].Index
},
)

return sorted
}

有两个指定的函数签名:Executable,ExecutableInSequence。

其中 Executable 代表单个的函数逻辑,ExecutableInSequence 代表函数之间需要关联的函数逻辑。

IndexedValue 是一个带下标的数据结构,相对应的辅助方法 sortIdxVals 可将其根据 index 排序。

Take

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92

// Take returns the first `num` values outputted by the Executables.
func Take(parentCtx context.Context, num int, execs ...Executable) ([]interface{}, error) {
execCount := len(execs)

if num > execCount {
num = execCount
}

// Create a new sub-context for possible cancelation.
ctx, cancel := context.WithCancel(parentCtx)
defer cancel()

output := make(chan IndexedExecutableOutput, 1)
go runExecs(ctx, output, execs)

fail := make(chan error, 1)
success := make(chan []IndexedValue, 1)
go takeUntilEnough(fail, success, min(len(execs), num), output)

select {

case <-parentCtx.Done():
// Stub comment to fix a test coverage bug.
return nil, parentCtx.Err()

case err := <-fail:
cancel()
if parentCtxErr := parentCtx.Err(); parentCtxErr != nil {
return nil, parentCtxErr
}
return nil, err

case uVals := <-success:
cancel()
return pluckVals(uVals), nil
}
}

func runExecs(ctx context.Context, output chan<- IndexedExecutableOutput, execs []Executable) {
var wg sync.WaitGroup
for i, exec := range execs {
wg.Add(1)

go func(i int, exec Executable) {
val, err := exec(ctx)
if err != nil {
output <- IndexedExecutableOutput{
IndexedValue{i, nil},
err,
}
return
}

output <- IndexedExecutableOutput{
IndexedValue{i, val},
nil,
}
wg.Done()
}(i, exec)
}

wg.Wait()
close(output)
}

func takeUntilEnough(fail chan error, success chan []IndexedValue, num int, output chan IndexedExecutableOutput) {
uVals := make([]IndexedValue, num)

enough := false
outputCount := 0
for r := range output {
if enough {
continue
}

if r.Err != nil {
enough = true
fail <- r.Err
continue
}

uVals[outputCount] = r.Value
outputCount++

if outputCount == num {
enough = true
success <- uVals
continue
}
}
}

Take 方法的作用是获取前指定个数的函数成功结果,即返出结果。

使用了 pipeline 的并发思想(执行一个函数得到一个结果管道传给下一个函数继续对结果处理)这样做的好处就是处理逻辑不用相互等待,只要有任务那么每一个层级都会启动,就像本身你要煮十个蛋才能去切但切和煮之间互不影响,最好的做法肯定是煮熟一个就同时去切一个。

Take 方法首先起 goroutine 进行函数的逻辑处理并将得到结果和序号进行组装传入管道,另起一个 goroutine 从管道中接收任务并聚合,在拿到num 个结果后将结果返出给 success 管道。

最后获取管道结果,错误,或超时取消处理。

All

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// All returns all the outputs from all Executables, order guaranteed.
func All(parentCtx context.Context, execs ...Executable) ([]interface{}, error) {
// Create a new sub-context for possible cancelation.
ctx, cancel := context.WithCancel(parentCtx)
defer cancel()

output := make(chan IndexedExecutableOutput, 1)
go runExecs(ctx, output, execs)

fail := make(chan error, 1)
success := make(chan []IndexedValue, 1)
go takeUntilEnough(fail, success, len(execs), output)

select {

case <-parentCtx.Done():
// Stub comment to fix a test coverage bug.
return nil, parentCtx.Err()

case err := <-fail:
cancel()
if parentCtxErr := parentCtx.Err(); parentCtxErr != nil {
return nil, parentCtxErr
}
return nil, err

case uVals := <-success:
cancel()
return pluckVals(sortIdxVals(uVals)), nil
}
}

All 方法就比较简单和 Take 基本一致只是 All 获取的所有的函数的结果。

Last

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/*
Last returns the last `num` values outputted by the Executables.
*/
func Last(parentCtx context.Context, num int, execs ...Executable) ([]interface{}, error) {
execCount := len(execs)
if num > execCount {
num = execCount
}
start := execCount - num

vals, err := Take(parentCtx, execCount, execs...)
if err != nil {
return nil, err
}

return vals[start:], err
}

Last 方法也很简单在Take的基础上取最后的指定个数的执行结果。

Retry

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// MaxRetriesExceededError stores how many times did an Execution run before exceeding the limit.
// The retries field holds the value.
type MaxRetriesExceededError struct {
retries int
}

func (err MaxRetriesExceededError) Error() string {
var word string
switch err.retries {
case 0:
word = "infinity"
case 1:
word = "1 time"
default:
word = fmt.Sprintf("%v times", err.retries)
}

return fmt.Sprintf("Max retries exceeded (%v).\n", word)
}

// Retry attempts to get a value from an Executable instead of an Error.
// It will keeps re-running the Executable when failed no more than `retries` times.
// Also, when the parent Context canceled, it returns the `Err()` of it immediately.
func Retry(parentCtx context.Context, retries int, fn Executable) (interface{}, error) {
ctx, cancel := context.WithCancel(parentCtx)
defer cancel()

c := 0
fail := make(chan error, 1)
success := make(chan interface{}, 1)

for {
go func() {
val, err := fn(ctx)
if err != nil {
fail <- err
return
}
success <- val
}()

select {
//
case <-parentCtx.Done():
// Stub comment to fix a test coverage bug.
return nil, parentCtx.Err()

case <-fail:
if parentCtxErr := parentCtx.Err(); parentCtxErr != nil {
return nil, parentCtxErr
}

c++
if retries == 0 || c < retries {
continue
}
return nil, MaxRetriesExceededError{c}

case val := <-success:
return val, nil
}
}
}

Retry 方法会进行错误重试,直到获取到结果或是达到重试次数上限。

Waterfall

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// Waterfall runs `ExecutableInSequence`s one by one,
// passing previous result to next Executable as input.
// When an error occurred, it stop the process then returns the error.
// When the parent Context canceled, it returns the `Err()` of it immediately.
func Waterfall(parentCtx context.Context, execs ...ExecutableInSequence) (interface{}, error) {
ctx, cancel := context.WithCancel(parentCtx)
defer cancel()

var lastVal interface{}
execCount := len(execs)
i := 0
fail := make(chan error, 1)
success := make(chan interface{}, 1)

for {
go func() {
val, err := execs[i](ctx, lastVal)
if err != nil {
fail <- err
return
}
success <- val
}()

select {

case <-parentCtx.Done():
// Stub comment to fix a test coverage bug.
return nil, parentCtx.Err()

case err := <-fail:
if parentCtxErr := parentCtx.Err(); parentCtxErr != nil {
return nil, parentCtxErr
}

return nil, err

case val := <-success:
lastVal = val
i++
if i == execCount {
return val, nil
}

continue
}
}
}

Waterfall 方法比较有意思,每个逻辑函数的结果都是下一个逻辑函数的参数循环往复直到拿到最后一个函数执行的结果

schedgroup

schedgroup是 Matt Layher 开发的 worker pool,可以指定任务在某个时间或者某个时间之后执行。设计十分巧妙,但目前只有107个赞,有点难以理解。

这个库实际模仿了go官方库里的 timer 原语,我开始用 timer 的时候是尽量避免新建定时器 newtimer 的因为我觉得每个 timer 应该是都维护了一个goroutine ,不断的查看现在的时间,所以我每次都使用 reset 来重置 timer,然后接收该timer 中的管道信号。其实并不是,在你启动 timer 的时候其实不论你新建多少个 timer 实体,timer都仅仅启动一个 gorotuine。每次新建的timer 都会被维护到一个runtime.timer 的小顶堆中,gorotuine 只需要不停的去获取最小时间设置的 timer 看是否触发,触发后再去取下一个timer。这不是我门的主题感性趣的可以看下这片文章:How Do They Do It: Timers in Go

Schedgroup 的思路也是一样的,在新建 Group 的时候会启动一个 monitor 函数的 goroutine ,同时 Group 结构中维护了一个 tasks 小顶堆结构,该结构以触发时间 deadline 为排序指标。monitor 不停的去获取最小触发时间的定时任务,当定时任务到时间的时候会执行对应的任务方法形成 goroutine 。

好了大概的思路有了我们一起来看下具体的逻辑。

这个并发原语包含的方法如下:

1
2
3
4
5
type Group
func New(ctx context.Context) *Group
func (g *Group) Delay(delay time.Duration, fn func())
func (g *Group) Schedule(when time.Time, fn func())
func (g *Group) Wait() error

Group

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// Although unnecessary, explicit break labels should be used in all select
// statements in this package so that test coverage tools are able to identify
// which cases have been triggered.

// A Group is a goroutine worker pool which schedules tasks to be performed
// after a specified time. A Group must be created with the New constructor.
// Once Wait is called, New must be called to create a new Group to schedule
// more tasks.
type Group struct {
// Atomics must come first per sync/atomic.
waiting *uint32

// Context/cancelation support.
ctx context.Context
cancel func()

// Task runner and a heap of tasks to be run.
wg sync.WaitGroup
mu sync.Mutex
tasks tasks

// Signals for when a task is added and how many tasks remain on the heap.
addC chan struct{}
lenC chan int
}

// New creates a new Group which will use ctx for cancelation. If cancelation
// is not a concern, use context.Background().
func New(ctx context.Context) *Group {
// Monitor goroutine context and cancelation.
mctx, cancel := context.WithCancel(ctx)

g := &Group{
waiting: new(uint32),

ctx: ctx,
cancel: cancel,

addC: make(chan struct{}),
lenC: make(chan int),
}

g.wg.Add(1)
go func() {
defer g.wg.Done()
g.monitor(mctx)
}()

return g
}

Group 结构是主体结构,其中包含:

waiting : 使用来控制只执行一次的 wait 方法的轻量级锁

ctx : 初始化上下文用于监听取消

cancel :初始化取消方法,用于级连取消

addC : 增加定时管道,用于通知增加定时任务信号

lenC : 定时任务总量管道用于接收目前还有多少任务

Delay & Schedule

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// Delay schedules a function to run at or after the specified delay. Delay
// is a convenience wrapper for Schedule which adds delay to the current time.
// Specifying a negative delay will cause the task to be scheduled immediately.
//
// If Delay is called after a call to Wait, Delay will panic.
func (g *Group) Delay(delay time.Duration, fn func()) {
g.Schedule(time.Now().Add(delay), fn)
}

// Schedule schedules a function to run at or after the specified time.
// Specifying a past time will cause the task to be scheduled immediately.
//
// If Schedule is called after a call to Wait, Schedule will panic.
func (g *Group) Schedule(when time.Time, fn func()) {
if atomic.LoadUint32(g.waiting) != 0 {
panic("schedgroup: attempted to schedule task after Group.Wait was called")
}

g.mu.Lock()
defer g.mu.Unlock()

heap.Push(&g.tasks, task{
Deadline: when,
Call: fn,
})

// Notify monitor that a new task has been pushed on to the heap.
select {
case g.addC <- struct{}{}:
break
default:
break
}
}

Schedule 是给定时任务对增加一个给定时间的任务,加入任务后发送加入任务的通知信号

Delay 是添加一个延时某个时间就执行的定时任务。

Wait

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// Wait waits for the completion of all scheduled tasks, or for cancelation of
// the context passed to New. Wait will only returns errors due to context
// cancelation. If no context is associated the the Group, wait never returns
// an error.
//
// Once Wait is called, any further calls to Delay or Schedule will panic. If
// Wait is called more than once, Wait will panic.
func (g *Group) Wait() error {
if v := atomic.SwapUint32(g.waiting, 1); v != 0 {
panic("schedgroup: multiple calls to Group.Wait")
}

// Context cancelation takes priority.
if err := g.ctx.Err(); err != nil {
return err
}

// See if the task heap is already empty. If so, we can exit early.
g.mu.Lock()
if g.tasks.Len() == 0 {
// Release the mutex immediately so that any running jobs are able to
// complete and send on g.lenC.
g.mu.Unlock()
g.cancel()
g.wg.Wait()
return nil
}
g.mu.Unlock()

// Wait on context cancelation or for the number of items in the heap
// to reach 0.
var n int
for {
select {
case <-g.ctx.Done():
return g.ctx.Err()
case n = <-g.lenC:
// Context cancelation takes priority.
if err := g.ctx.Err(); err != nil {
return err
}
}

if n == 0 {
// No more tasks left, cancel the monitor goroutine and wait for
// all tasks to complete.
g.cancel()
g.wg.Wait()
return nil
}
}
}

wait 方法就是等待所有任务都执行完成后退出,它的逻辑是监听目前任务堆的数量,当为0时,等待所有定时任务执行完成,cancel 掉monitor , 优雅退出。

Monitor & Trigger

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// monitor triggers tasks at the interval specified by g.Interval until ctx
// is canceled.
func (g *Group) monitor(ctx context.Context) {
t := time.NewTimer(0)
defer t.Stop()

for {
if ctx.Err() != nil {
// Context canceled.
return
}

now := time.Now()
var tickC <-chan time.Time

// Start any tasks that are ready as of now.
next := g.trigger(now)
if !next.IsZero() {
// Wait until the next scheduled task is ready.
t.Reset(next.Sub(now))
tickC = t.C
} else {
t.Stop()
}

select {
case <-ctx.Done():
// Context canceled.
return
case <-g.addC:
// A new task was added, check task heap again.
//lint:ignore SA4011 intentional break for code coverage
break
case <-tickC:
// An existing task should be ready as of now.
//lint:ignore SA4011 intentional break for code coverage
break
}
}
}

// trigger checks for scheduled tasks and runs them if they are scheduled
// on or after the time specified by now.
func (g *Group) trigger(now time.Time) time.Time {
g.mu.Lock()
defer func() {
// Notify how many tasks are left on the heap so Wait can stop when
// appropriate.
select {
case g.lenC <- g.tasks.Len():
break
default:
// Wait hasn't been called.
break
}

g.mu.Unlock()
}()

for g.tasks.Len() > 0 {
next := &g.tasks[0]
if next.Deadline.After(now) {
// Earliest scheduled task is not ready.
return next.Deadline
}

// This task is ready, pop it from the heap and run it.
t := heap.Pop(&g.tasks).(task)
g.wg.Add(1)
go func() {
defer g.wg.Done()
t.Call()
}()
}

return time.Time{}
}

monitor 和 trigger 这个两个方法一定是该包的核心逻辑

trigger 方法其实就是去获取最小时间的定时任务,拿到后看任务是否有到触发时间,没有到触发时间的话就直接将该时间返回,到了的话就把任务从堆中取出,直接起 goroutine 去做对应任务的逻辑函数,然后立刻检查当前的任务堆中最小时间任务,继续重复上述逻辑。
__
monitor 是在 new group 时就启动的函数,他的任务就是去调用触发函数,他的作用是对 trigger 函数进行合理的触发,触发的条件有两个,一个是在添加任务时去重新触发,防止添加了一个小时间任务,一个是在当前最小任务还未到调用时间,等待到达掉用时间去调用trigger 让他对其进行调用。

完成整个包的梳理,是不是对于怎么使用它已经心里有数了,在使用的时候不需要新建多个 group 结构,直接起一个全局的 group ,不断的向里面追加任务即可,如果新建多个将对函数性能和内存占用产生极大的浪费。

总结

本文梳理了 ErrorGroup 的拓展函数拓展函数可以帮助你进行goroutine的生命周期管理,并发量维护,错误统计等基础工作,你可以用该结构进行一组子任务的并发,编排则需要你在函数外部继续处理。

之后介绍了几个任务编排的包,其主要逻辑就是对一组函数签名进行类型的统一,然后即可对一组函数进行统一编排比如等待全部完成收集所有结果,只收集前几个后几个函数的结果,一个函数结果要作为另一个函数的参数,重复执行知道错误返出,只要有错误进行所有函数的结束丢弃结果等等不同逻辑的编排。

最后介绍了定时任务并发的包使用方式。

可以说一组任务的并发编排的大部分业务场景都已经涵盖其中,你只要打通这些包的逻辑进行合理的改写合并优化,你的并发业务一定不难实现。