Skip to content
清晨的一缕阳光
返回

Go 并发安全与陷阱

Go 并发安全与陷阱

Go 的并发模型虽然强大,但使用不当会导致严重的并发问题。本文将深入探讨 Goroutine 泄漏、竞态条件、Channel 误用等常见陷阱,并提供实用的解决方案。

一、Goroutine 泄漏

1.1 什么是 Goroutine 泄漏

Goroutine 泄漏是指 Goroutine 启动后永远无法退出,导致内存和资源浪费:

// 泄漏示例
func leak() {
    ch := make(chan int)
    go func() {
        ch <- 1  // 阻塞,无人接收
    }()
    // Goroutine 永远阻塞
}

// 后果:
// - 内存占用不释放
// - 文件描述符不关闭
// - 数据库连接不释放

1.2 常见泄漏场景

场景 1:Channel 无人接收

// ❌ 泄漏:发送后无人接收
func leak1() {
    ch := make(chan int)
    go func() {
        ch <- 1  // 阻塞
    }()
}

// ✅ 修复:使用缓冲或确保接收
func fixed1() {
    ch := make(chan int, 1)
    go func() {
        ch <- 1  // 不阻塞
    }()
    <-ch  // 接收
}

场景 2:忘记关闭 Channel

// ❌ 泄漏:range 永不退出
func leak2(ch chan int) {
    go func() {
        for v := range ch {  // ch 不关闭,永不退出
            process(v)
        }
    }()
}

// ✅ 修复:确保关闭 channel
func fixed2(ch chan int) {
    go func() {
        defer close(ch)  // 完成后关闭
        for i := 0; i < 10; i++ {
            ch <- i
        }
    }()
    
    for v := range ch {  // ch 关闭后自动退出
        process(v)
    }
}

场景 3:未使用 context 控制

// ❌ 泄漏:无法退出
func leak3() {
    go func() {
        for {
            doWork()  // 永久循环
        }
    }()
}

// ✅ 修复:使用 context 控制退出
func fixed3(ctx context.Context) {
    go func() {
        for {
            select {
            case <-ctx.Done():
                return  // 收到退出信号
            default:
                doWork()
            }
        }
    }()
}

场景 4:Timer 未停止

// ❌ 泄漏:timer goroutine 不退出
func leak4() {
    timer := time.NewTimer(time.Hour)
    doWork()
    // timer 未停止,goroutine 不退出
}

// ✅ 修复:使用 defer 停止
func fixed4() {
    timer := time.NewTimer(time.Hour)
    defer timer.Stop()
    doWork()
}

场景 5:Ticker 未停止

// ❌ 泄漏
func leak5() {
    ticker := time.NewTicker(time.Second)
    go func() {
        for range ticker.C {
            doWork()
        }
    }()
    // ticker 永不关闭
}

// ✅ 修复
func fixed5(ctx context.Context) {
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()
    
    go func() {
        for {
            select {
            case <-ticker.C:
                doWork()
            case <-ctx.Done():
                return
            }
        }
    }()
}

1.3 检测 Goroutine 泄漏

方法 1:使用 goroutine 数监控

func checkGoroutineLeak() {
    start := runtime.NumGoroutine()
    
    // 执行操作
    doSomething()
    
    // 等待一段时间
    time.Sleep(time.Second)
    
    end := runtime.NumGoroutine()
    
    if end > start {
        log.Printf("可能泄漏:%d -> %d", start, end)
    }
}

方法 2:使用测试工具

// 使用 go.uber.org/goleak
import "go.uber.org/goleak"

func TestSomething(t *testing.T) {
    defer goleak.VerifyToplevel(t)
    
    // 执行测试
    // 如果有泄漏,测试会失败
}

方法 3:pprof 分析

# 启动 pprof
go tool pprof http://localhost:6060/debug/pprof/goroutine

# 查看 goroutine 堆栈
(pprof) list main.leak

1.4 预防泄漏最佳实践

// 原则 1:始终使用 context 控制生命周期
func worker(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            return
        default:
            doWork()
        }
    }
}

// 原则 2:确保 channel 有接收者
func producer(ctx context.Context, ch chan<- int) {
    defer close(ch)  // 确保关闭
    for i := 0; i < 10; i++ {
        select {
        case <-ctx.Done():
            return
        case ch <- i:
        }
    }
}

// 原则 3:使用 defer 清理资源
func process() {
    mu.Lock()
    defer mu.Unlock()
    // ...
}

// 原则 4:限制 Goroutine 数量
func processWithLimit(items []string, limit int) {
    sem := make(chan struct{}, limit)
    for _, item := range items {
        sem <- struct{}{}
        go func(it string) {
            defer func() { <-sem }()
            processItem(it)
        }(item)
    }
}

二、竞态条件(Race Condition)

2.1 什么是竞态条件

当多个 goroutine 同时访问共享数据,且至少有一个是写操作时,可能发生竞态条件:

// ❌ 竞态条件
var count int

func increment() {
    count++  // 非原子操作:读 - 改 - 写
}

// 多个 goroutine 同时调用
for i := 0; i < 100; i++ {
    go increment()
}

// 结果:count 可能小于 100

2.2 检测竞态条件

使用竞态检测器

# 运行测试时检测
go test -race ./...

# 运行程序时检测
go run -race main.go

# 构建时启用
go build -race main.go

输出示例

WARNING: DATA RACE
Write at 0x00c000014098 by goroutine 7:
  main.increment()
      /path/main.go:10 +0x39

Previous read at 0x00c000014098 by goroutine 6:
  main.increment()
      /path/main.go:9 +0x1f

2.3 常见竞态场景

场景 1:共享变量并发写

// ❌ 竞态
var m = make(map[string]int)

func update(key string, value int) {
    m[key] = value  // 多个 goroutine 同时写
}

// ✅ 修复:加锁
var mu sync.Mutex
var m = make(map[string]int)

func update(key string, value int) {
    mu.Lock()
    defer mu.Unlock()
    m[key] = value
}

// ✅ 修复:使用 sync.Map
var m sync.Map

func update(key string, value int) {
    m.Store(key, value)
}

场景 2:检查后使用(Check-Then-Act)

// ❌ 竞态
if _, exists := m[key]; !exists {
    m[key] = value  // 检查和写入之间有竞态
}

// ✅ 修复:加锁
mu.Lock()
if _, exists := m[key]; !exists {
    m[key] = value
}
mu.Unlock()

// ✅ 修复:使用 LoadOrStore
m.LoadOrStore(key, value)

场景 3:读取切片长度后访问

// ❌ 竞态
func process(s []int) {
    for i := 0; i < len(s); i++ {
        // len(s) 可能变化
        use(s[i])
    }
}

// ✅ 修复:先复制
func process(s []int) {
    mu.Lock()
    copy := make([]int, len(s))
    copy(copy, s)
    mu.Unlock()
    
    for i := 0; i < len(copy); i++ {
        use(copy[i])
    }
}

场景 4:单例模式

// ❌ 竞态
var instance *Singleton

func GetInstance() *Singleton {
    if instance == nil {
        instance = &Singleton{}  // 多个 goroutine 可能同时创建
    }
    return instance
}

// ✅ 修复:使用 sync.Once
var (
    instance *Singleton
    once     sync.Once
)

func GetInstance() *Singleton {
    once.Do(func() {
        instance = &Singleton{}
    })
    return instance
}

2.4 解决竞态的方法

方法 1:互斥锁

type SafeCounter struct {
    mu    sync.Mutex
    count int
}

func (c *SafeCounter) Inc() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.count++
}

func (c *SafeCounter) Get() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.count
}

方法 2:原子操作

type AtomicCounter struct {
    count int64
}

func (c *AtomicCounter) Inc() {
    atomic.AddInt64(&c.count, 1)
}

func (c *AtomicCounter) Get() int64 {
    return atomic.LoadInt64(&c.count)
}

方法 3:Channel 通信

type ChannelCounter struct {
    ch chan int
}

func NewChannelCounter() *ChannelCounter {
    c := &ChannelCounter{ch: make(chan int)}
    go func() {
        count := 0
        for inc := range c.ch {
            count += inc
        }
    }()
    return c
}

func (c *ChannelCounter) Inc() {
    c.ch <- 1
}

方法 4:Goroutine 本地数据

// 每个 goroutine 有自己的副本
func process(items []int) {
    var wg sync.WaitGroup
    
    for i := 0; i < len(items); i++ {
        wg.Add(1)
        go func(item int) {
            defer wg.Done()
            // 使用本地副本,无竞态
            result := compute(item)
            use(result)
        }(items[i])
    }
    
    wg.Wait()
}

三、Channel 误用

3.1 向已关闭的 Channel 发送

// ❌ panic
ch := make(chan int)
close(ch)
ch <- 1  // panic: send on closed channel

// ✅ 修复:确保只关闭一次
var once sync.Once
closeOnce := func() {
    once.Do(func() {
        close(ch)
    })
}

// ✅ 修复:使用 recover
func safeSend(ch chan int, value int) {
    defer func() {
        if r := recover(); r != nil {
            log.Println("发送失败:", r)
        }
    }()
    ch <- value
}

3.2 重复关闭 Channel

// ❌ panic
ch := make(chan int)
close(ch)
close(ch)  // panic: close of closed channel

// ✅ 修复:使用 sync.Once
var once sync.Once
func safeClose(ch chan int) {
    once.Do(func() {
        close(ch)
    })
}

// ✅ 修复:使用 recover
func safeClose(ch chan int) {
    defer func() {
        recover()  // 忽略 panic
    }()
    close(ch)
}

3.3 nil Channel 永远阻塞

var ch chan int  // nil

// 永远阻塞
ch <- 1    // 阻塞
<-ch       // 阻塞

// 使用场景:动态禁用 case
func process(ch1, ch2 chan int) {
    ch2 = nil  // 禁用 ch2
    for {
        select {
        case v := <-ch1:
            handle(v)
        case v := <-ch2:  // ch2 为 nil,此 case 永不执行
            handle(v)
        }
    }
}

3.4 缓冲区大小不当

// ❌ 问题:无缓冲导致阻塞
ch := make(chan int)
for i := 0; i < 10; i++ {
    ch <- i  // 第一个就阻塞
}

// ✅ 修复:使用缓冲
ch := make(chan int, 10)
for i := 0; i < 10; i++ {
    ch <- i  // 不阻塞
}

// ✅ 修复:使用 goroutine 接收
ch := make(chan int)
go func() {
    for v := range ch {
        process(v)
    }
}()
for i := 0; i < 10; i++ {
    ch <- i
}
close(ch)

3.5 忘记关闭 Channel

// ❌ 泄漏:range 永不退出
func producer(ch chan<- int) {
    for i := 0; i < 10; i++ {
        ch <- i
    }
    // 忘记 close(ch)
}

func consumer(ch <-chan int) {
    for v := range ch {  // 永不退出
        process(v)
    }
}

// ✅ 修复
func producer(ch chan<- int) {
    defer close(ch)  // 确保关闭
    for i := 0; i < 10; i++ {
        ch <- i
    }
}

四、WaitGroup 误用

4.1 忘记调用 Done

// ❌ 泄漏:Wait 永不返回
var wg sync.WaitGroup
wg.Add(1)
go func() {
    doWork()
    // 忘记 wg.Done()
}()
wg.Wait()  // 永远阻塞

// ✅ 修复:使用 defer
var wg sync.WaitGroup
wg.Add(1)
go func() {
    defer wg.Done()
    doWork()
}()
wg.Wait()

4.2 Add 在 Goroutine 内部

// ❌ 竞态条件
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
    go func() {
        wg.Add(1)  // 可能在 Wait 之后调用
        doWork()
        wg.Done()
    }()
}
wg.Wait()

// ✅ 修复:在外部 Add
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
    wg.Add(1)
    go func() {
        defer wg.Done()
        doWork()
    }()
}
wg.Wait()

4.3 WaitGroup 重用

// ❌ 错误:重用 WaitGroup
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
    wg.Add(1)
    go func() {
        defer wg.Done()
        doWork()
    }()
}
wg.Wait()

// 重用(计数器可能为负)
for i := 0; i < 10; i++ {
    wg.Add(1)
    go func() {
        defer wg.Done()
        doWork()
    }()
}
wg.Wait()

// ✅ 修复:每次使用新的 WaitGroup
func batch1() {
    var wg sync.WaitGroup
    // ...
    wg.Wait()
}

func batch2() {
    var wg sync.WaitGroup
    // ...
    wg.Wait()
}

五、Mutex 误用

5.1 锁拷贝

// ❌ 错误:Mutex 被拷贝
type Data struct {
    mu    sync.Mutex
    value int
}

func copyData(d Data) Data {
    return d  // Mutex 被拷贝,未定义行为
}

// ✅ 修复:使用指针
func copyData(d *Data) *Data {
    return &Data{value: d.value}
}

5.2 死锁

// ❌ 死锁:重复加锁
mu.Lock()
mu.Lock()  // 死锁

// ❌ 死锁:锁顺序不一致
// Goroutine A
mu1.Lock()
mu2.Lock()

// Goroutine B
mu2.Lock()
mu1.Lock()  // 可能死锁

// ✅ 修复:统一锁顺序
// 总是先 mu1 后 mu2

5.3 临界区过大

// ❌ 不推荐:临界区过大
func process() {
    mu.Lock()
    defer mu.Unlock()
    
    data := loadData()      // I/O 操作
    result := compute(data) // 耗时计算
    saveData(result)        // I/O 操作
}

// ✅ 修复:缩小临界区
func process() {
    mu.Lock()
    data := loadData()
    mu.Unlock()
    
    result := compute(data)  // 锁外执行
    
    mu.Lock()
    saveData(result)
    mu.Unlock()
}

5.4 忘记解锁

// ❌ 错误:panic 导致未解锁
mu.Lock()
doSomething()  // 可能 panic
mu.Unlock()    // 未执行

// ✅ 修复:使用 defer
mu.Lock()
defer mu.Unlock()
doSomething()

六、并发最佳实践

6.1 设计原则

原则说明
不要通过共享内存通信使用 Channel 通信
通过通信共享内存Channel 传递数据所有权
Goroutine 生命周期可控使用 context 控制退出
避免 Goroutine 泄漏确保所有 Goroutine 能退出
减少锁竞争缩小临界区,使用细粒度锁

6.2 并发模式

模式 1:Worker Pool

func workerPool(ctx context.Context, jobs <-chan Job, results chan<- Job, n int) {
    var wg sync.WaitGroup
    
    for i := 0; i < n; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for {
                select {
                case <-ctx.Done():
                    return
                case job, ok := <-jobs:
                    if !ok {
                        return
                    }
                    results <- processJob(job)
                }
            }
        }()
    }
    
    wg.Wait()
    close(results)
}

模式 2:Pipeline

func pipeline(ctx context.Context, in <-chan int) <-chan int {
    // 阶段 1
    stage1 := make(chan int)
    go func() {
        defer close(stage1)
        for n := range in {
            select {
            case <-ctx.Done():
                return
            case stage1 <- n * 2:
            }
        }
    }()
    
    // 阶段 2
    stage2 := make(chan int)
    go func() {
        defer close(stage2)
        for n := range stage1 {
            select {
            case <-ctx.Done():
                return
            case stage2 <- n + 1:
            }
        }
    }()
    
    return stage2
}

模式 3:Fan-out/Fan-in

func fanOut(ctx context.Context, in <-chan int, n int) []<-chan int {
    outs := make([]<-chan int, n)
    for i := 0; i < n; i++ {
        out := make(chan int)
        go func(out chan<- int) {
            defer close(out)
            for n := range in {
                select {
                case <-ctx.Done():
                    return
                case out <- n:
                }
            }
        }(out)
        outs[i] = out
    }
    return outs
}

func fanIn(ctx context.Context, ins ...<-chan int) <-chan int {
    out := make(chan int)
    var wg sync.WaitGroup
    
    for _, in := range ins {
        wg.Add(1)
        go func(in <-chan int) {
            defer wg.Done()
            for n := range in {
                select {
                case <-ctx.Done():
                    return
                case out <- n:
                }
            }
        }(in)
    }
    
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

6.3 性能优化

优化 1:减少锁粒度

// 全局锁
var mu sync.Mutex
var data map[string]int

// 优化:分片锁
const shards = 256
var shardMu [shards]sync.Mutex
var shardData [shards]map[string]int

func get(key string) int {
    idx := hash(key) % shards
    shardMu[idx].Lock()
    defer shardMu[idx].Unlock()
    return shardData[idx][key]
}

优化 2:使用 RWMutex

// 读多写少场景
var rw sync.RWMutex
var cache map[string]string

func get(key string) string {
    rw.RLock()
    defer rw.RUnlock()
    return cache[key]
}

func set(key, value string) {
    rw.Lock()
    defer rw.Unlock()
    cache[key] = value
}

优化 3:使用原子操作

// Mutex: ~50 ns/op
type MutexCounter struct {
    mu    sync.Mutex
    count int
}

func (c *MutexCounter) Inc() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.count++
}

// Atomic: ~10 ns/op
type AtomicCounter struct {
    count int64
}

func (c *AtomicCounter) Inc() {
    atomic.AddInt64(&c.count, 1)
}

6.4 调试技巧

技巧 1:竞态检测

# 运行测试
go test -race ./...

# 运行程序
go run -race main.go

技巧 2:Goroutine 分析

# 查看 goroutine 数量
curl http://localhost:6060/debug/pprof/goroutine?debug=1

# 分析 goroutine 堆栈
go tool pprof http://localhost:6060/debug/pprof/goroutine

技巧 3:死锁检测

// 设置死锁检测
import _ "net/http/pprof"

func main() {
    go func() {
        log.Println(http.ListenAndServe("localhost:6060", nil))
    }()
    
    // 程序逻辑
}

七、检查清单

并发安全检查清单

总结

Go 并发编程的常见陷阱:

陷阱类型表现解决方案
Goroutine 泄漏内存增长,goroutine 不退出使用 context 控制
竞态条件数据不一致,随机错误加锁或原子操作
Channel 误用panic 或阻塞确保正确关闭
WaitGroup 误用永久阻塞defer Done()
Mutex 误用死锁,未定义行为defer Unlock()

核心原则

  1. 始终控制 Goroutine 生命周期
  2. 使用 Channel 进行通信
  3. 避免共享内存,或适当同步
  4. 使用竞态检测器验证

掌握这些最佳实践,能帮助你写出更安全、更高效的并发代码。

参考资料


分享这篇文章到:

上一篇文章
Queue 并发队列详解
下一篇文章
MySQL 死锁分析与解决