我们知道了singleflight的用法,使用 singleflight 我们可以抑制多个请求,极大地节约带宽、增加系统吞吐量、提升性能。那么,singleflight 底层是如何实现的呢?本文我们来分析一番。
整体结构#
singleflight 的核心是将同时段的多个请求抑制,只有一个请求能够真正请求资源,其他请求都阻塞。上一篇提到,singleflight的公开api仅包括:
Group
对象: 它表示处理"相同数据"的一系列工作,在这里“重复请求”将会被抑制Result
对象: 表示执行真正业务逻辑的结果对象Do
方法: 执行请求抑制DoChan
方法: 与Do
相同,只是结果返回 <-chan Result
从这些api我们大致可以知道,调用 Do
或者 DoChan
方法就是我们所述的核心“请求”,可以猜测: 对于同一个 key
,首先调用的会执行真正的逻辑,方法返回之前的后续所有相同的 key
调用都会阻塞,当第一个请求返回后,阻塞的这些调用就直接使用其返回值作为自己的返回值了。
顺着上述猜想的逻辑,我们看看singleflight的源码实现。代码不多,算上注释一共就200来行,我们来一一分析。
Group struct#
首先看看 Group
的代码:
1
2
3
4
| type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}
|
Group
表示处理相同数据的一系列工作,这些工作存储到一个 map[string]*call
的结构中,为了保证并发安全,Group
内部持有 sync.Mutex
锁用来保护这个 map
的读写。
Group
有一个非常重要的两个方法 Do
和 DoChan
, 在上一篇已经介绍过了。
再来回顾一下 Do
方法的定义:
1
| func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)
|
已经介绍过,这里再详细看看这些参数:
- key: 标记为同一请求的key,相同的key认为是相同的请求。这个key其实就是底层
map
的key,下边会详细介绍 fn
: 真正执行业务逻辑的方法,该方法没有任何参数且返回一个任意对象 interface{}
和一个 error
,这就是真正执行业务逻辑的标准方法,我们把 fn
方法称为业务方法
返回值:
v
: 就是 fn 方法返回的第一个值err
: fn 方法返回的第二值 errorshared
: 当抑制了其他请求,返回 true
, 也就是说将真正执行业务逻辑的请求返回结果共享给其他请求后,该值为 true
, 否则为 false
。简言之,如果有其他同 key
的 Do
方法调用被抑制了,则返回 true
,否则为 false
。
DoChan
方法与 Do
方法的区别在于返回值不同:
1
| func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result
|
DoChan
返回一个只读的 <-chan Result
,这用调用方就可以通过 chan
来接收结果。
call struct#
再来看看 singleflight 一个重要的结构 call
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| type call struct {
wg sync.WaitGroup
// val、err均表示Group的Do方法的返回值,它们在 WaitGroup 完成之前只能写入一次,
// WaitGroup 完成之后就只能读
val interface{}
err error
// dups 表示重复调用 Do 方法的次数,chans 表示抑制调用的返回chan,调用 DoChan 方法时
// 会向通道中写入结果,调用方读取
// 在 WaitGroup 完成之前会抑制持有 mutex,
// 在 WaitGroup 结束后,这些字段将被读取但不会被写入。
dups int
chans []chan<- Result
}
|
简单而言,call
表示 Do
或者 DoChan
方法中业务方法 fn
的调用,它内部持有 sync.WaitGroup
, 用来抑制请求,当首次执行业务方法 fn
时调用 WaitGroup.Add(1)
,重复请求则调用 WaitGroup.Wait()
方法阻塞它们,从而保证只会调用一次 fn
方法。
Do 方法详细实现#
再看看 Do
的实现代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
| func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
// 第一次创建map,call 会在后续创建
if g.m == nil {
g.m = make(map[string]*call)
}
// 如果 key 已经存在,说明是重复请求,记录重复次数
if c, ok := g.m[key]; ok {
c.dups++
g.mu.Unlock()
// 关键点,等待 fn 方法调用结束,然后就可以获取 call 上的返回值
c.wg.Wait()
// 处理错误
if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
// 返回结果
return c.val, c.err, true
}
// 创建 call
c := new(call)
// 调用业务方法 fn 时,WaitGroup 设置为 1,其他重复调用均会 wait
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
// 调用真正业务逻辑方法 fn
g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}
|
关键点在于 call
上的这个 WaitGroup
, 它是实现的关键。当可以调用业务方法 fn
时,首先创建 call
,然后设置 WaitGroup
并发通行数量为1。Group
上 map
如果存在相同的key,则说明调用方会重复调用,调用需要被抑制,那么先增加重复数量,然后调用 WaitGroup.Wait()
阻塞调用,当 fn
调用结束后会执行 WaitGroup.Done()
(下文有介绍),然后重复调用阻塞结束,返回 call
上的结果和错误。每次调用 Do
方法都需要先加锁,再解锁,保证并发读写原子性。
再来看看业务方法 fn
是如何调用的,也就是 doCall()
方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
| func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
normalReturn := false
recovered := false
// 使用两次 defer 来区分错误
defer func() {
if !normalReturn && !recovered {
c.err = errGoexit
}
g.mu.Lock()
defer g.mu.Unlock()
// fn 调用结束,WaitGroup done,阻塞调用可以返回了
c.wg.Done()
// 调用完成后删除
if g.m[key] == c {
delete(g.m, key)
}
if e, ok := c.err.(*panicError); ok {
// 确保panic不能被recover,防止 chan 永久阻塞
if len(c.chans) > 0 {
go panic(e)
select {} // 保留此 goroutine,以便它出现在crash dump中
} else {
panic(e)
}
} else if c.err == errGoexit {
} else {
// 正常返回,向 call 的 chans 写入结果
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
}
}()
func() {
defer func() {
if !normalReturn {
if r := recover(); r != nil {
c.err = newPanicError(r)
}
}
}()
// 调用 fn
c.val, c.err = fn()
normalReturn = true
}()
if !normalReturn {
recovered = true
}
}
|
整个方法虽然代码看起来多,其实都是在区分和处理错误,真正的逻辑其实就一句:
调用 fn
,并将返回值和错误赋值给 call
,最后的结果和错误处理都在 defer 的匿名函数中,defer 中会调用 call
上 WaitGroup
的 Done
方法,被阻塞的调用就可以直接获取到call
上的返回结果。
DoChan 详细实现#
看完了 Do
方法,那么 DoChan
方法的实现就很简单了:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
ch := make(chan Result, 1)
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
c.chans = append(c.chans, ch)
g.mu.Unlock()
return ch
}
c := &call{chans: []chan<- Result{ch}}
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
go g.doCall(c, key, fn)
return ch
}
|
与 Do
方法逻辑类似,只是每次调用都会创建一个 chan
, 并放入 call
的 chans
属性中,并返回该 chan
给调用方。同样,只有第一个能够调用业务方法 fn
的调用会创建 call
。
其实在调用 Do
方法时,call
结构体中的 chans
属性都是 nil
。也就是用不到该属性,它是给 DoChan
方法设计的,在 doCall
方法中,向 chans
写入数据的逻辑前边已经介绍过:
1
2
3
4
| // 正常返回,向 call 的 chans 写入结果
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
|
至此,DoChan
方法的逻辑就很清楚了: 为每个调用方创建一个 chan
, 它们可以读取它来获得结果,重复调用读取 chan
被阻塞,直到第一次调用 fn
成功,此时后会向 chans
写入结果。由于 chan
本身是阻塞的,不再需要调用 WaitGroup.Wait()
了。
singleflight 的实现主要依赖 sync.WaitGroup
和 sync.Mutex
,利用 WaitGroup
限制并发请求数量,利用 Mutex
加锁保证并发安全性。DoChan
和 Do
方法的区别在于处理结果上,前者多了对于 chan
的管理。
相关阅读