在掌握了 Goroutine 和 Channel 的基础知识后,我们需要了解 Go 标准库 sync 包提供的各种同步工具。这些工具能帮助我们更好地控制并发程序的执行流程,避免数据竞争,提高程序的可靠性。

sync.WaitGroup:等待一组 Goroutine 完成

基本用法

WaitGroup 用于等待一组 Goroutine 完成。它有三个方法:

  • Add(delta int):增加计数器
  • Done():减少计数器(相当于 Add(-1)
  • Wait():阻塞直到计数器为 0
 1package main
 2
 3import (
 4    "fmt"
 5    "sync"
 6    "time"
 7)
 8
 9func worker(id int, wg *sync.WaitGroup) {
10    defer wg.Done() // 函数结束时调用 Done()
11    
12    fmt.Printf("Worker %d starting\n", id)
13    time.Sleep(time.Second)
14    fmt.Printf("Worker %d done\n", id)
15}
16
17func main() {
18    var wg sync.WaitGroup
19    
20    // 启动 5 个 worker
21    for i := 1; i <= 5; i++ {
22        wg.Add(1) // 每启动一个 goroutine,计数器 +1
23        go worker(i, &wg)
24    }
25    
26    wg.Wait() // 等待所有 worker 完成
27    fmt.Println("All workers completed")
28}

注意事项

  • Add() 必须在 Wait() 之前调用
  • Add() 通常在启动 goroutine 之前调用,而不是在 goroutine 内部
  • 必须传递 WaitGroup 的指针,而不是值拷贝

sync.Mutex:互斥锁

基本用法

Mutex(互斥锁)用于保护共享资源,确保同一时间只有一个 goroutine 可以访问。

 1package main
 2
 3import (
 4    "fmt"
 5    "sync"
 6)
 7
 8type SafeCounter struct {
 9    mu    sync.Mutex
10    count int
11}
12
13func (c *SafeCounter) Inc() {
14    c.mu.Lock()   // 加锁
15    c.count++
16    c.mu.Unlock() // 解锁
17}
18
19func (c *SafeCounter) Value() int {
20    c.mu.Lock()
21    defer c.mu.Unlock()
22    return c.count
23}
24
25func main() {
26    counter := SafeCounter{}
27    var wg sync.WaitGroup
28    
29    // 启动 1000 个 goroutine 同时增加计数器
30    for i := 0; i < 1000; i++ {
31        wg.Add(1)
32        go func() {
33            defer wg.Done()
34            counter.Inc()
35        }()
36    }
37    
38    wg.Wait()
39    fmt.Println("Final count:", counter.Value()) // 输出:1000
40}

注意事项

  • 必须成对使用 Lock()Unlock()
  • 推荐使用 defer 确保解锁
  • 不要复制包含 Mutex 的结构体(会导致锁失效)

sync.RWMutex:读写锁

基本用法

RWMutex(读写锁)允许多个读操作同时进行,但写操作是独占的。

 1package main
 2
 3import (
 4    "fmt"
 5    "sync"
 6    "time"
 7)
 8
 9type Cache struct {
10    mu   sync.RWMutex
11    data map[string]string
12}
13
14func (c *Cache) Get(key string) string {
15    c.mu.RLock()         // 读锁
16    defer c.mu.RUnlock()
17    return c.data[key]
18}
19
20func (c *Cache) Set(key, value string) {
21    c.mu.Lock()          // 写锁
22    defer c.mu.Unlock()
23    c.data[key] = value
24}
25
26func main() {
27    cache := Cache{data: make(map[string]string)}
28    
29    // 写入数据
30    cache.Set("name", "Go")
31    
32    // 多个 goroutine 同时读取(不会阻塞)
33    var wg sync.WaitGroup
34    for i := 0; i < 10; i++ {
35        wg.Add(1)
36        go func(id int) {
37            defer wg.Done()
38            fmt.Printf("Reader %d: %s\n", id, cache.Get("name"))
39        }(i)
40    }
41    
42    wg.Wait()
43}

使用场景

  • 读多写少的场景
  • 读操作不会修改数据
  • 需要提高并发读性能

sync.Once:确保只执行一次

基本用法

Once 确保某个操作只执行一次,常用于单例模式或初始化。

 1package main
 2
 3import (
 4    "fmt"
 5    "sync"
 6)
 7
 8var (
 9    instance *Singleton
10    once     sync.Once
11)
12
13type Singleton struct {
14    data string
15}
16
17func GetInstance() *Singleton {
18    once.Do(func() {
19        fmt.Println("Creating singleton instance")
20        instance = &Singleton{data: "singleton"}
21    })
22    return instance
23}
24
25func main() {
26    var wg sync.WaitGroup
27    
28    // 多个 goroutine 同时获取实例
29    for i := 0; i < 10; i++ {
30        wg.Add(1)
31        go func() {
32            defer wg.Done()
33            s := GetInstance()
34            fmt.Println(s.data)
35        }()
36    }
37    
38    wg.Wait()
39    // "Creating singleton instance" 只会打印一次
40}

使用场景

  • 单例模式
  • 配置文件只加载一次
  • 数据库连接池初始化

sync.Cond:条件变量

基本用法

Cond 用于在 goroutine 之间协调,等待或通知某个条件的发生。

 1package main
 2
 3import (
 4    "fmt"
 5    "sync"
 6    "time"
 7)
 8
 9func main() {
10    var mu sync.Mutex
11    cond := sync.NewCond(&mu)
12    ready := false
13    
14    // 等待者
15    go func() {
16        mu.Lock()
17        for !ready {
18            cond.Wait() // 等待条件满足
19        }
20        fmt.Println("Condition met, proceeding...")
21        mu.Unlock()
22    }()
23    
24    // 通知者
25    time.Sleep(time.Second)
26    mu.Lock()
27    ready = true
28    cond.Signal() // 通知一个等待的 goroutine
29    mu.Unlock()
30    
31    time.Sleep(time.Second)
32}

方法说明

  • Wait():释放锁并等待,被唤醒后重新获取锁
  • Signal():唤醒一个等待的 goroutine
  • Broadcast():唤醒所有等待的 goroutine

信号量(Semaphore)

使用 golang.org/x/sync/semaphore

信号量用于限制同时访问资源的 goroutine 数量。

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "golang.org/x/sync/semaphore"
 7    "time"
 8)
 9
10func main() {
11    // 创建一个权重为 3 的信号量(最多 3 个并发)
12    sem := semaphore.NewWeighted(3)
13    ctx := context.Background()
14    
15    // 启动 10 个任务
16    for i := 1; i <= 10; i++ {
17        // 获取信号量(如果已满,会阻塞)
18        if err := sem.Acquire(ctx, 1); err != nil {
19            fmt.Println("Failed to acquire semaphore:", err)
20            break
21        }
22        
23        go func(id int) {
24            defer sem.Release(1) // 释放信号量
25            
26            fmt.Printf("Task %d started\n", id)
27            time.Sleep(time.Second)
28            fmt.Printf("Task %d completed\n", id)
29        }(i)
30    }
31    
32    // 等待所有任务完成
33    sem.Acquire(ctx, 3)
34}

使用场景

  • 限制并发数(如数据库连接池)
  • 控制资源访问(如限制 API 调用频率)

errgroup:带错误处理的 WaitGroup

使用 golang.org/x/sync/errgroup

errgroup 提供了更方便的错误处理和上下文管理。

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "golang.org/x/sync/errgroup"
 7    "time"
 8)
 9
10func fetchURL(url string) error {
11    time.Sleep(time.Second)
12    if url == "bad-url" {
13        return fmt.Errorf("failed to fetch %s", url)
14    }
15    fmt.Printf("Fetched %s\n", url)
16    return nil
17}
18
19func main() {
20    g, ctx := errgroup.WithContext(context.Background())
21    
22    urls := []string{"url1", "url2", "bad-url", "url3"}
23    
24    for _, url := range urls {
25        url := url // 避免闭包问题
26        g.Go(func() error {
27            // 如果 context 被取消,立即返回
28            select {
29            case <-ctx.Done():
30                return ctx.Err()
31            default:
32                return fetchURL(url)
33            }
34        })
35    }
36    
37    // 等待所有任务完成,返回第一个错误
38    if err := g.Wait(); err != nil {
39        fmt.Println("Error:", err)
40    }
41}

特点

  • 自动管理 WaitGroup
  • 返回第一个非 nil 错误
  • 提供 Context 取消机制

最佳实践

1. 选择合适的同步工具

  • 简单等待:使用 WaitGroup
  • 保护共享数据:使用 MutexRWMutex
  • 单次初始化:使用 Once
  • 限制并发数:使用信号量
  • 需要错误处理:使用 errgroup

2. 避免常见错误

  • 不要复制包含锁的结构体
  • 确保锁的成对使用(Lock/Unlock)
  • WaitGroup 的 Add 要在 Wait 之前调用
  • 避免在持有锁时进行耗时操作

3. 性能考虑

  • 读多写少用 RWMutex
  • 尽量减小锁的粒度
  • 考虑使用 Channel 替代锁

小结

Go 的 sync 包提供了丰富的并发同步工具。合理使用这些工具,可以写出高效、安全的并发程序。记住:优先使用 Channel,必要时才使用锁

练习题

  1. 使用 sync.Map 实现一个线程安全的缓存(提示:sync.Map 是 Go 1.9 引入的并发安全的 map)
  2. 使用信号量实现一个连接池,限制最多 5 个并发连接
  3. 使用 errgroup 并发下载多个文件,如果任一下载失败,取消所有其他下载

相关阅读