想要写好Go并发不得不掌握的数据结构(4)

Context

如果我们遇到了下面的一些场景,也可以考虑使用 Context:

上下文信息传递 (request-scoped),比如处理 http 请求、在请求处理链路上传递信息;

控制子 goroutine 的运行;

超时控制的方法调用;

可以取消的方法调用。

所以,我们需要掌握 Context 的具体用法,这样才能在不影响主要业务流程实现的时候,实现一些通用的信息传递,或者是能够和其它 goroutine 协同工作,提供 timeout、cancel 等机制。

Context 基本使用方法

包 context 定义了 Context 接口,Context 的具体实现包括 4 个方法,分别是 Deadline、Done、Err 和 Value,如下所示:

1
2
3
4
5
6
type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}
}

Deadline 方法会返回这个 Context 被取消的截止日期。如果没有设置截止日期,ok 的值是 false。后续每次调用这个对象的 Deadline 方法时,都会返回和第一次调用相同的结果。

Done 方法返回一个 Channel 对象。在 Context 被取消时,此 Channel 会被 close,如果没被取消,可能会返回 nil。后续的 Done 调用总是返回相同的结果。当 Done 被 close 的时候,你可以通过 ctx.Err 获取错误信息。Done 这个方法名其实起得并不好,因为名字太过笼统,不能明确反映 Done 被 close 的原因,因为 cancel、timeout、deadline 都可能导致 Done 被 close,不过,目前还没有一个更合适的方法名称。

关于 Done 方法,你必须要记住的知识点就是:如果 Done 没有被 close,Err 方法返回 nil;如果 Done 被 close,Err 方法会返回 Done 被 close 的原因。

Value 返回此 ctx 中和指定的 key 相关联的 value。

  • context.Background():返回一个非 nil 的、空的 Context,没有任何值,不会被 cancel,不会超时,没有截止日期。一般用在主函数、初始化、测试以及创建根 Context 的时候。
  • context.TODO():返回一个非 nil 的、空的 Context,没有任何值,不会被 cancel,不会超时,没有截止日期。当你不清楚是否该用 Context,或者目前还不知道要传递一些什么上下文信息的时候,就可以使用这个方法。

创建特殊用途 Context 的方法

几种创建特殊用途 Context 的方法:WithValue、WithCancel、WithTimeout 和 WithDeadline,包括它们的功能以及实现方式。

WithValue

WithValue 基于 parent Context 生成一个新的 Context,保存了一个 key-value 键值对。它常常用来传递上下文。

1
2
3
4
type valueCtx struct {
Context
key, val interface{}
}

它持有一个 key-value 键值对,还持有 parent 的 Context。它覆盖了 Value 方法,优先从自己的存储中检查这个 key,不存在的话会从 parent 中继续检查。

Go 标准库实现的 Context 还实现了链式查找。如果不存在,还会向 parent Context 去查找,如果 parent 还是 valueCtx 的话,还是遵循相同的原则:valueCtx 会嵌入 parent,所以还是会查找 parent 的 Value 方法的。

WithCancel

WithCancel 方法返回 parent 的副本,只是副本中的 Done Channel 是新建的对象,它的类型是 cancelCtx。

我们常常在一些需要主动取消长时间的任务时,创建这种类型的 Context,然后把这个 Context 传给长时间执行任务的 goroutine。当需要中止任务时,我们就可以 cancel 这个 Context,这样长时间执行任务的 goroutine,就可以通过检查这个 Context,知道 Context 已经被取消了。

WithCancel 返回值中的第二个值是一个 cancel 函数。其实,这个返回值的名称(cancel)和类型(Cancel)也非常迷惑人。

记住,不是只有你想中途放弃,才去调用 cancel,只要你的任务正常完成了,就需要调用 cancel,这样,这个 Context 才能释放它的资源(通知它的 children 处理 cancel,从它的 parent 中把自己移除,甚至释放相关的 goroutine)。很多同学在使用这个方法的时候,都会忘记调用 cancel,切记切记,而且一定尽早释放。

1
2
3
4
5
6
7
8
9
10
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
c := newCancelCtx(parent)
propagateCancel(parent, &c)// 把c朝上传播
return &c, func() { c.cancel(true, Canceled) }
}

// newCancelCtx returns an initialized cancelCtx.
func newCancelCtx(parent Context) cancelCtx {
return cancelCtx{Context: parent}
}

代码中调用的 propagateCancel 方法会顺着 parent 路径往上找,直到找到一个 cancelCtx,或者为 nil。如果不为空,就把自己加入到这个 cancelCtx 的 child,以便这个 cancelCtx 被取消的时候通知自己。如果为空,会新起一个 goroutine,由它来监听 parent 的 Done 是否已关闭。

当这个 cancelCtx 的 cancel 函数被调用的时候,或者 parent 的 Done 被 close 的时候,这个 cancelCtx 的 Done 才会被 close。

cancel 是向下传递的,如果一个 WithCancel 生成的 Context 被 cancel 时,如果它的子 Context(也有可能是孙,或者更低,依赖子的类型)也是 cancelCtx 类型的,就会被 cancel,但是不会向上传递。parent Context 不会因为子 Context 被 cancel 而 cancel。

cancelCtx 被取消时,它的 Err 字段就是下面这个 Canceled 错误:

1
var Canceled = errors.New("context canceled")
如果取消,取消的是哪些context?

看下cancelCtx的源码:

1
2
3
4
5
6
7
8
9
10
11
12
// A cancelCtx can be canceled. When canceled, it also cancels any children
// that implement canceler.
type cancelCtx struct {
Context

mu sync.Mutex // protects following fields
done atomic.Value // of chan struct{}, created lazily, closed by first cancel call
children map[canceler]struct{} // set to nil by the first cancel call
err error // set to non-nil by the first cancel call
cause error // set to non-nil by the first cancel call
}

children 是一个字典存放自己所有的下级是cancelCtx的儿子

那么是如何实现的呢

1
2
3
4
5
6
7
8
9
func withCancel(parent Context) *cancelCtx {
if parent == nil {
panic("cannot create context from nil parent")
}
c := &cancelCtx{}
c.propagateCancel(parent, c)
return c
}

这是用父context来生成一个子CancelCtx

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// propagateCancel arranges for child to be canceled when parent is.
// It sets the parent context of cancelCtx.
func (c *cancelCtx) propagateCancel(parent Context, child canceler) {
c.Context = parent

done := parent.Done()
if done == nil {
return // parent is never canceled
}

select {
case <-done:
// parent is already canceled
child.cancel(false, parent.Err(), Cause(parent))
return
default:
}

if p, ok := parentCancelCtx(parent); ok {
// parent is a *cancelCtx, or derives from one.
p.mu.Lock()
if p.err != nil {
// parent has already been canceled
child.cancel(false, p.err, p.cause)
} else {
if p.children == nil {
p.children = make(map[canceler]struct{})
}
p.children[child] = struct{}{}
}
p.mu.Unlock()
return
}

if a, ok := parent.(afterFuncer); ok {
// parent implements an AfterFunc method.
c.mu.Lock()
stop := a.AfterFunc(func() {
child.cancel(false, parent.Err(), Cause(parent))
})
c.Context = stopCtx{
Context: parent,
stop: stop,
}
c.mu.Unlock()
return
}

goroutines.Add(1)
go func() {
select {
case <-parent.Done():
child.cancel(false, parent.Err(), Cause(parent))
case <-child.Done():
}
}()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// parentCancelCtx returns the underlying *cancelCtx for parent.
// It does this by looking up parent.Value(&cancelCtxKey) to find
// the innermost enclosing *cancelCtx and then checking whether
// parent.Done() matches that *cancelCtx. (If not, the *cancelCtx
// has been wrapped in a custom implementation providing a
// different done channel, in which case we should not bypass it.)
func parentCancelCtx(parent Context) (*cancelCtx, bool) {
done := parent.Done()
if done == closedchan || done == nil {
return nil, false
}
p, ok := parent.Value(&cancelCtxKey).(*cancelCtx)
if !ok {
return nil, false
}
pdone, _ := p.done.Load().(chan struct{})
if pdone != done {
return nil, false
}
return p, true
}

如果父亲ctx也是一个cancelCtx那么就将儿子的Cancel方法放入父亲的children字典集合中,那么我们看下Cancel方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// cancel closes c.done, cancels each of c's children, and, if
// removeFromParent is true, removes c from its parent's children.
// cancel sets c.cause to cause if this is the first time c is canceled.
func (c *cancelCtx) cancel(removeFromParent bool, err, cause error) {
if err == nil {
panic("context: internal error: missing cancel error")
}
if cause == nil {
cause = err
}
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return // already canceled
}
c.err = err
c.cause = cause
d, _ := c.done.Load().(chan struct{})
if d == nil {
c.done.Store(closedchan)
} else {
close(d)
}
for child := range c.children {
// NOTE: acquiring the child's lock while holding parent's lock.
child.cancel(false, err, cause)
}
c.children = nil
c.mu.Unlock()

if removeFromParent {
removeChild(c.Context, c)
}
}

可以看到在取消方法执行后会先去把自己的done chanel关闭,然后去取儿子字典集合中的cancel把自己的儿子的cancel方法进行执行。我们尝试去理解这个逻辑,首先要有一个概念,那就是context其实是一个子context指向父context的一个链表,每个父cancel context的儿子如果也是cancel context 那么父context的children中就会加入这个子context,我们尝试从最高级的父context进行取消那么可以思考到所有儿子cancel context的done 都会被关闭,我们再去思考如果从最低级的子context去取消那么取消的只会是子context自己。

WithTimeout

WithTimeout 其实是和 WithDeadline 一样,只不过一个参数是超时时间,一个参数是截止时间。超时时间加上当前时间,其实就是截止时间,因此,WithTimeout 的实现是:

1
2
3
4
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
// 当前时间+timeout就是deadline
return WithDeadline(parent, time.Now().Add(timeout))
}

WithDeadline

WithDeadline 会返回一个 parent 的副本,并且设置了一个不晚于参数 d 的截止时间,类型为 timerCtx(或者是 cancelCtx)。

如果它的截止时间晚于 parent 的截止时间,那么就以 parent 的截止时间为准,并返回一个类型为 cancelCtx 的 Context,因为 parent 的截止时间到了,就会取消这个 cancelCtx。

如果当前时间已经超过了截止时间,就直接返回一个已经被 cancel 的 timerCtx。否则就会启动一个定时器,到截止时间取消这个 timerCtx。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
// 如果parent的截止时间更早,直接返回一个cancelCtx即可
if cur, ok := parent.Deadline(); ok && cur.Before(d) {
return WithCancel(parent)
}
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: d,
}
propagateCancel(parent, c) // 同cancelCtx的处理逻辑
dur := time.Until(d)
if dur <= 0 { //当前时间已经超过了截止时间,直接cancel
c.cancel(true, DeadlineExceeded)
return c, func() { c.cancel(false, Canceled) }
}
c.mu.Lock()
defer c.mu.Unlock()
if c.err == nil {
// 设置一个定时器,到截止时间后取消
c.timer = time.AfterFunc(dur, func() {
c.cancel(true, DeadlineExceeded)
})
}
return c, func() { c.cancel(true, Canceled) }
}

和 cancelCtx 一样,WithDeadline(WithTimeout)返回的 cancel 一定要调用,并且要尽可能早地被调用,这样才能尽早释放资源,不要单纯地依赖截止时间被动取消。正确的使用姿势是啥呢?我们来看一个例子。

1
2
3
4
5
func slowOperationWithTimeout(ctx context.Context) (Result, error) {
ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel() // 一旦慢操作完成就立马调用cancel
return slowOperation(ctx)
}

Cond

Go 标准库提供 Cond 原语的目的是,为等待 / 通知场景下的并发问题提供支持。Cond 通常应用于等待某个条件的一组 goroutine,等条件变为 true 的时候,其中一个 goroutine 或者所有的 goroutine 都会被唤醒执行。

顾名思义,Cond 是和某个条件相关,这个条件需要一组 goroutine 协作共同完成,在条件还没有满足的时候,所有等待这个条件的 goroutine 都会被阻塞住,只有这一组 goroutine 通过协作达到了这个条件,等待的 goroutine 才可能继续进行下去。

那这里等待的条件是什么呢?等待的条件,可以是某个变量达到了某个阈值或者某个时间点,也可以是一组变量分别都达到了某个阈值,还可以是某个对象的状态满足了特定的条件。总结来讲,等待的条件是一种可以用来计算结果是 true 还是 false 的条件。

从开发实践上,我们真正使用 Cond 的场景比较少,因为一旦遇到需要使用 Cond 的场景,我们更多地会使用 Channel 的方式(我会在第 12 和第 13 讲展开 Channel 的用法)去实现,因为那才是更地道的 Go 语言的写法,甚至 Go 的开发者有个“把 Cond 从标准库移除”的提议(issue 21165)。而有的开发者认为,Cond 是唯一难以掌握的 Go 并发原语。至于其中原因,我先卖个关子,到这一讲的后半部分我再和你解释。

Cond 的实现

1
2
3
4
5
type Cond
func NeWCond(l Locker) *Cond
func (c *Cond) Broadcast()
func (c *Cond) Signal()
func (c *Cond) Wait()

首先,Cond 关联的 Locker 实例可以通过 c.L 访问,它内部维护着一个先入先出的等待队列

然后,我们分别看下它的三个方法 Broadcast、Signal 和 Wait 方法。

Signal 方法,允许调用者 Caller 唤醒一个等待此 Cond 的 goroutine。如果此时没有等待的 goroutine,显然无需通知 waiter;如果 Cond 等待队列中有一个或者多个等待的 goroutine,则需要从等待队列中移除第一个 goroutine 并把它唤醒。在其他编程语言中,比如 Java 语言中,Signal 方法也被叫做 notify 方法。

Broadcast 方法,允许调用者 Caller 唤醒所有等待此 Cond 的 goroutine。如果此时没有等待的 goroutine,显然无需通知 waiter;如果 Cond 等待队列中有一个或者多个等待的 goroutine,则清空所有等待的 goroutine,并全部唤醒。在其他编程语言中,比如 Java 语言中,Broadcast 方法也被叫做 notifyAll 方法。

同样地,调用 Broadcast 方法时,也不强求你一定持有 c.L 的锁。

Wait 方法,会把调用者 Caller 放入 Cond 的等待队列中并阻塞,直到被 Signal 或者 Broadcast 的方法从等待队列中移除并唤醒。

调用 Wait 方法时必须要持有 c.L 的锁。

知道了 Cond 提供的三个方法后,我们再通过一个百米赛跑开始时的例子,来学习下 Cond 的使用方法。10 个运动员进入赛场之后需要先做拉伸活动活动筋骨,向观众和粉丝招手致敬,在自己的赛道上做好准备;等所有的运动员都准备好之后,裁判员才会打响发令枪。每个运动员做好准备之后,将 ready 加一,表明自己做好准备了,同时调用 Broadcast 方法通知裁判员。

因为裁判员只有一个,所以这里可以直接替换成 Signal 方法调用。调用 Broadcast 方法的时候,我们并没有请求 c.L 锁,只是在更改等待变量的时候才使用到了锁。裁判员会等待运动员都准备好。虽然每个运动员准备好之后都唤醒了裁判员,但是裁判员被唤醒之后需要检查等待条件是否满足(运动员都准备好了)。可以看到,裁判员被唤醒之后一定要检查等待条件,如果条件不满足还是要继续等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func main() {
c := sync.NewCond(&sync.Mutex{})
var ready int

for i := 0; i < 10; i++ {
go func(i int) {
time.Sleep(time.Duration(rand.Int63n(10)) * time.Second)

// 加锁更改等待条件
c.L.Lock()
ready++
c.L.Unlock()

log.Printf("运动员#%d 已准备就绪\n", i)
// 广播唤醒所有的等待者
c.Broadcast()
}(i)
}

c.L.Lock()
for ready != 10 {
c.Wait()
log.Println("裁判员被唤醒一次")
}
c.L.Unlock()

//所有的运动员是否就绪
log.Println("所有运动员都准备就绪。比赛开始,3,2,1, ......")
}

它的复杂在于:一,这段代码有时候需要加锁,有时候可以不加;二,Wait 唤醒后需要检查条件;三,条件变量的更改,其实是需要原子操作或者互斥锁保护的。所以,有的开发者会认为,Cond 是唯一难以掌握的 Go 并发原语。

Cond 的实现原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
type Cond struct {
noCopy noCopy

// 当观察或者修改等待条件的时候需要加锁
L Locker

// 等待队列
notify notifyList
checker copyChecker
}

func NewCond(l Locker) *Cond {
return &Cond{L: l}
}

func (c *Cond) Wait() {
c.checker.check()
// 增加到等待队列中
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
// 阻塞休眠直到被唤醒
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}

func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}

func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}

runtime_notifyListXXX 是运行时实现的方法,实现了一个等待 / 通知的队列。如果你想深入学习这部分,可以再去看看 runtime/sema.go 代码中。

copyChecker 是一个辅助结构,可以在运行时检查 Cond 是否被复制使用。

Signal 和 Broadcast 只涉及到 notifyList 数据结构,不涉及到锁。

Wait 把调用者加入到等待队列时会释放锁,在被唤醒之后还会请求锁。

在阻塞休眠期间,调用者是不持有锁的,这样能让其他 goroutine 有机会检查或者更新等待变量。

使用 Cond 的 2 个常见错误

以前面百米赛跑的程序为例,在调用 cond.Wait 时,把前后的 Lock/Unlock 注释掉,如下面的代码中的第 20 行和第 25 行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func main() {
c := sync.NewCond(&sync.Mutex{})
var ready int

for i := 0; i < 10; i++ {
go func(i int) {
time.Sleep(time.Duration(rand.Int63n(10)) * time.Second)

// 加锁更改等待条件
c.L.Lock()
ready++
c.L.Unlock()

log.Printf("运动员#%d 已准备就绪\n", i)
// 广播唤醒所有的等待者
c.Broadcast()
}(i)
}

// c.L.Lock()
for ready != 10 {
c.Wait()
log.Println("裁判员被唤醒一次")
}
// c.L.Unlock()

//所有的运动员是否就绪
log.Println("所有运动员都准备就绪。比赛开始,3,2,1, ......")
}

出现这个问题的原因在于,cond.Wait 方法的实现是,把当前调用者加入到 notify 队列之中后会释放锁(如果不释放锁,其他 Wait 的调用者就没有机会加入到 notify 队列中了),然后一直等待;等调用者被唤醒之后,又会去争抢这把锁。如果调用 Wait 之前不加锁的话,就有可能 Unlock 一个未加锁的 Locker。所以切记,调用 cond.Wait 方法之前一定要加锁。

使用 Cond 的另一个常见错误是,只调用了一次 Wait,没有检查等待条件是否满足,结果条件没满足,程序就继续执行了。出现这个问题的原因在于,误以为 Cond 的使用,就像 WaitGroup 那样调用一下 Wait 方法等待那么简单。比如下面的代码中,把第 21 行和第 24 行注释掉:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func main() {
c := sync.NewCond(&sync.Mutex{})
var ready int

for i := 0; i < 10; i++ {
go func(i int) {
time.Sleep(time.Duration(rand.Int63n(10)) * time.Second)

// 加锁更改等待条件
c.L.Lock()
ready++
c.L.Unlock()

log.Printf("运动员#%d 已准备就绪\n", i)
// 广播唤醒所有的等待者
c.Broadcast()
}(i)
}

c.L.Lock()
// for ready != 10 {
c.Wait()
log.Println("裁判员被唤醒一次")
// }
c.L.Unlock()

//所有的运动员是否就绪
log.Println("所有运动员都准备就绪。比赛开始,3,2,1, ......")
}

运行这个程序,你会发现,可能只有几个运动员准备好之后程序就运行完了,而不是我们期望的所有运动员都准备好才进行下一步。原因在于,每一个运动员准备好之后都会唤醒所有的等待者,也就是这里的裁判员,比如第一个运动员准备好后就唤醒了裁判员,结果这个裁判员傻傻地没做任何检查,以为所有的运动员都准备好了,就继续执行了。

用例

Kubernetes 项目中定义了优先级队列 PriorityQueue 这样一个数据结构,用来实现 Pod 的调用。它内部有三个 Pod 的队列,即 activeQ、podBackoffQ 和 unschedulableQ,其中 activeQ 就是用来调度的活跃队列(heap)。

Pop 方法调用的时候,如果这个队列为空,并且这个队列没有 Close 的话,会调用 Cond 的 Wait 方法等待。

你可以看到,调用 Wait 方法的时候,调用者是持有锁的,并且被唤醒的时候检查等待条件(队列是否为空)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 从队列中取出一个元素
func (p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error) {
p.lock.Lock()
defer p.lock.Unlock()
for p.activeQ.Len() == 0 { // 如果队列为空
if p.closed {
return nil, fmt.Errorf(queueClosed)
}
p.cond.Wait() // 等待,直到被唤醒
}
......
return pInfo, err
}

当 activeQ 增加新的元素时,会调用条件变量的 Boradcast 方法,通知被 Pop 阻塞的调用者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 增加元素到队列中
func (p *PriorityQueue) Add(pod *v1.Pod) error {
p.lock.Lock()
defer p.lock.Unlock()
pInfo := p.newQueuedPodInfo(pod)
if err := p.activeQ.Add(pInfo); err != nil {//增加元素到队列中
klog.Errorf("Error adding pod %v to the scheduling queue: %v", nsNameForPod(pod), err)
return err
}
......
p.cond.Broadcast() //通知其它等待的goroutine,队列中有元素了

return nil
}

这个优先级队列被关闭的时候,也会调用 Broadcast 方法,避免被 Pop 阻塞的调用者永远 hang 住。

1
2
3
4
5
6
7
func (p *PriorityQueue) Close() {
p.lock.Lock()
defer p.lock.Unlock()
close(p.stop)
p.closed = true
p.cond.Broadcast() //关闭时通知等待的goroutine,避免它们永远等待
}

Once

Once 可以用来执行且仅仅执行一次动作,常常用于单例对象的初始化场景。

看一个初始化链接的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
"net"
"sync"
"time"
)

// 使用互斥锁保证线程(goroutine)安全
var connMu sync.Mutex
var conn net.Conn

func getConn() net.Conn {
connMu.Lock()
defer connMu.Unlock()

// 返回已创建好的连接
if conn != nil {
return conn
}

// 创建连接
conn, _ = net.DialTimeout("tcp", "baidu.com:80", 10*time.Second)
return conn
}

// 使用连接
func main() {
conn := getConn()
if conn == nil {
panic("conn is nil")
}
}

这种方式虽然实现起来简单,但是有性能问题。一旦连接创建好,每次请求的时候还是得竞争锁才能读取到这个连接,这是比较浪费资源的,因为连接如果创建好之后,其实就不需要锁的保护了。怎么办呢?

once这时候就派上用处了

Once 的使用场景

sync.Once 只暴露了一个方法 Do,你可以多次调用 Do 方法,但是只有第一次调用 Do 方法时 f 参数才会执行,这里的 f 是一个无参数无返回值的函数。

1
func (o *Once) Do(f func())

因为当且仅当第一次调用 Do 方法的时候参数 f 才会执行,即使第二次、第三次、第 n 次调用时 f 参数的值不一样,也不会被执行,比如下面的例子,虽然 f1 和 f2 是不同的函数,但是第二个函数 f2 就不会执行。

因为这里的 f 参数是一个无参数无返回的函数,所以你可能会通过闭包的方式引用外面的参数,比如:

1
2
3
4
5
6
7
8
var addr = "baidu.com"

var conn net.Conn
var err error

once.Do(func() {
conn, err = net.Dial("tcp", addr)
})

而且在实际的使用中,绝大多数情况下,你会使用闭包的方式去初始化外部的一个资源。你看,Once 的使用场景很明确,所以,在标准库内部实现中也常常能看到 Once 的身影。比如标准库内部cache的实现上,就使用了 Once 初始化 Cache 资源,包括 defaultDir 值的获取:

我给你重点介绍一下很值得我们学习的 math/big/sqrt.go 中实现的一个数据结构,它通过 Once 封装了一个只初始化一次的值:

1
2
3
4
5
6
7
8
9
10
11
12
13
 // 值是3.0或者0.0的一个数据结构
var threeOnce struct {
sync.Once
v *Float
}

// 返回此数据结构的值,如果还没有初始化为3.0,则初始化
func three() *Float {
threeOnce.Do(func() { // 使用Once初始化
threeOnce.v = NewFloat(3.0)
})
return threeOnce.v
}

Once 常常用来初始化单例资源,或者并发访问只需初始化一次的共享资源,或者在测试的时候初始化一次测试资源。

如何实现一个 Once?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type Once struct {
done uint32
m Mutex
}

func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 0 {
o.doSlow(f)
}
}


func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
// 双检查
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}

使用 Once 可能出现的 2 种错误

第一种错误:死锁

你已经知道了 Do 方法会执行一次 f,但是如果 f 中再次调用这个 Once 的 Do 方法的话,就会导致死锁的情况出现。这还不是无限递归的情况,而是的的确确的 Lock 的递归调用导致的死锁

1
2
3
4
5
6
7
8
func main() {
var once sync.Once
once.Do(func() {
once.Do(func() {
fmt.Println("初始化")
})
})
}

当然,想要避免这种情况的出现,就不要在 f 参数中调用当前的这个 Once,不管是直接的还是间接的。

第二种错误:未初始化

如果 f 方法执行的时候 panic,或者 f 执行初始化资源的时候失败了,这个时候,Once 还是会认为初次执行已经成功了,即使再次调用 Do 方法,也不会再次执行 f。

比如下面的例子,由于一些防火墙的原因,googleConn 并没有被正确的初始化,后面如果想当然认为既然执行了 Do 方法 googleConn 就已经初始化的话,会抛出空指针的错误:

1
2
3
4
5
6
7
8
9
10
11
12
func main() {
var once sync.Once
var googleConn net.Conn // 到Google网站的一个连接

once.Do(func() {
// 建立到google.com的连接,有可能因为网络的原因,googleConn并没有建立成功,此时它的值为nil
googleConn, _ = net.Dial("tcp", "google.com:80")
})
// 发送http请求
googleConn.Write([]byte("GET / HTTP/1.1\r\nHost: google.com\r\n Accept: */*\r\n\r\n"))
io.Copy(os.Stdout, googleConn)
}

既然执行过 Once.Do 方法也可能因为函数执行失败的原因未初始化资源,并且以后也没机会再次初始化资源,那么这种初始化未完成的问题该怎么解决呢?

这里我来告诉你一招独家秘笈,我们可以自己实现一个类似 Once 的并发原语,既可以返回当前调用 Do 方法是否正确完成,还可以在初始化失败后调用 Do 方法再次尝试初始化,直到初始化成功才不再初始化了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 一个功能更加强大的Once
type Once struct {
m sync.Mutex
done uint32
}
// 传入的函数f有返回值error,如果初始化失败,需要返回失败的error
// Do方法会把这个error返回给调用者
func (o *Once) Do(f func() error) error {
if atomic.LoadUint32(&o.done) == 1 { //fast path
return nil
}
return o.slowDo(f)
}
// 如果还没有初始化
func (o *Once) slowDo(f func() error) error {
o.m.Lock()
defer o.m.Unlock()
var err error
if o.done == 0 { // 双检查,还没有初始化
err = f()
if err == nil { // 初始化成功才将标记置为已初始化
atomic.StoreUint32(&o.done, 1)
}
}
return err
}

我们所做的改变就是 Do 方法和参数 f 函数都会返回 error,如果 f 执行失败,会把这个错误信息返回。

对 slowDo 方法也做了调整,如果 f 调用失败,我们不会更改 done 字段的值,这样后续的 goroutine 还会继续调用 f。如果 f 执行成功,才会修改 done 的值为 1。