Go 语言分布式任务处理器 Machinery – 架构,源码详解篇

你是否因为一个复杂的分布式并发场景且需要进行任务编排而写了一个逻辑复杂,各种耦合,又臭又长的代码?代码跑起来各种bug 浪费时间精力,其他人也看不懂没法接手?

其实 Go 语言有现成的轮子来辅助你实现复杂的逻辑,让你的代码优美,低耦合,鲁棒性强,少bug。这个就是分布式队列Machinery

本文深度结合源码讲解Machinery的使用方法和需要注意的坑位,让你从根上掌握Machinery的使用方法。

优势

我们为什么要使用任务处理器?其实引入一个轮子之前想清楚他的优势是十分必要的,如果不充分了解他的优势那么很可能就为了用而用,很多场景可能使用它反而浪费了系统性能。我们就先来看下Machinery 的优势:

Machinery 支持定义串行,并行 ,延迟,重试,定时等各种任务执行方式,并且你可以在定义好任务执行方式后对多个任务进行编排,调任务整先后完成顺序,他还对数据处理结果进行缓存,处理结果出错可以再次处理,增加程序的鲁棒性。访问结果可以直接查询缓存,提高响应速度:

  • 简化代码:其实简化代码是引入 Machinery 的最大原因,如果你只有几个定时任务要执行,不需要进行任务之间的调度,那其实大可不必,Machinery 将任务调度的逻辑高度简化,你不需要写复杂的任务之间的编排逻辑。这样就极大的简化了你的代码,减少些逻辑者的任务量,最重要的是减少bug,更减少排查问题的时间。
  • 优化代码:Machinery 的支持任务编排方式及任务执行结果获取等处理不仅统一化而且在提升执行效率及处理代码耦合等方面都做的非常好。比如:将执行任务和获取结果进行了拆分,将执行任务和结果处理拆开,去掉业务逻辑耦合,使用权重队列去进行延时任务的处理,降低代码内存及cpu消耗等。
  • 分布式支持:无需考虑分布式数据共享,存储及任务执行上锁等问题。

功能

Machinery一个第三方开源的基于分布式消息分发的异步任务队列,有着以下这些特性:

  • 任务重试机制
  • 延迟任务机制
  • 任务定时机制
  • 任务回调机制
  • 任务结果记录
  • 支持Workflow模式:Chain,Group,Chord
  • 多Brokers支持:Redis, AMQP, AWS SQS
  • 多Backends支持:Redis, Memcache, AMQP, MongoDB

当前machinery在v1 stable版本,可以通过go get github.com/RichardKnop/machinery/v1获取。

架构设计

任务队列,简而言之就是一个由生产者控制消费者的生产消费模型,用户请求会生成任务,任务生产者不断的向队列中插入任务,同时,队列的处理器程序充当消费者不断的消费任务。基于这种框架设计思想,我们来看下machinery的简单设计结构图例:

image-20220206235800341

这个图我画的比较浅显。我们通过源码来深入了解一下。首先要弄清的就是几个主要的资源结构:

  • Server :业务主体,我们可以使用用server暴露的接口方法进行所有任务编排的操作。如果是简单的使用那么了解它就够了。

  • Broker :数据存储层接口,主要功能是将数据放入任务队列和取出,控制任务并发,延迟也在这层。

  • Backend:数据存储层接口,主要用于更新获取任务执行结果,状态等。

  • Woker:数据处理层结构,主要是操作 Sever, Broker,Backend 进行任务的获取,执行,处理执行状态及结果等。

  • Task: 数据处理层,这一层包括Task,Signature,Group,Chain,Chord等结构,主要是处理任务编排的逻辑。

接下来我们来详细说明:

Server

Server 结构是Machinery的业务主体,其中包含了配置,任务逻辑,Broker,Backend其结构如下:

1
2
3
4
5
6
7
8
9
// Server is the main Machinery object and stores all configuration
// All the tasks workers process are registered against the server
type Server struct {
config *config.Config
registeredTasks map[string]interface{}
broker brokersiface.Broker
backend backendsiface.Backend
}

可以说sever就包含了Machinery的所有要素。我们接下来一点点来看,作者是如果进行设计的。

从server可以看出整个业务逻辑主体是包含两个对象的,broker和backend这个两个数据存储层接口,不通的底层存储应用会形成不同的存储接口。本篇文章中,我们将以redis来详细介绍,其他类型的存储介质,在实现细节上由于介质的API支持可能略有不同,但machinery具体暴露接口类似。

config

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Config holds all configuration for our program
type Config struct {
Broker string `yaml:"broker" envconfig:"BROKER"`
DefaultQueue string `yaml:"default_queue" envconfig:"DEFAULT_QUEUE"`
ResultBackend string `yaml:"result_backend" envconfig:"RESULT_BACKEND"`
ResultsExpireIn int `yaml:"results_expire_in" envconfig:"RESULTS_EXPIRE_IN"`
AMQP *AMQPConfig `yaml:"amqp"`
SQS *SQSConfig `yaml:"sqs"`
Redis *RedisConfig `yaml:"redis"`
TLSConfig *tls.Config
// NoUnixSignals - when set disables signal handling in machinery
NoUnixSignals bool `yaml:"no_unix_signals" envconfig:"NO_UNIX_SIGNALS"`
DynamoDB *DynamoDBConfig `yaml:"dynamodb"`
}

配置层其实主要是对存储底层应用的的配置,从配置也可以看出。如何设置存储应用的配置Machinery代码中写的很清楚。

这里我想说下 DefaultQueue,我认为这个是比较重要且容易忽视的。Mechinery的主要设计思路其实是一个存储一个包含函数名称及参数的任务的队列。启动一个Worker来不停的进行任务队列的处理。如果你在启动worker的时候直接使用server对象的NewWorker方法,那么这个DefaultQueue其实就是存储的队列名称,比如我们用redis做底层的任务队列,那么这个redis队列的key就是你的DefaultQueue设置的参数。

为什么这个重要呢,因为如果你没弄清这个queue的使用方式,所有的任务都用一个worker处理,那么所有的任务都会发到这个DefaultQueue 中,这将会极大的降低你任务编排逻辑设计多样性。比如某个任务优先级很高,你想它一触发就执行,那么只用一个队列将会无法实现。正确的方式是使用NewCustomQueueWorker指定一个新的queue。

还有一个参数是Redis中的NormalTasksPollPeriod因为worker在执行函数时是执行的BLPOP命令这个参数就决定了没有任务触发时阻塞多久。

Server 的接口方法

1
2
3
4
5
6
7
8
9
10
11
12
13
type Server interface {
GetBroker() brokersiface.Broker
GetConfig() *config.Config
RegisterTasks(namedTaskFuncs map[string]interface{}) error
SendTaskWithContext(ctx context.Context, signature *tasks.Signature) (*result.AsyncResult, error)
SendTask(signature *tasks.Signature) (*result.AsyncResult, error)
SendChainWithContext(ctx context.Context, chain *tasks.Chain) (*result.ChainAsyncResult, error)
SendChain(chain *tasks.Chain) (*result.ChainAsyncResult, error)
SendGroupWithContext(ctx context.Context, group *tasks.Group, sendConcurrency int) ([]*result.AsyncResult, error)
SendGroup(group *tasks.Group, sendConcurrency int) ([]*result.AsyncResult, error)
SendChordWithContext(ctx context.Context, chord *tasks.Chord, sendConcurrency int) (*result.ChordAsyncResult, error)
SendChord(chord *tasks.Chord, sendConcurrency int) (*result.ChordAsyncResult, error)
}

可以看到Server接口中暴露的方法都很实在,没有很华而不实的接口,前两个是获取自己的内部结构,后面的RegisterTasks的方法就是将自己任务编排所需要的操作方法都注册到Sever结构中的registeredTasks,在需要使用的时候Sever会在里面找。

注册函数这块我觉得也是需要说明一下,因为注册的函数签名类型是有限制的。

Sever 执行函数注册

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// RegisterTasks registers all tasks at once
func (server *Server) RegisterTasks(namedTaskFuncs map[string]interface{}) error {
for _, task := range namedTaskFuncs {
if err := tasks.ValidateTask(task); err != nil {
return err
}
}

for k, v := range namedTaskFuncs {
server.registeredTasks.Store(k, v)
}

server.broker.SetRegisteredTaskNames(server.GetRegisteredTaskNames())
return nil
}


// ValidateTask validates task function using reflection and makes sure
// it has a proper signature. Functions used as tasks must return at least a
// single value and the last return type must be error
func ValidateTask(task interface{}) error {
v := reflect.ValueOf(task)
t := v.Type()

// Task must be a function
if t.Kind() != reflect.Func {
return ErrTaskMustBeFunc
}

// Task must return at least a single value
if t.NumOut() < 1 {
return ErrTaskReturnsNoValue
}

// Last return value must be error
lastReturnType := t.Out(t.NumOut() - 1)
errorInterface := reflect.TypeOf((*error)(nil)).Elem()
if !lastReturnType.Implements(errorInterface) {
return ErrLastReturnValueMustBeError
}

return nil
}

批量注册的任务函数的参数是一个map[string]interface{}结构我们注册进去的其实是一个唯一函数名称对应的一个函数,签名类型的限制可以在ValidateTask里看出,首先必须是一个函数类型,这个不用多说,其次函数签名至少有一个出参,且最后一个参数一定要实现error接口。

后面的Send开头的方法其实就都是发送任务各种不同编排姿势的任务到Worker中执行。

Sever 建立 Worker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// NewWorker creates Worker instance
func (server *Server) NewWorker(consumerTag string, concurrency int) *Worker {
return &Worker{
server: server,
ConsumerTag: consumerTag,
Concurrency: concurrency,
Queue: "",
}
}

// NewCustomQueueWorker creates Worker instance with Custom Queue
func (server *Server) NewCustomQueueWorker(consumerTag string, concurrency int, queue string) *Worker {
return &Worker{
server: server,
ConsumerTag: consumerTag,
Concurrency: concurrency,
Queue: queue,
}
}


上面代码是建立worker的方法。NewWorker和NewCustomQueueWorker,这两个方法的区别我上面已经说过。那么我们下面就看下worker:

Woker

Worker 是整个架构的执行主体,负责根据不同的任务及编排模式去获取任务并执行,最终输出结果并记录。了解了Worker和Server就可以自如的使用Machinery了,所以在介绍好worker后我会开始说这个包是如何用的。

1
2
3
4
5
6
7
8
9
10
11
// Worker represents a single worker process
type Worker struct {
server *Server
ConsumerTag string
Concurrency int
Queue string
errorHandler func(err error)
preTaskHandler func(*tasks.Signature)
postTaskHandler func(*tasks.Signature)
preConsumeHandler func(*Worker) bool
}
  • Sever:可以看到在worker主体中有我们之前介绍完的Server结构,刚刚说了sever中记录了各种执行任务的方法和数据获取接口Broker,结构处理接口Backend,所以Worker其实是Sever派生出的一个结构,这样设计代码更形象更清晰。

  • ConsumerTag:没有任何地方用到。

  • Concurrency:启动一个 worker 执行任务的并发量最大值,如果小于1,那么会默认为当前cpu核数的两倍。

  • Queue:任务触发队列存储 Key。

  • errorHandler:返回err前的处理函数,见worker.LaunchAsync。这个参数很重要,需要注意: 执行任务的函数一旦出现错误,默认的处理方式是直接向errchan中发送消息,并退出 LaunchAsync 函数,最终干掉worker优雅退出,如果不想一有错误就干掉worker退出,那么就需要你自己去定义一个errorHandler,你可以单纯的打印一个日志也可以发送到日志服务,这样处理完错误它就会再次启动worker。当然你也可以在接受到错误后再次调用worker.LaunchAsync函数重新启动一个服务。

  • preTaskHandler:执行任务前做的处理函数,见worker.Process。

  • postTaskHandler:执行任务后做的处理函数,见worker.Process。

  • preConsumeHandler:消费任务前的判断任务是否可消费逻辑,见broker.ConsumeOne。

Worker 的功能方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// Launch starts a new worker process. The worker subscribes
// to the default queue and processes incoming registered tasks
func (worker *Worker) Launch() error {}

// LaunchAsync is a non blocking version of Launch
func (worker *Worker) LaunchAsync(errorsChan chan<- error) {}

// CustomQueue returns Custom Queue of the running worker process
func (worker *Worker) CustomQueue() string {}

// Quit tears down the running worker process
func (worker *Worker) Quit() {}

// Process handles received tasks and triggers success/error callbacks
func (worker *Worker) Process(signature *tasks.Signature) error {}

// retryTask decrements RetryCount counter and republishes the task to the queue
func (worker *Worker) taskRetry(signature *tasks.Signature) error {}

// taskRetryIn republishes the task to the queue with ETA of now + retryIn.Seconds()
func (worker *Worker) retryTaskIn(signature *tasks.Signature, retryIn time.Duration) error {}

// taskSucceeded updates the task state and triggers success callbacks or a
// chord callback if this was the last task of a group with a chord callback
func (worker *Worker) taskSucceeded(signature *tasks.Signature, taskResults []*tasks.TaskResult) error {}

// taskFailed updates the task state and triggers error callbacks
func (worker *Worker) taskFailed(signature *tasks.Signature, taskErr error) error {}

// Returns true if the worker uses AMQP backend
func (worker *Worker) hasAMQPBackend() bool {}

// SetErrorHandler sets a custom error handler for task errors
// A default behavior is just to log the error after all the retry attempts fail
func (worker *Worker) SetErrorHandler(handler func(err error)) {}

//SetPreTaskHandler sets a custom handler func before a job is started
func (worker *Worker) SetPreTaskHandler(handler func(*tasks.Signature)) {}

//SetPostTaskHandler sets a custom handler for the end of a job
func (worker *Worker) SetPostTaskHandler(handler func(*tasks.Signature)) {}

//SetPreConsumeHandler sets a custom handler for the end of a job
func (worker *Worker) SetPreConsumeHandler(handler func(*Worker) bool) {}

//GetServer returns server
func (worker *Worker) GetServer() *Server {}
func (worker *Worker) PreConsumeHandler() bool {}
func RedactURL(urlString string) string {}

worker 的这些方法的核心就一个就是LaunchAsync,这个方法是启动一个异步执行的worker。使用这个方法启动的worker就开始了不停处理任务之旅。

我们主要看一下如何启动一个 worker。

Worker 启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
// Launch starts a new worker process. The worker subscribes
// to the default queue and processes incoming registered tasks
func (worker *Worker) Launch() error {
errorsChan := make(chan error)

worker.LaunchAsync(errorsChan)

return <-errorsChan
}

// LaunchAsync is a non blocking version of Launch
func (worker *Worker) LaunchAsync(errorsChan chan<- error) {
cnf := worker.server.GetConfig()
broker := worker.server.GetBroker()

// Log some useful information about worker configuration
log.INFO.Printf("Launching a worker with the following settings:")
log.INFO.Printf("- Broker: %s", RedactURL(cnf.Broker))
if worker.Queue == "" {
log.INFO.Printf("- DefaultQueue: %s", cnf.DefaultQueue)
} else {
log.INFO.Printf("- CustomQueue: %s", worker.Queue)
}
log.INFO.Printf("- ResultBackend: %s", RedactURL(cnf.ResultBackend))
if cnf.AMQP != nil {
log.INFO.Printf("- AMQP: %s", cnf.AMQP.Exchange)
log.INFO.Printf(" - Exchange: %s", cnf.AMQP.Exchange)
log.INFO.Printf(" - ExchangeType: %s", cnf.AMQP.ExchangeType)
log.INFO.Printf(" - BindingKey: %s", cnf.AMQP.BindingKey)
log.INFO.Printf(" - PrefetchCount: %d", cnf.AMQP.PrefetchCount)
}

var signalWG sync.WaitGroup
// Goroutine to start broker consumption and handle retries when broker connection dies
go func() {
for {
retry, err := broker.StartConsuming(worker.ConsumerTag, worker.Concurrency, worker)

if retry {
if worker.errorHandler != nil {
worker.errorHandler(err)
} else {
log.WARNING.Printf("Broker failed with error: %s", err)
}
} else {
signalWG.Wait()
errorsChan <- err // stop the goroutine
return
}
}
}()
if !cnf.NoUnixSignals {
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt, syscall.SIGTERM)
var signalsReceived uint

// Goroutine Handle SIGINT and SIGTERM signals
go func() {
for s := range sig {
log.WARNING.Printf("Signal received: %v", s)
signalsReceived++

if signalsReceived < 2 {
// After first Ctrl+C start quitting the worker gracefully
log.WARNING.Print("Waiting for running tasks to finish before shutting down")
signalWG.Add(1)
go func() {
worker.Quit()
errorsChan <- ErrWorkerQuitGracefully
signalWG.Done()
}()
} else {
// Abort the program when user hits Ctrl+C second time in a row
errorsChan <- ErrWorkerQuitAbruptly
}
}
}()
}
}

worker启动方法的逻辑就是开一个go程维护一个broker消费者,用errchan阻塞住,维护消费者的同时监听一个系统的信号管道做优雅退出,一个有意思的设计是一旦启动按一次Ctrl+C是优雅结束,两次Ctrl+C可以强制结束。

该逻辑中核心方法就是 broker.StartConsuming,broker的StartConsuming方法其实就是不断获取当下该执行的任务然后调用worker的process方法来执行,所以我们这里先看process方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// Process handles received tasks and triggers success/error callbacks
func (worker *Worker) Process(signature *tasks.Signature) error {
// If the task is not registered with this worker, do not continue
// but only return nil as we do not want to restart the worker process
if !worker.server.IsTaskRegistered(signature.Name) {
return nil
}

taskFunc, err := worker.server.GetRegisteredTask(signature.Name)
if err != nil {
return nil
}

// Update task state to RECEIVED
if err = worker.server.GetBackend().SetStateReceived(signature); err != nil {
return fmt.Errorf("Set state to 'received' for task %s returned error: %s", signature.UUID, err)
}

// Prepare task for processing
task, err := tasks.NewWithSignature(taskFunc, signature)
// if this failed, it means the task is malformed, probably has invalid
// signature, go directly to task failed without checking whether to retry
if err != nil {
worker.taskFailed(signature, err)
return err
}

// try to extract trace span from headers and add it to the function context
// so it can be used inside the function if it has context.Context as the first
// argument. Start a new span if it isn't found.
taskSpan := tracing.StartSpanFromHeaders(signature.Headers, signature.Name)
tracing.AnnotateSpanWithSignatureInfo(taskSpan, signature)
task.Context = opentracing.ContextWithSpan(task.Context, taskSpan)

// Update task state to STARTED
if err = worker.server.GetBackend().SetStateStarted(signature); err != nil {
return fmt.Errorf("Set state to 'started' for task %s returned error: %s", signature.UUID, err)
}

//Run handler before the task is called
if worker.preTaskHandler != nil {
worker.preTaskHandler(signature)
}

//Defer run handler for the end of the task
if worker.postTaskHandler != nil {
defer worker.postTaskHandler(signature)
}

// Call the task
results, err := task.Call()
if err != nil {
// If a tasks.ErrRetryTaskLater was returned from the task,
// retry the task after specified duration
retriableErr, ok := interface{}(err).(tasks.ErrRetryTaskLater)
if ok {
return worker.retryTaskIn(signature, retriableErr.RetryIn())
}

// Otherwise, execute default retry logic based on signature.RetryCount
// and signature.RetryTimeout values
if signature.RetryCount > 0 {
return worker.taskRetry(signature)
}

return worker.taskFailed(signature, err)
}

return worker.taskSucceeded(signature, results)
}

逻辑很简单,就是从sever中取出对应的任务执行方法,然后进行处理,处理过程总根据处理任务的结果和报错调用taskfailed和t taskSucceeded来使用backend接口进行记录。

这里想说的一个地方是retry机制的设计,这里触发任务失败重试很灵活,你可以在触发结构signature中直接设置重试次数及时间,值得说的一点是这里设置的重试时间并不是每次执行都会进行这个时间的等待而是有一个斐波那契数列的重试用时延长机制,避免某个错误的任务一直不停重试造成CPU浪费。在设计中你还可以在任务函数中需要错误重试的地方实现他的ErrRetryTaskLater结构以实现在某个地方出错时才重试的编排方式。

这时候我们的思路还是串联不起来的,因为我们知道了新建Sever,注册任务函数,新建worker来执行,最后在需要调度任务时使用Sever来触发。但是怎么传参?又如何编排?其实触发函数刚刚说过就是Sever暴露出来的Send开头的接口,你会发现Send开头有几个不同的Send方法,这里就来说明触发的任务函数及编排任务函数的方法:

触发任务的参数有signature,group,chain,chord几个结构,其中signature是基础,用于触发单个任务

Signature

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Signature represents a single task invocation
type Signature struct {
UUID string
Name string
RoutingKey string
ETA *time.Time
GroupUUID string
GroupTaskCount int
Args []Arg
Headers Headers
Priority uint8
Immutable bool
RetryCount int
RetryTimeout int
OnSuccess []*Signature
OnError []*Signature
ChordCallback *Signature
//MessageGroupId for Broker, e.g. SQS
BrokerMessageGroupId string
//ReceiptHandle of SQS Message
SQSReceiptHandle string
// StopTaskDeletionOnError used with sqs when we want to send failed messages to dlq,
// and don't want machinery to delete from source queue
StopTaskDeletionOnError bool
// IgnoreWhenTaskNotRegistered auto removes the request when there is no handeler available
// When this is true a task with no handler will be ignored and not placed back in the queue
IgnoreWhenTaskNotRegistered bool
}

signature就是单个任务的触发开关同时也是参数传递和结果记录的地方,他与整个任务的生命周期都有关。

  • UUID:是它的唯一标识,从下面的新建 Signature 的方法中可以看出UUID是轮子内部生成的。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    // NewSignature creates a new task signature
    func NewSignature(name string, args []Arg) (*Signature, error) {
    signatureID := uuid.New().String()
    return &Signature{
    UUID: fmt.Sprintf("task_%v", signatureID),
    Name: name,
    Args: args,
    }, nil
    }
  • Name:如果你仔细看了上文,你应该知道这个Name参数是什么,他就是我们之前注册在Sever中的处理函数名称。在触发任务时Worker使用Name来获取任务执行函数。
  • RoutingKey:这个参数需要重点注意,他其实就是之前说过的任务队列的底层 key(queue),当你用使用NewCustomQueueWorker创建多个消耗不同queue的任务执行者时,指定RoutingKey 为对应的queue,那么就相当于将任务触发Signature存储于该队列,等待执行。所以RoutingKey 参数其实就是对应的queue,如果没有指定那么他就是初次在config中设置的DefaultQueue。下面的代码即是broker在存储signature前对RoutingKey的处理。
1
2
3
4
5
6
7
8
9
10
11
// AdjustRoutingKey makes sure the routing key is correct.
// If the routing key is an empty string:
// a) set it to binding key for direct exchange type
// b) set it to default queue name
func (b *Broker) AdjustRoutingKey(s *tasks.Signature) {
if s.RoutingKey != "" {
return
}

s.RoutingKey = b.GetConfig().DefaultQueue
}
  • ETA:该参数是指定任务执行时间,在建立延迟任务触发和任务重试中都会用到,其中要进行任务延迟编排那你需要自己设定任务执行时间,任务重试延迟则是轮子自己做的,它会新建一个带有延迟触发时间的signature,并存入RoutingKey。
  • GroupUUID和GroupCount:是该任务跟随的任务组ID,和任务组任务总数。
  • Immutable:这个参数是决定同步任务是否需要上个任务的结果作为参数的。默认为false,是把上一个函数的处理结果作为下一个的参数。
  • IgnoreWhenTaskNotRegistered:这个参数决定是否在触发任务的执行函数未注册时在把任务signature塞回队尾。如果是true将直接忽略这个触发,如果是false会把任务塞回到队尾。
  • OnSuccess:成功后的回调触发。
  • OnError:失败后的回调。
  • ChordCallback:调用链完成后的回调。
  • Args:这个参数可以说是最为重要的,这就是触发执行任务函数的入参。我们先看下它的结构
1
2
3
4
5
6
7
// Arg represents a single argument passed to invocation fo a task
type Arg struct {
Name string `bson:"name"`
Type string `bson:"type"`
Value interface{} `bson:"value"`
}

其中Name参数并没用什么卵用,可能是作者一开始想用参数名称来进行对位匹配,后面发现很难实现吧。Type就是参数的类型,Value就是参数的值。

这里想要了解整个执行函数逻辑是什么样的我们就必须去了解另一个结构:Task,这个结构我们在上面worker的process方法中出现过,其实在worker的process函数中就是通过从broker中获取的signature触发结构以及从sever中获取的任务执行函数建立了一个Task结构,并使用task.call() 进行了函数执行。是不是瞬间通透了。我们下面看下Task及其新建的源码:

Task

1
2
3
4
5
6
7
8
// Task wraps a signature and methods used to reflect task arguments and
// return values after invoking the task
type Task struct {
TaskFunc reflect.Value
UseContext bool
Context context.Context
Args []reflect.Value
}

TaskFunc 是我们注册在Sever中的执行函数的反射值Value。Args 就是对signature中的Args进行处理的到切片中每个value的反射值。

新建Task:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// NewWithSignature is the same as New but injects the signature
func NewWithSignature(taskFunc interface{}, signature *Signature) (*Task, error) {
args := signature.Args
ctx := context.Background()
ctx = context.WithValue(ctx, signatureCtx, signature)
task := &Task{
TaskFunc: reflect.ValueOf(taskFunc),
Context: ctx,
}

taskFuncType := reflect.TypeOf(taskFunc)
if taskFuncType.NumIn() > 0 {
arg0Type := taskFuncType.In(0)
if IsContextType(arg0Type) {
task.UseContext = true
}
}

if err := task.ReflectArgs(args); err != nil {
return nil, fmt.Errorf("Reflect task args error: %s", err)
}

return task, nil
}

// New tries to use reflection to convert the function and arguments
// into a reflect.Value and prepare it for invocation
func New(taskFunc interface{}, args []Arg) (*Task, error) {
task := &Task{
TaskFunc: reflect.ValueOf(taskFunc),
Context: context.Background(),
}

taskFuncType := reflect.TypeOf(taskFunc)
if taskFuncType.NumIn() > 0 {
arg0Type := taskFuncType.In(0)
if IsContextType(arg0Type) {
task.UseContext = true
}
}

if err := task.ReflectArgs(args); err != nil {
return nil, fmt.Errorf("Reflect task args error: %s", err)
}

return task, nil
}

UseContext 参数就是看第一个函数入参是否为context类型。

建立Task后使用 task.ReflectArgs()方法来进行参数列表的值反射:

Args 通用化处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
// ReflectArgs converts []TaskArg to []reflect.Value
func (t *Task) ReflectArgs(args []Arg) error {
argValues := make([]reflect.Value, len(args))

for i, arg := range args {
argValue, err := ReflectValue(arg.Type, arg.Value)
if err != nil {
return err
}
argValues[i] = argValue
}

t.Args = argValues
return nil
}


// ReflectValue converts interface{} to reflect.Value based on string type
func ReflectValue(valueType string, value interface{}) (reflect.Value, error) {
if strings.HasPrefix(valueType, "[]") {
return reflectValues(valueType, value)
}

return reflectValue(valueType, value)
}

// reflectValue converts interface{} to reflect.Value based on string type
// representing a base type (not a slice)
func reflectValue(valueType string, value interface{}) (reflect.Value, error) {
theType, ok := typesMap[valueType]
if !ok {
return reflect.Value{}, NewErrUnsupportedType(valueType)
}
theValue := reflect.New(theType)

// Booleans
if theType.String() == "bool" {
boolValue, err := getBoolValue(theType.String(), value)
if err != nil {
return reflect.Value{}, err
}

theValue.Elem().SetBool(boolValue)
return theValue.Elem(), nil
}

// Integers
if strings.HasPrefix(theType.String(), "int") {
intValue, err := getIntValue(theType.String(), value)
if err != nil {
return reflect.Value{}, err
}

theValue.Elem().SetInt(intValue)
return theValue.Elem(), err
}

// Unsigned integers
if strings.HasPrefix(theType.String(), "uint") {
uintValue, err := getUintValue(theType.String(), value)
if err != nil {
return reflect.Value{}, err
}

theValue.Elem().SetUint(uintValue)
return theValue.Elem(), err
}

// Floating point numbers
if strings.HasPrefix(theType.String(), "float") {
floatValue, err := getFloatValue(theType.String(), value)
if err != nil {
return reflect.Value{}, err
}

theValue.Elem().SetFloat(floatValue)
return theValue.Elem(), err
}

// Strings
if theType.String() == "string" {
stringValue, err := getStringValue(theType.String(), value)
if err != nil {
return reflect.Value{}, err
}

theValue.Elem().SetString(stringValue)
return theValue.Elem(), nil
}

return reflect.Value{}, NewErrUnsupportedType(valueType)
}

// reflectValues converts interface{} to reflect.Value based on string type
// representing a slice of values
func reflectValues(valueType string, value interface{}) (reflect.Value, error) {
theType, ok := typesMap[valueType]
if !ok {
return reflect.Value{}, NewErrUnsupportedType(valueType)
}

// For NULL we return an empty slice
if value == nil {
return reflect.MakeSlice(theType, 0, 0), nil
}

var theValue reflect.Value

// Booleans
if theType.String() == "[]bool" {
bools := reflect.ValueOf(value)

theValue = reflect.MakeSlice(theType, bools.Len(), bools.Len())
for i := 0; i < bools.Len(); i++ {
boolValue, err := getBoolValue(strings.Split(theType.String(), "[]")[1], bools.Index(i).Interface())
if err != nil {
return reflect.Value{}, err
}

theValue.Index(i).SetBool(boolValue)
}

return theValue, nil
}

// Integers
if strings.HasPrefix(theType.String(), "[]int") {
ints := reflect.ValueOf(value)

theValue = reflect.MakeSlice(theType, ints.Len(), ints.Len())
for i := 0; i < ints.Len(); i++ {
intValue, err := getIntValue(strings.Split(theType.String(), "[]")[1], ints.Index(i).Interface())
if err != nil {
return reflect.Value{}, err
}

theValue.Index(i).SetInt(intValue)
}

return theValue, nil
}

// Unsigned integers
if strings.HasPrefix(theType.String(), "[]uint") || theType.String() == "[]byte" {

// Decode the base64 string if the value type is []uint8 or it's alias []byte
// See: https://golang.org/pkg/encoding/json/#Marshal
// > Array and slice values encode as JSON arrays, except that []byte encodes as a base64-encoded string
if reflect.TypeOf(value).String() == "string" {
output, err := base64.StdEncoding.DecodeString(value.(string))
if err != nil {
return reflect.Value{}, err
}
value = output
}

uints := reflect.ValueOf(value)

theValue = reflect.MakeSlice(theType, uints.Len(), uints.Len())
for i := 0; i < uints.Len(); i++ {
uintValue, err := getUintValue(strings.Split(theType.String(), "[]")[1], uints.Index(i).Interface())
if err != nil {
return reflect.Value{}, err
}

theValue.Index(i).SetUint(uintValue)
}

return theValue, nil
}

// Floating point numbers
if strings.HasPrefix(theType.String(), "[]float") {
floats := reflect.ValueOf(value)

theValue = reflect.MakeSlice(theType, floats.Len(), floats.Len())
for i := 0; i < floats.Len(); i++ {
floatValue, err := getFloatValue(strings.Split(theType.String(), "[]")[1], floats.Index(i).Interface())
if err != nil {
return reflect.Value{}, err
}

theValue.Index(i).SetFloat(floatValue)
}

return theValue, nil
}

// Strings
if theType.String() == "[]string" {
strs := reflect.ValueOf(value)

theValue = reflect.MakeSlice(theType, strs.Len(), strs.Len())
for i := 0; i < strs.Len(); i++ {
strValue, err := getStringValue(strings.Split(theType.String(), "[]")[1], strs.Index(i).Interface())
if err != nil {
return reflect.Value{}, err
}

theValue.Index(i).SetString(strValue)
}

return theValue, nil
}

return reflect.Value{}, NewErrUnsupportedType(valueType)
}

从上面的方法可以看出,ReflectValue根据Arg结构的类型来新建一个类型相同的储值空间并将Arg结构的值拷贝进去。

需要注意的的是传入参数其实是经过深度拷贝之后再进行函数执行的,这样处理主要是为之后失败重试,或是执行函数未注册再次塞回等处理考虑。所以不要想去传入指针再回收结果。

参数类型如何限制从源码中也看的很清晰,必须要存在于下面的typesMap才可以设置:

Args 类型限制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
typesMap = map[string]reflect.Type{
// base types
"bool": reflect.TypeOf(true),
"int": reflect.TypeOf(int(1)),
"int8": reflect.TypeOf(int8(1)),
"int16": reflect.TypeOf(int16(1)),
"int32": reflect.TypeOf(int32(1)),
"int64": reflect.TypeOf(int64(1)),
"uint": reflect.TypeOf(uint(1)),
"uint8": reflect.TypeOf(uint8(1)),
"uint16": reflect.TypeOf(uint16(1)),
"uint32": reflect.TypeOf(uint32(1)),
"uint64": reflect.TypeOf(uint64(1)),
"float32": reflect.TypeOf(float32(0.5)),
"float64": reflect.TypeOf(float64(0.5)),
"string": reflect.TypeOf(string("")),
// slices
"[]bool": reflect.TypeOf(make([]bool, 0)),
"[]int": reflect.TypeOf(make([]int, 0)),
"[]int8": reflect.TypeOf(make([]int8, 0)),
"[]int16": reflect.TypeOf(make([]int16, 0)),
"[]int32": reflect.TypeOf(make([]int32, 0)),
"[]int64": reflect.TypeOf(make([]int64, 0)),
"[]uint": reflect.TypeOf(make([]uint, 0)),
"[]uint8": reflect.TypeOf(make([]uint8, 0)),
"[]uint16": reflect.TypeOf(make([]uint16, 0)),
"[]uint32": reflect.TypeOf(make([]uint32, 0)),
"[]uint64": reflect.TypeOf(make([]uint64, 0)),
"[]float32": reflect.TypeOf(make([]float32, 0)),
"[]float64": reflect.TypeOf(make([]float64, 0)),
"[]byte": reflect.TypeOf(make([]byte, 0)),
"[]string": reflect.TypeOf([]string{""}),
}

Task 执行函数调用

传入参数顺序是严格按照Args列表的下标顺序的,针对这点我们来看下任务执行的函数task.Call()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
// Call attempts to call the task with the supplied arguments.
//
// `err` is set in the return value in two cases:
// 1. The reflected function invocation panics (e.g. due to a mismatched
// argument list).
// 2. The task func itself returns a non-nil error.
func (t *Task) Call() (taskResults []*TaskResult, err error) {
// retrieve the span from the task's context and finish it as soon as this function returns
if span := opentracing.SpanFromContext(t.Context); span != nil {
defer span.Finish()
}

defer func() {
// Recover from panic and set err.
if e := recover(); e != nil {
switch e := e.(type) {
default:
err = ErrTaskPanicked
case error:
err = e
case string:
err = errors.New(e)
}

// mark the span as failed and dump the error and stack trace to the span
if span := opentracing.SpanFromContext(t.Context); span != nil {
opentracing_ext.Error.Set(span, true)
span.LogFields(
opentracing_log.Error(err),
opentracing_log.Object("stack", string(debug.Stack())),
)
}

// Print stack trace
log.ERROR.Printf("%v stack: %s", err, debug.Stack())
}
}()

args := t.Args

if t.UseContext {
ctxValue := reflect.ValueOf(t.Context)
args = append([]reflect.Value{ctxValue}, args...)
}

// Invoke the task
results := t.TaskFunc.Call(args)

// Task must return at least a value
if len(results) == 0 {
return nil, ErrTaskReturnsNoValue
}

// Last returned value
lastResult := results[len(results)-1]

// If the last returned value is not nil, it has to be of error type, if that
// is not the case, return error message, otherwise propagate the task error
// to the caller
if !lastResult.IsNil() {
// If the result implements Retriable interface, return instance of Retriable
retriableErrorInterface := reflect.TypeOf((*Retriable)(nil)).Elem()
if lastResult.Type().Implements(retriableErrorInterface) {
return nil, lastResult.Interface().(ErrRetryTaskLater)
}

// Otherwise, check that the result implements the standard error interface,
// if not, return ErrLastReturnValueMustBeError error
errorInterface := reflect.TypeOf((*error)(nil)).Elem()
if !lastResult.Type().Implements(errorInterface) {
return nil, ErrLastReturnValueMustBeError
}

// Return the standard error
return nil, lastResult.Interface().(error)
}

// Convert reflect values to task results
taskResults = make([]*TaskResult, len(results)-1)
for i := 0; i < len(results)-1; i++ {
val := results[i].Interface()
typeStr := reflect.TypeOf(val).String()
taskResults[i] = &TaskResult{
Type: typeStr,
Value: val,
}
}

return taskResults, err
}

好了我相信介绍了这么多你肯定已经打通了Machinery的整个逻辑,让我们来看怎么使用

使用姿势

通过上文架构和源码的梳理相信大家都改大致明白该如何使用。接下来就看一个简单的使用用例,看下和你想的一不一样。介绍完用例我们再来看数据存储层的Broker和Backend。

启动服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

func TestRedisRedisWorkerQuitRaceCondition(t *testing.T) {
repeat := 3
for i := 0; i < repeat; i++ {
redisURL := os.Getenv("REDIS_URL")
if redisURL == "" {
t.Skip("REDIS_URL is not defined")
}

// Redis broker, Redis result backend
cnf := &config.Config{
Broker: fmt.Sprintf("redis://%v", redisURL),
DefaultQueue: "test_queue",
ResultBackend: fmt.Sprintf("redis://%v", redisURL),
Lock: fmt.Sprintf("redis://%v", redisURL),
}

server, _ := machinery.NewServer(cnf)
worker := server.NewWorker("test_worker", 0)

errorsChan := make(chan error, 1)

// Check Quit() immediately after LaunchAsync() will shutdown gracefully
// and not panic on close(b.stopChan)
worker.LaunchAsync(errorsChan)
worker.Quit()

if err := <-errorsChan; err != nil {
t.Errorf("Error shutting down machinery worker gracefully %+v", err)
continue
}
}
}

我们先看一个官方的测试用例,先新建一个Sever,然后启动一个worker,这里启动用的是worker.LaunchAsync方法,也就是同一用DefaultQueue来做底层任务队列的存储key。

一旦 errorsChan 有错误接收到就再次重试。这是一个简单的使用用例,我介绍他的原因就是他会在出错后次重启,但只有三次,我们如果想要出错一直重启就不用限制次数,或者设置worker的ErrHandler参数。

任务触发及结果获取

以上我们已经得到了Machinery的操作实体sever,我们就可以使用其send方法来触发任务。然后获取结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func testSendTask(server Server, t *testing.T) {
addTask := newAddTask(1, 1)

asyncResult, err := server.SendTask(addTask)
if err != nil {
t.Error(err)
}

results, err := asyncResult.Get(time.Duration(time.Millisecond * 5))
if err != nil {
t.Error(err)
}

if len(results) != 1 {
t.Errorf("Number of results returned = %d. Wanted %d", len(results), 1)
}

if results[0].Interface() != int64(2) {
t.Errorf(
"result = %v(%v), want int64(2)",
results[0].Type().String(),
results[0].Interface(),
)
}

sumTask := newSumTask([]int64{1, 2})
asyncResult, err = server.SendTask(sumTask)
if err != nil {
t.Error(err)
}

results, err = asyncResult.Get(time.Duration(time.Millisecond * 5))
if err != nil {
t.Error(err)
}

if len(results) != 1 {
t.Errorf("Number of results returned = %d. Wanted %d", len(results), 1)
}

if results[0].Interface() != int64(3) {
t.Errorf(
"result = %v(%v), want int64(3)",
results[0].Type().String(),
results[0].Interface(),
)
}
}

这个官方用例告诉我们如何去触发一个单任务,并进行结果的获取。这个结果结构会在Backend中说到。

数据存储处理

通过上面的描述我相信整个轮子的设计和使用以及要注意的地方都很清楚了,下面我们来更近一步看他是如何进行任务队列及结果状态存储的。

刚刚已经提到了两个非常重要的角色就是broker和backend。一个用于任务队列的存取,一个用于任务结果及状态的处理。

Broker

Broker是一个作者设计的一个接口,用于存取任务触发的队列,不同的底层存储服务对应着不通的功能实现,我们只看redis。

Broker 的创建

新建Sever时会通过配置来建立不同的Broker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76

// BrokerFactory creates a new object of iface.Broker
// Currently only AMQP/S broker is supported
func BrokerFactory(cnf *config.Config) (brokeriface.Broker, error) {
if strings.HasPrefix(cnf.Broker, "amqp://") {
return amqpbroker.New(cnf), nil
}

if strings.HasPrefix(cnf.Broker, "amqps://") {
return amqpbroker.New(cnf), nil
}

if strings.HasPrefix(cnf.Broker, "redis://") || strings.HasPrefix(cnf.Broker, "rediss://") {
var scheme string
if strings.HasPrefix(cnf.Broker, "redis://") {
scheme = "redis://"
} else {
scheme = "rediss://"
}
parts := strings.Split(cnf.Broker, scheme)
if len(parts) != 2 {
return nil, fmt.Errorf(
"Redis broker connection string should be in format %shost:port, instead got %s", scheme,
cnf.Broker,
)
}
brokers := strings.Split(parts[1], ",")
if len(brokers) > 1 || (cnf.Redis != nil && cnf.Redis.ClusterMode) {
return redisbroker.NewGR(cnf, brokers, 0), nil
} else {
redisHost, redisPassword, redisDB, err := ParseRedisURL(cnf.Broker)
if err != nil {
return nil, err
}
return redisbroker.New(cnf, redisHost, redisPassword, "", redisDB), nil
}
}

if strings.HasPrefix(cnf.Broker, "redis+socket://") {
redisSocket, redisPassword, redisDB, err := ParseRedisSocketURL(cnf.Broker)
if err != nil {
return nil, err
}

return redisbroker.New(cnf, "", redisPassword, redisSocket, redisDB), nil
}

if strings.HasPrefix(cnf.Broker, "eager") {
return eagerbroker.New(), nil
}

if _, ok := os.LookupEnv("DISABLE_STRICT_SQS_CHECK"); ok {
//disable SQS name check, so that users can use this with local simulated SQS
//where sql broker url might not start with https://sqs

//even when disabling strict SQS naming check, make sure its still a valid http URL
if strings.HasPrefix(cnf.Broker, "https://") || strings.HasPrefix(cnf.Broker, "http://") {
return sqsbroker.New(cnf), nil
}
} else {
if strings.HasPrefix(cnf.Broker, "https://sqs") {
return sqsbroker.New(cnf), nil
}
}

if strings.HasPrefix(cnf.Broker, "gcppubsub://") {
projectID, subscriptionName, err := ParseGCPPubSubURL(cnf.Broker)
if err != nil {
return nil, err
}
return gcppubsubbroker.New(cnf, projectID, subscriptionName)
}

return nil, fmt.Errorf("Factory failed with broker URL: %v", cnf.Broker)
}

函数也比较清晰我们就不多解释。

Redis的Broker结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Broker represents a Redis broker
type Broker struct {
common.Broker
common.RedisConnector
host string
password string
db int
pool *redis.Pool
consumingWG sync.WaitGroup // wait group to make sure whole consumption completes
processingWG sync.WaitGroup // use wait group to make sure task processing completes
delayedWG sync.WaitGroup
// If set, path to a socket file overrides hostname
socketPath string
redsync *redsync.Redsync
redisOnce sync.Once
redisDelayedTasksKey string
}

其中的参数也没什么好说的,三个waitgroup 用来维护三个不同的异步程序,分别是worker启动的一个消费者,并发执行任务的执行者,执行延时任务的执行者。redisDelayedTasksKey 其实就是设置在config中的redis结构中的DelayedTaskKey参数,改参数为空则设置为默认的“delayed_tasks”,这个redisDelayedTasksKey对应一个有序队列,这个有序队列根据时间来排序,worker不断地把到时的获取到推送到任务触发队列中。

Broker 的接口方法

1
2
3
4
5
6
7
8
9
10
11
12
13
// Broker - a common interface for all brokers
type Broker interface {
GetConfig() *config.Config
SetRegisteredTaskNames(names []string)
IsTaskRegistered(name string) bool
StartConsuming(consumerTag string, concurrency int, p TaskProcessor) (bool, error)
StopConsuming()
Publish(ctx context.Context, task *tasks.Signature) error
GetPendingTasks(queue string) ([]*tasks.Signature, error)
GetDelayedTasks() ([]*tasks.Signature, error)
AdjustRoutingKey(s *tasks.Signature)
}

这其中GetConfig就是获取之前建立sever的那个config结构,GetPendingTasks,GetDelayedTasks 就是获取队列里的所有任务触发,AdjustRoutingKey 文已经介绍过了。

主要介绍Broker中的消费逻辑

最重要就是StartComsuming这个函数

Broker 启动一个任务消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// StartConsuming enters a loop and waits for incoming messages
func (b *Broker) StartConsuming(consumerTag string, concurrency int, taskProcessor iface.TaskProcessor) (bool, error) {
b.consumingWG.Add(1)
defer b.consumingWG.Done()

if concurrency < 1 {
concurrency = runtime.NumCPU() * 2
}

b.Broker.StartConsuming(consumerTag, concurrency, taskProcessor)

conn := b.open()
defer conn.Close()

// Ping the server to make sure connection is live
_, err := conn.Do("PING")
if err != nil {
b.GetRetryFunc()(b.GetRetryStopChan())

// Return err if retry is still true.
// If retry is false, broker.StopConsuming() has been called and
// therefore Redis might have been stopped. Return nil exit
// StartConsuming()
if b.GetRetry() {
return b.GetRetry(), err
}
return b.GetRetry(), errs.ErrConsumerStopped
}

// Channel to which we will push tasks ready for processing by worker
deliveries := make(chan []byte, concurrency)
pool := make(chan struct{}, concurrency)

// initialize worker pool with maxWorkers workers
for i := 0; i < concurrency; i++ {
pool <- struct{}{}
}

// A receiving goroutine keeps popping messages from the queue by BLPOP
// If the message is valid and can be unmarshaled into a proper structure
// we send it to the deliveries channel
go func() {

log.INFO.Print("[*] Waiting for messages. To exit press CTRL+C")

for {
select {
// A way to stop this goroutine from b.StopConsuming
case <-b.GetStopChan():
close(deliveries)
return
case <-pool:
select {
case <-b.GetStopChan():
close(deliveries)
return
default:
}

if taskProcessor.PreConsumeHandler() {
task, _ := b.nextTask(getQueue(b.GetConfig(), taskProcessor))
//TODO: should this error be ignored?
if len(task) > 0 {
deliveries <- task
}
}

pool <- struct{}{}
}
}
}()

// A goroutine to watch for delayed tasks and push them to deliveries
// channel for consumption by the worker
b.delayedWG.Add(1)
go func() {
defer b.delayedWG.Done()

for {
select {
// A way to stop this goroutine from b.StopConsuming
case <-b.GetStopChan():
return
default:
task, err := b.nextDelayedTask(b.redisDelayedTasksKey)
if err != nil {
continue
}

signature := new(tasks.Signature)
decoder := json.NewDecoder(bytes.NewReader(task))
decoder.UseNumber()
if err := decoder.Decode(signature); err != nil {
log.ERROR.Print(errs.NewErrCouldNotUnmarshalTaskSignature(task, err))
}

if err := b.Publish(context.Background(), signature); err != nil {
log.ERROR.Print(err)
}
}
}
}()

if err := b.consume(deliveries, concurrency, taskProcessor); err != nil {
return b.GetRetry(), err
}

// Waiting for any tasks being processed to finish
b.processingWG.Wait()

return b.GetRetry(), nil
}


可以看到函数里面维护了两个go程,一个是消费任务触发队列的相当于消费主体的生产者,不停的获取任务并送到deliveries管道进行消费,一个是延时任务队列处理者,这个处理者主要就是判断延时任务是否可以执行,并将可执行的任务调用pubulish方法推送到任务触发队列中。

我们主要看任务是如何被消费的

任务获取逻辑:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

// nextTask pops next available task from the default queue
func (b *Broker) nextTask(queue string) (result []byte, err error) {
conn := b.open()
defer conn.Close()

pollPeriodMilliseconds := 1000 // default poll period for normal tasks
if b.GetConfig().Redis != nil {
configuredPollPeriod := b.GetConfig().Redis.NormalTasksPollPeriod
if configuredPollPeriod > 0 {
pollPeriodMilliseconds = configuredPollPeriod
}
}
pollPeriod := time.Duration(pollPeriodMilliseconds) * time.Millisecond

// Issue 548: BLPOP expects an integer timeout expresses in seconds.
// The call will if the value is a float. Convert to integer using
// math.Ceil():
// math.Ceil(0.0) --> 0 (block indefinitely)
// math.Ceil(0.2) --> 1 (timeout after 1 second)
pollPeriodSeconds := math.Ceil(pollPeriod.Seconds())

items, err := redis.ByteSlices(conn.Do("BLPOP", queue, pollPeriodSeconds))
if err != nil {
return []byte{}, err
}

// items[0] - the name of the key where an element was popped
// items[1] - the value of the popped element
if len(items) != 2 {
return []byte{}, redis.ErrNil
}

result = items[1]

return result, nil
}

这个逻辑很简单,使用redis的阻塞式左弹出操作来获取一个任务触发。

任务消费逻辑:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// consume takes delivered messages from the channel and manages a worker pool
// to process tasks concurrently
func (b *Broker) consume(deliveries <-chan []byte, concurrency int, taskProcessor iface.TaskProcessor) error {
errorsChan := make(chan error, concurrency*2)
pool := make(chan struct{}, concurrency)

// init pool for Worker tasks execution, as many slots as Worker concurrency param
go func() {
for i := 0; i < concurrency; i++ {
pool <- struct{}{}
}
}()

for {
select {
case err := <-errorsChan:
return err
case d, open := <-deliveries:
if !open {
return nil
}
if concurrency > 0 {
// get execution slot from pool (blocks until one is available)
select {
case <-b.GetStopChan():
b.requeueMessage(d, taskProcessor)
continue
case <-pool:
}
}

b.processingWG.Add(1)

// Consume the task inside a goroutine so multiple tasks
// can be processed concurrently
go func() {
if err := b.consumeOne(d, taskProcessor); err != nil {
errorsChan <- err
}

b.processingWG.Done()

if concurrency > 0 {
// give slot back to pool
pool <- struct{}{}
}
}()
}
}
}

// consumeOne processes a single message using TaskProcessor
func (b *Broker) consumeOne(delivery []byte, taskProcessor iface.TaskProcessor) error {
signature := new(tasks.Signature)
decoder := json.NewDecoder(bytes.NewReader(delivery))
decoder.UseNumber()
if err := decoder.Decode(signature); err != nil {
return errs.NewErrCouldNotUnmarshalTaskSignature(delivery, err)
}

// If the task is not registered, we requeue it,
// there might be different workers for processing specific tasks
if !b.IsTaskRegistered(signature.Name) {
if signature.IgnoreWhenTaskNotRegistered {
return nil
}
log.INFO.Printf("Task not registered with this worker. Requeuing message: %s", delivery)
b.requeueMessage(delivery, taskProcessor)
return nil
}

log.DEBUG.Printf("Received new message: %s", delivery)

return taskProcessor.Process(signature)
}

其实这个逻辑之前也说过其核心就是worker的Process操作,可以留意下作者并发控制的逻辑这个和之前StartComsume的并发控制一样都是采用类似信号量控制法的方式。

Backend

Backend 和 Broker 类似也是作者设计的一个存储结构的接口,用于记录执行任务状态,结果。

Backend 的创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// BackendFactory creates a new object of backends.Interface
// Currently supported backends are AMQP/S and Memcache
func BackendFactory(cnf *config.Config) (backendiface.Backend, error) {

if strings.HasPrefix(cnf.ResultBackend, "amqp://") {
return amqpbackend.New(cnf), nil
}

if strings.HasPrefix(cnf.ResultBackend, "amqps://") {
return amqpbackend.New(cnf), nil
}

if strings.HasPrefix(cnf.ResultBackend, "memcache://") {
parts := strings.Split(cnf.ResultBackend, "memcache://")
if len(parts) != 2 {
return nil, fmt.Errorf(
"Memcache result backend connection string should be in format memcache://server1:port,server2:port, instead got %s",
cnf.ResultBackend,
)
}
servers := strings.Split(parts[1], ",")
return memcachebackend.New(cnf, servers), nil
}

if strings.HasPrefix(cnf.ResultBackend, "redis://") || strings.HasPrefix(cnf.ResultBackend, "rediss://") {
var scheme string
if strings.HasPrefix(cnf.ResultBackend, "redis://") {
scheme = "redis://"
} else {
scheme = "rediss://"
}
parts := strings.Split(cnf.ResultBackend, scheme)
addrs := strings.Split(parts[1], ",")
if len(addrs) > 1 || (cnf.Redis != nil && cnf.Redis.ClusterMode) {
return redisbackend.NewGR(cnf, addrs, 0), nil
} else {
redisHost, redisPassword, redisDB, err := ParseRedisURL(cnf.ResultBackend)

if err != nil {
return nil, err
}

return redisbackend.New(cnf, redisHost, redisPassword, "", redisDB), nil
}
}

if strings.HasPrefix(cnf.ResultBackend, "redis+socket://") {
redisSocket, redisPassword, redisDB, err := ParseRedisSocketURL(cnf.ResultBackend)
if err != nil {
return nil, err
}

return redisbackend.New(cnf, "", redisPassword, redisSocket, redisDB), nil
}

if strings.HasPrefix(cnf.ResultBackend, "mongodb://") ||
strings.HasPrefix(cnf.ResultBackend, "mongodb+srv://") {
return mongobackend.New(cnf)
}

if strings.HasPrefix(cnf.ResultBackend, "eager") {
return eagerbackend.New(), nil
}

if strings.HasPrefix(cnf.ResultBackend, "null") {
return nullbackend.New(), nil
}

if strings.HasPrefix(cnf.ResultBackend, "https://dynamodb") {
return dynamobackend.New(cnf), nil
}

return nil, fmt.Errorf("Factory failed with result backend: %v", cnf.ResultBackend)
}

其创建的方式和Broker类似,不做过多废话。

Redis的Backend结构

1
2
3
4
5
6
7
8
9
10
11
12
13
// Backend represents a Redis result backend
type Backend struct {
common.Backend
host string
password string
db int
pool *redis.Pool
// If set, path to a socket file overrides hostname
socketPath string
redsync *redsync.Redsync
redisOnce sync.Once
common.RedisConnector
}

Backend的接口方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Backend - a common interface for all result backends
type Backend interface {
// Group related functions
InitGroup(groupUUID string, taskUUIDs []string) error
GroupCompleted(groupUUID string, groupTaskCount int) (bool, error)
GroupTaskStates(groupUUID string, groupTaskCount int) ([]*tasks.TaskState, error)
TriggerChord(groupUUID string) (bool, error)

// Setting / getting task state
SetStatePending(signature *tasks.Signature) error
SetStateReceived(signature *tasks.Signature) error
SetStateStarted(signature *tasks.Signature) error
SetStateRetry(signature *tasks.Signature) error
SetStateSuccess(signature *tasks.Signature, results []*tasks.TaskResult) error
SetStateFailure(signature *tasks.Signature, err string) error
GetState(taskUUID string) (*tasks.TaskState, error)

// Purging stored stored tasks states and group meta data
IsAMQP() bool
PurgeState(taskUUID string) error
PurgeGroupMeta(groupUUID string) error
}

可以看出这个接口方法其实都是见名知意的,我大概解释一下:

Backend 的主要任务是把任务的一系列执行状态和结构记录下来,它通过Signature的UUID来关联,它的记录在redis中的结构是String类型并以signature的UUID为Key。在执行某一个signature任务时会同步的记录一个对应UUID的TaskState

1
2
3
4
5
6
7
8
9
10
// TaskState represents a state of a task
type TaskState struct {
TaskUUID string `bson:"_id"`
TaskName string `bson:"task_name"`
State string `bson:"state"`
Results []*TaskResult `bson:"results"`
Error string `bson:"error"`
CreatedAt time.Time `bson:"created_at"`
TTL int64 `bson:"ttl,omitempty"`
}ai

多的大家就自己看吧,包括函数结果的存储,还有任务组group的状态结果记录等。

  • SetStatePending:该函数是在Sever.SendTask时存入一个pending状态的taskState
  • SetStateReceived:该函数是在Broker获取一个signature的一开始将taskState状态更新为received
  • SetStateStarted:该函数是在即将开始执行任务前将taskState更新为started

剩下大同小异。可以结合worker的Process和taskSucceeded方法来看效果更佳。

通过Backend获取结果

在上面的代码中我们发现每次使用Sever触发任务都会有一个result返回,这个结构就是作者设计来通过Backend获取结果的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// AsyncResult represents a task result
type AsyncResult struct {
Signature *tasks.Signature
taskState *tasks.TaskState
backend iface.Backend
}

// NewAsyncResult creates AsyncResult instance
func NewAsyncResult(signature *tasks.Signature, backend iface.Backend) *AsyncResult {
return &AsyncResult{
Signature: signature,
taskState: new(tasks.TaskState),
backend: backend,
}
}

这个结构中Signature是老朋友了,taskState也讲过,backend就是用来获取结果的Backend接口。看下如何获取结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// Get returns task results (synchronous blocking call)
func (asyncResult *AsyncResult) Get(sleepDuration time.Duration) ([]reflect.Value, error) {
for {
results, err := asyncResult.Touch()

if results == nil && err == nil {
time.Sleep(sleepDuration)
} else {
return results, err
}
}
}

// Touch the state and don't wait
func (asyncResult *AsyncResult) Touch() ([]reflect.Value, error) {
if asyncResult.backend == nil {
return nil, ErrBackendNotConfigured
}

asyncResult.GetState()

// Purge state if we are using AMQP backend
if asyncResult.backend.IsAMQP() && asyncResult.taskState.IsCompleted() {
asyncResult.backend.PurgeState(asyncResult.taskState.TaskUUID)
}

if asyncResult.taskState.IsFailure() {
return nil, errors.New(asyncResult.taskState.Error)
}

if asyncResult.taskState.IsSuccess() {
return tasks.ReflectTaskResults(asyncResult.taskState.Results)
}

return nil, nil
}

// GetState returns latest task state
func (asyncResult *AsyncResult) GetState() *tasks.TaskState {
if asyncResult.taskState.IsCompleted() {
return asyncResult.taskState
}

taskState, err := asyncResult.backend.GetState(asyncResult.Signature.UUID)
if err == nil {
asyncResult.taskState = taskState
}

return asyncResult.taskState
}

获取任务结果的核心方法就是GetState,Touch 则是在任务成功时将结果出参返回出来。Get则是加了一个超时退出。

任务编排

接下来是任务编排的介绍,Machinery一共提供了三种任务编排方式:

  • Group : 执行一组异步任务,任务之间互不影响。
  • Chain:执行一组同步任务,任务有次序之分,上个任务的出参可作为下个任务的入参。
  • Chord:执行一组同步任务,执行完成后,在调用一个回调函数。

好了我们大致看下这三个结构:

Group

Group 结构

1
2
3
4
5
// Group creates a set of tasks to be executed in parallel
type Group struct {
GroupUUID string
Tasks []*Signature
}

Group 创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// NewGroup creates a new group of tasks to be processed in parallel
func NewGroup(signatures ...*Signature) (*Group, error) {
// Generate a group UUID
groupUUID := uuid.New().String()
groupID := fmt.Sprintf("group_%v", groupUUID)

// Auto generate task UUIDs if needed, group tasks by common group UUID
for _, signature := range signatures {
if signature.UUID == "" {
signatureID := uuid.New().String()
signature.UUID = fmt.Sprintf("task_%v", signatureID)
}
signature.GroupUUID = groupID
signature.GroupTaskCount = len(signatures)
}

return &Group{
GroupUUID: groupID,
Tasks: signatures,
}, nil
}

group 结构其实就是维护了多个Signature,并形成一个UUID作为标识。我们想要并发异步执行多个互不影响的任务就可以使用这个结构来进行任务编排,类似于一个WaitGroup。形成Group结构后使用 Sever.SendGroup 来进行触发。这里需要注意的是如何定义该Group的成功与失败,以及函数执行结果如何获取。

Group 用法

触发

使用Sever.SendGroupWithContext来触发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// SendGroupWithContext will inject the trace context in all the signature headers before publishing it
func (server *Server) SendGroupWithContext(ctx context.Context, group *tasks.Group, sendConcurrency int) ([]*result.AsyncResult, error) {
span, _ := opentracing.StartSpanFromContext(ctx, "SendGroup", tracing.ProducerOption(), tracing.MachineryTag, tracing.WorkflowGroupTag)
defer span.Finish()

tracing.AnnotateSpanWithGroupInfo(span, group, sendConcurrency)

// Make sure result backend is defined
if server.backend == nil {
return nil, errors.New("Result backend required")
}

asyncResults := make([]*result.AsyncResult, len(group.Tasks))

var wg sync.WaitGroup
wg.Add(len(group.Tasks))
errorsChan := make(chan error, len(group.Tasks)*2)

// Init group
server.backend.InitGroup(group.GroupUUID, group.GetUUIDs())

// Init the tasks Pending state first
for _, signature := range group.Tasks {
if err := server.backend.SetStatePending(signature); err != nil {
errorsChan <- err
continue
}
}

pool := make(chan struct{}, sendConcurrency)
go func() {
for i := 0; i < sendConcurrency; i++ {
pool <- struct{}{}
}
}()

for i, signature := range group.Tasks {

if sendConcurrency > 0 {
<-pool
}

go func(s *tasks.Signature, index int) {
defer wg.Done()

// Publish task

err := server.broker.Publish(ctx, s)

if sendConcurrency > 0 {
pool <- struct{}{}
}

if err != nil {
errorsChan <- fmt.Errorf("Publish message error: %s", err)
return
}

asyncResults[index] = result.NewAsyncResult(s, server.backend)
}(signature, i)
}

done := make(chan int)
go func() {
wg.Wait()
done <- 1
}()

select {
case err := <-errorsChan:
return asyncResults, err
case <-done:
return asyncResults, nil
}
}

从触发函数可以看出Group无非是把其包含的signature一个个的放入出发队列让后返回一个一一对应的结果结构来收集结果。有一点不同的是使用backend.InitGroup方法在Backend的结果处理集合中加入了一个Group结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// GroupMeta stores useful metadata about tasks within the same group
// E.g. UUIDs of all tasks which are used in order to check if all tasks
// completed successfully or not and thus whether to trigger chord callback
type GroupMeta struct {
GroupUUID string `bson:"_id"`
TaskUUIDs []string `bson:"task_uuids"`
ChordTriggered bool `bson:"chord_triggered"`
Lock bool `bson:"lock"`
CreatedAt time.Time `bson:"created_at"`
TTL int64 `bson:"ttl,omitempty"`
}

// InitGroup creates and saves a group meta data object
func (b *Backend) InitGroup(groupUUID string, taskUUIDs []string) error {
groupMeta := &tasks.GroupMeta{
GroupUUID: groupUUID,
TaskUUIDs: taskUUIDs,
CreatedAt: time.Now().UTC(),
}

encoded, err := json.Marshal(groupMeta)
if err != nil {
return err
}

conn := b.open()
defer conn.Close()

expiration := int64(b.getExpiration().Seconds())
_, err = conn.Do("SET", groupUUID, encoded, "EX", expiration)
if err != nil {
return err
}

return nil
}

这个groupMeta结构会以GroupUUID为Key存入Backend,使用这个结构来关联Signature,后续可以使用该结构把所有的对应Task结果获取到(参考backend.GroupTaskStates)。

执行

group 的执行其实没有特殊的处理就是将其包含的任务都放入任务触发队列中。

结果

它的结果获取其实也是去获取这几个任务的结果集,没有特殊的处理。

Group用例

我们来看一个官方使用用例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

func testSendGroup(server Server, t *testing.T, sendConcurrency int) {
t1, t2, t3 := newAddTask(1, 1), newAddTask(2, 2), newAddTask(5, 6)

group, err := tasks.NewGroup(t1, t2, t3)
if err != nil {
t.Fatal(err)
}

asyncResults, err := server.SendGroup(group, sendConcurrency)
if err != nil {
t.Error(err)
}

expectedResults := []int64{2, 4, 11}

actualResults := make([]int64, 3)

for i, asyncResult := range asyncResults {
results, err := asyncResult.Get(time.Duration(time.Millisecond * 5))
if err != nil {
t.Error(err)
}

if len(results) != 1 {
t.Errorf("Number of results returned = %d. Wanted %d", len(results), 1)
}

intResult, ok := results[0].Interface().(int64)
if !ok {
t.Errorf("Could not convert %v to int64", results[0].Interface())
}
actualResults[i] = intResult
}

sort.Sort(ascendingInt64s(actualResults))

if !reflect.DeepEqual(expectedResults, actualResults) {
t.Errorf(
"expected results = %v, actual results = %v",
expectedResults,
actualResults,
)
}
}

当然SendGroup也是可以进行并发控制的如果你一次塞入的任务过多你可以将并发量调大。

Chord

Chord 结构

1
2
3
4
5
6
// Chord adds an optional callback to the group to be executed
// after all tasks in the group finished
type Chord struct {
Group *Group
Callback *Signature
}

从结构上可以看出Chord任务是在Group任务的基础上加入了一个Callback单任务。可以想象其编排的目的就是在所有Group的任务做成功后,进行一个回调函数。

Chord 创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// NewChord creates a new chord (a group of tasks with a single callback
// to be executed after all tasks in the group has completed)
func NewChord(group *Group, callback *Signature) (*Chord, error) {
if callback.UUID == "" {
// Generate a UUID for the chord callback
callbackUUID := uuid.New().String()
callback.UUID = fmt.Sprintf("chord_%v", callbackUUID)
}

// Add a chord callback to all tasks
for _, signature := range group.Tasks {
signature.ChordCallback = callback
}

return &Chord{Group: group, Callback: callback}, nil
}

Chord 用法

触发

使用Sever.SendChordWithContext进行Chord任务的触发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// SendChordWithContext will inject the trace context in all the signature headers before publishing it
func (server *Server) SendChordWithContext(ctx context.Context, chord *tasks.Chord, sendConcurrency int) (*result.ChordAsyncResult, error) {
span, _ := opentracing.StartSpanFromContext(ctx, "SendChord", tracing.ProducerOption(), tracing.MachineryTag, tracing.WorkflowChordTag)
defer span.Finish()

tracing.AnnotateSpanWithChordInfo(span, chord, sendConcurrency)

_, err := server.SendGroupWithContext(ctx, chord.Group, sendConcurrency)
if err != nil {
return nil, err
}

return result.NewChordAsyncResult(
chord.Group.Tasks,
chord.Callback,
server.backend,
), nil
}

执行

chord任务在执行过程中其实与group无差别,就是并发的去执行几个group中的异步任务,但是在所有group任务都执行成功后会将回调塞入任务触发队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// taskSucceeded updates the task state and triggers success callbacks or a
// chord callback if this was the last task of a group with a chord callback
func (worker *Worker) taskSucceeded(signature *tasks.Signature, taskResults []*tasks.TaskResult) error {
// Update task state to SUCCESS
if err := worker.server.GetBackend().SetStateSuccess(signature, taskResults); err != nil {
return fmt.Errorf("Set state to 'success' for task %s returned error: %s", signature.UUID, err)
}

// Log human readable results of the processed task
var debugResults = "[]"
results, err := tasks.ReflectTaskResults(taskResults)
if err != nil {
log.WARNING.Print(err)
} else {
debugResults = tasks.HumanReadableResults(results)
}
log.DEBUG.Printf("Processed task %s. Results = %s", signature.UUID, debugResults)

// Trigger success callbacks

for _, successTask := range signature.OnSuccess {
if signature.Immutable == false {
// Pass results of the task to success callbacks
for _, taskResult := range taskResults {
successTask.Args = append(successTask.Args, tasks.Arg{
Type: taskResult.Type,
Value: taskResult.Value,
})
}
}

worker.server.SendTask(successTask)
}

// If the task was not part of a group, just return
if signature.GroupUUID == "" {
return nil
}

// There is no chord callback, just return
if signature.ChordCallback == nil {
return nil
}

// Check if all task in the group has completed
groupCompleted, err := worker.server.GetBackend().GroupCompleted(
signature.GroupUUID,
signature.GroupTaskCount,
)
if err != nil {
return fmt.Errorf("Completed check for group %s returned error: %s", signature.GroupUUID, err)
}

// If the group has not yet completed, just return
if !groupCompleted {
return nil
}

// Defer purging of group meta queue if we are using AMQP backend
if worker.hasAMQPBackend() {
defer worker.server.GetBackend().PurgeGroupMeta(signature.GroupUUID)
}

// Trigger chord callback
shouldTrigger, err := worker.server.GetBackend().TriggerChord(signature.GroupUUID)
if err != nil {
return fmt.Errorf("Triggering chord for group %s returned error: %s", signature.GroupUUID, err)
}

// Chord has already been triggered
if !shouldTrigger {
return nil
}

// Get task states
taskStates, err := worker.server.GetBackend().GroupTaskStates(
signature.GroupUUID,
signature.GroupTaskCount,
)
if err != nil {
log.ERROR.Printf(
"Failed to get tasks states for group:[%s]. Task count:[%d]. The chord may not be triggered. Error:[%s]",
signature.GroupUUID,
signature.GroupTaskCount,
err,
)
return nil
}

// Append group tasks' return values to chord task if it's not immutable
for _, taskState := range taskStates {
if !taskState.IsSuccess() {
return nil
}

if signature.ChordCallback.Immutable == false {
// Pass results of the task to the chord callback
for _, taskResult := range taskState.Results {
signature.ChordCallback.Args = append(signature.ChordCallback.Args, tasks.Arg{
Type: taskResult.Type,
Value: taskResult.Value,
})
}
}
}

// Send the chord task
_, err = worker.server.SendTask(signature.ChordCallback)
if err != nil {
return err
}

return nil
}

可以看到worker在执行任务的时候会在成功后看下该任务是否存在于某group,如果存在会判断group中的所有任务是否成功,即调用Backend的GroupCompleted方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

// GroupCompleted returns true if all tasks in a group finished
func (b *Backend) GroupCompleted(groupUUID string, groupTaskCount int) (bool, error) {
conn := b.open()
defer conn.Close()

groupMeta, err := b.getGroupMeta(conn, groupUUID)
if err != nil {
return false, err
}

taskStates, err := b.getStates(conn, groupMeta.TaskUUIDs...)
if err != nil {
return false, err
}

var countSuccessTasks = 0
for _, taskState := range taskStates {
if taskState.IsCompleted() {
countSuccessTasks++
}
}

return countSuccessTasks == groupTaskCount, nil
}

该方法会找出所有Group的 taskState 然后判断是否都已经完成全完成了判定为成功。如果全部成功则会用TriggerChord去判断是否存在回调函数。如果存在将会把Callback函数触发塞入任务触发队列中。

结果

chord的结果获取因为有回调函数的状态差异也有相应的不同。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// ChordAsyncResult represents a result of a chord
type ChordAsyncResult struct {
groupAsyncResults []*AsyncResult
chordAsyncResult *AsyncResult
backend iface.Backend
}

// NewChordAsyncResult creates ChordAsyncResult instance
func NewChordAsyncResult(groupTasks []*tasks.Signature, chordCallback *tasks.Signature, backend iface.Backend) *ChordAsyncResult {
asyncResults := make([]*AsyncResult, len(groupTasks))
for i, task := range groupTasks {
asyncResults[i] = NewAsyncResult(task, backend)
}
return &ChordAsyncResult{
groupAsyncResults: asyncResults,
chordAsyncResult: NewAsyncResult(chordCallback, backend),
backend: backend,
}
}

这个是Chord的结果处理结构,其实就是将基本任务和回调任务区分了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Get returns result of a chord (synchronous blocking call)
func (chordAsyncResult *ChordAsyncResult) Get(sleepDuration time.Duration) ([]reflect.Value, error) {
if chordAsyncResult.backend == nil {
return nil, ErrBackendNotConfigured
}

var err error
for _, asyncResult := range chordAsyncResult.groupAsyncResults {
_, err = asyncResult.Get(sleepDuration)
if err != nil {
return nil, err
}
}

return chordAsyncResult.chordAsyncResult.Get(sleepDuration)
}

或取结果时也没有什么特殊的就是一个个获取。

Chain

Chain 结构

1
2
3
4
5
// Chain creates a chain of tasks to be executed one after another
type Chain struct {
Tasks []*Signature
}

Chain 结构其实是一个同步执行多任务结构,如果你对signature足够的了解其实是可以直接触发同步执行任务的。但有了Chain我们的同步执行任务组设计将会异常方便。

Chain 创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// NewChain creates a new chain of tasks to be processed one by one, passing
// results unless task signatures are set to be immutable
func NewChain(signatures ...*Signature) (*Chain, error) {
// Auto generate task UUIDs if needed
for _, signature := range signatures {
if signature.UUID == "" {
signatureID := uuid.New().String()
signature.UUID = fmt.Sprintf("task_%v", signatureID)
}
}

for i := len(signatures) - 1; i > 0; i-- {
if i > 0 {
signatures[i-1].OnSuccess = []*Signature{signatures[i]}
}
}

chain := &Chain{Tasks: signatures}

return chain, nil
}

新建的Chain其实是将一个任务组最后一个任务不断的塞向前一个任务的OnSuccess回调中直到第一个任务为止,其意思就是第一个任务结束后将继续执行第二个。

Chain的用法

触发

使用Sever.SendChainWithContext进行Chord任务的触发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// SendChainWithContext will inject the trace context in all the signature headers before publishing it
func (server *Server) SendChainWithContext(ctx context.Context, chain *tasks.Chain) (*result.ChainAsyncResult, error) {
span, _ := opentracing.StartSpanFromContext(ctx, "SendChain", tracing.ProducerOption(), tracing.MachineryTag, tracing.WorkflowChainTag)
defer span.Finish()

tracing.AnnotateSpanWithChainInfo(span, chain)

return server.SendChain(chain)
}


// SendChain triggers a chain of tasks
func (server *Server) SendChain(chain *tasks.Chain) (*result.ChainAsyncResult, error) {
_, err := server.SendTask(chain.Tasks[0])
if err != nil {
return nil, err
}

return result.NewChainAsyncResult(chain.Tasks, server.backend), nil
}

触发时仅仅触发第一个Task。然后新建一个ChainAsyncResult结构来接受结果

执行

执行其实没有特殊的处理其实本身就会在任务执行成功后执行OnSuccess的回调

结果

因为我们只要获取chain的最后一个任务的结果,所以结果的获取也是有些区别,作者新建了一个Chain的结果收集结构。

1
2
3
4
5
// ChainAsyncResult represents a result of a chain of tasks
type ChainAsyncResult struct {
asyncResults []*AsyncResult
backend iface.Backend
}

获取Chain的任务结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// Get returns results of a chain of tasks (synchronous blocking call)
func (chainAsyncResult *ChainAsyncResult) Get(sleepDuration time.Duration) ([]reflect.Value, error) {
if chainAsyncResult.backend == nil {
return nil, ErrBackendNotConfigured
}

var (
results []reflect.Value
err error
)

for _, asyncResult := range chainAsyncResult.asyncResults {
results, err = asyncResult.Get(sleepDuration)
if err != nil {
return nil, err
}
}

return results, err
}

// GetWithTimeout returns results of a chain of tasks with timeout (synchronous blocking call)
func (chainAsyncResult *ChainAsyncResult) GetWithTimeout(timeoutDuration, sleepDuration time.Duration) ([]reflect.Value, error) {
if chainAsyncResult.backend == nil {
return nil, ErrBackendNotConfigured
}

var (
results []reflect.Value
err error
)

timeout := time.NewTimer(timeoutDuration)
ln := len(chainAsyncResult.asyncResults)
lastResult := chainAsyncResult.asyncResults[ln-1]

for {
select {
case <-timeout.C:
return nil, ErrTimeoutReached
default:

for _, asyncResult := range chainAsyncResult.asyncResults {
_, err = asyncResult.Touch()
if err != nil {
return nil, err
}
}

results, err = lastResult.Touch()
if err != nil {
return nil, err
}
if results != nil {
return results, err
}
time.Sleep(sleepDuration)
}
}
}

两个方法其实都是获取最后一个任务的结果,如果你需要分任务的结果当然可以从自任务中去获取,因为每个任务的结果其实都已经收集到了ChainAsyncResulth中

Chain 用例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func testSendChain(server Server, t *testing.T) {
t1, t2, t3 := newAddTask(2, 2), newAddTask(5, 6), newMultipleTask(4)

chain, err := tasks.NewChain(t1, t2, t3)
if err != nil {
t.Fatal(err)
}

chainAsyncResult, err := server.SendChain(chain)
if err != nil {
t.Error(err)
}

results, err := chainAsyncResult.Get(time.Duration(time.Millisecond * 5))
if err != nil {
t.Error(err)
}

if len(results) != 1 {
t.Errorf("Number of results returned = %d. Wanted %d", len(results), 1)
}

if results[0].Interface() != int64(60) {
t.Errorf(
"result = %v(%v), want int64(60)",
results[0].Type().String(),
results[0].Interface(),
)
}
}

官方用例也很简单,大家可以看一看。

定时任务

作者还结合了 Cron 强大的定时能力集成了定时任务的编排:

触发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115

// RegisterPeriodicTask register a periodic task which will be triggered periodically
func (server *Server) RegisterPeriodicTask(spec, name string, signature *tasks.Signature) error {
//check spec
schedule, err := cron.ParseStandard(spec)
if err != nil {
return err
}

f := func() {
//get lock
err := server.lock.LockWithRetries(utils.GetLockName(name, spec), schedule.Next(time.Now()).UnixNano()-1)
if err != nil {
return
}

//send task
_, err = server.SendTask(tasks.CopySignature(signature))
if err != nil {
log.ERROR.Printf("periodic task failed. task name is: %s. error is %s", name, err.Error())
}
}

_, err = server.scheduler.AddFunc(spec, f)
return err
}

// RegisterPeriodicChain register a periodic chain which will be triggered periodically
func (server *Server) RegisterPeriodicChain(spec, name string, signatures ...*tasks.Signature) error {
//check spec
schedule, err := cron.ParseStandard(spec)
if err != nil {
return err
}

f := func() {
// new chain
chain, _ := tasks.NewChain(tasks.CopySignatures(signatures...)...)

//get lock
err := server.lock.LockWithRetries(utils.GetLockName(name, spec), schedule.Next(time.Now()).UnixNano()-1)
if err != nil {
return
}

//send task
_, err = server.SendChain(chain)
if err != nil {
log.ERROR.Printf("periodic task failed. task name is: %s. error is %s", name, err.Error())
}
}

_, err = server.scheduler.AddFunc(spec, f)
return err
}

// RegisterPeriodicGroup register a periodic group which will be triggered periodically
func (server *Server) RegisterPeriodicGroup(spec, name string, sendConcurrency int, signatures ...*tasks.Signature) error {
//check spec
schedule, err := cron.ParseStandard(spec)
if err != nil {
return err
}

f := func() {
// new group
group, _ := tasks.NewGroup(tasks.CopySignatures(signatures...)...)

//get lock
err := server.lock.LockWithRetries(utils.GetLockName(name, spec), schedule.Next(time.Now()).UnixNano()-1)
if err != nil {
return
}

//send task
_, err = server.SendGroup(group, sendConcurrency)
if err != nil {
log.ERROR.Printf("periodic task failed. task name is: %s. error is %s", name, err.Error())
}
}

_, err = server.scheduler.AddFunc(spec, f)
return err
}

// RegisterPeriodicChord register a periodic chord which will be triggered periodically
func (server *Server) RegisterPeriodicChord(spec, name string, sendConcurrency int, callback *tasks.Signature, signatures ...*tasks.Signature) error {
//check spec
schedule, err := cron.ParseStandard(spec)
if err != nil {
return err
}

f := func() {
// new chord
group, _ := tasks.NewGroup(tasks.CopySignatures(signatures...)...)
chord, _ := tasks.NewChord(group, tasks.CopySignature(callback))

//get lock
err := server.lock.LockWithRetries(utils.GetLockName(name, spec), schedule.Next(time.Now()).UnixNano()-1)
if err != nil {
return
}

//send task
_, err = server.SendChord(chord, sendConcurrency)
if err != nil {
log.ERROR.Printf("periodic task failed. task name is: %s. error is %s", name, err.Error())
}
}

_, err = server.scheduler.AddFunc(spec, f)
return err
}

可以看出几个任务编排结构作者都贴心的给出了相应的定时任务触发方法,而且支持了分布式架构,值得注意的的是分布式锁是定时描述spec 加任务名称,不要让这两个东西重复了。

用例

1
2
3
4
5
// 作者没有写用例我随意写一个方法调用
func main(){
sever.RegisterPeriodicTask("0/3 * * * * ?", "checkTask", &tasks.Signature{}) // 每三秒执行一次任务名为 checkTask 的任务
}

总结

Machinery 的设计基于两个存储结构构件起来的,一个是任务触发队列这个通过Broker接口来调用,触发结构是Signature其中唯一标识为UUID,一个是任务结果状态记录,这个通过Backend接口来调用,结果记录结构为TaskState其中TaskUUID关联Signature的UUID。对外暴露的调用结构为Sever,其包含了Broker,Backend,以及注册任务执行函数Map。任务执行函数通过Signature中的Name做唯一标识来获取。调度结构为Worker,Worker结构通过不断的获取Broker中的任务触发来进行任务函数的执行同时根据函数执行的不同阶段向Backend中记录结果,在这个过程中为了逻辑清晰派生出了Task结构。Task结构主要做执行函数执行处理,而worker则主要起调度和协调作用它根据task执行的结果做一些处理。为了实现任务编排的需求又派生出了Group,Chord,Chain等结构但整个执行调度的基础都是基于单一Signature的触发的只是调度和状态定义方式有些许差异。

写这篇文章还是耗费了我很长的时间的,因为这个库确实非常方便,但如果不进一步了解其实也难以发挥它的作用,希望我的这个说明可以对大家接下来的代码有所帮助。