在掌握了 Goroutine 和 Channel 的基础知识后,我们需要了解 Go 标准库 sync 包提供的各种同步工具。这些工具能帮助我们更好地控制并发程序的执行流程,避免数据竞争,提高程序的可靠性。
sync.WaitGroup:等待一组 Goroutine 完成
基本用法
WaitGroup 用于等待一组 Goroutine 完成。它有三个方法:
Add(delta int):增加计数器Done():减少计数器(相当于Add(-1))Wait():阻塞直到计数器为 0
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done() // 函数结束时调用 Done()
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d done\n", id)
}
func main() {
var wg sync.WaitGroup
// 启动 5 个 worker
for i := 1; i <= 5; i++ {
wg.Add(1) // 每启动一个 goroutine,计数器 +1
go worker(i, &wg)
}
wg.Wait() // 等待所有 worker 完成
fmt.Println("All workers completed")
}注意事项
Add()必须在Wait()之前调用Add()通常在启动 goroutine 之前调用,而不是在 goroutine 内部必须传递
WaitGroup的指针,而不是值拷贝
sync.Mutex:互斥锁
基本用法
Mutex(互斥锁)用于保护共享资源,确保同一时间只有一个 goroutine 可以访问。
package main
import (
"fmt"
"sync"
)
type SafeCounter struct {
mu sync.Mutex
count int
}
func (c *SafeCounter) Inc() {
c.mu.Lock() // 加锁
c.count++
c.mu.Unlock() // 解锁
}
func (c *SafeCounter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.count
}
func main() {
counter := SafeCounter{}
var wg sync.WaitGroup
// 启动 1000 个 goroutine 同时增加计数器
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Inc()
}()
}
wg.Wait()
fmt.Println("Final count:", counter.Value()) // 输出:1000
}注意事项
必须成对使用
Lock()和Unlock()推荐使用
defer确保解锁不要复制包含
Mutex的结构体(会导致锁失效)
sync.RWMutex:读写锁
基本用法
RWMutex(读写锁)允许多个读操作同时进行,但写操作是独占的。
package main
import (
"fmt"
"sync"
"time"
)
type Cache struct {
mu sync.RWMutex
data map[string]string
}
func (c *Cache) Get(key string) string {
c.mu.RLock() // 读锁
defer c.mu.RUnlock()
return c.data[key]
}
func (c *Cache) Set(key, value string) {
c.mu.Lock() // 写锁
defer c.mu.Unlock()
c.data[key] = value
}
func main() {
cache := Cache{data: make(map[string]string)}
// 写入数据
cache.Set("name", "Go")
// 多个 goroutine 同时读取(不会阻塞)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
fmt.Printf("Reader %d: %s\n", id, cache.Get("name"))
}(i)
}
wg.Wait()
}使用场景
读多写少的场景
读操作不会修改数据
需要提高并发读性能
sync.Once:确保只执行一次
基本用法
Once 确保某个操作只执行一次,常用于单例模式或初始化。
package main
import (
"fmt"
"sync"
)
var (
instance *Singleton
once sync.Once
)
type Singleton struct {
data string
}
func GetInstance() *Singleton {
once.Do(func() {
fmt.Println("Creating singleton instance")
instance = &Singleton{data: "singleton"}
})
return instance
}
func main() {
var wg sync.WaitGroup
// 多个 goroutine 同时获取实例
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
s := GetInstance()
fmt.Println(s.data)
}()
}
wg.Wait()
// "Creating singleton instance" 只会打印一次
}使用场景
单例模式
配置文件只加载一次
数据库连接池初始化
sync.Cond:条件变量
基本用法
Cond 用于在 goroutine 之间协调,等待或通知某个条件的发生。
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var mu sync.Mutex
cond := sync.NewCond(&mu)
ready := false
// 等待者
go func() {
mu.Lock()
for !ready {
cond.Wait() // 等待条件满足
}
fmt.Println("Condition met, proceeding...")
mu.Unlock()
}()
// 通知者
time.Sleep(time.Second)
mu.Lock()
ready = true
cond.Signal() // 通知一个等待的 goroutine
mu.Unlock()
time.Sleep(time.Second)
}方法说明
Wait():释放锁并等待,被唤醒后重新获取锁Signal():唤醒一个等待的 goroutineBroadcast():唤醒所有等待的 goroutine
信号量(Semaphore)
使用 golang.org/x/sync/semaphore
信号量用于限制同时访问资源的 goroutine 数量。
package main
import (
"context"
"fmt"
"golang.org/x/sync/semaphore"
"time"
)
func main() {
// 创建一个权重为 3 的信号量(最多 3 个并发)
sem := semaphore.NewWeighted(3)
ctx := context.Background()
// 启动 10 个任务
for i := 1; i <= 10; i++ {
// 获取信号量(如果已满,会阻塞)
if err := sem.Acquire(ctx, 1); err != nil {
fmt.Println("Failed to acquire semaphore:", err)
break
}
go func(id int) {
defer sem.Release(1) // 释放信号量
fmt.Printf("Task %d started\n", id)
time.Sleep(time.Second)
fmt.Printf("Task %d completed\n", id)
}(i)
}
// 等待所有任务完成
sem.Acquire(ctx, 3)
}使用场景
限制并发数(如数据库连接池)
控制资源访问(如限制 API 调用频率)
errgroup:带错误处理的 WaitGroup
使用 golang.org/x/sync/errgroup
errgroup 提供了更方便的错误处理和上下文管理。
package main
import (
"context"
"fmt"
"golang.org/x/sync/errgroup"
"time"
)
func fetchURL(url string) error {
time.Sleep(time.Second)
if url == "bad-url" {
return fmt.Errorf("failed to fetch %s", url)
}
fmt.Printf("Fetched %s\n", url)
return nil
}
func main() {
g, ctx := errgroup.WithContext(context.Background())
urls := []string{"url1", "url2", "bad-url", "url3"}
for _, url := range urls {
url := url // 避免闭包问题
g.Go(func() error {
// 如果 context 被取消,立即返回
select {
case <-ctx.Done():
return ctx.Err()
default:
return fetchURL(url)
}
})
}
// 等待所有任务完成,返回第一个错误
if err := g.Wait(); err != nil {
fmt.Println("Error:", err)
}
}特点
自动管理 WaitGroup
返回第一个非 nil 错误
提供 Context 取消机制
最佳实践
1. 选择合适的同步工具
简单等待:使用
WaitGroup保护共享数据:使用
Mutex或RWMutex单次初始化:使用
Once限制并发数:使用信号量
需要错误处理:使用
errgroup
2. 避免常见错误
不要复制包含锁的结构体
确保锁的成对使用(Lock/Unlock)
WaitGroup 的 Add 要在 Wait 之前调用
避免在持有锁时进行耗时操作
3. 性能考虑
读多写少用
RWMutex尽量减小锁的粒度
考虑使用 Channel 替代锁
小结
Go 的 sync 包提供了丰富的并发同步工具。合理使用这些工具,可以写出高效、安全的并发程序。记住:优先使用 Channel,必要时才使用锁。
练习题
使用
sync.Map实现一个线程安全的缓存(提示:sync.Map是 Go 1.9 引入的并发安全的 map)使用信号量实现一个连接池,限制最多 5 个并发连接
使用
errgroup并发下载多个文件,如果任一下载失败,取消所有其他下载