Go同步原语:Mutex/RWMutex/WaitGroup
约 1701 字大约 6 分钟
gosyncmutex
2025-04-20
概述
Go虽然倡导"通过通信来共享内存"的CSP模型,但在某些场景下,传统的同步原语更加高效和直观。sync 包提供了一组精心设计的同步原语:Mutex、RWMutex、WaitGroup、Once、Pool、Cond。本文将深入分析它们的底层实现和正确使用方式。
sync.Mutex
状态机
Mutex内部使用一个int32的state字段编码多种状态:
type Mutex struct {
state int32 // 状态字段
sema uint32 // 信号量
}
// state的位含义:
// bit 0: locked(是否被锁定)
// bit 1: woken(是否有goroutine被唤醒)
// bit 2: starvation(是否进入饥饿模式)
// bit 3-31: waiter count(等待者数量)正常模式 vs 饥饿模式
正常模式:新到达的goroutine与被唤醒的goroutine竞争锁。新goroutine有优势(已在CPU上运行,且可能有多个),因此被唤醒的goroutine可能再次失败。
饥饿模式(Go 1.9+):当等待者等待超过1ms时触发。锁直接交给等待队列头部的goroutine,新到达的goroutine不自旋,直接排队。
// 自旋条件(runtime_canSpin):
// 1. 已自旋次数 < 4
// 2. GOMAXPROCS > 1(多核)
// 3. 至少有一个其他正在运行的P
// 4. 当前P的本地runqueue为空使用注意
var mu sync.Mutex
// 正确用法
mu.Lock()
defer mu.Unlock()
// 临界区代码...
// 注意事项:
// 1. Mutex不可复制(值传递会复制)
// 2. 不可重入(同一goroutine再次Lock会死锁)
// 3. Unlock未Lock的Mutex会panic
// 4. 通常将Mutex与保护的数据放在同一个struct中
type SafeCounter struct {
mu sync.Mutex
count int
}
func (c *SafeCounter) Inc() {
c.mu.Lock()
defer c.mu.Unlock()
c.count++
}sync.RWMutex
读写锁允许多个并发读取,但写入时互斥:
type RWMutex struct {
w Mutex // 写锁
writerSem uint32 // 写者信号量
readerSem uint32 // 读者信号量
readerCount atomic.Int32 // 读者计数(负值表示有写者等待)
readerWait atomic.Int32 // 写者需要等待的读者数量
}读写优先级
// Go的RWMutex是写优先的:
// 当有写者等待时,新的读者会被阻塞
// 这防止了写者饥饿
// 适用场景:读多写少
var cache struct {
sync.RWMutex
data map[string]string
}
func get(key string) (string, bool) {
cache.RLock()
defer cache.RUnlock()
v, ok := cache.data[key]
return v, ok
}
func set(key, value string) {
cache.Lock()
defer cache.Unlock()
cache.data[key] = value
}sync.WaitGroup
WaitGroup用于等待一组goroutine完成:
type WaitGroup struct {
state atomic.Uint64 // 高32位:计数器,低32位:等待者数量
sema uint32
}var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1) // 在启动goroutine前Add
go func(id int) {
defer wg.Done()
doWork(id)
}(i)
}
wg.Wait() // 阻塞直到计数器归零
// 注意事项:
// 1. Add必须在Wait之前调用
// 2. Add不能在goroutine内部调用(可能在Wait后执行)
// 3. 计数器不能变为负数(panic)
// 4. WaitGroup可以复用(Wait返回后可以再Add)sync.Once
Once确保某个操作只执行一次:
type Once struct {
done atomic.Uint32
m Mutex
}
func (o *Once) Do(f func()) {
// 快速路径:已经执行过
if o.done.Load() == 0 {
o.doSlow(f)
}
}
func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
if o.done.Load() == 0 {
defer o.done.Store(1)
f() // 即使f panic,done也会被设置
}
}// 典型用法:单例模式
type Database struct {
conn *sql.DB
}
var (
dbInstance *Database
dbOnce sync.Once
)
func GetDB() *Database {
dbOnce.Do(func() {
conn, err := sql.Open("postgres", dsn)
if err != nil {
log.Fatal(err)
}
dbInstance = &Database{conn: conn}
})
return dbInstance
}
// Go 1.21+ 新增 OnceFunc/OnceValue/OnceValues
var initConfig = sync.OnceValue(func() *Config {
cfg, _ := loadConfig()
return cfg
})
config := initConfig() // 只加载一次sync.Pool
Pool是一个临时对象缓存池,用于减少GC压力:
var bufPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}
func processRequest(data []byte) {
buf := bufPool.Get().(*bytes.Buffer)
defer func() {
buf.Reset()
bufPool.Put(buf)
}()
buf.Write(data)
// 使用buf处理请求...
}注意:Pool中的对象可能在任何GC后被回收,不要存放需要持久保存的对象。
sync.Cond
Cond用于goroutine之间的条件等待通知:
type Queue struct {
mu sync.Mutex
cond *sync.Cond
items []interface{}
}
func NewQueue() *Queue {
q := &Queue{}
q.cond = sync.NewCond(&q.mu)
return q
}
func (q *Queue) Put(item interface{}) {
q.mu.Lock()
defer q.mu.Unlock()
q.items = append(q.items, item)
q.cond.Signal() // 通知一个等待者
}
func (q *Queue) Get() interface{} {
q.mu.Lock()
defer q.mu.Unlock()
for len(q.items) == 0 {
q.cond.Wait() // 释放锁并等待,被唤醒后重新获取锁
}
item := q.items[0]
q.items = q.items[1:]
return item
}
// Signal: 唤醒一个等待者
// Broadcast: 唤醒所有等待者选择指南
总结
| 原语 | 用途 | 关键特性 |
|---|---|---|
| Mutex | 互斥锁 | 正常/饥饿双模式,不可重入 |
| RWMutex | 读写锁 | 写优先,适合读多写少 |
| WaitGroup | 等待完成 | Add在go之前,Done在defer中 |
| Once | 一次性初始化 | 并发安全的单例 |
| Pool | 对象缓存 | per-P无锁,GC可回收 |
| Cond | 条件变量 | Signal/Broadcast,Wait需在循环中 |
正确使用同步原语是编写安全并发代码的基础。优先考虑channel,当channel不适合时(如保护共享状态),再使用sync包的原语。
贡献者
更新日志
2026/3/14 13:09
查看所有更新日志
9f6c2-feat: organize wiki content and refresh site setup于