想要写好Go并发不得不掌握的数据结构(3)

SingleFlight

SingleFlight 是 Go 开发组提供的一个扩展并发原语。它的作用是,在处理多个 goroutine 同时调用同一个函数的时候,只让一个 goroutine 去调用这个函数,等到这个 goroutine 返回结果的时候,再把结果返回给这几个同时调用的 goroutine,这样可以减少并发调用的数量。

其实,sync.Once 不是只在并发的时候保证只有一个 goroutine 执行函数 f,而是会保证永远只执行一次,而 SingleFlight 是每次调用都重新执行,并且在多个请求同时调用的时候只有一个执行。它们两个面对的场景是不同的,sync.Once 主要是用在单次初始化场景中,而 SingleFlight 主要用在合并并发请求的场景中,尤其是缓存场景

如果你学会了 SingleFlight,在面对秒杀等大并发请求的场景,而且这些请求都是读请求时,你就可以把这些请求合并为一个请求,这样,你就可以将后端服务的压力从 n 降到 1。尤其是在面对后端是数据库这样的服务的时候,采用 SingleFlight 可以极大地提高性能。那么,话不多说,就让我们开始学习 SingleFlight 吧。

实现原理

SingleFlight 使用互斥锁 Mutex 和 Map 来实现。Mutex 提供并发时的读写保护,Map 用来保存同一个 key 的正在处理(in flight)的请求。

SingleFlight 的数据结构是 Group,它提供了三个方法。Do, DoChan,Forget

  • Do:这个方法执行一个函数,并返回函数执行的结果。你需要提供一个 key,对于同一个 key,在同一时间只有一个在执行,同一个 key 并发的请求会等待。第一个执行的请求返回的结果,就是它的返回结果。函数 fn 是一个无参的函数,返回一个结果或者 error,而 Do 方法会返回函数执行的结果或者是 error,shared 会指示 v 是否返回给多个请求。
  • DoChan:类似 Do 方法,只不过是返回一个 chan,等 fn 函数执行完,产生了结果以后,就能从这个 chan 中接收这个结果。
  • Forget:告诉 Group 忘记这个 key。这样一来,之后这个 key 请求会执行 f,而不是等待前一个未完成的 fn 函数的结果。

下面,我们来看具体的实现方法。

首先,SingleFlight 定义一个辅助对象 call,这个 call 就代表正在执行 fn 函数的请求或者是已经执行完的请求。Group 代表 SingleFlight。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 代表一个正在处理的请求,或者已经处理完的请求
type call struct {
wg sync.WaitGroup


// 这个字段代表处理完的值,在waitgroup完成之前只会写一次
// waitgroup完成之后就读取这个值
val interface{}
err error

// 指示当call在处理时是否要忘掉这个key
forgotten bool
dups int
chans []chan<- Result
}

// group代表一个singleflight对象
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}

我们只需要查看一个 Do 方法,DoChan 的处理方法是类似的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {//如果已经存在相同的key
c.dups++
g.mu.Unlock()
c.wg.Wait() //等待这个key的第一个请求完成
return c.val, c.err, true //使用第一个key的请求结果
}
c := new(call) // 第一个请求,创建一个call
c.wg.Add(1)
g.m[key] = c //加入到key map中
g.mu.Unlock()


g.doCall(c, key, fn) // 调用方法
return c.val, c.err, c.dups > 0
}

doCall 方法会实际调用函数 fn:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
c.val, c.err = fn()
c.wg.Done()


g.mu.Lock()
if !c.forgotten { // 已调用完,删除这个key
delete(g.m, key)
}
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
g.mu.Unlock()
}

在这段代码中,你要注意下第 7 行。在默认情况下,forgotten==false,所以第 8 行默认会被调用,也就是说,第一个请求完成后,后续的同一个 key 的请求又重新开始新一次的 fn 函数的调用。

应用场景

了解了 SingleFlight 的实现原理,下面我们来看看它都应用于什么场景中。

第一个是在net/lookup.go中,如果同时有查询同一个 host 的请求,lookupGroup 会把这些请求 merge 到一起,只需要一个请求就可以了:

1
2
3
4
// lookupGroup merges LookupIPAddr calls together for lookups for the same
// host. The lookupGroup key is the LookupIPAddr.host argument.
// The return values are ([]IPAddr, error).
lookupGroup singleflight.Group

第二个是 Go 在查询仓库版本信息时,将并发的请求合并成 1 个请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func metaImportsForPrefix(importPrefix string, mod ModuleMode, security web.SecurityMode) (*urlpkg.URL, []metaImport, error) {
// 使用缓存保存请求结果
setCache := func(res fetchResult) (fetchResult, error) {
fetchCacheMu.Lock()
defer fetchCacheMu.Unlock()
fetchCache[importPrefix] = res
return res, nil

// 使用 SingleFlight请求
resi, _, _ := fetchGroup.Do(importPrefix, func() (resi interface{}, err error) {
fetchCacheMu.Lock()
// 如果缓存中有数据,那么直接从缓存中取
if res, ok := fetchCache[importPrefix]; ok {
fetchCacheMu.Unlock()
return res, nil
}
fetchCacheMu.Unlock()
......

需要注意的是,这里涉及到了缓存的问题。上面的代码会把结果放在缓存中,这也是常用的一种解决缓存击穿的例子。

设计缓存问题时,我们常常需要解决缓存穿透、缓存雪崩和缓存击穿问题。缓存击穿问题是指,在平常高并发的系统中,大量的请求同时查询一个 key 时,如果这个 key 正好过期失效了,就会导致大量的请求都打到数据库上。这就是缓存击穿。

用 SingleFlight 来解决缓存击穿问题再合适不过了。因为,这个时候,只要这些对同一个 key 的并发请求的其中一个到数据库中查询,就可以了,这些并发的请求可以共享同一个结果。因为是缓存查询,不用考虑幂等性问题。

事实上,在 Go 生态圈知名的缓存框架 groupcache 中,就使用了较早的 Go 标准库的 SingleFlight 实现。接下来,我就来给你介绍一下 groupcache 是如何使用 SingleFlight 解决缓存击穿问题的。groupcache 中的 SingleFlight 只有一个方法:

1
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error)

SingleFlight 的作用是,在加载一个缓存项的时候,合并对同一个 key 的 load 的并发请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type Group struct {
。。。。。。
// loadGroup ensures that each key is only fetched once
// (either locally or remotely), regardless of the number of
// concurrent callers.
loadGroup flightGroup
......
}

func (g *Group) load(ctx context.Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
viewi, err := g.loadGroup.Do(key, func() (interface{}, error) {
// 从cache, peer, local尝试查询cache
return value, nil
})
if err == nil {
value = viewi.(ByteView)
}
return
}

循环栅栏 CyclicBarrier

循环栅栏(CyclicBarrier),它常常应用于重复进行一组 goroutine 同时执行的场景中。

CyclicBarrier允许一组 goroutine 彼此等待,到达一个共同的执行点。同时,因为它可以被重复使用,所以叫循环栅栏。具体的机制是,大家都在栅栏前等待,等全部都到齐了,就抬起栅栏放行

你可能会觉得,CyclicBarrier 和 WaitGroup 的功能有点类似,确实是这样。不过,CyclicBarrier 更适合用在“固定数量的 goroutine 等待同一个执行点”的场景中,而且在放行 goroutine 之后,CyclicBarrier 可以重复利用,不像 WaitGroup 重用的时候,必须小心翼翼避免 panic。

如果使用 WaitGroup 实现的话,调用比较复杂,不像 CyclicBarrier 那么清爽。更重要的是,如果想重用 WaitGroup,你还要保证,将 WaitGroup 的计数值重置到 n 的时候不会出现并发问题。WaitGroup 更适合用在“一个 goroutine 等待一组 goroutine 到达同一个执行点”的场景中,或者是不需要重用的场景中。

实现原理

CyclicBarrier 有两个初始化方法:

第一个是 New 方法,它只需要一个参数,来指定循环栅栏参与者的数量;

第二个方法是 NewWithAction,它额外提供一个函数,可以在每一次到达执行点的时候执行一次。具体的时间点是在最后一个参与者到达之后,但是其它的参与者还未被放行之前。我们可以利用它,做放行之前的一些共享状态的更新等操作。

这两个方法的签名如下:

1
2
func New(parties int) CyclicBarrier
func NewWithAction(parties int, barrierAction func() error) CyclicBarrier

CyclicBarrier 是一个接口,定义的方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type CyclicBarrier interface {
// 等待所有的参与者到达,如果被ctx.Done()中断,会返回ErrBrokenBarrier
Await(ctx context.Context) error

// 重置循环栅栏到初始化状态。如果当前有等待者,那么它们会返回ErrBrokenBarrier
Reset()

// 返回当前等待者的数量
GetNumberWaiting() int

// 参与者的数量
GetParties() int

// 循环栅栏是否处于中断状态
IsBroken() bool
}

循环栅栏的使用也很简单。循环栅栏的参与者只需调用 Await 等待,等所有的参与者都到达后,再执行下一步。当执行下一步的时候,循环栅栏的状态又恢复到初始的状态了,可以迎接下一轮同样多的参与者。

有一道非常经典的并发编程的题目,非常适合使用循环栅栏,下面我们来看一下。

并发趣题:一氧化二氢制造工厂

题目是这样的:有一个名叫大自然的搬运工的工厂,生产一种叫做一氧化二氢的神秘液体。这种液体的分子是由一个氧原子和两个氢原子组成的,也就是水。这个工厂有多条生产线,每条生产线负责生产氧原子或者是氢原子,每条生产线由一个 goroutine 负责。这些生产线会通过一个栅栏,只有一个氧原子生产线和两个氢原子生产线都准备好,才能生成出一个水分子,否则所有的生产线都会处于等待状态。也就是说,一个水分子必须由三个不同的生产线提供原子,而且水分子是一个一个按照顺序产生的,每生产一个水分子,就会打印出 HHO、HOH、OHH 三种形式的其中一种。HHH、OOH、OHO、HOO、OOO 都是不允许的。生产线中氢原子的生产线为 2N 条,氧原子的生产线为 N 条。

你可以先想一下,我们怎么来实现呢?首先,我们来定义一个 H2O 辅助数据类型,它包含两个信号量的字段和一个循环栅栏。

  • semaH 信号量:控制氢原子。一个水分子需要两个氢原子,所以,氢原子的空槽数资源数设置为 2。
  • semaO 信号量:控制氧原子。一个水分子需要一个氧原子,所以资源数的空槽数设置为 1。
  • 循环栅栏:等待两个氢原子和一个氧原子填补空槽,直到任务完成。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package water
import (
"context"
"github.com/marusama/cyclicbarrier"
"golang.org/x/sync/semaphore"
)
// 定义水分子合成的辅助数据结构
type H2O struct {
semaH *semaphore.Weighted // 氢原子的信号量
semaO *semaphore.Weighted // 氧原子的信号量
b cyclicbarrier.CyclicBarrier // 循环栅栏,用来控制合成
}
func New() *H2O {
return &H2O{
semaH: semaphore.NewWeighted(2), //氢原子需要两个
semaO: semaphore.NewWeighted(1), // 氧原子需要一个
b: cyclicbarrier.New(3), // 需要三个原子才能合成
}
}

接下来,我们看看各条流水线的处理情况。流水线分为氢原子处理流水线和氧原子处理流水线,首先,我们先看一下氢原子的流水线:如果有可用的空槽,氢原子的流水线的处理方法是 hydrogen,hydrogen 方法就会占用一个空槽(h2o.semaH.Acquire),输出一个 H 字符,然后等待栅栏放行。等其它的 goroutine 填补了氢原子的另一个空槽和氧原子的空槽之后,程序才可以继续进行。

1
2
3
4
5
6
7
func (h2o *H2O) hydrogen(releaseHydrogen func()) {
h2o.semaH.Acquire(context.Background(), 1)

releaseHydrogen() // 输出H
h2o.b.Await(context.Background()) //等待栅栏放行
h2o.semaH.Release(1) // 释放氢原子空槽
}

然后是氧原子的流水线。氧原子的流水线处理方法是 oxygen, oxygen 方法是等待氧原子的空槽,然后输出一个 O,就等待栅栏放行。放行后,释放氧原子空槽位。

1
2
3
4
5
6
7
func (h2o *H2O) oxygen(releaseOxygen func()) {
h2o.semaO.Acquire(context.Background(), 1)

releaseOxygen() // 输出O
h2o.b.Await(context.Background()) //等待栅栏放行
h2o.semaO.Release(1) // 释放氢原子空槽
}

在栅栏放行之前,只有两个氢原子的空槽位和一个氧原子的空槽位。只有等栅栏放行之后,这些空槽位才会被释放。栅栏放行,就意味着一个水分子组成成功。这个算法是不是正确呢?我们来编写一个单元测试检测一下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package water


import (
"math/rand"
"sort"
"sync"
"testing"
"time"
)


func TestWaterFactory(t *testing.T) {
//用来存放水分子结果的channel
var ch chan string
releaseHydrogen := func() {
ch <- "H"
}
releaseOxygen := func() {
ch <- "O"
}

// 300个原子,300个goroutine,每个goroutine并发的产生一个原子
var N = 100
ch = make(chan string, N*3)


h2o := New()

// 用来等待所有的goroutine完成
var wg sync.WaitGroup
wg.Add(N * 3)

// 200个氢原子goroutine
for i := 0; i < 2*N; i++ {
go func() {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
h2o.hydrogen(releaseHydrogen)
wg.Done()
}()
}
// 100个氧原子goroutine
for i := 0; i < N; i++ {
go func() {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
h2o.oxygen(releaseOxygen)
wg.Done()
}()
}

//等待所有的goroutine执行完
wg.Wait()

// 结果中肯定是300个原子
if len(ch) != N*3 {
t.Fatalf("expect %d atom but got %d", N*3, len(ch))
}

// 每三个原子一组,分别进行检查。要求这一组原子中必须包含两个氢原子和一个氧原子,这样才能正确组成一个水分子。
var s = make([]string, 3)
for i := 0; i < N; i++ {
s[0] = <-ch
s[1] = <-ch
s[2] = <-ch
sort.Strings(s)


water := s[0] + s[1] + s[2]
if water != "HHO" {
t.Fatalf("expect a water molecule but got %s", water)
}
}
}

atomic

Mutex、RWMutex 等并发原语的实现时,你可以看到,最底层是通过 atomic 包中的一些原子操作来实现的

在很多场景中,使用并发原语实现起来比较复杂,而原子操作可以帮助我们更轻松地实现底层的优化。

原子操作的基础知识

Package sync/atomic 实现了同步算法底层的原子的内存操作原语,我们把它叫做原子操作原语,它提供了一些实现原子操作的方法。之所以叫原子操作,是因为一个原子在执行的时候,其它线程不会看到执行一半的操作结果。在其它线程看来,原子操作要么执行完了,要么还没有执行,就像一个最小的粒子 - 原子一样,不可分割。

CPU 提供了基础的原子操作,不过,不同架构的系统的原子操作是不一样的。对于单处理器单核系统来说,如果一个操作是由一个 CPU 指令来实现的,那么它就是原子操作,比如它的 XCHG 和 INC 等指令。如果操作是基于多条指令来实现的,那么,执行的过程中可能会被中断,并执行上下文切换,这样的话,原子性的保证就被打破了,因为这个时候,操作可能只执行了一半。

在多处理器多核系统中,原子操作的实现就比较复杂了。由于 cache 的存在,单个核上的单个指令进行原子操作的时候,你要确保其它处理器或者核不访问此原子操作的地址,或者是确保其它处理器或者核总是访问原子操作之后的最新的值。x86 架构中提供了指令前缀 LOCK,LOCK 保证了指令(比如 LOCK CMPXCHG op1、op2)不会受其它处理器或 CPU 核的影响,有些指令(比如 XCHG)本身就提供 Lock 的机制。不同的 CPU 架构提供的原子操作指令的方式也是不同的,比如对于多核的 MIPS 和 ARM,提供了 LL/SC(Load Link/Store Conditional)指令,可以帮助实现原子操作(ARMLL/SC 指令 LDREX 和 STREX)。因为不同的 CPU 架构甚至不同的版本提供的原子操作的指令是不同的,所以,要用一种编程语言实现支持不同架构的原子操作是相当有难度的。不过,还好这些都不需要你操心,因为 Go 提供了一个通用的原子操作的 API,将更底层的不同的架构下的实现封装成 atomic 包,提供了修改类型的原子操作(atomic read-modify-write,RMW)和加载存储类型的原子操作(Load 和 Store)的 API,稍后我会一一介绍。

atomic 原子操作的应用场景

使用 atomic 的一些方法,我们可以实现更底层的一些优化。如果使用 Mutex 等并发原语进行这些优化,虽然可以解决问题,但是这些并发原语的实现逻辑比较复杂,对性能还是有一定的影响的。

举个例子:假设你想在程序中使用一个标志(flag,比如一个 bool 类型的变量),来标识一个定时任务是否已经启动执行了,你会怎么做呢?

我们先来看看加锁的方法。如果使用 Mutex 和 RWMutex,在读取和设置这个标志的时候加锁,是可以做到互斥的、保证同一时刻只有一个定时任务在执行的,所以使用 Mutex 或者 RWMutex 是一种解决方案。

其实,这个场景中的问题不涉及到对资源复杂的竞争逻辑,只是会并发地读写这个标志,这类场景就适合使用 atomic 的原子操作。具体怎么做呢?你可以使用一个 uint32 类型的变量,如果这个变量的值是 0,就标识没有任务在执行,如果它的值是 1,就标识已经有任务在完成了。你看,是不是很简单呢?

再来看一个例子。假设你在开发应用程序的时候,需要从配置服务器中读取一个节点的配置信息。而且,在这个节点的配置发生变更的时候,你需要重新从配置服务器中拉取一份新的配置并更新。你的程序中可能有多个 goroutine 都依赖这份配置,涉及到对这个配置对象的并发读写,你可以使用读写锁实现对配置对象的保护。在大部分情况下,你也可以利用 atomic 实现配置对象的更新和加载。

atomic 原子操作还是实现 lock-free 数据结构的基石。

在实现 lock-free 的数据结构时,我们可以不使用互斥锁,这样就不会让线程因为等待互斥锁而阻塞休眠,而是让线程保持继续处理的状态。另外,不使用互斥锁的话,lock-free 的数据结构还可以提供并发的性能。

atomic 提供的方法

关于 atomic,还有一个地方你一定要记住,atomic 操作的对象是一个地址,你需要把可寻址的变量的地址作为参数传递给方法,而不是把变量的值传递给方法。

Add

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// AddInt32 atomically adds delta to *addr and returns the new value.
// Consider using the more ergonomic and less error-prone [Int32.Add] instead.
func AddInt32(addr *int32, delta int32) (new int32)

// AddUint32 atomically adds delta to *addr and returns the new value.
// To subtract a signed positive constant value c from x, do AddUint32(&x, ^uint32(c-1)).
// In particular, to decrement x, do AddUint32(&x, ^uint32(0)).
// Consider using the more ergonomic and less error-prone [Uint32.Add] instead.
func AddUint32(addr *uint32, delta uint32) (new uint32)

// AddInt64 atomically adds delta to *addr and returns the new value.
// Consider using the more ergonomic and less error-prone [Int64.Add] instead
// (particularly if you target 32-bit platforms; see the bugs section).
func AddInt64(addr *int64, delta int64) (new int64)

// AddUint64 atomically adds delta to *addr and returns the new value.
// To subtract a signed positive constant value c from x, do AddUint64(&x, ^uint64(c-1)).
// In particular, to decrement x, do AddUint64(&x, ^uint64(0)).
// Consider using the more ergonomic and less error-prone [Uint64.Add] instead
// (particularly if you target 32-bit platforms; see the bugs section).
func AddUint64(addr *uint64, delta uint64) (new uint64)

// AddUintptr atomically adds delta to *addr and returns the new value.
// Consider using the more ergonomic and less error-prone [Uintptr.Add] instead.
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)

Add 方法就是给第一个参数地址中的值增加一个 delta 值。

于有符号的整数来说,delta 可以是一个负数,相当于减去一个值。对于无符号的整数和 uinptr 类型来说,怎么实现减去一个值呢?毕竟,atomic 并没有提供单独的减法操作。

我来跟你说一种方法。你可以利用计算机补码的规则,把减法变成加法。以 uint32 类型为例:

1
AddUint32(&x, ^uint32(c-1)).

如果是对 uint64 的值进行操作,那么,就把上面的代码中的 uint32 替换成 uint64。尤其是减 1 这种特殊的操作,我们可以简化为:

1
AddUint32(&x, ^uint32(0))

我们再来看看 CAS 方法。

CAS (CompareAndSwap)

以 int32 为例,我们学习一下 CAS 提供的功能。在 CAS 的方法签名中,需要提供要操作的地址、原数据值、新值,如下所示:

1
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)

这个方法会比较当前 addr 地址里的值是不是 old,如果不等于 old,就返回 false;如果等于 old,就把此地址的值替换成 new 值,返回 true。这就相当于“判断相等才替换”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// CompareAndSwapInt32 executes the compare-and-swap operation for an int32 value.
// Consider using the more ergonomic and less error-prone [Int32.CompareAndSwap] instead.
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)

// CompareAndSwapInt64 executes the compare-and-swap operation for an int64 value.
// Consider using the more ergonomic and less error-prone [Int64.CompareAndSwap] instead
// (particularly if you target 32-bit platforms; see the bugs section).
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)

// CompareAndSwapUint32 executes the compare-and-swap operation for a uint32 value.
// Consider using the more ergonomic and less error-prone [Uint32.CompareAndSwap] instead.
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)

// CompareAndSwapUint64 executes the compare-and-swap operation for a uint64 value.
// Consider using the more ergonomic and less error-prone [Uint64.CompareAndSwap] instead
// (particularly if you target 32-bit platforms; see the bugs section).
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)

// CompareAndSwapUintptr executes the compare-and-swap operation for a uintptr value.
// Consider using the more ergonomic and less error-prone [Uintptr.CompareAndSwap] instead.
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)

// CompareAndSwapPointer executes the compare-and-swap operation for a unsafe.Pointer value.
// Consider using the more ergonomic and less error-prone [Pointer.CompareAndSwap] instead.
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)

Swap

如果不需要比较旧值,只是比较粗暴地替换的话,就可以使用 Swap 方法,它替换后还可以返回旧值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// SwapInt32 atomically stores new into *addr and returns the previous *addr value.
// Consider using the more ergonomic and less error-prone [Int32.Swap] instead.
func SwapInt32(addr *int32, new int32) (old int32)

// SwapInt64 atomically stores new into *addr and returns the previous *addr value.
// Consider using the more ergonomic and less error-prone [Int64.Swap] instead
// (particularly if you target 32-bit platforms; see the bugs section).
func SwapInt64(addr *int64, new int64) (old int64)

// SwapUint32 atomically stores new into *addr and returns the previous *addr value.
// Consider using the more ergonomic and less error-prone [Uint32.Swap] instead.
func SwapUint32(addr *uint32, new uint32) (old uint32)

// SwapUint64 atomically stores new into *addr and returns the previous *addr value.
// Consider using the more ergonomic and less error-prone [Uint64.Swap] instead
// (particularly if you target 32-bit platforms; see the bugs section).
func SwapUint64(addr *uint64, new uint64) (old uint64)

// SwapUintptr atomically stores new into *addr and returns the previous *addr value.
// Consider using the more ergonomic and less error-prone [Uintptr.Swap] instead.
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)

// SwapPointer atomically stores new into *addr and returns the previous *addr value.
// Consider using the more ergonomic and less error-prone [Pointer.Swap] instead.
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)

Load

Load 方法会取出 addr 地址中的值,即使在多处理器、多核、有 CPU cache 的情况下,这个操作也能保证 Load 是一个原子操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// LoadInt32 atomically loads *addr.
// Consider using the more ergonomic and less error-prone [Int32.Load] instead.
func LoadInt32(addr *int32) (val int32)

// LoadInt64 atomically loads *addr.
// Consider using the more ergonomic and less error-prone [Int64.Load] instead
// (particularly if you target 32-bit platforms; see the bugs section).
func LoadInt64(addr *int64) (val int64)

// LoadUint32 atomically loads *addr.
// Consider using the more ergonomic and less error-prone [Uint32.Load] instead.
func LoadUint32(addr *uint32) (val uint32)

// LoadUint64 atomically loads *addr.
// Consider using the more ergonomic and less error-prone [Uint64.Load] instead
// (particularly if you target 32-bit platforms; see the bugs section).
func LoadUint64(addr *uint64) (val uint64)

// LoadUintptr atomically loads *addr.
// Consider using the more ergonomic and less error-prone [Uintptr.Load] instead.
func LoadUintptr(addr *uintptr) (val uintptr)

// LoadPointer atomically loads *addr.
// Consider using the more ergonomic and less error-prone [Pointer.Load] instead.
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)

Store

Store 方法会把一个值存入到指定的 addr 地址中,即使在多处理器、多核、有 CPU cache 的情况下,这个操作也能保证 Store 是一个原子操作。别的 goroutine 通过 Load 读取出来,不会看到存取了一半的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// StoreInt32 atomically stores val into *addr.
// Consider using the more ergonomic and less error-prone [Int32.Store] instead.
func StoreInt32(addr *int32, val int32)

// StoreInt64 atomically stores val into *addr.
// Consider using the more ergonomic and less error-prone [Int64.Store] instead
// (particularly if you target 32-bit platforms; see the bugs section).
func StoreInt64(addr *int64, val int64)

// StoreUint32 atomically stores val into *addr.
// Consider using the more ergonomic and less error-prone [Uint32.Store] instead.
func StoreUint32(addr *uint32, val uint32)

// StoreUint64 atomically stores val into *addr.
// Consider using the more ergonomic and less error-prone [Uint64.Store] instead
// (particularly if you target 32-bit platforms; see the bugs section).
func StoreUint64(addr *uint64, val uint64)

// StoreUintptr atomically stores val into *addr.
// Consider using the more ergonomic and less error-prone [Uintptr.Store] instead.
func StoreUintptr(addr *uintptr, val uintptr)

// StorePointer atomically stores val into *addr.
// Consider using the more ergonomic and less error-prone [Pointer.Store] instead.
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)

Value 类型

刚刚说的都是一些比较常见的类型,其实,atomic 还提供了一个特殊的类型:Value。它可以原子地存取对象类型,但也只能存取,不能 CAS 和 Swap,常常用在配置变更等场景中。

接下来,我以一个配置变更的例子,来演示 Value 类型的使用。这里定义了一个 Value 类型的变量 config, 用来存储配置信息。

首先,我们启动一个 goroutine,然后让它随机 sleep 一段时间,之后就变更一下配置,并通过我们前面学到的 Cond 并发原语,通知其它的 reader 去加载新的配置。

接下来,我们启动一个 goroutine 等待配置变更的信号,一旦有变更,它就会加载最新的配置。通过这个例子,你可以了解到 Value 的 Store/Load 方法的使用,因为它只有这两个方法,只要掌握了它们的使用,你就完全掌握了 Value 类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
type Config struct {
NodeName string
Addr string
Count int32
}

func loadNewConfig() Config {
return Config{
NodeName: "北京",
Addr: "10.77.95.27",
Count: rand.Int31(),
}
}
func main() {
var config atomic.Value
config.Store(loadNewConfig())
var cond = sync.NewCond(&sync.Mutex{})

// 设置新的config
go func() {
for {
time.Sleep(time.Duration(5+rand.Int63n(5)) * time.Second)
config.Store(loadNewConfig())
cond.Broadcast() // 通知等待着配置已变更
}
}()

go func() {
for {
cond.L.Lock()
cond.Wait() // 等待变更信号
c := config.Load().(Config) // 读取新的配置
fmt.Printf("new config: %+v\n", c)
cond.L.Unlock()
}
}()

select {}
}

第三方库的扩展

使用 atomic 实现 Lock-Free queue

atomic 常常用来实现 Lock-Free 的数据结构,这次我会给你展示一个 Lock-Free queue 的实现。Lock-Free queue 最出名的就是 Maged M. Michael 和 Michael L. Scott 1996 年发表的论文中的算法,算法比较简单,容易实现,伪代码的每一行都提供了注释,我就不在这里贴出伪代码了,因为我们使用 Go 实现这个数据结构的代码几乎和伪代码一样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package queue
import (
"sync/atomic"
"unsafe"
)
// lock-free的queue
type LKQueue struct {
head unsafe.Pointer
tail unsafe.Pointer
}
// 通过链表实现,这个数据结构代表链表中的节点
type node struct {
value interface{}
next unsafe.Pointer
}
func NewLKQueue() *LKQueue {
n := unsafe.Pointer(&node{})
return &LKQueue{head: n, tail: n}
}
// 入队
func (q *LKQueue) Enqueue(v interface{}) {
n := &node{value: v}
for {
tail := load(&q.tail)
next := load(&tail.next)
if tail == load(&q.tail) { // 尾还是尾
if next == nil { // 还没有新数据入队
if cas(&tail.next, next, n) { //增加到队尾
cas(&q.tail, tail, n) //入队成功,移动尾巴指针
return
}
} else { // 已有新数据加到队列后面,需要移动尾指针
cas(&q.tail, tail, next)
}
}
}
}
// 出队,没有元素则返回nil
func (q *LKQueue) Dequeue() interface{} {
for {
head := load(&q.head)
tail := load(&q.tail)
next := load(&head.next)
if head == load(&q.head) { // head还是那个head
if head == tail { // head和tail一样
if next == nil { // 说明是空队列
return nil
}
// 只是尾指针还没有调整,尝试调整它指向下一个
cas(&q.tail, tail, next)
} else {
// 读取出队的数据
v := next.value
// 既然要出队了,头指针移动到下一个
if cas(&q.head, head, next) {
return v // Dequeue is done. return
}
}
}
}
}

// 将unsafe.Pointer原子加载转换成node
func load(p *unsafe.Pointer) (n *node) {
return (*node)(atomic.LoadPointer(p))
}

// 封装CAS,避免直接将*node转换成unsafe.Pointer
func cas(p *unsafe.Pointer, old, new *node) (ok bool) {
return atomic.CompareAndSwapPointer(
p, unsafe.Pointer(old), unsafe.Pointer(new))
}

这个 lock-free 的实现使用了一个辅助头指针(head),头指针不包含有意义的数据,只是一个辅助的节点,这样的话,出队入队中的节点会更简单。入队的时候,通过 CAS 操作将一个元素添加到队尾,并且移动尾指针。出队的时候移除一个节点,并通过 CAS 操作移动 head 指针,同时在必要的时候移动尾指针

Mutext

018 年,Go 开发者将 fast path 和 slow path 拆成独立的方法,以便内联,提高性能。

2019 年也有一个 Mutex 的优化,虽然没有对 Mutex 做修改,但是,对于 Mutex 唤醒后持有锁的那个 waiter,调度器可以有更高的优先级去执行,这已经是很细致的性能优化了。为了避免代码过多,这里只列出当前的 Mutex 实现。

想要理解当前的 Mutex,我们需要好好泡一杯茶,仔细地品一品了。当然,现在的 Mutex 代码已经复杂得接近不可读的状态了,而且代码也非常长,删减后占了几乎三页纸。但是,作为第一个要详细介绍的同步原语,我还是希望能更清楚地剖析 Mutex 的实现,向你展示它的演化和为了一个貌似很小的 feature 不得不将代码变得非常复杂的原因。

e0c23794c8a1d355a7a183400c036276.jpg

当然,你也可以暂时略过这一段,以后慢慢品,只需要记住,Mutex 绝不容忍一个 goroutine 被落下,永远没有机会获取锁。不抛弃不放弃是它的宗旨,而且它也尽可能地让等待较长的 goroutine 更有机会获取到锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
type Mutex struct {
state int32
sema uint32
}

const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving // 从state字段中分出一个饥饿标记
mutexWaiterShift = iota

starvationThresholdNs = 1e6
)

func (m *Mutex) Lock() {
// Fast path: 幸运之路,一下就获取到了锁
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
// Slow path:缓慢之路,尝试自旋竞争或饥饿状态下饥饿goroutine竞争
m.lockSlow()
}

func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false // 此goroutine的饥饿标记
awoke := false // 唤醒标记
iter := 0 // 自旋次数
old := m.state // 当前的锁的状态
for {
// 锁是非饥饿状态,锁还没被释放,尝试自旋
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state // 再次获取锁的状态,之后会检查是否锁被释放了
continue
}
new := old
if old&mutexStarving == 0 {
new |= mutexLocked // 非饥饿状态,加锁
}
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift // waiter数量加1
}
if starving && old&mutexLocked != 0 {
new |= mutexStarving // 设置饥饿状态
}
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken // 新状态清除唤醒标记
}
// 成功设置新状态
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 原来锁的状态已释放,并且不是饥饿状态,正常请求到了锁,返回
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// 处理饥饿状态

// 如果以前就在队列里面,加入到队列头
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 阻塞等待
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 唤醒之后检查锁是否应该处于饥饿状态
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// 如果锁已经处于饥饿状态,直接抢到锁,返回
if old&mutexStarving != 0 {
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 有点绕,加锁并且将waiter数减1
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving // 最后一个waiter或者已经不饥饿了,清除饥饿标记
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
}

func (m *Mutex) Unlock() {
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
m.unlockSlow(new)
}
}

func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
old := new
for {
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
runtime_Semrelease(&m.sema, true, 1)
}
}

什么时候不进行自旋

  • 当前锁是饥饿状态
  • 当前没有上锁
  • 当前自旋次数带到设定限制

不同状态的取锁方式

  • 饥饿状态中一定要走先入先出队列等待唤醒
  • 非饥饿状态会直接抢锁

常见的 4 种错误场景

使用 Mutex 常见的错误场景有 4 类,分别是 Lock/Unlock 不是成对出现、Copy 已使用的 Mutex、重入和死锁。下面我们一一来看。

Lock/Unlock 不是成对出现

Lock/Unlock 没有成对出现,就意味着会出现死锁的情况,或者是因为 Unlock 一个未加锁的 Mutex 而导致 panic。

我们先来看看缺少 Unlock 的场景,常见的有三种情况:

代码中有太多的 if-else 分支,可能在某个分支中漏写了 Unlock;

在重构的时候把 Unlock 给删除了;

Unlock 误写成了 Lock。

在这种情况下,锁被获取之后,就不会被释放了,这也就意味着,其它的 goroutine 永远都没机会获取到锁。

我们再来看缺少 Lock 的场景,这就很简单了,一般来说就是误操作删除了 Lock。 比如先前使用 Mutex 都是正常的,结果后来其他人重构代码的时候,由于对代码不熟悉,或者由于开发者的马虎,把 Lock 调用给删除了,或者注释掉了。比如下面的代码,mu.Lock() 一行代码被删除了,直接 Unlock 一个未加锁的 Mutex 会 panic:

Copy 已使用的 Mutex

第二种误用是 Copy 已使用的 Mutex。在正式分析这个错误之前,我先交代一个小知识点,那就是 Package sync 的同步原语在使用后是不能复制的。

我们知道 Mutex 是最常用的一个同步原语,那它也是不能复制的。为什么呢?原因在于,Mutex 是一个有状态的对象,它的 state 字段记录这个锁的状态。如果你要复制一个已经加锁的 Mutex 给一个新的变量,那么新的刚初始化的变量居然被加锁了,这显然不符合你的期望,因为你期望的是一个零值的 Mutex。关键是在并发环境下,你根本不知道要复制的 Mutex 状态是什么,因为要复制的 Mutex 是由其它 goroutine 并发访问的,状态可能总是在变化。

重入

当一个线程获取锁时,如果没有其它线程拥有这个锁,那么,这个线程就成功获取到这个锁。之后,如果其它线程再请求这个锁,就会处于阻塞等待的状态。但是,如果拥有这把锁的线程再请求这把锁的话,不会阻塞,而是成功返回,所以叫可重入锁(有时候也叫做递归锁)。只要你拥有这把锁,你可以可着劲儿地调用,比如通过递归实现一些算法,调用者不会阻塞或者死锁。

了解了可重入锁的概念,那我们来看 Mutex 使用的错误场景。划重点了:Mutex 不是可重入的锁。想想也不奇怪,因为 Mutex 的实现中没有记录哪个 goroutine 拥有这把锁。

理论上,任何 goroutine 都可以随意地 Unlock 这把锁,所以没办法计算重入条件,毕竟,“臣妾做不到啊”!

死锁

接下来,我们来看第四种错误场景:死锁。

我先解释下什么是死锁。两个或两个以上的进程(或线程,goroutine)在执行过程中,因争夺共享资源而处于一种互相等待的状态,如果没有外部干涉,它们都将无法推进下去,此时,我们称系统处于死锁状态或系统产生了死锁。

我们来分析一下死锁产生的必要条件。如果你想避免死锁,只要破坏这四个条件中的一个或者几个,就可以了。

互斥: 至少一个资源是被排他性独享的,其他线程必须处于等待状态,直到资源被释放。

持有和等待:goroutine 持有一个资源,并且还在请求其它 goroutine 持有的资源,也就是咱们常说的“吃着碗里,看着锅里”的意思。

不可剥夺:资源只能由持有它的 goroutine 来释放。

环路等待:一般来说,存在一组等待进程,P={P1,P2,…,PN},P1 等待 P2 持有的资源,P2 等待 P3 持有的资源,依此类推,最后是 PN 等待 P1 持有的资源,这就形成了一个环路等待的死结。

Go 运行时,有死锁探测的功能,能够检查出是否出现了死锁的情况,如果出现了,这个时候你就需要调整策略来处理了。

你可以引入一个第三方的锁,大家都依赖这个锁进行业务处理,比如现在政府推行的一站式政务服务中心。或者是解决持有等待问题,物业不需要看到派出所的证明才给开物业证明,等等。

WaitGroup

首先,我们看看 WaitGroup 的数据结构。它包括了一个 noCopy 的辅助字段,一个 state1 记录 WaitGroup 状态的数组。

noCopy 的辅助字段,主要就是辅助 vet 工具检查是否通过 copy 赋值这个 WaitGroup 实例。我会在后面和你详细分析这个字段;

state1,一个具有复合意义的字段,包含 WaitGroup 的计数、阻塞在检查点的 waiter 数和信号量。

WaitGroup 的数据结构定义以及 state 信息的获取方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type WaitGroup struct {
// 避免复制使用的一个技巧,可以告诉vet工具违反了复制使用的规则
noCopy noCopy
// 64bit(8bytes)的值分成两段,高32bit是计数值,低32bit是waiter的计数
// 另外32bit是用作信号量的
// 因为64bit值的原子操作需要64bit对齐,但是32bit编译器不支持,所以数组中的元素在不同的架构中不一样,具体处理看下面的方法
// 总之,会找到对齐的那64bit作为state,其余的32bit做信号量
state1 [3]uint32
}


// 得到state的地址和信号量的地址
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
// 如果地址是64bit对齐的,数组前两个元素做state,后一个元素做信号量
return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
} else {
// 如果地址是32bit对齐的,数组后两个元素用来做state,它可以用来做64bit的原子操作,第一个元素32bit用来做信号量
return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
}
}

因为对 64 位整数的原子操作要求整数的地址是 64 位对齐的,所以针对 64 位和 32 位环境的 state 字段的组成是不一样的。在 64 位环境下,state1 的第一个元素是 waiter 数,第二个元素是 WaitGroup 的计数值,第三个元素是信号量。在 32 位环境下,如果 state1 不是 64 位对齐的地址,那么 state1 的第一个元素是信号量,后两个元素分别是 waiter 数和计数值。然后,我们继续深入源码,看一下 Add、Done 和 Wait 这三个方法的实现。

Add 方法主要操作的是 state 的计数部分。你可以为计数值增加一个 delta 值,内部通过原子操作把这个值加到计数值上。需要注意的是,这个 delta 也可以是个负数,相当于为计数值减去一个值,Done 方法内部其实就是通过 Add(-1) 实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
// 高32bit是计数值v,所以把delta左移32,增加到计数上
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32) // 当前计数值
w := uint32(state) // waiter count

if v > 0 || w == 0 {
return
}

// 如果计数值v为0并且waiter的数量w不为0,那么state的值就是waiter的数量
// 将waiter的数量设置为0,因为计数值v也是0,所以它们俩的组合*statep直接设置为0即可。此时需要并唤醒所有的waiter
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}


// Done方法实际就是计数器减1
func (wg *WaitGroup) Done() {
wg.Add(-1)
}

Wait 方法的实现逻辑是:不断检查 state 的值。如果其中的计数值变为了 0,那么说明所有的任务已完成,调用者不必再等待,直接返回。如果计数值大于 0,说明此时还有任务没完成,那么调用者就变成了等待者,需要加入 waiter 队列,并且阻塞住自己。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()

for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32) // 当前计数值
w := uint32(state) // waiter的数量
if v == 0 {
// 如果计数值为0, 调用这个方法的goroutine不必再等待,继续执行它后面的逻辑即可
return
}
// 否则把waiter数量加1。期间可能有并发调用Wait的情况,所以最外层使用了一个for循环
if atomic.CompareAndSwapUint64(statep, state, state+1) {
// 阻塞休眠等待
runtime_Semacquire(semap)
// 被唤醒,不再阻塞,返回
return
}
}
}

使用 WaitGroup 时的常见错误

常见问题一:计数器设置为负值

WaitGroup 的计数器的值必须大于等于 0。我们在更改这个计数值的时候,WaitGroup 会先做检查,如果计数值被设置为负数,就会导致 panic。

不可add写为负数

不可done 比add 多

常见问题二:不期望的 Add 时机

在使用 WaitGroup 的时候,你一定要遵循的原则就是,等所有的 Add 方法调用之后再调用 Wait,否则就可能导致 panic 或者不期望的结果。

我们构造这样一个场景:只有部分的 Add/Done 执行完后,Wait 就返回。我们看一个例子:启动四个 goroutine,每个 goroutine 内部调用 Add(1) 然后调用 Done(),主 goroutine 调用 Wait 等待任务完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func main() {
var wg sync.WaitGroup
go dosomething(100, &wg) // 启动第一个goroutine
go dosomething(110, &wg) // 启动第二个goroutine
go dosomething(120, &wg) // 启动第三个goroutine
go dosomething(130, &wg) // 启动第四个goroutine

wg.Wait() // 主goroutine等待完成
fmt.Println("Done")
}

func dosomething(millisecs time.Duration, wg *sync.WaitGroup) {
duration := millisecs * time.Millisecond
time.Sleep(duration) // 故意sleep一段时间

wg.Add(1)
fmt.Println("后台执行, duration:", duration)
wg.Done()
}

在这个例子中,我们原本设想的是,等四个 goroutine 都执行完毕后输出 Done 的信息,但是它的错误之处在于,将 WaitGroup.Add 方法的调用放在了子 gorotuine 中。等主 goorutine 调用 Wait 的时候,因为四个任务 goroutine 一开始都休眠,所以可能 WaitGroup 的 Add 方法还没有被调用,WaitGroup 的计数还是 0,所以它并没有等待四个子 goroutine 执行完毕才继续执行,而是立刻执行了下一步。

导致这个错误的原因是,没有遵循先完成所有的 Add 之后才 Wait。要解决这个问题,一个方法是,预先设置计数值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func main() {
var wg sync.WaitGroup
wg.Add(4) // 预先设定WaitGroup的计数值

go dosomething(100, &wg) // 启动第一个goroutine
go dosomething(110, &wg) // 启动第二个goroutine
go dosomething(120, &wg) // 启动第三个goroutine
go dosomething(130, &wg) // 启动第四个goroutine

wg.Wait() // 主goroutine等待
fmt.Println("Done")
}

func dosomething(millisecs time.Duration, wg *sync.WaitGroup) {
duration := millisecs * time.Millisecond
time.Sleep(duration)

fmt.Println("后台执行, duration:", duration)
wg.Done()
}

另一种方法是在启动子 goroutine 之前才调用 Add:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func main() {
var wg sync.WaitGroup

dosomething(100, &wg) // 调用方法,把计数值加1,并启动任务goroutine
dosomething(110, &wg) // 调用方法,把计数值加1,并启动任务goroutine
dosomething(120, &wg) // 调用方法,把计数值加1,并启动任务goroutine
dosomething(130, &wg) // 调用方法,把计数值加1,并启动任务goroutine

wg.Wait() // 主goroutine等待,代码逻辑保证了四次Add(1)都已经执行完了
fmt.Println("Done")
}

func dosomething(millisecs time.Duration, wg *sync.WaitGroup) {
wg.Add(1) // 计数值加1,再启动goroutine

go func() {
duration := millisecs * time.Millisecond
time.Sleep(duration)
fmt.Println("后台执行, duration:", duration)
wg.Done()
}()
}

常见问题三:前一个 Wait 还没结束就重用 WaitGroup

只要 WaitGroup 的计数值恢复到零值的状态,那么它就可以被看作是新创建的 WaitGroup,被重复使用。

但是,如果我们在 WaitGroup 的计数值还没有恢复到零值的时候就重用,就会导致程序 panic。我们看一个例子,初始设置 WaitGroup 的计数值为 1,启动一个 goroutine 先调用 Done 方法,接着就调用 Add 方法,Add 方法有可能和主 goroutine 并发执行。

1
2
3
4
5
6
7
8
9
10
func main() {
var wg sync.WaitGroup
wg.Add(1)
go func() {
time.Sleep(time.Millisecond)
wg.Done() // 计数器减1
wg.Add(1) // 计数值加1
}()
wg.Wait() // 主goroutine等待,有可能和第7行并发执行
}

在这个例子中,第 6 行虽然让 WaitGroup 的计数恢复到 0,但是因为第 9 行有个 waiter 在等待,如果等待 Wait 的 goroutine,刚被唤醒就和 Add 调用(第 7 行)有并发执行的冲突,所以就会出现 panic。

WaitGroup 虽然可以重用,但是是有一个前提的,那就是必须等到上一轮的 Wait 完成之后,才能重用 WaitGroup 执行下一轮的 Add/Wait,如果你在 Wait 还没执行完的时候就调用下一轮 Add 方法,就有可能出现 panic。

noCopy:辅助 vet 检查

我们刚刚在学习 WaitGroup 的数据结构时,提到了里面有一个 noCopy 字段。你还记得它的作用吗?其实,它就是指示 vet 工具在做检查的时候,这个数据结构不能做值复制使用。更严谨地说,是不能在第一次使用之后复制使用 ( must not be copied after first use)。

你可能会说了,为什么要把 noCopy 字段单独拿出来讲呢?一方面,把 noCopy 字段穿插到 waitgroup 代码中讲解,容易干扰我们对 WaitGroup 整体的理解。另一方面,也是非常重要的原因,noCopy 是一个通用的计数技术,其他并发原语中也会用到,所以单独介绍有助于你以后在实践中使用这个技术。

但是,WaitGroup 同步原语不就是 Add、Done 和 Wait 方法吗?vet 能检查出来吗?其实是可以的。通过给 WaitGroup 添加一个 noCopy 字段,我们就可以为 WaitGroup 实现 Locker 接口,这样 vet 工具就可以做复制检查了。而且因为 noCopy 字段是未输出类型,所以 WaitGroup 不会暴露 Lock/Unlock 方法。

noCopy 字段的类型是 noCopy,它只是一个辅助的、用来帮助 vet 检查用的类型:

1
2
3
4
5
6
type noCopy struct{}

// Lock is a no-op used by -copylocks checker from `go vet`.
func (*noCopy) Lock() {}
func (*noCopy) Unlock() {}

如果你想要自己定义的数据结构不被复制使用,或者说,不能通过 vet 工具检查出复制使用的报警,就可以通过嵌入 noCopy 这个数据类型来实现。