Tunny 库源码深度解读

Tunny 库是一个用于生成和管理goroutine池的Golang库,制定一个方法可以限制该方法并发量及控制生命周期。这是一个设计简单但功能强大且通用易用的库,之前我们说过 Machinery 库,该库功能当然强大的多,他支持分布式且支持不同任务的编排,Tunny就没有这种功能,但Tunny库异常简单轻量如果不是大型的事件推动型项目,那么Tunny就足够支持其场景。

本文解读一下Tunny库的源码并且对比 Machinery 库来总结下两位作者在设计通用场景功能时的不同思路。

源码解读

架构

我们先来看看Tunny 库的大体架构:

主体pool:

1
2
3
4
5
6
7
8
9
10
11
12
// Pool is a struct that manages a collection of workers, each with their own
// goroutine. The Pool can initialize, expand, compress and close the workers,
// as well as processing jobs with the workers synchronously.
type Pool struct {
ctor func() Worker
workers []*workerWrapper
reqChan chan workRequest

workerMut sync.Mutex
queuedJobs int64
}

Pool 结构作者设计作为使用该库的主体,消费者生成及生产者的调用都是通过该结构实现的,其中 ctor 是生成接口 worker 的方法worker 接口是最小的消费单位作者设计其来执行业务的主要逻辑。作者设计该字段为生产worker的函数而不是直接将worker放入应该是为了方便使用者调用,使生产worker的方法和我们的主逻辑解耦。workers 是pool 结构维护的生产者队列。reqChan 是消费和生产的通信的主通道,生产者使用该通道激活一个消费者消费。workerMut 是一个并发锁,主要用在workers数量的增减时。最后的que queJobs就是记录当前进行的并发任务量。

新建pool:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// New creates a new Pool of workers that starts with n workers. You must
// provide a constructor function that creates new Worker types and when you
// change the size of the pool the constructor will be called to create each new
// Worker.
func New(n int, ctor func() Worker) *Pool {
p := &Pool{
ctor: ctor,
reqChan: make(chan workRequest),
}
p.SetSize(n)

return p
}

使用 New 方法来新建一个pool结构,参数n 根据业务逻辑来进行并发量的设计,n为同时间内业务逻辑的并发处理量。参数 ctor生成worker接口的方法,worker接口我们可以自己定义,当然也可用作者设计好的两个实现worker接口的逻辑这个接下来会说。最后初始化一下reqChan。

可以看到在初始化一个pool后紧接着调用了 pool 结构实现的 SetSize 方法并传入了参数n,根据我刚才对参数n的描述不难想象这个函数是构建并发消费者的函数紧接着我们就会说到这个。

pool 结构实现的方法:
SiteSize
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// SetSize changes the total number of workers in the Pool. This can be called
// by any goroutine at any time unless the Pool has been stopped, in which case
// a panic will occur.
func (p *Pool) SetSize(n int) {
p.workerMut.Lock()
defer p.workerMut.Unlock()

lWorkers := len(p.workers)
if lWorkers == n {
return
}

// Add extra workers if N > len(workers)
for i := lWorkers; i < n; i++ {
p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor()))
}

// Asynchronously stop all workers > N
for i := n; i < lWorkers; i++ {
p.workers[i].stop()
}

// Synchronously wait for all workers > N to stop
for i := n; i < lWorkers; i++ {
p.workers[i].join()
}

// Remove stopped workers from slice
p.workers = p.workers[:n]
}

这个就是我们刚刚说的构建并发消费者的逻辑,这个方法不但构建了并发消费者而且其可以对消费者的数量进行增减,只要再此调用该函数并设置你想要的并发数量就并发数量就可以改为你当前设置的值。

构建消费者的逻辑是这段:

1
2
3
4
// Add extra workers if N > len(workers)
for i := lWorkers; i < n; i++ {
p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor()))
}

这段逻辑使用了newWorkerWrapper的方法生成了WorkerWrapper并追加进了pool 结构的消费者队列里。WorkerWrapper是作者设计的控制业务逻辑核心结构,这个结构对我们注入的业务函数进行了包裹实,这个结构后面会说。

Process
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Process will use the Pool to process a payload and synchronously return the
// result. Process can be called safely by any goroutines, but will panic if the
// Pool has been stopped.
func (p *Pool) Process(payload interface{}) interface{} {
atomic.AddInt64(&p.queuedJobs, 1)

request, open := <-p.reqChan
if !open {
panic(ErrPoolNotRunning)
}

request.jobChan <- payload

payload, open = <-request.retChan
if !open {
panic(ErrWorkerClosed)
}

atomic.AddInt64(&p.queuedJobs, -1)
return payload
}

该方法为触发一次业务逻辑执行并获取结果,通过获取reqChan的数据来触发一次业务逻辑的执行,作者设计的并发思路是并发消费者尝试去发送请求到一个无缓冲管道也就是reqChan,生产者在要执行逻辑的时候去尝试获取一个消费者的消费请求,从现实中来看这就是打工者自动自发去请求老板有什么活干,老板告知后开始干活的场景。

我们看下reqChan获取的workRequest结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

// workRequest is a struct containing context representing a workers intention
// to receive a work payload.
type workRequest struct {
// jobChan is used to send the payload to this worker.
jobChan chan<- interface{}

// retChan is used to read the result from this worker.
retChan <-chan interface{}

// interruptFunc can be called to cancel a running job. When called it is no
// longer necessary to read from retChan.
interruptFunc func()
}

作者对于生产端和消费端的联系上是很有意思的,消费者把jobChan任务参数管道和retChan以及interruptFunc中断本次任务函数发给生产端,生产的会去把消费参数通过jobChan发给消费者,然后消费者产生结果再把结果发到retChan由生产端接收。

我在看到后感觉很奇怪,为什么作者不去用正常的思维让生产端直接发送任务参数及结果管道给消费者在把结果给生产端呢?后面我发现作者这样做的好处就是可以在消费端只维护一个retChan就好,否则生产端每次都要产生一个retChan并且在获取结果后或者任务中断时还要去关闭他,逻辑写起来反而复杂,如果不关闭就容易造成内存泄漏,作者这种写法就只在一个消费者里维护一个retChan就好,更加简单易维护。作者这么写还有一个原因是需要在任务启动前就把任务中断函数传送给生产端,所以必不可少的是消费端需要先传送信息给生产端如此一来作者这样的设计就十分巧妙了。

当然我们控制消费端超时中断的还可以使用 context,让生产端每次生成一个超时context,传给消费端,这样一样可以在两边同步中断信息,但劣势和之前一样每次生产端调用都会产生context,如果不是高并发调用还好,并发调用其实很容易出现内存占用过大,或是gc占用cpu过高的问题,作者的设计思路其实是让所有资源都控制在消费端,因为消费端是固定的那么两端公用的资源在消费端产生和控制是更加合理的。

最后一个函数interruptFunc则是给生产端通知消费端终止任务用的,如果一个任务用时太长我们可以去中断它开始下一个,防止生产端任务堆积。

ProcessTimed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// ProcessTimed will use the Pool to process a payload and synchronously return
// the result. If the timeout occurs before the job has finished the worker will
// be interrupted and ErrJobTimedOut will be returned. ProcessTimed can be
// called safely by any goroutines.
func (p *Pool) ProcessTimed(
payload interface{},
timeout time.Duration,
) (interface{}, error) {
atomic.AddInt64(&p.queuedJobs, 1)
defer atomic.AddInt64(&p.queuedJobs, -1)

tout := time.NewTimer(timeout)

var request workRequest
var open bool

select {
case request, open = <-p.reqChan:
if !open {
return nil, ErrPoolNotRunning
}
case <-tout.C:
return nil, ErrJobTimedOut
}

select {
case request.jobChan <- payload:
case <-tout.C:
request.interruptFunc()
return nil, ErrJobTimedOut
}

select {
case payload, open = <-request.retChan:
if !open {
return nil, ErrWorkerClosed
}
case <-tout.C:
request.interruptFunc()
return nil, ErrJobTimedOut
}

tout.Stop()
return payload, nil
}

这个方法是一个可以控制超时中断的方法,因为生产者和消费者使用retchan进行结果数据通信所以在生产觉得时间太久了我不等了的时候消费者也要知道生产者不再等待,因为生产者不去接收消费者的结果那么消费者将会一直阻塞,所以消费者需要在生产者不等的时候接到一个信号,收到这个信号就会丢弃当前结果不在传入retChan。这个信息的传输方式就靠这个一开传给生产者的in terruptFunc()方法,该方法对workerWrapper结构中的interruptChan做关闭,且在消费者go协程将监听将数据传给retChan出的结果平行监听,在发现interrupt管道关闭后会丢掉当前结果开始下一个任务的监听。

注意 :这个ProcessTimed方法才是作者设计该并发的精髓,在作者设计的业务逻辑执行并发中,任务执行和任务结果获取是同步的也就是说每次生产者出发一次业务逻辑都会等待获得结果才结束自己的生命周期,这时就需要超时操作来防止任务堆积或是内存泄漏。那么作者把任务处理和结果获取做成同步有什么意义呢,这个在之后对比michinary的并发处理时会给出答案。

Close
1
2
3
4
5
// Close will terminate all workers and close the job channel of this Pool.
func (p *Pool) Close() {
p.SetSize(0)
close(p.reqChan)
}

Close就是停掉当前所有的消费者workerwrrapper,并关掉reqChan。

任务处理控控制结构 workerWrapper:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// workerWrapper takes a Worker implementation and wraps it within a goroutine
// and channel arrangement. The workerWrapper is responsible for managing the
// lifetime of both the Worker and the goroutine.
type workerWrapper struct {
worker Worker
interruptChan chan struct{}

// reqChan is NOT owned by this type, it is used to send requests for work.
reqChan chan<- workRequest

// closeChan can be closed in order to cleanly shutdown this worker.
closeChan chan struct{}

// closedChan is closed by the run() goroutine when it exits.
closedChan chan struct{}
}

该结构是管理控制业务逻辑并发的结构,他负责控制业务逻辑并发处理的开始,中断,结束以及最大并发量。

新建 workerWrapper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func newWorkerWrapper(
reqChan chan<- workRequest,
worker Worker,
) *workerWrapper {
w := workerWrapper{
worker: worker,
interruptChan: make(chan struct{}),
reqChan: reqChan,
closeChan: make(chan struct{}),
closedChan: make(chan struct{}),
}

go w.run()

return &w
}

我们可以注意到刚刚的setsize方法中调用了这个新建函数。这个新建函数每建立一个就会异步跑起来一个业务处理控制器。所以调用多少次这个新建逻辑就是业务逻辑最大并发量就是多少

workerWrapper 结构方法实现:

run
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (w *workerWrapper) run() {
jobChan, retChan := make(chan interface{}), make(chan interface{})
defer func() {
w.worker.Terminate()
close(retChan)
close(w.closedChan)
}()

for {
// NOTE: Blocking here will prevent the worker from closing down.
w.worker.BlockUntilReady()
select {
case w.reqChan <- workRequest{
jobChan: jobChan,
retChan: retChan,
interruptFunc: w.interrupt,
}:
select {
case payload := <-jobChan:
result := w.worker.Process(payload)
select {
case retChan <- result:
case <-w.interruptChan:
w.interruptChan = make(chan struct{})
}
case _, _ = <-w.interruptChan:
w.interruptChan = make(chan struct{})
}
case <-w.closeChan:
return
}
}
}

run函数有三个阶段的等待,1.等待reqchan请求被接收触发业务逻辑。2.等待生产者将jobchan的业务参数传入。3.等待业务逻辑执行完成将结果传回给生产者。可以看到第一个阶段起到的作用是触发,不涉及业务逻辑的处理,第二个阶段在等待业务逻辑参数,也还未执行业务逻辑但其实消费者已经启动了消费流程,这个阶段也有可能因为生产端等待reqchan过久而在发送同时超时的,所以在获取参数的同时也做了中断处理。第三个阶段就是处理业务逻辑并等待结果的核心阶段这一阶段可能由于业务逻辑延迟导致超时,生产端在规定时间内无法获取结果就会中断。

interrupt

中断函数,将该函数传给生产端实现生产端消费端同步中断信号。

1
2
3
4
func (w *workerWrapper) interrupt() {
close(w.interruptChan)
w.worker.Interrupt()
}

这里使用 close(w.interruptChan),方式来同步超时中断信号。

值得注意的是,在中断当前数据消费后,还调用了w.worker.Interrupt()方法,worker是最小的业务逻辑处理者,在worker是一个复杂时间久且含有for循环的函数时可以自定义worker的中断方法。实现中断生产消费端的同时业务逻辑也中断的逻辑。所以如果你的业务逻辑耗时较长且有for循环不断处理问题你可以自定义一个woker实现中断的方法。

stop & join

stop 和 join 方法

1
2
3
4
5
6
7
func (w *workerWrapper) stop() {
close(w.closeChan)
}

func (w *workerWrapper) join() {
<-w.closedChan
}

这两个逻辑十分简单,stop逻辑就是停下当前”run”起来的业务逻辑控制器。join逻辑就是等待当前的workerrapper逻辑真的被停掉。

业务处理接口 Worker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Worker is an interface representing a Tunny working agent. It will be used to
// block a calling goroutine until ready to process a job, process that job
// synchronously, interrupt its own process call when jobs are abandoned, and
// clean up its resources when being removed from the pool.
//
// Each of these duties are implemented as a single method and can be averted
// when not needed by simply implementing an empty func.
type Worker interface {
// Process will synchronously perform a job and return the result.
Process(interface{}) interface{}

// BlockUntilReady is called before each job is processed and must block the
// calling goroutine until the Worker is ready to process the next job.
BlockUntilReady()

// Interrupt is called when a job is cancelled. The worker is responsible
// for unblocking the Process implementation.
Interrupt()

// Terminate is called when a Worker is removed from the processing pool
// and is responsible for cleaning up any held resources.
Terminate()
}

Worker 是作者抽象出来的一个最小的业务逻辑执行器。这个执行器只服务业务执行(Process)中断(Interrupt)和结束(Terminate)还有一个等待函数(BlockUntilReady)在实现接口函数时你应该将业务处理逻辑注入。

closuerWorker
1
2
3
4
5
6
7
8
9
10
11
12
13
// closureWorker is a minimal Worker implementation that simply wraps a
// func(interface{}) interface{}
type closureWorker struct {
processor func(interface{}) interface{}
}

func (w *closureWorker) Process(payload interface{}) interface{} {
return w.processor(payload)
}

func (w *closureWorker) BlockUntilReady() {}
func (w *closureWorker) Interrupt() {}
func (w *closureWorker) Terminate() {}

这个结构是作者为了方便使用实现的一个Worker接口对应结构体。该结构只是注入了一个通用的函数签名func(interface{}) interface{}在Process方法中进行调用。你可以制作更加复杂的worker接口实现。比如将context,及其cancel()方法注入结构体实现函数中断,或是注入一个terminateChan 实现函数结束。

使用closuerWorker新建pool的方法 NewFunc

1
2
3
4
5
6
7
8
9
// NewFunc creates a new Pool of workers where each worker will process using
// the provided func.
func NewFunc(n int, f func(interface{}) interface{}) *Pool {
return New(n, func() Worker {
return &closureWorker{
processor: f,
}
})
}
callbackWorker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// callbackWorker is a minimal Worker implementation that attempts to cast
// each job into func() and either calls it if successful or returns
// ErrJobNotFunc.
type callbackWorker struct{}

func (w *callbackWorker) Process(payload interface{}) interface{} {
f, ok := payload.(func())
if !ok {
return ErrJobNotFunc
}
f()
return nil
}

func (w *callbackWorker) BlockUntilReady() {}
func (w *callbackWorker) Interrupt() {}
func (w *callbackWorker) Terminate() {}

者个结构是作者做出来的一个对应无参数函数业务逻辑的执行者实现。这个结构中payload 是一个func()类型。并且直接调用。

这个结构虽然对函数的参数和返回值有限制都么有,但其通用性很高不用提前将业务逻辑钉死并注入只要穿入func()类型函数即可。

与Machinery的并发方式对比

我写一个简单的伪代码来描述machinery的并发处理设计:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118

func getTaskFormRedisList() string {
fmt.Println("getTaskFormRedisList")
time.Sleep(1 * time.Second)
return "hello world! "
}

type MachineryPool struct {
signature chan string
currency int
stopChan chan struct{}
group *sync.WaitGroup
dealFunc func(string)
}

func NewMachineryPool(f func(string), currency int) *MachineryPool {
return &MachineryPool{
signature: make(chan string),
currency: currency,
stopChan: make(chan struct{}),
group: &sync.WaitGroup{},
dealFunc: f,
}
}

func DealSig(sig string) {
fmt.Println("get signature message : ", sig)
}

func (m *MachineryPool) run() {
m.group.Add(1)
go func() {
defer m.group.Done()
for {
select {
case <-m.stopChan:
fmt.Println("producer has return")
return
default:
m.signature <- getTaskFormRedisList()

}
}
}()

deliverChan := make(chan string)
m.group.Add(1)
go func() {
defer m.group.Done()
for {
select {
case sig := <-m.signature:
if sig != "" {
deliverChan <- sig
}
case <-m.stopChan:
fmt.Println("consumer has return")
return

}

}

}()

m.ConsumeOne(deliverChan)
}

func (m *MachineryPool) Stop() {
m.stopChan <- struct{}{}
m.group.Wait()
}

func (m *MachineryPool) ConsumeOne(deliverChan chan string) {
currencyChan := make(chan struct{}, m.currency)
go func() {
for i := 0; i < m.currency; i++ {
currencyChan <- struct{}{}
}
}()

for {
select {
case sig, open := <-deliverChan:
if !open {
return
}
select {
case <-currencyChan:
m.group.Add(1)
go func() {
m.dealFunc(sig)
m.group.Done()
currencyChan <- struct{}{}
}()

case <-m.stopChan:
fmt.Println("consume one has been return")
return

}

case <-m.stopChan:
fmt.Println("consume one has been return")
return

}
}
}

func TestMachineryPool(t *testing.T) {
p := NewMachineryPool(DealSig, 10)
go p.run()

time.Sleep(10 * time.Second)
p.Stop()
}

我只是用伪代码把Machinery的并发处理逻辑简单的写出来,并不是说machinery的逻辑就是这么写的,原代码设计十分复杂。我只是把他的并发控制的逻辑大致搞了出来,用来对比两个作者的思路有何不同。

并发量控制

并发控制的方式在Machinery中使用的是固定信号量channel模式,准备一个你需要控制并发量大小的channel,你可以先塞满每个并发出一个,也可以先空着每个并发入一个。Tunny 使用的模式则是固定消费者数量模式来固定最大并发量。在业务逻辑处理量大,高并发,且不太需要实时性时,将消费者和生产者解耦是推荐切必要的,这样可以避免同一时间处理任务过多导致生产者阻塞。当需要实时性且任务处理时间不长使用Tunny是非常好的选择,他在触发逻辑处理后等待结果,且在出错时或超时时立刻重试,业务逻辑处理的稳定性和结果获取的实时性都更好。

结果获取

Machinery 将业务逻辑执行和结果获取进行了解耦,这样做的好处是将处理业务逻辑的任务单独独立出来,如此一来生产者和消费也实现了完全解耦合,生产者只要去做一个任务触发就可以不管了,接下来使用获取结果的方法去尝试获取结果。而Tunny就需要生产者一直等到结果获取到,或是超时报错。这时生产者其实是容易堆积的。

这样做的好处就是你可以去第一时间重试失败的业务逻辑,在处理一些需要可靠性实时性高的需求,且业务逻辑是调用其他地方的接口来处理容易遇到网络等问题,这时就可以第一时间及时重试,Machinery 的重试是再次把逻辑放回消息队列,此时就不保证什么时候触发了,触发时间其实是不可控的。

生命周期控制

结束消费者两者都是使用的管道控制。在优雅退出的逻辑中,等待所有消费者结束的处理还是有些不一样,Machinery是使用经典的WaitGroup,而Tunny 的处理就更加轻量,用了一个ClosedChan来查看是否所有的并发都已经退出,这种处理方式其实是建立在固定消费者的并发控制逻辑模式的前提下的,在只控制固定消费者并发的前提下推荐使用这种方法来等待所有协程结束,但在Machinery这种未知数量的并发控制还是必须使用waitGroup。

总结

Machinery 和 Tunny 都有其适用的最佳场景,都是处理并发的利器。我们在使用的同时也需要学习作者在设计并发处理的思路和套路,不同的业务逻辑我们总是需要不同处理方式,并不是说使用他们的包就万事大吉了,在处理高并发的业务逻辑时学到作者的设计和抽象方式举一反三形成最适合业务的并发处理模式才是关键。