Skip to content
清晨的一缕阳光
返回

Go 并发编程核心机制

Go 并发编程核心机制

一、Goroutine 原理

1.1 什么是 Goroutine

func main() {
    go sayHello()  // 启动一个 Goroutine
    fmt.Println("main")
}

func sayHello() {
    fmt.Println("hello")
}
特性Goroutine线程
内存占用2KB1MB
创建开销微秒级毫秒级
调度方式用户态 (M:N)内核态 (1:1)
通信方式Channel共享内存 + 锁

1.2 GMP 调度模型

G (Goroutine) → M (Machine) → P (Processor)
     ↓              ↓              ↓
   协程          系统线程        逻辑处理器
graph LR
    P1 --> M1
    P1 --> M2
    P2 --> M3
    G1 --> P1
    G2 --> P1
    G3 --> P2

1.3 调度过程

// 1. 创建 Goroutine
go func() {
    fmt.Println("hello")
}()

// 2. G 被放入 P 的本地队列
// 3. M 从 P 获取 G 并执行
// 4. G 遇到系统调用 → M 阻塞 → G 迁移到其他 M
// 5. G 执行完毕 → 回到 P 的队列等待回收

二、Channel 机制

2.1 Channel 基础

// 无缓冲 Channel (同步)
ch := make(chan int)
ch <- 1       // 发送,阻塞直到有人接收
v := <-ch     // 接收,阻塞直到有数据

// 有缓冲 Channel (异步)
ch := make(chan int, 10)
ch <- 1       // 不阻塞,直到缓冲区满
v := <-ch     // 不阻塞,直到缓冲区空

// 关闭 Channel
close(ch)
v, ok := <-ch // ok=false 表示 channel 已关闭

2.2 Channel 底层结构

type hchan struct {
    qcount   uint           // 队列中数据数量
    dataqsiz uint           // 缓冲区大小
    buf      unsafe.Pointer // 缓冲区指针
    elemsize uint16         // 元素大小
    closed   uint32         // 是否关闭
    sendx    uint           // 发送索引
    recvx    uint           // 接收索引
    recvq    waitq          // 接收等待队列
    sendq    waitq          // 发送等待队列
    lock     mutex          // 互斥锁
}

2.3 使用场景

// 1. 管道模式 (Pipeline)
func pipeline(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * 2
        }
        close(out)
    }()
    return out
}

// 2. 扇入扇出 (Fan-in/Fan-out)
func fanIn(ins ...<-chan int) <-chan int {
    out := make(chan int)
    var wg sync.WaitGroup
    for _, in := range ins {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for n := range c {
                out <- n
            }
        }(in)
    }
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

// 3. 控制并发数
func worker(jobs <-chan int, results chan<- int) {
    for j := range jobs {
        results <- j * 2
    }
}

func main() {
    jobs := make(chan int, 100)
    results := make(chan int, 100)
    
    // 启动 3 个 worker
    for w := 1; w <= 3; w++ {
        go worker(jobs, results)
    }
    
    // 发送任务
    for j := 1; j <= 5; j++ {
        jobs <- j
    }
    close(jobs)
    
    // 收集结果
    for r := 1; r <= 5; r++ {
        fmt.Println(<-results)
    }
}

三、select 多路复用

3.1 基础用法

select {
case msg1 := <-ch1:
    fmt.Println("received", msg1)
case msg2 := <-ch2:
    fmt.Println("received", msg2)
case ch3 <- data:
    fmt.Println("sent", data)
default:
    fmt.Println("no channel ready")
case <-time.After(time.Second):
    fmt.Println("timeout")
}

3.2 典型场景

// 1. 超时控制
func requestWithTimeout(url string, timeout time.Duration) ([]byte, error) {
    ch := make(chan []byte, 1)
    errCh := make(chan error, 1)
    
    go func() {
        resp, err := http.Get(url)
        if err != nil {
            errCh <- err
            return
        }
        ch <- resp.Body
    }()
    
    select {
    case data := <-ch:
        return data, nil
    case err := <-errCh:
        return nil, err
    case <-time.After(timeout):
        return nil, errors.New("request timeout")
    }
}

// 2. 优雅退出
func worker(done chan bool) {
    for {
        select {
        case <-time.After(100 * time.Millisecond):
            fmt.Println("working...")
        case <-done:
            fmt.Println("exit")
            return
        }
    }
}

四、sync 包

4.1 Mutex 互斥锁

type Counter struct {
    mu    sync.Mutex
    count int
}

func (c *Counter) Inc() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.count++
}

func (c *Counter) Get() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.count
}

4.2 RWMutex 读写锁

type Cache struct {
    mu   sync.RWMutex
    data map[string]string
}

func (c *Cache) Get(key string) string {
    c.mu.RLock()      // 读锁,可并发
    defer c.mu.RUnlock()
    return c.data[key]
}

func (c *Cache) Set(key, value string) {
    c.mu.Lock()       // 写锁,互斥
    defer c.mu.Unlock()
    c.data[key] = value
}

4.3 WaitGroup 等待组

func main() {
    var wg sync.WaitGroup
    
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            fmt.Println("worker", id)
        }(i)
    }
    
    wg.Wait()  // 等待所有 Goroutine 完成
}

4.4 Once 单次执行

var (
    instance *Singleton
    once     sync.Once
)

func GetInstance() *Singleton {
    once.Do(func() {
        instance = &Singleton{}
    })
    return instance
}

4.5 Atomic 原子操作

var count int64

// 原子自增
atomic.AddInt64(&count, 1)

// 原子加载
v := atomic.LoadInt64(&count)

// 原子存储
atomic.StoreInt64(&count, 100)

// CAS 操作
atomic.CompareAndSwapInt64(&count, 0, 1)

五、并发陷阱

5.1 Goroutine 泄漏

// ❌ 泄漏:Goroutine 永远阻塞
func leak() {
    ch := make(chan int)
    go func() {
        ch <- 1  // 阻塞,无人接收
    }()
}

// ✅ 修复:使用缓冲或 select
func fixed() {
    ch := make(chan int, 1)
    go func() {
        select {
        case ch <- 1:
        case <-time.After(time.Second):
        }
    }()
}

5.2 竞态条件

// ❌ 竞态:多个 Goroutine 同时写
var m = make(map[string]int)
go func() { m["a"] = 1 }()
go func() { m["b"] = 2 }()

// ✅ 修复:加锁或使用 sync.Map
var mu sync.Mutex
var m = make(map[string]int)
go func() {
    mu.Lock()
    m["a"] = 1
    mu.Unlock()
}()

5.3 Channel 使用错误

// ❌ 向已关闭的 channel 发送
close(ch)
ch <- 1  // panic!

// ✅ 检查是否关闭
v, ok := <-ch
if !ok {
    fmt.Println("channel closed")
}

// ❌ 重复关闭 channel
close(ch)
close(ch)  // panic!

// ✅ 确保只关闭一次
var once sync.Once
once.Do(func() { close(ch) })

六、最佳实践

6.1 并发模式

模式场景实现
Worker Pool任务分发Channel + Goroutine
Pipeline流式处理多级 Channel
Fan-out并行处理多个 Goroutine 读同一 Channel
Fan-in结果合并多个 Channel 写入一个

6.2 设计原则

  1. 不要通过共享内存通信,通过通信共享内存
  2. Goroutine 生命周期要可控
  3. Channel 大小要合理(通常 1 或 0)
  4. 避免 Goroutine 泄漏
  5. 使用 context 控制超时和取消

6.3 Context 使用

func process(ctx context.Context, data string) error {
    select {
    case <-ctx.Done():
        return ctx.Err()
    case result := <-doWork(data):
        return result
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    
    if err := process(ctx, "data"); err != nil {
        fmt.Println("error:", err)
    }
}

总结

Go 并发编程核心要点:

  1. Goroutine:轻量级协程,GMP 调度模型
  2. Channel:类型安全的通信机制,遵循 CSP 模型
  3. select:多路复用,处理超时和取消
  4. sync 包:Mutex、RWMutex、WaitGroup 等同步原语
  5. 避免陷阱:Goroutine 泄漏、竞态条件、Channel 误用

Go 并发哲学:CSP 模型(Communicating Sequential Processes)


分享这篇文章到:

上一篇文章
焦虑的一代-成长的需求
下一篇文章
Redis 性能调优实战