Go 并发安全与陷阱
Go 的并发模型虽然强大,但使用不当会导致严重的并发问题。本文将深入探讨 Goroutine 泄漏、竞态条件、Channel 误用等常见陷阱,并提供实用的解决方案。
一、Goroutine 泄漏
1.1 什么是 Goroutine 泄漏
Goroutine 泄漏是指 Goroutine 启动后永远无法退出,导致内存和资源浪费:
// 泄漏示例
func leak() {
ch := make(chan int)
go func() {
ch <- 1 // 阻塞,无人接收
}()
// Goroutine 永远阻塞
}
// 后果:
// - 内存占用不释放
// - 文件描述符不关闭
// - 数据库连接不释放
1.2 常见泄漏场景
场景 1:Channel 无人接收
// ❌ 泄漏:发送后无人接收
func leak1() {
ch := make(chan int)
go func() {
ch <- 1 // 阻塞
}()
}
// ✅ 修复:使用缓冲或确保接收
func fixed1() {
ch := make(chan int, 1)
go func() {
ch <- 1 // 不阻塞
}()
<-ch // 接收
}
场景 2:忘记关闭 Channel
// ❌ 泄漏:range 永不退出
func leak2(ch chan int) {
go func() {
for v := range ch { // ch 不关闭,永不退出
process(v)
}
}()
}
// ✅ 修复:确保关闭 channel
func fixed2(ch chan int) {
go func() {
defer close(ch) // 完成后关闭
for i := 0; i < 10; i++ {
ch <- i
}
}()
for v := range ch { // ch 关闭后自动退出
process(v)
}
}
场景 3:未使用 context 控制
// ❌ 泄漏:无法退出
func leak3() {
go func() {
for {
doWork() // 永久循环
}
}()
}
// ✅ 修复:使用 context 控制退出
func fixed3(ctx context.Context) {
go func() {
for {
select {
case <-ctx.Done():
return // 收到退出信号
default:
doWork()
}
}
}()
}
场景 4:Timer 未停止
// ❌ 泄漏:timer goroutine 不退出
func leak4() {
timer := time.NewTimer(time.Hour)
doWork()
// timer 未停止,goroutine 不退出
}
// ✅ 修复:使用 defer 停止
func fixed4() {
timer := time.NewTimer(time.Hour)
defer timer.Stop()
doWork()
}
场景 5:Ticker 未停止
// ❌ 泄漏
func leak5() {
ticker := time.NewTicker(time.Second)
go func() {
for range ticker.C {
doWork()
}
}()
// ticker 永不关闭
}
// ✅ 修复
func fixed5(ctx context.Context) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
go func() {
for {
select {
case <-ticker.C:
doWork()
case <-ctx.Done():
return
}
}
}()
}
1.3 检测 Goroutine 泄漏
方法 1:使用 goroutine 数监控
func checkGoroutineLeak() {
start := runtime.NumGoroutine()
// 执行操作
doSomething()
// 等待一段时间
time.Sleep(time.Second)
end := runtime.NumGoroutine()
if end > start {
log.Printf("可能泄漏:%d -> %d", start, end)
}
}
方法 2:使用测试工具
// 使用 go.uber.org/goleak
import "go.uber.org/goleak"
func TestSomething(t *testing.T) {
defer goleak.VerifyToplevel(t)
// 执行测试
// 如果有泄漏,测试会失败
}
方法 3:pprof 分析
# 启动 pprof
go tool pprof http://localhost:6060/debug/pprof/goroutine
# 查看 goroutine 堆栈
(pprof) list main.leak
1.4 预防泄漏最佳实践
// 原则 1:始终使用 context 控制生命周期
func worker(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
doWork()
}
}
}
// 原则 2:确保 channel 有接收者
func producer(ctx context.Context, ch chan<- int) {
defer close(ch) // 确保关闭
for i := 0; i < 10; i++ {
select {
case <-ctx.Done():
return
case ch <- i:
}
}
}
// 原则 3:使用 defer 清理资源
func process() {
mu.Lock()
defer mu.Unlock()
// ...
}
// 原则 4:限制 Goroutine 数量
func processWithLimit(items []string, limit int) {
sem := make(chan struct{}, limit)
for _, item := range items {
sem <- struct{}{}
go func(it string) {
defer func() { <-sem }()
processItem(it)
}(item)
}
}
二、竞态条件(Race Condition)
2.1 什么是竞态条件
当多个 goroutine 同时访问共享数据,且至少有一个是写操作时,可能发生竞态条件:
// ❌ 竞态条件
var count int
func increment() {
count++ // 非原子操作:读 - 改 - 写
}
// 多个 goroutine 同时调用
for i := 0; i < 100; i++ {
go increment()
}
// 结果:count 可能小于 100
2.2 检测竞态条件
使用竞态检测器:
# 运行测试时检测
go test -race ./...
# 运行程序时检测
go run -race main.go
# 构建时启用
go build -race main.go
输出示例:
WARNING: DATA RACE
Write at 0x00c000014098 by goroutine 7:
main.increment()
/path/main.go:10 +0x39
Previous read at 0x00c000014098 by goroutine 6:
main.increment()
/path/main.go:9 +0x1f
2.3 常见竞态场景
场景 1:共享变量并发写
// ❌ 竞态
var m = make(map[string]int)
func update(key string, value int) {
m[key] = value // 多个 goroutine 同时写
}
// ✅ 修复:加锁
var mu sync.Mutex
var m = make(map[string]int)
func update(key string, value int) {
mu.Lock()
defer mu.Unlock()
m[key] = value
}
// ✅ 修复:使用 sync.Map
var m sync.Map
func update(key string, value int) {
m.Store(key, value)
}
场景 2:检查后使用(Check-Then-Act)
// ❌ 竞态
if _, exists := m[key]; !exists {
m[key] = value // 检查和写入之间有竞态
}
// ✅ 修复:加锁
mu.Lock()
if _, exists := m[key]; !exists {
m[key] = value
}
mu.Unlock()
// ✅ 修复:使用 LoadOrStore
m.LoadOrStore(key, value)
场景 3:读取切片长度后访问
// ❌ 竞态
func process(s []int) {
for i := 0; i < len(s); i++ {
// len(s) 可能变化
use(s[i])
}
}
// ✅ 修复:先复制
func process(s []int) {
mu.Lock()
copy := make([]int, len(s))
copy(copy, s)
mu.Unlock()
for i := 0; i < len(copy); i++ {
use(copy[i])
}
}
场景 4:单例模式
// ❌ 竞态
var instance *Singleton
func GetInstance() *Singleton {
if instance == nil {
instance = &Singleton{} // 多个 goroutine 可能同时创建
}
return instance
}
// ✅ 修复:使用 sync.Once
var (
instance *Singleton
once sync.Once
)
func GetInstance() *Singleton {
once.Do(func() {
instance = &Singleton{}
})
return instance
}
2.4 解决竞态的方法
方法 1:互斥锁
type SafeCounter struct {
mu sync.Mutex
count int
}
func (c *SafeCounter) Inc() {
c.mu.Lock()
defer c.mu.Unlock()
c.count++
}
func (c *SafeCounter) Get() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.count
}
方法 2:原子操作
type AtomicCounter struct {
count int64
}
func (c *AtomicCounter) Inc() {
atomic.AddInt64(&c.count, 1)
}
func (c *AtomicCounter) Get() int64 {
return atomic.LoadInt64(&c.count)
}
方法 3:Channel 通信
type ChannelCounter struct {
ch chan int
}
func NewChannelCounter() *ChannelCounter {
c := &ChannelCounter{ch: make(chan int)}
go func() {
count := 0
for inc := range c.ch {
count += inc
}
}()
return c
}
func (c *ChannelCounter) Inc() {
c.ch <- 1
}
方法 4:Goroutine 本地数据
// 每个 goroutine 有自己的副本
func process(items []int) {
var wg sync.WaitGroup
for i := 0; i < len(items); i++ {
wg.Add(1)
go func(item int) {
defer wg.Done()
// 使用本地副本,无竞态
result := compute(item)
use(result)
}(items[i])
}
wg.Wait()
}
三、Channel 误用
3.1 向已关闭的 Channel 发送
// ❌ panic
ch := make(chan int)
close(ch)
ch <- 1 // panic: send on closed channel
// ✅ 修复:确保只关闭一次
var once sync.Once
closeOnce := func() {
once.Do(func() {
close(ch)
})
}
// ✅ 修复:使用 recover
func safeSend(ch chan int, value int) {
defer func() {
if r := recover(); r != nil {
log.Println("发送失败:", r)
}
}()
ch <- value
}
3.2 重复关闭 Channel
// ❌ panic
ch := make(chan int)
close(ch)
close(ch) // panic: close of closed channel
// ✅ 修复:使用 sync.Once
var once sync.Once
func safeClose(ch chan int) {
once.Do(func() {
close(ch)
})
}
// ✅ 修复:使用 recover
func safeClose(ch chan int) {
defer func() {
recover() // 忽略 panic
}()
close(ch)
}
3.3 nil Channel 永远阻塞
var ch chan int // nil
// 永远阻塞
ch <- 1 // 阻塞
<-ch // 阻塞
// 使用场景:动态禁用 case
func process(ch1, ch2 chan int) {
ch2 = nil // 禁用 ch2
for {
select {
case v := <-ch1:
handle(v)
case v := <-ch2: // ch2 为 nil,此 case 永不执行
handle(v)
}
}
}
3.4 缓冲区大小不当
// ❌ 问题:无缓冲导致阻塞
ch := make(chan int)
for i := 0; i < 10; i++ {
ch <- i // 第一个就阻塞
}
// ✅ 修复:使用缓冲
ch := make(chan int, 10)
for i := 0; i < 10; i++ {
ch <- i // 不阻塞
}
// ✅ 修复:使用 goroutine 接收
ch := make(chan int)
go func() {
for v := range ch {
process(v)
}
}()
for i := 0; i < 10; i++ {
ch <- i
}
close(ch)
3.5 忘记关闭 Channel
// ❌ 泄漏:range 永不退出
func producer(ch chan<- int) {
for i := 0; i < 10; i++ {
ch <- i
}
// 忘记 close(ch)
}
func consumer(ch <-chan int) {
for v := range ch { // 永不退出
process(v)
}
}
// ✅ 修复
func producer(ch chan<- int) {
defer close(ch) // 确保关闭
for i := 0; i < 10; i++ {
ch <- i
}
}
四、WaitGroup 误用
4.1 忘记调用 Done
// ❌ 泄漏:Wait 永不返回
var wg sync.WaitGroup
wg.Add(1)
go func() {
doWork()
// 忘记 wg.Done()
}()
wg.Wait() // 永远阻塞
// ✅ 修复:使用 defer
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
doWork()
}()
wg.Wait()
4.2 Add 在 Goroutine 内部
// ❌ 竞态条件
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
go func() {
wg.Add(1) // 可能在 Wait 之后调用
doWork()
wg.Done()
}()
}
wg.Wait()
// ✅ 修复:在外部 Add
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
doWork()
}()
}
wg.Wait()
4.3 WaitGroup 重用
// ❌ 错误:重用 WaitGroup
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
doWork()
}()
}
wg.Wait()
// 重用(计数器可能为负)
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
doWork()
}()
}
wg.Wait()
// ✅ 修复:每次使用新的 WaitGroup
func batch1() {
var wg sync.WaitGroup
// ...
wg.Wait()
}
func batch2() {
var wg sync.WaitGroup
// ...
wg.Wait()
}
五、Mutex 误用
5.1 锁拷贝
// ❌ 错误:Mutex 被拷贝
type Data struct {
mu sync.Mutex
value int
}
func copyData(d Data) Data {
return d // Mutex 被拷贝,未定义行为
}
// ✅ 修复:使用指针
func copyData(d *Data) *Data {
return &Data{value: d.value}
}
5.2 死锁
// ❌ 死锁:重复加锁
mu.Lock()
mu.Lock() // 死锁
// ❌ 死锁:锁顺序不一致
// Goroutine A
mu1.Lock()
mu2.Lock()
// Goroutine B
mu2.Lock()
mu1.Lock() // 可能死锁
// ✅ 修复:统一锁顺序
// 总是先 mu1 后 mu2
5.3 临界区过大
// ❌ 不推荐:临界区过大
func process() {
mu.Lock()
defer mu.Unlock()
data := loadData() // I/O 操作
result := compute(data) // 耗时计算
saveData(result) // I/O 操作
}
// ✅ 修复:缩小临界区
func process() {
mu.Lock()
data := loadData()
mu.Unlock()
result := compute(data) // 锁外执行
mu.Lock()
saveData(result)
mu.Unlock()
}
5.4 忘记解锁
// ❌ 错误:panic 导致未解锁
mu.Lock()
doSomething() // 可能 panic
mu.Unlock() // 未执行
// ✅ 修复:使用 defer
mu.Lock()
defer mu.Unlock()
doSomething()
六、并发最佳实践
6.1 设计原则
| 原则 | 说明 |
|---|---|
| 不要通过共享内存通信 | 使用 Channel 通信 |
| 通过通信共享内存 | Channel 传递数据所有权 |
| Goroutine 生命周期可控 | 使用 context 控制退出 |
| 避免 Goroutine 泄漏 | 确保所有 Goroutine 能退出 |
| 减少锁竞争 | 缩小临界区,使用细粒度锁 |
6.2 并发模式
模式 1:Worker Pool
func workerPool(ctx context.Context, jobs <-chan Job, results chan<- Job, n int) {
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case job, ok := <-jobs:
if !ok {
return
}
results <- processJob(job)
}
}
}()
}
wg.Wait()
close(results)
}
模式 2:Pipeline
func pipeline(ctx context.Context, in <-chan int) <-chan int {
// 阶段 1
stage1 := make(chan int)
go func() {
defer close(stage1)
for n := range in {
select {
case <-ctx.Done():
return
case stage1 <- n * 2:
}
}
}()
// 阶段 2
stage2 := make(chan int)
go func() {
defer close(stage2)
for n := range stage1 {
select {
case <-ctx.Done():
return
case stage2 <- n + 1:
}
}
}()
return stage2
}
模式 3:Fan-out/Fan-in
func fanOut(ctx context.Context, in <-chan int, n int) []<-chan int {
outs := make([]<-chan int, n)
for i := 0; i < n; i++ {
out := make(chan int)
go func(out chan<- int) {
defer close(out)
for n := range in {
select {
case <-ctx.Done():
return
case out <- n:
}
}
}(out)
outs[i] = out
}
return outs
}
func fanIn(ctx context.Context, ins ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
for _, in := range ins {
wg.Add(1)
go func(in <-chan int) {
defer wg.Done()
for n := range in {
select {
case <-ctx.Done():
return
case out <- n:
}
}
}(in)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
6.3 性能优化
优化 1:减少锁粒度
// 全局锁
var mu sync.Mutex
var data map[string]int
// 优化:分片锁
const shards = 256
var shardMu [shards]sync.Mutex
var shardData [shards]map[string]int
func get(key string) int {
idx := hash(key) % shards
shardMu[idx].Lock()
defer shardMu[idx].Unlock()
return shardData[idx][key]
}
优化 2:使用 RWMutex
// 读多写少场景
var rw sync.RWMutex
var cache map[string]string
func get(key string) string {
rw.RLock()
defer rw.RUnlock()
return cache[key]
}
func set(key, value string) {
rw.Lock()
defer rw.Unlock()
cache[key] = value
}
优化 3:使用原子操作
// Mutex: ~50 ns/op
type MutexCounter struct {
mu sync.Mutex
count int
}
func (c *MutexCounter) Inc() {
c.mu.Lock()
defer c.mu.Unlock()
c.count++
}
// Atomic: ~10 ns/op
type AtomicCounter struct {
count int64
}
func (c *AtomicCounter) Inc() {
atomic.AddInt64(&c.count, 1)
}
6.4 调试技巧
技巧 1:竞态检测
# 运行测试
go test -race ./...
# 运行程序
go run -race main.go
技巧 2:Goroutine 分析
# 查看 goroutine 数量
curl http://localhost:6060/debug/pprof/goroutine?debug=1
# 分析 goroutine 堆栈
go tool pprof http://localhost:6060/debug/pprof/goroutine
技巧 3:死锁检测
// 设置死锁检测
import _ "net/http/pprof"
func main() {
go func() {
log.Println(http.ListenAndServe("localhost:6060", nil))
}()
// 程序逻辑
}
七、检查清单
并发安全检查清单
- 所有 Goroutine 都能优雅退出
- Channel 在使用后关闭
- Timer/Ticker 使用 defer 停止
- 使用 context 控制生命周期
- 共享数据有适当的同步机制
- 无竞态条件(通过 -race 检测)
- Mutex 使用 defer 解锁
- WaitGroup 的 Add 在外部调用
- 无死锁风险(锁顺序一致)
- 临界区尽可能小
总结
Go 并发编程的常见陷阱:
| 陷阱类型 | 表现 | 解决方案 |
|---|---|---|
| Goroutine 泄漏 | 内存增长,goroutine 不退出 | 使用 context 控制 |
| 竞态条件 | 数据不一致,随机错误 | 加锁或原子操作 |
| Channel 误用 | panic 或阻塞 | 确保正确关闭 |
| WaitGroup 误用 | 永久阻塞 | defer Done() |
| Mutex 误用 | 死锁,未定义行为 | defer Unlock() |
核心原则:
- 始终控制 Goroutine 生命周期
- 使用 Channel 进行通信
- 避免共享内存,或适当同步
- 使用竞态检测器验证
掌握这些最佳实践,能帮助你写出更安全、更高效的并发代码。