想要写好Go并发不得不掌握的数据结构(3)
SingleFlight
SingleFlight 是 Go 开发组提供的一个扩展并发原语。它的作用是,在处理多个 goroutine 同时调用同一个函数的时候,只让一个 goroutine 去调用这个函数,等到这个 goroutine 返回结果的时候,再把结果返回给这几个同时调用的 goroutine,这样可以减少并发调用的数量。
其实,sync.Once 不是只在并发的时候保证只有一个 goroutine 执行函数 f,而是会保证永远只执行一次,而 SingleFlight 是每次调用都重新执行,并且在多个请求同时调用的时候只有一个执行。它们两个面对的场景是不同的,sync.Once 主要是用在单次初始化场景中,而 SingleFlight 主要用在合并并发请求的场景中,尤其是缓存场景
如果你学会了 SingleFlight,在面对秒杀等大并发请求的场景,而且这些请求都是读请求时,你就可以把这些请求合并为一个请求,这样,你就可以将后端服务的压力从 n 降到 1。尤其是在面对后端是数据库这样的服务的时候,采用 SingleFlight 可以极大地提高性能。那么,话不多说,就让我们开始学习 SingleFlight 吧。
实现原理
SingleFlight 使用互斥锁 Mutex 和 Map 来实现。Mutex 提供并发时的读写保护,Map 用来保存同一个 key 的正在处理(in flight)的请求。
SingleFlight 的数据结构是 Group,它提供了三个方法。Do, DoChan,Forget
- Do:这个方法执行一个函数,并返回函数执行的结果。你需要提供一个 key,对于同一个 key,在同一时间只有一个在执行,同一个 key 并发的请求会等待。第一个执行的请求返回的结果,就是它的返回结果。函数 fn 是一个无参的函数,返回一个结果或者 error,而 Do 方法会返回函数执行的结果或者是 error,shared 会指示 v 是否返回给多个请求。
- DoChan:类似 Do 方法,只不过是返回一个 chan,等 fn 函数执行完,产生了结果以后,就能从这个 chan 中接收这个结果。
- Forget:告诉 Group 忘记这个 key。这样一来,之后这个 key 请求会执行 f,而不是等待前一个未完成的 fn 函数的结果。
下面,我们来看具体的实现方法。
首先,SingleFlight 定义一个辅助对象 call,这个 call 就代表正在执行 fn 函数的请求或者是已经执行完的请求。Group 代表 SingleFlight。
1 | // 代表一个正在处理的请求,或者已经处理完的请求 |
我们只需要查看一个 Do 方法,DoChan 的处理方法是类似的。
1 | func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { |
doCall 方法会实际调用函数 fn:
1 | func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { |
在这段代码中,你要注意下第 7 行。在默认情况下,forgotten==false,所以第 8 行默认会被调用,也就是说,第一个请求完成后,后续的同一个 key 的请求又重新开始新一次的 fn 函数的调用。
应用场景
了解了 SingleFlight 的实现原理,下面我们来看看它都应用于什么场景中。
第一个是在net/lookup.go中,如果同时有查询同一个 host 的请求,lookupGroup 会把这些请求 merge 到一起,只需要一个请求就可以了:
1 | // lookupGroup merges LookupIPAddr calls together for lookups for the same |
第二个是 Go 在查询仓库版本信息时,将并发的请求合并成 1 个请求:
1 | func metaImportsForPrefix(importPrefix string, mod ModuleMode, security web.SecurityMode) (*urlpkg.URL, []metaImport, error) { |
需要注意的是,这里涉及到了缓存的问题。上面的代码会把结果放在缓存中,这也是常用的一种解决缓存击穿的例子。
设计缓存问题时,我们常常需要解决缓存穿透、缓存雪崩和缓存击穿问题。缓存击穿问题是指,在平常高并发的系统中,大量的请求同时查询一个 key 时,如果这个 key 正好过期失效了,就会导致大量的请求都打到数据库上。这就是缓存击穿。
用 SingleFlight 来解决缓存击穿问题再合适不过了。因为,这个时候,只要这些对同一个 key 的并发请求的其中一个到数据库中查询,就可以了,这些并发的请求可以共享同一个结果。因为是缓存查询,不用考虑幂等性问题。
事实上,在 Go 生态圈知名的缓存框架 groupcache 中,就使用了较早的 Go 标准库的 SingleFlight 实现。接下来,我就来给你介绍一下 groupcache 是如何使用 SingleFlight 解决缓存击穿问题的。groupcache 中的 SingleFlight 只有一个方法:
1 | func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) |
SingleFlight 的作用是,在加载一个缓存项的时候,合并对同一个 key 的 load 的并发请求:
1 | type Group struct { |
循环栅栏 CyclicBarrier
循环栅栏(CyclicBarrier),它常常应用于重复进行一组 goroutine 同时执行的场景中。
CyclicBarrier允许一组 goroutine 彼此等待,到达一个共同的执行点。同时,因为它可以被重复使用,所以叫循环栅栏。具体的机制是,大家都在栅栏前等待,等全部都到齐了,就抬起栅栏放行
你可能会觉得,CyclicBarrier 和 WaitGroup 的功能有点类似,确实是这样。不过,CyclicBarrier 更适合用在“固定数量的 goroutine 等待同一个执行点”的场景中,而且在放行 goroutine 之后,CyclicBarrier 可以重复利用,不像 WaitGroup 重用的时候,必须小心翼翼避免 panic。
如果使用 WaitGroup 实现的话,调用比较复杂,不像 CyclicBarrier 那么清爽。更重要的是,如果想重用 WaitGroup,你还要保证,将 WaitGroup 的计数值重置到 n 的时候不会出现并发问题。WaitGroup 更适合用在“一个 goroutine 等待一组 goroutine 到达同一个执行点”的场景中,或者是不需要重用的场景中。
实现原理
CyclicBarrier 有两个初始化方法:
第一个是 New 方法,它只需要一个参数,来指定循环栅栏参与者的数量;
第二个方法是 NewWithAction,它额外提供一个函数,可以在每一次到达执行点的时候执行一次。具体的时间点是在最后一个参与者到达之后,但是其它的参与者还未被放行之前。我们可以利用它,做放行之前的一些共享状态的更新等操作。
这两个方法的签名如下:
1 | func New(parties int) CyclicBarrier |
CyclicBarrier 是一个接口,定义的方法如下:
1 | type CyclicBarrier interface { |
循环栅栏的使用也很简单。循环栅栏的参与者只需调用 Await 等待,等所有的参与者都到达后,再执行下一步。当执行下一步的时候,循环栅栏的状态又恢复到初始的状态了,可以迎接下一轮同样多的参与者。
有一道非常经典的并发编程的题目,非常适合使用循环栅栏,下面我们来看一下。
并发趣题:一氧化二氢制造工厂
题目是这样的:有一个名叫大自然的搬运工的工厂,生产一种叫做一氧化二氢的神秘液体。这种液体的分子是由一个氧原子和两个氢原子组成的,也就是水。这个工厂有多条生产线,每条生产线负责生产氧原子或者是氢原子,每条生产线由一个 goroutine 负责。这些生产线会通过一个栅栏,只有一个氧原子生产线和两个氢原子生产线都准备好,才能生成出一个水分子,否则所有的生产线都会处于等待状态。也就是说,一个水分子必须由三个不同的生产线提供原子,而且水分子是一个一个按照顺序产生的,每生产一个水分子,就会打印出 HHO、HOH、OHH 三种形式的其中一种。HHH、OOH、OHO、HOO、OOO 都是不允许的。生产线中氢原子的生产线为 2N 条,氧原子的生产线为 N 条。
你可以先想一下,我们怎么来实现呢?首先,我们来定义一个 H2O 辅助数据类型,它包含两个信号量的字段和一个循环栅栏。
- semaH 信号量:控制氢原子。一个水分子需要两个氢原子,所以,氢原子的空槽数资源数设置为 2。
- semaO 信号量:控制氧原子。一个水分子需要一个氧原子,所以资源数的空槽数设置为 1。
- 循环栅栏:等待两个氢原子和一个氧原子填补空槽,直到任务完成。
1 | package water |
接下来,我们看看各条流水线的处理情况。流水线分为氢原子处理流水线和氧原子处理流水线,首先,我们先看一下氢原子的流水线:如果有可用的空槽,氢原子的流水线的处理方法是 hydrogen,hydrogen 方法就会占用一个空槽(h2o.semaH.Acquire),输出一个 H 字符,然后等待栅栏放行。等其它的 goroutine 填补了氢原子的另一个空槽和氧原子的空槽之后,程序才可以继续进行。
1 | func (h2o *H2O) hydrogen(releaseHydrogen func()) { |
然后是氧原子的流水线。氧原子的流水线处理方法是 oxygen, oxygen 方法是等待氧原子的空槽,然后输出一个 O,就等待栅栏放行。放行后,释放氧原子空槽位。
1 | func (h2o *H2O) oxygen(releaseOxygen func()) { |
在栅栏放行之前,只有两个氢原子的空槽位和一个氧原子的空槽位。只有等栅栏放行之后,这些空槽位才会被释放。栅栏放行,就意味着一个水分子组成成功。这个算法是不是正确呢?我们来编写一个单元测试检测一下。
1 | package water |
atomic
Mutex、RWMutex 等并发原语的实现时,你可以看到,最底层是通过 atomic 包中的一些原子操作来实现的
在很多场景中,使用并发原语实现起来比较复杂,而原子操作可以帮助我们更轻松地实现底层的优化。
原子操作的基础知识
Package sync/atomic 实现了同步算法底层的原子的内存操作原语,我们把它叫做原子操作原语,它提供了一些实现原子操作的方法。之所以叫原子操作,是因为一个原子在执行的时候,其它线程不会看到执行一半的操作结果。在其它线程看来,原子操作要么执行完了,要么还没有执行,就像一个最小的粒子 - 原子一样,不可分割。
CPU 提供了基础的原子操作,不过,不同架构的系统的原子操作是不一样的。对于单处理器单核系统来说,如果一个操作是由一个 CPU 指令来实现的,那么它就是原子操作,比如它的 XCHG 和 INC 等指令。如果操作是基于多条指令来实现的,那么,执行的过程中可能会被中断,并执行上下文切换,这样的话,原子性的保证就被打破了,因为这个时候,操作可能只执行了一半。
在多处理器多核系统中,原子操作的实现就比较复杂了。由于 cache 的存在,单个核上的单个指令进行原子操作的时候,你要确保其它处理器或者核不访问此原子操作的地址,或者是确保其它处理器或者核总是访问原子操作之后的最新的值。x86 架构中提供了指令前缀 LOCK,LOCK 保证了指令(比如 LOCK CMPXCHG op1、op2)不会受其它处理器或 CPU 核的影响,有些指令(比如 XCHG)本身就提供 Lock 的机制。不同的 CPU 架构提供的原子操作指令的方式也是不同的,比如对于多核的 MIPS 和 ARM,提供了 LL/SC(Load Link/Store Conditional)指令,可以帮助实现原子操作(ARMLL/SC 指令 LDREX 和 STREX)。因为不同的 CPU 架构甚至不同的版本提供的原子操作的指令是不同的,所以,要用一种编程语言实现支持不同架构的原子操作是相当有难度的。不过,还好这些都不需要你操心,因为 Go 提供了一个通用的原子操作的 API,将更底层的不同的架构下的实现封装成 atomic 包,提供了修改类型的原子操作(atomic read-modify-write,RMW)和加载存储类型的原子操作(Load 和 Store)的 API,稍后我会一一介绍。
atomic 原子操作的应用场景
使用 atomic 的一些方法,我们可以实现更底层的一些优化。如果使用 Mutex 等并发原语进行这些优化,虽然可以解决问题,但是这些并发原语的实现逻辑比较复杂,对性能还是有一定的影响的。
举个例子:假设你想在程序中使用一个标志(flag,比如一个 bool 类型的变量),来标识一个定时任务是否已经启动执行了,你会怎么做呢?
我们先来看看加锁的方法。如果使用 Mutex 和 RWMutex,在读取和设置这个标志的时候加锁,是可以做到互斥的、保证同一时刻只有一个定时任务在执行的,所以使用 Mutex 或者 RWMutex 是一种解决方案。
其实,这个场景中的问题不涉及到对资源复杂的竞争逻辑,只是会并发地读写这个标志,这类场景就适合使用 atomic 的原子操作。具体怎么做呢?你可以使用一个 uint32 类型的变量,如果这个变量的值是 0,就标识没有任务在执行,如果它的值是 1,就标识已经有任务在完成了。你看,是不是很简单呢?
再来看一个例子。假设你在开发应用程序的时候,需要从配置服务器中读取一个节点的配置信息。而且,在这个节点的配置发生变更的时候,你需要重新从配置服务器中拉取一份新的配置并更新。你的程序中可能有多个 goroutine 都依赖这份配置,涉及到对这个配置对象的并发读写,你可以使用读写锁实现对配置对象的保护。在大部分情况下,你也可以利用 atomic 实现配置对象的更新和加载。
atomic 原子操作还是实现 lock-free 数据结构的基石。
在实现 lock-free 的数据结构时,我们可以不使用互斥锁,这样就不会让线程因为等待互斥锁而阻塞休眠,而是让线程保持继续处理的状态。另外,不使用互斥锁的话,lock-free 的数据结构还可以提供并发的性能。
atomic 提供的方法
关于 atomic,还有一个地方你一定要记住,atomic 操作的对象是一个地址,你需要把可寻址的变量的地址作为参数传递给方法,而不是把变量的值传递给方法。
Add
1 | // AddInt32 atomically adds delta to *addr and returns the new value. |
Add 方法就是给第一个参数地址中的值增加一个 delta 值。
于有符号的整数来说,delta 可以是一个负数,相当于减去一个值。对于无符号的整数和 uinptr 类型来说,怎么实现减去一个值呢?毕竟,atomic 并没有提供单独的减法操作。
我来跟你说一种方法。你可以利用计算机补码的规则,把减法变成加法。以 uint32 类型为例:
1 | AddUint32(&x, ^uint32(c-1)). |
如果是对 uint64 的值进行操作,那么,就把上面的代码中的 uint32 替换成 uint64。尤其是减 1 这种特殊的操作,我们可以简化为:
1 | AddUint32(&x, ^uint32(0)) |
我们再来看看 CAS 方法。
CAS (CompareAndSwap)
以 int32 为例,我们学习一下 CAS 提供的功能。在 CAS 的方法签名中,需要提供要操作的地址、原数据值、新值,如下所示:
1 | func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool) |
这个方法会比较当前 addr 地址里的值是不是 old,如果不等于 old,就返回 false;如果等于 old,就把此地址的值替换成 new 值,返回 true。这就相当于“判断相等才替换”。
1 | // CompareAndSwapInt32 executes the compare-and-swap operation for an int32 value. |
Swap
如果不需要比较旧值,只是比较粗暴地替换的话,就可以使用 Swap 方法,它替换后还可以返回旧值
1 | // SwapInt32 atomically stores new into *addr and returns the previous *addr value. |
Load
Load 方法会取出 addr 地址中的值,即使在多处理器、多核、有 CPU cache 的情况下,这个操作也能保证 Load 是一个原子操作。
1 | // LoadInt32 atomically loads *addr. |
Store
Store 方法会把一个值存入到指定的 addr 地址中,即使在多处理器、多核、有 CPU cache 的情况下,这个操作也能保证 Store 是一个原子操作。别的 goroutine 通过 Load 读取出来,不会看到存取了一半的值。
1 | // StoreInt32 atomically stores val into *addr. |
Value 类型
刚刚说的都是一些比较常见的类型,其实,atomic 还提供了一个特殊的类型:Value。它可以原子地存取对象类型,但也只能存取,不能 CAS 和 Swap,常常用在配置变更等场景中。
接下来,我以一个配置变更的例子,来演示 Value 类型的使用。这里定义了一个 Value 类型的变量 config, 用来存储配置信息。
首先,我们启动一个 goroutine,然后让它随机 sleep 一段时间,之后就变更一下配置,并通过我们前面学到的 Cond 并发原语,通知其它的 reader 去加载新的配置。
接下来,我们启动一个 goroutine 等待配置变更的信号,一旦有变更,它就会加载最新的配置。通过这个例子,你可以了解到 Value 的 Store/Load 方法的使用,因为它只有这两个方法,只要掌握了它们的使用,你就完全掌握了 Value 类型。
1 | type Config struct { |
第三方库的扩展
使用 atomic 实现 Lock-Free queue
atomic 常常用来实现 Lock-Free 的数据结构,这次我会给你展示一个 Lock-Free queue 的实现。Lock-Free queue 最出名的就是 Maged M. Michael 和 Michael L. Scott 1996 年发表的论文中的算法,算法比较简单,容易实现,伪代码的每一行都提供了注释,我就不在这里贴出伪代码了,因为我们使用 Go 实现这个数据结构的代码几乎和伪代码一样:
1 | package queue |
这个 lock-free 的实现使用了一个辅助头指针(head),头指针不包含有意义的数据,只是一个辅助的节点,这样的话,出队入队中的节点会更简单。入队的时候,通过 CAS 操作将一个元素添加到队尾,并且移动尾指针。出队的时候移除一个节点,并通过 CAS 操作移动 head 指针,同时在必要的时候移动尾指针
Mutext
018 年,Go 开发者将 fast path 和 slow path 拆成独立的方法,以便内联,提高性能。
2019 年也有一个 Mutex 的优化,虽然没有对 Mutex 做修改,但是,对于 Mutex 唤醒后持有锁的那个 waiter,调度器可以有更高的优先级去执行,这已经是很细致的性能优化了。为了避免代码过多,这里只列出当前的 Mutex 实现。
想要理解当前的 Mutex,我们需要好好泡一杯茶,仔细地品一品了。当然,现在的 Mutex 代码已经复杂得接近不可读的状态了,而且代码也非常长,删减后占了几乎三页纸。但是,作为第一个要详细介绍的同步原语,我还是希望能更清楚地剖析 Mutex 的实现,向你展示它的演化和为了一个貌似很小的 feature 不得不将代码变得非常复杂的原因。
当然,你也可以暂时略过这一段,以后慢慢品,只需要记住,Mutex 绝不容忍一个 goroutine 被落下,永远没有机会获取锁。不抛弃不放弃是它的宗旨,而且它也尽可能地让等待较长的 goroutine 更有机会获取到锁。
1 | type Mutex struct { |
什么时候不进行自旋
- 当前锁是饥饿状态
- 当前没有上锁
- 当前自旋次数带到设定限制
不同状态的取锁方式
- 饥饿状态中一定要走先入先出队列等待唤醒
- 非饥饿状态会直接抢锁
常见的 4 种错误场景
使用 Mutex 常见的错误场景有 4 类,分别是 Lock/Unlock 不是成对出现、Copy 已使用的 Mutex、重入和死锁。下面我们一一来看。
Lock/Unlock 不是成对出现
Lock/Unlock 没有成对出现,就意味着会出现死锁的情况,或者是因为 Unlock 一个未加锁的 Mutex 而导致 panic。
我们先来看看缺少 Unlock 的场景,常见的有三种情况:
代码中有太多的 if-else 分支,可能在某个分支中漏写了 Unlock;
在重构的时候把 Unlock 给删除了;
Unlock 误写成了 Lock。
在这种情况下,锁被获取之后,就不会被释放了,这也就意味着,其它的 goroutine 永远都没机会获取到锁。
我们再来看缺少 Lock 的场景,这就很简单了,一般来说就是误操作删除了 Lock。 比如先前使用 Mutex 都是正常的,结果后来其他人重构代码的时候,由于对代码不熟悉,或者由于开发者的马虎,把 Lock 调用给删除了,或者注释掉了。比如下面的代码,mu.Lock() 一行代码被删除了,直接 Unlock 一个未加锁的 Mutex 会 panic:
Copy 已使用的 Mutex
第二种误用是 Copy 已使用的 Mutex。在正式分析这个错误之前,我先交代一个小知识点,那就是 Package sync 的同步原语在使用后是不能复制的。
我们知道 Mutex 是最常用的一个同步原语,那它也是不能复制的。为什么呢?原因在于,Mutex 是一个有状态的对象,它的 state 字段记录这个锁的状态。如果你要复制一个已经加锁的 Mutex 给一个新的变量,那么新的刚初始化的变量居然被加锁了,这显然不符合你的期望,因为你期望的是一个零值的 Mutex。关键是在并发环境下,你根本不知道要复制的 Mutex 状态是什么,因为要复制的 Mutex 是由其它 goroutine 并发访问的,状态可能总是在变化。
重入
当一个线程获取锁时,如果没有其它线程拥有这个锁,那么,这个线程就成功获取到这个锁。之后,如果其它线程再请求这个锁,就会处于阻塞等待的状态。但是,如果拥有这把锁的线程再请求这把锁的话,不会阻塞,而是成功返回,所以叫可重入锁(有时候也叫做递归锁)。只要你拥有这把锁,你可以可着劲儿地调用,比如通过递归实现一些算法,调用者不会阻塞或者死锁。
了解了可重入锁的概念,那我们来看 Mutex 使用的错误场景。划重点了:Mutex 不是可重入的锁。想想也不奇怪,因为 Mutex 的实现中没有记录哪个 goroutine 拥有这把锁。
理论上,任何 goroutine 都可以随意地 Unlock 这把锁,所以没办法计算重入条件,毕竟,“臣妾做不到啊”!
死锁
接下来,我们来看第四种错误场景:死锁。
我先解释下什么是死锁。两个或两个以上的进程(或线程,goroutine)在执行过程中,因争夺共享资源而处于一种互相等待的状态,如果没有外部干涉,它们都将无法推进下去,此时,我们称系统处于死锁状态或系统产生了死锁。
我们来分析一下死锁产生的必要条件。如果你想避免死锁,只要破坏这四个条件中的一个或者几个,就可以了。
互斥: 至少一个资源是被排他性独享的,其他线程必须处于等待状态,直到资源被释放。
持有和等待:goroutine 持有一个资源,并且还在请求其它 goroutine 持有的资源,也就是咱们常说的“吃着碗里,看着锅里”的意思。
不可剥夺:资源只能由持有它的 goroutine 来释放。
环路等待:一般来说,存在一组等待进程,P={P1,P2,…,PN},P1 等待 P2 持有的资源,P2 等待 P3 持有的资源,依此类推,最后是 PN 等待 P1 持有的资源,这就形成了一个环路等待的死结。
Go 运行时,有死锁探测的功能,能够检查出是否出现了死锁的情况,如果出现了,这个时候你就需要调整策略来处理了。
你可以引入一个第三方的锁,大家都依赖这个锁进行业务处理,比如现在政府推行的一站式政务服务中心。或者是解决持有等待问题,物业不需要看到派出所的证明才给开物业证明,等等。
WaitGroup
首先,我们看看 WaitGroup 的数据结构。它包括了一个 noCopy 的辅助字段,一个 state1 记录 WaitGroup 状态的数组。
noCopy 的辅助字段,主要就是辅助 vet 工具检查是否通过 copy 赋值这个 WaitGroup 实例。我会在后面和你详细分析这个字段;
state1,一个具有复合意义的字段,包含 WaitGroup 的计数、阻塞在检查点的 waiter 数和信号量。
WaitGroup 的数据结构定义以及 state 信息的获取方法如下:
1 | type WaitGroup struct { |
因为对 64 位整数的原子操作要求整数的地址是 64 位对齐的,所以针对 64 位和 32 位环境的 state 字段的组成是不一样的。在 64 位环境下,state1 的第一个元素是 waiter 数,第二个元素是 WaitGroup 的计数值,第三个元素是信号量。在 32 位环境下,如果 state1 不是 64 位对齐的地址,那么 state1 的第一个元素是信号量,后两个元素分别是 waiter 数和计数值。然后,我们继续深入源码,看一下 Add、Done 和 Wait 这三个方法的实现。
Add 方法主要操作的是 state 的计数部分。你可以为计数值增加一个 delta 值,内部通过原子操作把这个值加到计数值上。需要注意的是,这个 delta 也可以是个负数,相当于为计数值减去一个值,Done 方法内部其实就是通过 Add(-1) 实现的。
1 | func (wg *WaitGroup) Add(delta int) { |
Wait 方法的实现逻辑是:不断检查 state 的值。如果其中的计数值变为了 0,那么说明所有的任务已完成,调用者不必再等待,直接返回。如果计数值大于 0,说明此时还有任务没完成,那么调用者就变成了等待者,需要加入 waiter 队列,并且阻塞住自己。
1 | func (wg *WaitGroup) Wait() { |
使用 WaitGroup 时的常见错误
常见问题一:计数器设置为负值
WaitGroup 的计数器的值必须大于等于 0。我们在更改这个计数值的时候,WaitGroup 会先做检查,如果计数值被设置为负数,就会导致 panic。
不可add写为负数
不可done 比add 多
常见问题二:不期望的 Add 时机
在使用 WaitGroup 的时候,你一定要遵循的原则就是,等所有的 Add 方法调用之后再调用 Wait,否则就可能导致 panic 或者不期望的结果。
我们构造这样一个场景:只有部分的 Add/Done 执行完后,Wait 就返回。我们看一个例子:启动四个 goroutine,每个 goroutine 内部调用 Add(1) 然后调用 Done(),主 goroutine 调用 Wait 等待任务完成。
1 | func main() { |
在这个例子中,我们原本设想的是,等四个 goroutine 都执行完毕后输出 Done 的信息,但是它的错误之处在于,将 WaitGroup.Add 方法的调用放在了子 gorotuine 中。等主 goorutine 调用 Wait 的时候,因为四个任务 goroutine 一开始都休眠,所以可能 WaitGroup 的 Add 方法还没有被调用,WaitGroup 的计数还是 0,所以它并没有等待四个子 goroutine 执行完毕才继续执行,而是立刻执行了下一步。
导致这个错误的原因是,没有遵循先完成所有的 Add 之后才 Wait。要解决这个问题,一个方法是,预先设置计数值:
1 | func main() { |
另一种方法是在启动子 goroutine 之前才调用 Add:
1 | func main() { |
常见问题三:前一个 Wait 还没结束就重用 WaitGroup
只要 WaitGroup 的计数值恢复到零值的状态,那么它就可以被看作是新创建的 WaitGroup,被重复使用。
但是,如果我们在 WaitGroup 的计数值还没有恢复到零值的时候就重用,就会导致程序 panic。我们看一个例子,初始设置 WaitGroup 的计数值为 1,启动一个 goroutine 先调用 Done 方法,接着就调用 Add 方法,Add 方法有可能和主 goroutine 并发执行。
1 | func main() { |
在这个例子中,第 6 行虽然让 WaitGroup 的计数恢复到 0,但是因为第 9 行有个 waiter 在等待,如果等待 Wait 的 goroutine,刚被唤醒就和 Add 调用(第 7 行)有并发执行的冲突,所以就会出现 panic。
WaitGroup 虽然可以重用,但是是有一个前提的,那就是必须等到上一轮的 Wait 完成之后,才能重用 WaitGroup 执行下一轮的 Add/Wait,如果你在 Wait 还没执行完的时候就调用下一轮 Add 方法,就有可能出现 panic。
noCopy:辅助 vet 检查
我们刚刚在学习 WaitGroup 的数据结构时,提到了里面有一个 noCopy 字段。你还记得它的作用吗?其实,它就是指示 vet 工具在做检查的时候,这个数据结构不能做值复制使用。更严谨地说,是不能在第一次使用之后复制使用 ( must not be copied after first use)。
你可能会说了,为什么要把 noCopy 字段单独拿出来讲呢?一方面,把 noCopy 字段穿插到 waitgroup 代码中讲解,容易干扰我们对 WaitGroup 整体的理解。另一方面,也是非常重要的原因,noCopy 是一个通用的计数技术,其他并发原语中也会用到,所以单独介绍有助于你以后在实践中使用这个技术。
但是,WaitGroup 同步原语不就是 Add、Done 和 Wait 方法吗?vet 能检查出来吗?其实是可以的。通过给 WaitGroup 添加一个 noCopy 字段,我们就可以为 WaitGroup 实现 Locker 接口,这样 vet 工具就可以做复制检查了。而且因为 noCopy 字段是未输出类型,所以 WaitGroup 不会暴露 Lock/Unlock 方法。
noCopy 字段的类型是 noCopy,它只是一个辅助的、用来帮助 vet 检查用的类型:
1 | type noCopy struct{} |
如果你想要自己定义的数据结构不被复制使用,或者说,不能通过 vet 工具检查出复制使用的报警,就可以通过嵌入 noCopy 这个数据类型来实现。