在开发过程中,尤其是web server,有时候我们需要并发的请求数据,对于某些数据,同时发起的多个请求其实拿到的数据都是相同的,如果不处理这类请求,那么每个请求都会获取一遍数据,这无疑是对资源的浪费。比如要从数据库查询相同 id
的一条数据,并发的多个请求都会执行一次 sql
语句,增加了数据库的压力。
有没有一种方案,当多个请求同时发起后,只有第一个请求去获取数据,在它返回之前,其他请求都各自阻塞等待直到真正执行数据获取的请求返回后,直接拿它的结果?我们把这种将同时发起的多个请求转为一个请求去执行真正业务逻辑的情况称为“请求抑制”。在 Go 中,singleflight
就是用来处理请求抑制的。
简介
singleflight
包全路径为 golang.org/x/sync/singleflight
, 目前版本是 v0.7.0
。
前边提到,singleflight
用于抑制同一时间获取相同数据的重复请求。当存在多个重复请求时,singleflight
保证只有一个请求能执行,其他请求阻塞,直到前边的请求返回后直接将其返回值共享(shared
)给阻塞的这些请求。
在使用之前,首先要理解何为"重复请求",如何区分"相同数据"。
- 相同数据:指并发下当前时间段多个请求获取的数据是完全相同的,比如获取全局的配置、查询天气数据等。
- 重复请求:指处理相同数据时,在一个请求从发起到返回之前这段时间,又有其他多个请求发起,那么这些请求就是重复请求。
理解了这两点,现在我们来看看 singleflight
的用法。
用法
singleflight
整体设计比较简单,公开的 api
包括:
Group
对象: 它表示处理"相同数据"的一系列工作,在这里“重复请求”将会被抑制Result
对象: 表示执行真正业务逻辑的结果对象Do
方法: 执行请求抑制,后文详述DoChan
方法: 与Do
相同,只是结果返回<-chan Result
Do 方法
Do
方法表示执行请求抑制,其定义如下:
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
// ...
}
首先,它需要在 singleflight.Group
实例下执行,用于抑制这一组请求。
两个参数:
- key: 这个参数用来指示"相同数据",也就是说相同的key代表相同数据,区分相同数据是正确使用
singleflight
的关键。 fn
: 真正执行业务逻辑的方法,该方法返回一个任意对象interface{}
和一个error
返回结果:
v
: 就是 fn 方法返回的第一个值err
: fn 方法返回的第二值 errorshared
: 当抑制了其他请求,返回true
, 也就是说将真正执行业务逻辑的请求返回结果共享给其他请求后,该值为true
, 否则为false
DoChan 方法
DoChan
方法与 Do
类似,只是将结果封装为 Result
并返回它的 chan
:
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
// ...
}
显然,该方法是为了并发调用设计的,该方法调用后可以并发执行其他代码。
示例
假设需要从数据库查询相同 id 的文章,文章并不会频繁更新。如果同时有多个请求需要执行查询,没有请求抑制则数据库需要执行多次重复的sql,浪费性能。使用 singleflight
则可以只执行一次sql,并将结果共享给这些查询请求,是他们得到相同的结果。来看看代码实现:
首先,来编写一个方法模拟从数据库查询文章:
var count int32
func getArticle(id int) (article string, err error) {
atomic.AddInt32(&count, 1)
time.Sleep(time.Duration(count) * time.Millisecond)
return fmt.Sprintf("article: %d", id), nil
}
这里使用一个 count
全局变量来记录真正访问数据库的次数,方法中模拟请求次数越多等待越久。
然后,并发请求它:
func doDemo() {
time.AfterFunc(1*time.Second, func() {
atomic.AddInt32(&count, -count)
})
var (
wg sync.WaitGroup
now = time.Now()
n = 10
)
for i := 0; i < n; i++ {
wg.Add(1)
go func() {
var res string
var err error
var shared bool
res, err = getArticle(1)
if err != nil {
panic(err)
}
if res != "article: 1" {
panic("handle error")
}
fmt.Printf("article: %s, shared: %v\n", res, shared)
wg.Done()
}()
}
wg.Wait()
fmt.Printf("同时发起 %d 次请求, 真正查询次数: %d, 耗时: %s", n, count, time.Since(now))
}
在 main
方法中调用 doDemo()
,结果如下:
同时发起 10 次请求,真正查询次数: 10, 耗时: 10.392278s
现在,我们使用 singleflight
改造上边的 doDemo()
方法:
先添加一个方法:
func singleflightGetArticle(sg *singleflight.Group, id int) (string, error, bool) {
v, err, shared := sg.Do(fmt.Sprintf("%d", id), func() (interface{}, error) {
fmt.Println("getArticle")
return getArticle(id)
})
return v.(string), err, shared
}
该方法需要传递一个 singleflight.Group
实例,这样就可以调用其 Do
方法来抑制相同请求,key
为文章 id,也就是说相同id的文章认为是相同数据。
调用它:
func doDemo() {
time.AfterFunc(1*time.Second, func() {
atomic.AddInt32(&count, -count)
})
var (
wg sync.WaitGroup
now = time.Now()
n = 10
sg = &singleflight.Group{}
)
for i := 0; i < n; i++ {
wg.Add(1)
go func() {
var res string
var err error
var shared bool
res, err, shared = singleflightGetArticle(sg, 1)
if err != nil {
panic(err)
}
if res != "article: 1" {
panic("handle error")
}
fmt.Printf("article: %s, shared: %v\n", res, shared)
wg.Done()
}()
}
wg.Wait()
fmt.Printf("同时发起 %d 次请求, 真正查询次数: %d, 耗时: %s", n, count, time.Since(now))
}
该方法首先需要创建 singleflight.Group
实例。
再此执行结果:
同时发起 10 次请求,真正查询次数: 1, 耗时: 1.785056ms
可以看到真正执行数据库查询的只有一次请求。DoChan()
方法的调用与上边大同小异,唯一的区别在于需要自己去读取返回的 chan
。
使用场景
singleflight
的使用关键是要区分和识别出 “相同数据”,也就是要设计好 key
。其使用场景通常有:
- 数据懒加载: 首次请求加载数据,后来的请求直接查询该数据即可,
singleflight
可以避免多次去加载数据,比如加载全局配置等 - 冗余外部Api抑制:尤其是在微服务中,当存在多次调用外部api时,可以使用
singleflight
来避免冗余的API调用,比如向外部系统请求天气数据等 - 读多写少的数据查询:一些数据很少更新,但是读很频繁,也可以考虑使用
singleflight
来优化性能,但是要记录好日志以监控其行为和影响,避免一些异常情况
总结
并发时多个请求处理相同数据,可以使用 singleflight
来优化性能,避免一些不必要的操作。singleflight
还可以与缓存搭配使用,进一步提升系统性能和可靠性。