在掌握了 Goroutine 和 Channel 的基础知识后,我们需要了解 Go 标准库 sync 包提供的各种同步工具。这些工具能帮助我们更好地控制并发程序的执行流程,避免数据竞争,提高程序的可靠性。

sync.WaitGroup:等待一组 Goroutine 完成

基本用法

WaitGroup 用于等待一组 Goroutine 完成。它有三个方法:

  • Add(delta int):增加计数器

  • Done():减少计数器(相当于 Add(-1)

  • Wait():阻塞直到计数器为 0

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done() // 函数结束时调用 Done()

    fmt.Printf("Worker %d starting\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Worker %d done\n", id)
}

func main() {
    var wg sync.WaitGroup

    // 启动 5 个 worker
    for i := 1; i <= 5; i++ {
        wg.Add(1) // 每启动一个 goroutine,计数器 +1
        go worker(i, &wg)
    }

    wg.Wait() // 等待所有 worker 完成
    fmt.Println("All workers completed")
}

注意事项

  • Add() 必须在 Wait() 之前调用

  • Add() 通常在启动 goroutine 之前调用,而不是在 goroutine 内部

  • 必须传递 WaitGroup 的指针,而不是值拷贝

sync.Mutex:互斥锁

基本用法

Mutex(互斥锁)用于保护共享资源,确保同一时间只有一个 goroutine 可以访问。

package main

import (
    "fmt"
    "sync"
)

type SafeCounter struct {
    mu    sync.Mutex
    count int
}

func (c *SafeCounter) Inc() {
    c.mu.Lock()   // 加锁
    c.count++
    c.mu.Unlock() // 解锁
}

func (c *SafeCounter) Value() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.count
}

func main() {
    counter := SafeCounter{}
    var wg sync.WaitGroup

    // 启动 1000 个 goroutine 同时增加计数器
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter.Inc()
        }()
    }

    wg.Wait()
    fmt.Println("Final count:", counter.Value()) // 输出:1000
}

注意事项

  • 必须成对使用 Lock()Unlock()

  • 推荐使用 defer 确保解锁

  • 不要复制包含 Mutex 的结构体(会导致锁失效)

sync.RWMutex:读写锁

基本用法

RWMutex(读写锁)允许多个读操作同时进行,但写操作是独占的。

package main

import (
    "fmt"
    "sync"
    "time"
)

type Cache struct {
    mu   sync.RWMutex
    data map[string]string
}

func (c *Cache) Get(key string) string {
    c.mu.RLock()         // 读锁
    defer c.mu.RUnlock()
    return c.data[key]
}

func (c *Cache) Set(key, value string) {
    c.mu.Lock()          // 写锁
    defer c.mu.Unlock()
    c.data[key] = value
}

func main() {
    cache := Cache{data: make(map[string]string)}

    // 写入数据
    cache.Set("name", "Go")

    // 多个 goroutine 同时读取(不会阻塞)
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            fmt.Printf("Reader %d: %s\n", id, cache.Get("name"))
        }(i)
    }

    wg.Wait()
}

使用场景

  • 读多写少的场景

  • 读操作不会修改数据

  • 需要提高并发读性能

sync.Once:确保只执行一次

基本用法

Once 确保某个操作只执行一次,常用于单例模式或初始化。

package main

import (
    "fmt"
    "sync"
)

var (
    instance *Singleton
    once     sync.Once
)

type Singleton struct {
    data string
}

func GetInstance() *Singleton {
    once.Do(func() {
        fmt.Println("Creating singleton instance")
        instance = &Singleton{data: "singleton"}
    })
    return instance
}

func main() {
    var wg sync.WaitGroup

    // 多个 goroutine 同时获取实例
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            s := GetInstance()
            fmt.Println(s.data)
        }()
    }

    wg.Wait()
    // "Creating singleton instance" 只会打印一次
}

使用场景

  • 单例模式

  • 配置文件只加载一次

  • 数据库连接池初始化

sync.Cond:条件变量

基本用法

Cond 用于在 goroutine 之间协调,等待或通知某个条件的发生。

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var mu sync.Mutex
    cond := sync.NewCond(&mu)
    ready := false

    // 等待者
    go func() {
        mu.Lock()
        for !ready {
            cond.Wait() // 等待条件满足
        }
        fmt.Println("Condition met, proceeding...")
        mu.Unlock()
    }()

    // 通知者
    time.Sleep(time.Second)
    mu.Lock()
    ready = true
    cond.Signal() // 通知一个等待的 goroutine
    mu.Unlock()

    time.Sleep(time.Second)
}

方法说明

  • Wait():释放锁并等待,被唤醒后重新获取锁

  • Signal():唤醒一个等待的 goroutine

  • Broadcast():唤醒所有等待的 goroutine

信号量(Semaphore)

使用 golang.org/x/sync/semaphore

信号量用于限制同时访问资源的 goroutine 数量。

package main

import (
    "context"
    "fmt"
    "golang.org/x/sync/semaphore"
    "time"
)

func main() {
    // 创建一个权重为 3 的信号量(最多 3 个并发)
    sem := semaphore.NewWeighted(3)
    ctx := context.Background()

    // 启动 10 个任务
    for i := 1; i <= 10; i++ {
        // 获取信号量(如果已满,会阻塞)
        if err := sem.Acquire(ctx, 1); err != nil {
            fmt.Println("Failed to acquire semaphore:", err)
            break
        }

        go func(id int) {
            defer sem.Release(1) // 释放信号量

            fmt.Printf("Task %d started\n", id)
            time.Sleep(time.Second)
            fmt.Printf("Task %d completed\n", id)
        }(i)
    }

    // 等待所有任务完成
    sem.Acquire(ctx, 3)
}

使用场景

  • 限制并发数(如数据库连接池)

  • 控制资源访问(如限制 API 调用频率)

errgroup:带错误处理的 WaitGroup

使用 golang.org/x/sync/errgroup

errgroup 提供了更方便的错误处理和上下文管理。

package main

import (
    "context"
    "fmt"
    "golang.org/x/sync/errgroup"
    "time"
)

func fetchURL(url string) error {
    time.Sleep(time.Second)
    if url == "bad-url" {
        return fmt.Errorf("failed to fetch %s", url)
    }
    fmt.Printf("Fetched %s\n", url)
    return nil
}

func main() {
    g, ctx := errgroup.WithContext(context.Background())

    urls := []string{"url1", "url2", "bad-url", "url3"}

    for _, url := range urls {
        url := url // 避免闭包问题
        g.Go(func() error {
            // 如果 context 被取消,立即返回
            select {
            case <-ctx.Done():
                return ctx.Err()
            default:
                return fetchURL(url)
            }
        })
    }

    // 等待所有任务完成,返回第一个错误
    if err := g.Wait(); err != nil {
        fmt.Println("Error:", err)
    }
}

特点

  • 自动管理 WaitGroup

  • 返回第一个非 nil 错误

  • 提供 Context 取消机制

最佳实践

1. 选择合适的同步工具

  • 简单等待:使用 WaitGroup

  • 保护共享数据:使用 MutexRWMutex

  • 单次初始化:使用 Once

  • 限制并发数:使用信号量

  • 需要错误处理:使用 errgroup

2. 避免常见错误

  • 不要复制包含锁的结构体

  • 确保锁的成对使用(Lock/Unlock)

  • WaitGroup 的 Add 要在 Wait 之前调用

  • 避免在持有锁时进行耗时操作

3. 性能考虑

  • 读多写少用 RWMutex

  • 尽量减小锁的粒度

  • 考虑使用 Channel 替代锁

小结

Go 的 sync 包提供了丰富的并发同步工具。合理使用这些工具,可以写出高效、安全的并发程序。记住:优先使用 Channel,必要时才使用锁

练习题

  1. 使用 sync.Map 实现一个线程安全的缓存(提示:sync.Map 是 Go 1.9 引入的并发安全的 map)

  2. 使用信号量实现一个连接池,限制最多 5 个并发连接

  3. 使用 errgroup 并发下载多个文件,如果任一下载失败,取消所有其他下载


相关阅读