这片文章是承接上一篇 聊聊 ErrorGroup 的用法和拓展 ErrorGroup 官方包的创建初衷就是给我们提供一个正确的处理一组任务的并发方式,经过拓展,我们还可以控制最大并发任务量,之后进一步拓展 Group 的用法更加多样,适用业务也更加多元,所以我这片文章题目就扩大到了处理一组并发任务。本文将从多个 Group 的多元拓展包入手,对并发处理任务的要点及实际处理方式进行总结和提炼,力求看后可以掌握处理并发任务的核心要点。
这个 face book 的拓展包实际是对 waitgroup 的拓展,这个包不但可以用来记录并发控制并发协程的生命周期,还可以对所有并发任务的错误进行处理及记录。
Group 结构
1 2 3 4 5 6 7 8
// Group is similar to a sync.WaitGroup, but allows for collecting errors. // The collected errors are never reset, so unlike a sync.WaitGroup, this Group // can only be used _once_. That is, you may only call Wait on it once. type Group struct { wg sync.WaitGroup mu sync.Mutex errors MultiError }
可以看到这个 group 的结构是在 WaitGroup 结构的基础上增加了一个锁及一个 MultiError 的结构,MultiError 就是处理并发协程错误输出的结构。
// MultiError allows returning a group of errors as one error. type MultiError []error
// Error returns a concatenated string of all contained errors. func(m MultiError)Error()string { l := len(m) if l == 0 { panic("MultiError with no errors") } if l == 1 { panic("MultiError with only 1 error") } var b bytes.Buffer b.WriteString("multiple errors: ") for i, e := range m { b.WriteString(e.Error()) if i != l-1 { b.WriteString(" | ") } } return b.String() }
// NewMultiError returns nil if all input errors passed in are nil. Otherwise, // it coalesces all input errors into a single error instance. Useful for // code like this: // // func doThisAndThat() error { // err1 := tryThis() // err2 := tryThat() // return errgroup.NewMultiError(err1, err2) // } // funcNewMultiError(errs ...error)error { var multiErr MultiError for _, err := range errs { if err != nil { multiErr = append(multiErr, err) } }
// Add adds delta, which may be negative. See sync.WaitGroup.Add documentation // for details. func(g *Group)Add(delta int) { g.wg.Add(delta) }
// Done decrements the Group counter. func(g *Group)Done() { g.wg.Done() }
// Error adds an error to return in Wait. The error must not be nil. func(g *Group)Error(e error) { if e == nil { panic("error must not be nil") } g.mu.Lock() defer g.mu.Unlock() g.errors = append(g.errors, e) }
// Wait blocks until the Group counter is zero. If no errors were recorded, it // returns nil. If one error was recorded, it returns it as is. If more than // one error was recorded it returns a MultiError which is a slice of errors. func(g *Group)Wait()error { g.wg.Wait() g.mu.Lock() defer g.mu.Unlock() errors := g.errors l := len(errors) if l == 0 { returnnil } if l == 1 { return errors[0] } return errors }
// SizedGroup has the same role as WaitingGroup but adds a limit of the amount of goroutines started concurrently. // Uses similar Go() scheduling as errgrp.Group, thread safe. // SizedGroup interface enforces constructor usage and doesn't allow direct creation of sizedGroup type SizedGroup struct { options wg sync.WaitGroup sema sync.Locker }
// Go calls the given function in a new goroutine. // Every call will be unblocked, but some goroutines may wait if semaphore locked. func(g *SizedGroup)Go(fn func(ctx context.Context)) {
// Wait blocks until the SizedGroup counter is zero. // See sync.WaitGroup documentation for more information. func(g *SizedGroup)Wait() { g.wg.Wait() }
这个 wait 函数就比较简单了,就是等待所有的 goroutine 结束
这个包还有一个 ErrSizedGroup 的结构,这个结构进行了并发任务的错误处理:
ErrSizedGroup
1 2 3 4 5 6 7 8 9 10 11 12
// ErrSizedGroup is a SizedGroup with error control. Works the same as errgrp.Group, i.e. returns first error. // Can work as regular errgrp.Group or with early termination. Thread safe. // ErrSizedGroup interface enforces constructor usage and doesn't allow direct creation of errSizedGroup type ErrSizedGroup struct { options wg sync.WaitGroup sema sync.Locker
for n, e := range m.errors { errs = append(errs, fmt.Sprintf("[%d] {%s}", n, e.Error())) } return fmt.Sprintf("%d error(s) occurred: %s", len(m.errors), strings.Join(errs, ", ")) }
// NewErrSizedGroup makes wait group with limited size alive goroutines. // By default all goroutines will be started but will wait inside. For limited number of goroutines use Preemptive() options. // TermOnErr will skip (won't start) all other goroutines if any error returned. funcNewErrSizedGroup(size int, options ...GroupOption) *ErrSizedGroup {
res := ErrSizedGroup{ sema: NewSemaphore(size), err: new(multierror), }
for _, opt := range options { opt(&res.options) }
return &res }
// Go calls the given function in a new goroutine. // The first call to return a non-nil error cancels the group if termOnError; its error will be // returned by Wait. If no termOnError all errors will be collected in multierror. func(g *ErrSizedGroup)Go(f func()error) {
g.wg.Add(1)
if g.preLock { g.sema.Lock() }
gofunc() { defer g.wg.Done()
// terminated will be true if any error happened before and g.termOnError terminated := func()bool { if !g.termOnError { returnfalse } g.errLock.RLock() defer g.errLock.RUnlock() return g.err.errorOrNil() != nil }
if terminated() { return// terminated due prev error, don't run anything in this group anymore }
g.errOnce.Do(func() { // call context cancel once if g.cancel != nil { g.cancel() } }) } g.sema.Unlock() }() }
// Wait blocks until all function calls from the Go method have returned, then // returns all errors (if any) wrapped with multierror from them. func(g *ErrSizedGroup)Wait()error { g.wg.Wait() if g.cancel != nil { g.cancel() } return g.err.errorOrNil() }
// All method returns when all of the callbacks passed as an iterable have finished, // returned responses and errors are ordered according to callback order // will panic if context is nil funcAll(ctx context.Context, fns ...AsyncFunc)([]interface{}, []error) { if ctx == nil { panic("nil context provided") }
// Race method returns a response as soon as one of the callbacks in an iterable executes without an error, // otherwise last error is returned // will panic if context is nil funcRace(ctx context.Context, fns ...AsyncFunc)(interface{}, error) { if ctx == nil { panic("nil context provided") }
// Retry method retries callback given amount of times until it executes without an error, // when retries = 0 it will retry infinitely // will panic if context is nil funcRetry(ctx context.Context, retires int, fn AsyncFunc)(interface{}, error) { if ctx == nil { panic("nil context provided") }
i := 1
for { select { case <-ctx.Done(): returnnil, ctx.Err() default: var r response r.res, r.err = fn(ctx)
if r.err == nil || i == retires { return r.res, r.err }
// Executable represents a singular logic block. // It can be used with several functions. type Executable func(context.Context)(interface{}, error)
// ExecutableInSequence represents one of a sequence of logic blocks. type ExecutableInSequence func(context.Context, interface{})(interface{}, error)
// IndexedValue stores the output of Executables, // along with the index of the source Executable for ordering. type IndexedValue struct { Index int Value interface{} }
// IndexedExecutableOutput stores both output and error values from a Excetable. type IndexedExecutableOutput struct { Value IndexedValue Err error }
funcpluckVals(iVals []IndexedValue) []interface{} { vals := []interface{}{} for _, val := range iVals { vals = append(vals, val.Value) }
// Take returns the first `num` values outputted by the Executables. funcTake(parentCtx context.Context, num int, execs ...Executable)([]interface{}, error) { execCount := len(execs)
if num > execCount { num = execCount }
// Create a new sub-context for possible cancelation. ctx, cancel := context.WithCancel(parentCtx) defer cancel()
output := make(chan IndexedExecutableOutput, 1) go runExecs(ctx, output, execs)
case <-parentCtx.Done(): // Stub comment to fix a test coverage bug. returnnil, parentCtx.Err()
case err := <-fail: cancel() if parentCtxErr := parentCtx.Err(); parentCtxErr != nil { returnnil, parentCtxErr } returnnil, err
case uVals := <-success: cancel() return pluckVals(uVals), nil } }
funcrunExecs(ctx context.Context, output chan<- IndexedExecutableOutput, execs []Executable) { var wg sync.WaitGroup for i, exec := range execs { wg.Add(1)
// All returns all the outputs from all Executables, order guaranteed. funcAll(parentCtx context.Context, execs ...Executable)([]interface{}, error) { // Create a new sub-context for possible cancelation. ctx, cancel := context.WithCancel(parentCtx) defer cancel()
output := make(chan IndexedExecutableOutput, 1) go runExecs(ctx, output, execs)
case <-parentCtx.Done(): // Stub comment to fix a test coverage bug. returnnil, parentCtx.Err()
case err := <-fail: cancel() if parentCtxErr := parentCtx.Err(); parentCtxErr != nil { returnnil, parentCtxErr } returnnil, err
case uVals := <-success: cancel() return pluckVals(sortIdxVals(uVals)), nil } }
All 方法就比较简单和 Take 基本一致只是 All 获取的所有的函数的结果。
Last
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/* Last returns the last `num` values outputted by the Executables. */ funcLast(parentCtx context.Context, num int, execs ...Executable)([]interface{}, error) { execCount := len(execs) if num > execCount { num = execCount } start := execCount - num
// MaxRetriesExceededError stores how many times did an Execution run before exceeding the limit. // The retries field holds the value. type MaxRetriesExceededError struct { retries int }
func(err MaxRetriesExceededError)Error()string { var word string switch err.retries { case0: word = "infinity" case1: word = "1 time" default: word = fmt.Sprintf("%v times", err.retries) }
// Retry attempts to get a value from an Executable instead of an Error. // It will keeps re-running the Executable when failed no more than `retries` times. // Also, when the parent Context canceled, it returns the `Err()` of it immediately. funcRetry(parentCtx context.Context, retries int, fn Executable)(interface{}, error) { ctx, cancel := context.WithCancel(parentCtx) defer cancel()
// Waterfall runs `ExecutableInSequence`s one by one, // passing previous result to next Executable as input. // When an error occurred, it stop the process then returns the error. // When the parent Context canceled, it returns the `Err()` of it immediately. funcWaterfall(parentCtx context.Context, execs ...ExecutableInSequence)(interface{}, error) { ctx, cancel := context.WithCancel(parentCtx) defer cancel()
var lastVal interface{} execCount := len(execs) i := 0 fail := make(chan error, 1) success := make(chaninterface{}, 1)
for { gofunc() { val, err := execs[i](ctx, lastVal) if err != nil { fail <- err return } success <- val }()
select {
case <-parentCtx.Done(): // Stub comment to fix a test coverage bug. returnnil, parentCtx.Err()
case err := <-fail: if parentCtxErr := parentCtx.Err(); parentCtxErr != nil { returnnil, parentCtxErr }
returnnil, err
case val := <-success: lastVal = val i++ if i == execCount { return val, nil }
// Although unnecessary, explicit break labels should be used in all select // statements in this package so that test coverage tools are able to identify // which cases have been triggered.
// A Group is a goroutine worker pool which schedules tasks to be performed // after a specified time. A Group must be created with the New constructor. // Once Wait is called, New must be called to create a new Group to schedule // more tasks. type Group struct { // Atomics must come first per sync/atomic. waiting *uint32
// Task runner and a heap of tasks to be run. wg sync.WaitGroup mu sync.Mutex tasks tasks
// Signals for when a task is added and how many tasks remain on the heap. addC chanstruct{} lenC chanint }
// New creates a new Group which will use ctx for cancelation. If cancelation // is not a concern, use context.Background(). funcNew(ctx context.Context) *Group { // Monitor goroutine context and cancelation. mctx, cancel := context.WithCancel(ctx)
// Delay schedules a function to run at or after the specified delay. Delay // is a convenience wrapper for Schedule which adds delay to the current time. // Specifying a negative delay will cause the task to be scheduled immediately. // // If Delay is called after a call to Wait, Delay will panic. func(g *Group)Delay(delay time.Duration, fn func()) { g.Schedule(time.Now().Add(delay), fn) }
// Schedule schedules a function to run at or after the specified time. // Specifying a past time will cause the task to be scheduled immediately. // // If Schedule is called after a call to Wait, Schedule will panic. func(g *Group)Schedule(when time.Time, fn func()) { if atomic.LoadUint32(g.waiting) != 0 { panic("schedgroup: attempted to schedule task after Group.Wait was called") }
// Wait waits for the completion of all scheduled tasks, or for cancelation of // the context passed to New. Wait will only returns errors due to context // cancelation. If no context is associated the the Group, wait never returns // an error. // // Once Wait is called, any further calls to Delay or Schedule will panic. If // Wait is called more than once, Wait will panic. func(g *Group)Wait()error { if v := atomic.SwapUint32(g.waiting, 1); v != 0 { panic("schedgroup: multiple calls to Group.Wait") }
// See if the task heap is already empty. If so, we can exit early. g.mu.Lock() if g.tasks.Len() == 0 { // Release the mutex immediately so that any running jobs are able to // complete and send on g.lenC. g.mu.Unlock() g.cancel() g.wg.Wait() returnnil } g.mu.Unlock()
// Wait on context cancelation or for the number of items in the heap // to reach 0. var n int for { select { case <-g.ctx.Done(): return g.ctx.Err() case n = <-g.lenC: // Context cancelation takes priority. if err := g.ctx.Err(); err != nil { return err } }
if n == 0 { // No more tasks left, cancel the monitor goroutine and wait for // all tasks to complete. g.cancel() g.wg.Wait() returnnil } } }
// monitor triggers tasks at the interval specified by g.Interval until ctx // is canceled. func(g *Group)monitor(ctx context.Context) { t := time.NewTimer(0) defer t.Stop()
for { if ctx.Err() != nil { // Context canceled. return }
now := time.Now() var tickC <-chan time.Time
// Start any tasks that are ready as of now. next := g.trigger(now) if !next.IsZero() { // Wait until the next scheduled task is ready. t.Reset(next.Sub(now)) tickC = t.C } else { t.Stop() }
select { case <-ctx.Done(): // Context canceled. return case <-g.addC: // A new task was added, check task heap again. //lint:ignore SA4011 intentional break for code coverage break case <-tickC: // An existing task should be ready as of now. //lint:ignore SA4011 intentional break for code coverage break } } }
// trigger checks for scheduled tasks and runs them if they are scheduled // on or after the time specified by now. func(g *Group)trigger(now time.Time)time.Time { g.mu.Lock() deferfunc() { // Notify how many tasks are left on the heap so Wait can stop when // appropriate. select { case g.lenC <- g.tasks.Len(): break default: // Wait hasn't been called. break }
g.mu.Unlock() }()
for g.tasks.Len() > 0 { next := &g.tasks[0] if next.Deadline.After(now) { // Earliest scheduled task is not ready. return next.Deadline }
// This task is ready, pop it from the heap and run it. t := heap.Pop(&g.tasks).(task) g.wg.Add(1) gofunc() { defer g.wg.Done() t.Call() }() }
return time.Time{} }
monitor 和 trigger 这个两个方法一定是该包的核心逻辑
trigger 方法其实就是去获取最小时间的定时任务,拿到后看任务是否有到触发时间,没有到触发时间的话就直接将该时间返回,到了的话就把任务从堆中取出,直接起 goroutine 去做对应任务的逻辑函数,然后立刻检查当前的任务堆中最小时间任务,继续重复上述逻辑。 __ monitor 是在 new group 时就启动的函数,他的任务就是去调用触发函数,他的作用是对 trigger 函数进行合理的触发,触发的条件有两个,一个是在添加任务时去重新触发,防止添加了一个小时间任务,一个是在当前最小任务还未到调用时间,等待到达掉用时间去调用trigger 让他对其进行调用。
完成整个包的梳理,是不是对于怎么使用它已经心里有数了,在使用的时候不需要新建多个 group 结构,直接起一个全局的 group ,不断的向里面追加任务即可,如果新建多个将对函数性能和内存占用产生极大的浪费。