在掌握了 Goroutine 和 Channel 的基础知识后,我们需要了解 Go 标准库 sync 包提供的各种同步工具。这些工具能帮助我们更好地控制并发程序的执行流程,避免数据竞争,提高程序的可靠性。
sync.WaitGroup:等待一组 Goroutine 完成
基本用法
WaitGroup 用于等待一组 Goroutine 完成。它有三个方法:
Add(delta int):增加计数器Done():减少计数器(相当于Add(-1))Wait():阻塞直到计数器为 0
1package main
2
3import (
4 "fmt"
5 "sync"
6 "time"
7)
8
9func worker(id int, wg *sync.WaitGroup) {
10 defer wg.Done() // 函数结束时调用 Done()
11
12 fmt.Printf("Worker %d starting\n", id)
13 time.Sleep(time.Second)
14 fmt.Printf("Worker %d done\n", id)
15}
16
17func main() {
18 var wg sync.WaitGroup
19
20 // 启动 5 个 worker
21 for i := 1; i <= 5; i++ {
22 wg.Add(1) // 每启动一个 goroutine,计数器 +1
23 go worker(i, &wg)
24 }
25
26 wg.Wait() // 等待所有 worker 完成
27 fmt.Println("All workers completed")
28}
注意事项
Add()必须在Wait()之前调用Add()通常在启动 goroutine 之前调用,而不是在 goroutine 内部- 必须传递
WaitGroup的指针,而不是值拷贝
sync.Mutex:互斥锁
基本用法
Mutex(互斥锁)用于保护共享资源,确保同一时间只有一个 goroutine 可以访问。
1package main
2
3import (
4 "fmt"
5 "sync"
6)
7
8type SafeCounter struct {
9 mu sync.Mutex
10 count int
11}
12
13func (c *SafeCounter) Inc() {
14 c.mu.Lock() // 加锁
15 c.count++
16 c.mu.Unlock() // 解锁
17}
18
19func (c *SafeCounter) Value() int {
20 c.mu.Lock()
21 defer c.mu.Unlock()
22 return c.count
23}
24
25func main() {
26 counter := SafeCounter{}
27 var wg sync.WaitGroup
28
29 // 启动 1000 个 goroutine 同时增加计数器
30 for i := 0; i < 1000; i++ {
31 wg.Add(1)
32 go func() {
33 defer wg.Done()
34 counter.Inc()
35 }()
36 }
37
38 wg.Wait()
39 fmt.Println("Final count:", counter.Value()) // 输出:1000
40}
注意事项
- 必须成对使用
Lock()和Unlock() - 推荐使用
defer确保解锁 - 不要复制包含
Mutex的结构体(会导致锁失效)
sync.RWMutex:读写锁
基本用法
RWMutex(读写锁)允许多个读操作同时进行,但写操作是独占的。
1package main
2
3import (
4 "fmt"
5 "sync"
6 "time"
7)
8
9type Cache struct {
10 mu sync.RWMutex
11 data map[string]string
12}
13
14func (c *Cache) Get(key string) string {
15 c.mu.RLock() // 读锁
16 defer c.mu.RUnlock()
17 return c.data[key]
18}
19
20func (c *Cache) Set(key, value string) {
21 c.mu.Lock() // 写锁
22 defer c.mu.Unlock()
23 c.data[key] = value
24}
25
26func main() {
27 cache := Cache{data: make(map[string]string)}
28
29 // 写入数据
30 cache.Set("name", "Go")
31
32 // 多个 goroutine 同时读取(不会阻塞)
33 var wg sync.WaitGroup
34 for i := 0; i < 10; i++ {
35 wg.Add(1)
36 go func(id int) {
37 defer wg.Done()
38 fmt.Printf("Reader %d: %s\n", id, cache.Get("name"))
39 }(i)
40 }
41
42 wg.Wait()
43}
使用场景
- 读多写少的场景
- 读操作不会修改数据
- 需要提高并发读性能
sync.Once:确保只执行一次
基本用法
Once 确保某个操作只执行一次,常用于单例模式或初始化。
1package main
2
3import (
4 "fmt"
5 "sync"
6)
7
8var (
9 instance *Singleton
10 once sync.Once
11)
12
13type Singleton struct {
14 data string
15}
16
17func GetInstance() *Singleton {
18 once.Do(func() {
19 fmt.Println("Creating singleton instance")
20 instance = &Singleton{data: "singleton"}
21 })
22 return instance
23}
24
25func main() {
26 var wg sync.WaitGroup
27
28 // 多个 goroutine 同时获取实例
29 for i := 0; i < 10; i++ {
30 wg.Add(1)
31 go func() {
32 defer wg.Done()
33 s := GetInstance()
34 fmt.Println(s.data)
35 }()
36 }
37
38 wg.Wait()
39 // "Creating singleton instance" 只会打印一次
40}
使用场景
- 单例模式
- 配置文件只加载一次
- 数据库连接池初始化
sync.Cond:条件变量
基本用法
Cond 用于在 goroutine 之间协调,等待或通知某个条件的发生。
1package main
2
3import (
4 "fmt"
5 "sync"
6 "time"
7)
8
9func main() {
10 var mu sync.Mutex
11 cond := sync.NewCond(&mu)
12 ready := false
13
14 // 等待者
15 go func() {
16 mu.Lock()
17 for !ready {
18 cond.Wait() // 等待条件满足
19 }
20 fmt.Println("Condition met, proceeding...")
21 mu.Unlock()
22 }()
23
24 // 通知者
25 time.Sleep(time.Second)
26 mu.Lock()
27 ready = true
28 cond.Signal() // 通知一个等待的 goroutine
29 mu.Unlock()
30
31 time.Sleep(time.Second)
32}
方法说明
Wait():释放锁并等待,被唤醒后重新获取锁Signal():唤醒一个等待的 goroutineBroadcast():唤醒所有等待的 goroutine
信号量(Semaphore)
使用 golang.org/x/sync/semaphore
信号量用于限制同时访问资源的 goroutine 数量。
1package main
2
3import (
4 "context"
5 "fmt"
6 "golang.org/x/sync/semaphore"
7 "time"
8)
9
10func main() {
11 // 创建一个权重为 3 的信号量(最多 3 个并发)
12 sem := semaphore.NewWeighted(3)
13 ctx := context.Background()
14
15 // 启动 10 个任务
16 for i := 1; i <= 10; i++ {
17 // 获取信号量(如果已满,会阻塞)
18 if err := sem.Acquire(ctx, 1); err != nil {
19 fmt.Println("Failed to acquire semaphore:", err)
20 break
21 }
22
23 go func(id int) {
24 defer sem.Release(1) // 释放信号量
25
26 fmt.Printf("Task %d started\n", id)
27 time.Sleep(time.Second)
28 fmt.Printf("Task %d completed\n", id)
29 }(i)
30 }
31
32 // 等待所有任务完成
33 sem.Acquire(ctx, 3)
34}
使用场景
- 限制并发数(如数据库连接池)
- 控制资源访问(如限制 API 调用频率)
errgroup:带错误处理的 WaitGroup
使用 golang.org/x/sync/errgroup
errgroup 提供了更方便的错误处理和上下文管理。
1package main
2
3import (
4 "context"
5 "fmt"
6 "golang.org/x/sync/errgroup"
7 "time"
8)
9
10func fetchURL(url string) error {
11 time.Sleep(time.Second)
12 if url == "bad-url" {
13 return fmt.Errorf("failed to fetch %s", url)
14 }
15 fmt.Printf("Fetched %s\n", url)
16 return nil
17}
18
19func main() {
20 g, ctx := errgroup.WithContext(context.Background())
21
22 urls := []string{"url1", "url2", "bad-url", "url3"}
23
24 for _, url := range urls {
25 url := url // 避免闭包问题
26 g.Go(func() error {
27 // 如果 context 被取消,立即返回
28 select {
29 case <-ctx.Done():
30 return ctx.Err()
31 default:
32 return fetchURL(url)
33 }
34 })
35 }
36
37 // 等待所有任务完成,返回第一个错误
38 if err := g.Wait(); err != nil {
39 fmt.Println("Error:", err)
40 }
41}
特点
- 自动管理 WaitGroup
- 返回第一个非 nil 错误
- 提供 Context 取消机制
最佳实践
1. 选择合适的同步工具
- 简单等待:使用
WaitGroup - 保护共享数据:使用
Mutex或RWMutex - 单次初始化:使用
Once - 限制并发数:使用信号量
- 需要错误处理:使用
errgroup
2. 避免常见错误
- 不要复制包含锁的结构体
- 确保锁的成对使用(Lock/Unlock)
- WaitGroup 的 Add 要在 Wait 之前调用
- 避免在持有锁时进行耗时操作
3. 性能考虑
- 读多写少用
RWMutex - 尽量减小锁的粒度
- 考虑使用 Channel 替代锁
小结
Go 的 sync 包提供了丰富的并发同步工具。合理使用这些工具,可以写出高效、安全的并发程序。记住:优先使用 Channel,必要时才使用锁。
练习题
- 使用
sync.Map实现一个线程安全的缓存(提示:sync.Map是 Go 1.9 引入的并发安全的 map) - 使用信号量实现一个连接池,限制最多 5 个并发连接
- 使用
errgroup并发下载多个文件,如果任一下载失败,取消所有其他下载