我们知道了singleflight的用法,使用 singleflight 我们可以抑制多个请求,极大地节约带宽、增加系统吞吐量、提升性能。那么,singleflight 底层是如何实现的呢?本文我们来分析一番。

整体结构

singleflight 的核心是将同时段的多个请求抑制,只有一个请求能够真正请求资源,其他请求都阻塞。上一篇提到,singleflight的公开api仅包括:

  • Group 对象: 它表示处理"相同数据"的一系列工作,在这里“重复请求”将会被抑制
  • Result 对象: 表示执行真正业务逻辑的结果对象
  • Do 方法: 执行请求抑制
  • DoChan 方法: 与Do相同,只是结果返回 <-chan Result

从这些api我们大致可以知道,调用 Do 或者 DoChan 方法就是我们所述的核心“请求”,可以猜测: 对于同一个 key,首先调用的会执行真正的逻辑,方法返回之前的后续所有相同的 key 调用都会阻塞,当第一个请求返回后,阻塞的这些调用就直接使用其返回值作为自己的返回值了。

顺着上述猜想的逻辑,我们看看singleflight的源码实现。代码不多,算上注释一共就200来行,我们来一一分析。

Group struct

首先看看 Group 的代码:

1
2
3
4
type Group struct {
	mu sync.Mutex       // protects m
	m  map[string]*call // lazily initialized
}

Group 表示处理相同数据的一系列工作,这些工作存储到一个 map[string]*call 的结构中,为了保证并发安全,Group 内部持有 sync.Mutex 锁用来保护这个 map 的读写。

Group 有一个非常重要的两个方法 DoDoChan, 在上一篇已经介绍过了。

再来回顾一下 Do 方法的定义:

1
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)

已经介绍过,这里再详细看看这些参数:

  • key: 标记为同一请求的key,相同的key认为是相同的请求。这个key其实就是底层map的key,下边会详细介绍
  • fn: 真正执行业务逻辑的方法,该方法没有任何参数且返回一个任意对象 interface{} 和一个 error,这就是真正执行业务逻辑的标准方法,我们把 fn 方法称为业务方法

返回值:

  • v: 就是 fn 方法返回的第一个值
  • err: fn 方法返回的第二值 error
  • shared: 当抑制了其他请求,返回 true, 也就是说将真正执行业务逻辑的请求返回结果共享给其他请求后,该值为 true, 否则为 false。简言之,如果有其他同 keyDo 方法调用被抑制了,则返回 true,否则为 false

DoChan 方法与 Do 方法的区别在于返回值不同:

1
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result

DoChan 返回一个只读的 <-chan Result,这用调用方就可以通过 chan 来接收结果。

call struct

再来看看 singleflight 一个重要的结构 call:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
type call struct {
	wg sync.WaitGroup

	// val、err均表示Group的Do方法的返回值,它们在 WaitGroup 完成之前只能写入一次,
	// WaitGroup 完成之后就只能读
	val interface{}
	err error

	// dups 表示重复调用 Do 方法的次数,chans 表示抑制调用的返回chan,调用 DoChan 方法时
	// 会向通道中写入结果,调用方读取
	// 在 WaitGroup 完成之前会抑制持有 mutex,
	// 在 WaitGroup 结束后,这些字段将被读取但不会被写入。
	dups  int
	chans []chan<- Result
}

简单而言,call 表示 Do 或者 DoChan 方法中业务方法 fn 的调用,它内部持有 sync.WaitGroup, 用来抑制请求,当首次执行业务方法 fn 时调用 WaitGroup.Add(1),重复请求则调用 WaitGroup.Wait() 方法阻塞它们,从而保证只会调用一次 fn 方法。

Do 方法详细实现

再看看 Do 的实现代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
	g.mu.Lock()
	// 第一次创建map,call 会在后续创建
	if g.m == nil {
		g.m = make(map[string]*call)
	}
	// 如果 key 已经存在,说明是重复请求,记录重复次数
	if c, ok := g.m[key]; ok {
		c.dups++
		g.mu.Unlock()
		// 关键点,等待 fn 方法调用结束,然后就可以获取 call 上的返回值
		c.wg.Wait()

		// 处理错误
		if e, ok := c.err.(*panicError); ok {
			panic(e)
		} else if c.err == errGoexit {
			runtime.Goexit()
		}
		// 返回结果
		return c.val, c.err, true
	}
	// 创建 call
	c := new(call)
	// 调用业务方法 fn 时,WaitGroup 设置为 1,其他重复调用均会 wait
	c.wg.Add(1)
	g.m[key] = c
	g.mu.Unlock()

	// 调用真正业务逻辑方法 fn
	g.doCall(c, key, fn)
	return c.val, c.err, c.dups > 0
}

关键点在于 call 上的这个 WaitGroup, 它是实现的关键。当可以调用业务方法 fn 时,首先创建 call,然后设置 WaitGroup 并发通行数量为1。Groupmap 如果存在相同的key,则说明调用方会重复调用,调用需要被抑制,那么先增加重复数量,然后调用 WaitGroup.Wait() 阻塞调用,当 fn 调用结束后会执行 WaitGroup.Done() (下文有介绍),然后重复调用阻塞结束,返回 call 上的结果和错误。每次调用 Do 方法都需要先加锁,再解锁,保证并发读写原子性。

再来看看业务方法 fn 是如何调用的,也就是 doCall() 方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
	normalReturn := false
	recovered := false

	// 使用两次 defer 来区分错误
	defer func() {
		if !normalReturn && !recovered {
			c.err = errGoexit
		}

		g.mu.Lock()
		defer g.mu.Unlock()
		// fn 调用结束,WaitGroup done,阻塞调用可以返回了
		c.wg.Done()
		// 调用完成后删除
		if g.m[key] == c {
			delete(g.m, key)
		}

		if e, ok := c.err.(*panicError); ok {
			// 确保panic不能被recover,防止 chan 永久阻塞
			if len(c.chans) > 0 {
				go panic(e)
				select {} // 保留此 goroutine,以便它出现在crash dump中
			} else {
				panic(e)
			}
		} else if c.err == errGoexit {
		} else {
			// 正常返回,向 call 的 chans 写入结果
			for _, ch := range c.chans {
				ch <- Result{c.val, c.err, c.dups > 0}
			}
		}
	}()

	func() {
		defer func() {
			if !normalReturn {
				if r := recover(); r != nil {
					c.err = newPanicError(r)
				}
			}
		}()
		
		// 调用 fn
		c.val, c.err = fn()
		normalReturn = true
	}()

	if !normalReturn {
		recovered = true
	}
}

整个方法虽然代码看起来多,其实都是在区分和处理错误,真正的逻辑其实就一句:

1
c.val, c.err = fn()

调用 fn,并将返回值和错误赋值给 call,最后的结果和错误处理都在 defer 的匿名函数中,defer 中会调用 callWaitGroupDone 方法,被阻塞的调用就可以直接获取到call上的返回结果。

DoChan 详细实现

看完了 Do 方法,那么 DoChan 方法的实现就很简单了:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
	ch := make(chan Result, 1)
	g.mu.Lock()
	if g.m == nil {
		g.m = make(map[string]*call)
	}
	if c, ok := g.m[key]; ok {
		c.dups++
		c.chans = append(c.chans, ch)
		g.mu.Unlock()
		return ch
	}
	c := &call{chans: []chan<- Result{ch}}
	c.wg.Add(1)
	g.m[key] = c
	g.mu.Unlock()

	go g.doCall(c, key, fn)

	return ch
}

Do 方法逻辑类似,只是每次调用都会创建一个 chan, 并放入 callchans 属性中,并返回该 chan 给调用方。同样,只有第一个能够调用业务方法 fn 的调用会创建 call

其实在调用 Do 方法时,call 结构体中的 chans 属性都是 nil。也就是用不到该属性,它是给 DoChan 方法设计的,在 doCall 方法中,向 chans 写入数据的逻辑前边已经介绍过:

1
2
3
4
// 正常返回,向 call 的 chans 写入结果
for _, ch := range c.chans {
	ch <- Result{c.val, c.err, c.dups > 0}
}

至此,DoChan 方法的逻辑就很清楚了: 为每个调用方创建一个 chan, 它们可以读取它来获得结果,重复调用读取 chan 被阻塞,直到第一次调用 fn 成功,此时后会向 chans 写入结果。由于 chan 本身是阻塞的,不再需要调用 WaitGroup.Wait()了。

总结

singleflight 的实现主要依赖 sync.WaitGroupsync.Mutex,利用 WaitGroup 限制并发请求数量,利用 Mutex 加锁保证并发安全性。DoChanDo 方法的区别在于处理结果上,前者多了对于 chan 的管理。


相关阅读