Skip to content
清晨的一缕阳光
返回

Go GMP 调度模型详解

GMP 调度模型详解

GMP 是 Go 并发高性能的核心,理解其原理有助于编写更高效的并发代码。

一、核心概念

1.1 GMP 组成

G (Goroutine):

M (Machine):

P (Processor):

1.2 架构关系

┌─────────────────────────────────────────────┐
│  P1          P2          P3          P4     │
│  │           │           │           │      │
│  M1          M2          M3          M4     │
│  │           │           │           │      │
│  G1,G2,G3    G4,G5       G6          G7,G8  │
│                                              │
│  Global Queue: [G9, G10]                    │
│  Idle M: [M5, M6]                           │
└─────────────────────────────────────────────┘

1.3 源码定义

// runtime/runtime2.go

// G - Goroutine
struct g {
    stack       stack   // 栈
    m           *m      // 当前关联的 M
    goid        int64   // Goroutine ID
    sched       gobuf   // 调度信息
    atomicstatus uint32 // 状态
    // ... 更多字段
}

// M - Machine
struct m {
    g0      *g     // 调度器 G
    gsignal *g     // 信号处理 G
    curg    *g     // 当前用户 G
    p       puintptr // 关联的 P
    nextp   puintptr
    id      int32
    // ... 更多字段
}

// P - Processor
struct p {
    m           muintptr   // 关联的 M
    runqhead   uint32      // 队列头
    runqtail   uint32      // 队列尾
    runq       [256]guintptr // 本地队列
    runnext    guintptr    // 下一个 G
    status     uint32
    // ... 更多字段
}

二、调度流程

2.1 Goroutine 创建

go func() {
    fmt.Println("Hello")
}()

内部流程

// 1. 创建 G 对象
// runtime/proc.go:goexit
func newproc(siz int32, fn *funcval) {
    gp := newproc1(fn)
    
    // 2. 放入 P 的本地队列
    systemstack(func() {
        gp.m = getg().m
        gp.gopc = getg().pc
        gp.startpc = fn.fn
        
        // 3. 调度器选择 M 执行
        schedule()
    })
}

// runtime/proc.go:newproc1
func newproc1(fn *funcval) *g {
    // 分配 G 对象
    newg := malg(_StackMin)
    
    // 设置入口函数
    newg.sched.pc = getcallerpc()
    newg.sched.sp = newg.stack.hi
    newg.sched.g = guintptr(unsafe.Pointer(newg))
    
    // 设置状态为 runnable
    newg.atomicstatus = _Grunnable
    
    return newg
}

2.2 调度器启动

// runtime/proc.go:main
func main() {
    // 1. 创建 P
    for i := 0; i < gomaxprocs; i++ {
        p := allocp()
        
        // 2. 创建 M
        m := newm()
        
        // 3. 绑定 P 和 M
        m.p = p
        p.m = m
        
        // 4. 启动线程
        startm(m)
    }
    
    // 5. 启动主 G
    g := getg()
    g.m.g0 = g
    schedule()
}

// runtime/proc.go:startm
func startm(mp *m) {
    lock(&sched.lock)
    
    // 获取空闲 P
    pp := idlepget()
    if pp == nil {
        // 无空闲 P,将 M 放入空闲队列
        mp.p = 0
        mp.nextp = 0
        idlemput(mp)
        unlock(&sched.lock)
        return
    }
    
    unlock(&sched.lock)
    
    // 设置 P
    mp.p = pp
    pp.m = mp
    
    // 启动线程
    newm(nil, pp)
}

2.3 调度循环

// runtime/asm_amd64.s:goexit
TEXT ·goexit(SB),NOSPLIT,$0-0
    CALL    ·goexit1(SB)  // 调用 goexit1
    RET     0

// runtime/proc.go:goexit1
func goexit1() {
    // 1. 清理资源
    mcall(goexit0)
}

func goexit0(gp *g) {
    // 2. 标记 G 为 dead
    gp.status = _Gdead
    
    // 3. 释放 G
    gfput(getg().m.p, gp)
    
    // 4. 调度下一个 G
    schedule()
}

// runtime/proc.go:schedule
func schedule() {
    _g_ := getg()
    
    // 1. 从 P 获取 G
    gp := _g_.m.p.runqget()
    
    if gp == nil {
        // 2. 本地队列为空,尝试窃取
        gp = runqsteal()
    }
    
    if gp == nil {
        // 3. 从全局队列获取
        gp = globrunqget()
    }
    
    // 4. 执行 G
    execute(gp)
}

// runtime/asm_amd64.s:execute
TEXT ·execute(SB),NOSPLIT,$0
    MOVQ    ·gobuf+0(FP), BX  // 获取 G
    MOVQ    gobuf_sp(BX), SP  // 恢复栈指针
    MOVQ    gobuf_pc(BX), DX  // 恢复指令指针
    JMP     DX                // 跳转到 G 的指令

三、工作窃取

3.1 窃取机制

当 P1 的队列为空时:

  1. 从其他 P 窃取(随机选择)
  2. 从全局队列获取
  3. 从网络轮询器获取
  4. 休眠等待

3.2 窃取实现

// runtime/proc.go:runqsteal
func runqsteal(_p_, p2 *p) bool {
    // 1. 获取 P2 队列的一半
    n := p2.runqhead - p2.runqtail
    if n == 0 {
        return false
    }
    n = n / 2
    
    // 2. 批量窃取
    for i := uint32(0); i < n; i++ {
        // 从 P2 队列尾部取出
        g := p2.runq[p2.runqtail%uint32(len(p2.runq))]
        p2.runqtail++
        
        // 放入本地队列头部
        _p_.runq[_p_.runqhead%uint32(len(_p_.runq))] = g
        _p_.runqhead++
    }
    
    return n > 0
}

3.3 窃取优化

// runtime/proc.go:findrunnable
func findrunnable() *g {
    _g_ := getg()
    _p_ := _g_.m.p
    
    // 1. 限制窃取频率
    if _p_.schedtick%4 != 0 {
        // 每 4 次调度才尝试窃取
        return nil
    }
    
    // 2. 随机选择 P
    for i := 0; i < gomaxprocs; i++ {
        p2 := allp[rand.Intn(gomaxprocs)]
        
        if runqsteal(_p_, p2) {
            return _p_.runqget()
        }
    }
    
    // 3. 从全局队列获取
    return globrunqget()
}

四、调度策略

4.1 抢占式调度

基于时间的抢占

// runtime/proc.go:preemptone
func preemptone(_p_ *p) bool {
    mp := _p_.m
    
    // 获取当前运行的 G
    gp := mp.curg
    
    // 检查是否超时
    if gp.preempt {
        // 设置抢占标志
        gp.stackguard0 = stackPreempt
        return true
    }
    
    return false
}

基于函数调用的抢占

// runtime/asm_amd64.s:morestack
TEXT ·morestack(SB),NOSPLIT,$0
    // 在函数调用时检查抢占标志
    MOVQ    g_stackguard0(g), BX
    CMPQ    BX, SP
    JLS     morestack_call  // 如果栈不足或需要抢占
    
    // 正常函数调用
    CALL    ·newstack(SB)
    RET

4.2 公平调度

// runtime/proc.go:schedule
func schedule() {
    // 检查全局队列等待时间
    if sched.gqwait > 0 {
        // 优先调度全局队列的 G
        gp = globrunqget()
        if gp != nil {
            execute(gp)
            return
        }
    }
    
    // 正常调度流程
    // ...
}

4.3 手递手调度

// runtime/proc.go:handoffp
func handoffp(_p_ *p) {
    // 1. 获取空闲 M
    mp := getidle()
    
    if mp != nil {
        // 2. 直接传递 P
        mp.p = _p_
        _p_.m = mp
        
        // 3. 启动 M
        startm(mp)
    }
}

五、性能优化

5.1 本地队列优先

// runtime/proc.go:runqget
func runqget(_p_ *p) *g {
    // 1. 优先从本地队列获取(无锁)
    for {
        h := atomic.Load(&_p_.runqhead)
        t := atomic.Load(&_p_.runqtail)
        
        if t == h {
            // 队列为空
            break
        }
        
        g := _p_.runq[h%uint32(len(_p_.runq))]
        if atomic.Cas(&_p_.runqhead, h, h+1) {
            return g
        }
    }
    
    // 2. 本地队列为空,尝试窃取
    if g := runqsteal(); g != nil {
        return g
    }
    
    // 3. 从全局队列获取(需要锁)
    return globrunqget()
}

5.2 批量操作

// runtime/proc.go:runqgetBatch
func runqgetBatch(_p_ *p, batch []*g) int {
    n := 0
    
    for i := 0; i < len(batch) && n < 32; i++ {
        h := atomic.Load(&_p_.runqhead)
        t := atomic.Load(&_p_.runqtail)
        
        if t == h {
            break
        }
        
        batch[i] = _p_.runq[h%uint32(len(_p_.runq))]
        atomic.Cas(&_p_.runqhead, h, h+1)
        n++
    }
    
    return n
}

5.3 网络轮询器

// runtime/netpoll.go:netpoll
func netpoll(waitms int64) *g {
    // 1. 使用 epoll/kqueue 等待网络事件
    events := poller.wait(waitms)
    
    // 2. 唤醒对应的 G
    var gp *g
    for _, event := range events {
        gd := event.data.(*g)
        gd.sched.link = gp
        gp = gd
    }
    
    return gp
}

// runtime/proc.go:findrunnable
func findrunnable() *g {
    // 从网络轮询器获取 G
    if gp := netpoll(0); gp != nil {
        return gp
    }
    
    // ...
}

六、调试与监控

6.1 GODEBUG

# 查看调度信息
GODEBUG=schedtrace=1000 ./app

# 输出
SCHED 1000ms: gomaxprocs=4 idleprocs=2 threads=10 idlethreads=4 runqueue=4 [1 2 3 4]

# 查看 GC 信息
GODEBUG=gctrace=1 ./app

# 输出
gc 1 @0.010s 0%: 0.010+0.50+0.010 ms clock, ...

6.2 pprof

# CPU 分析
go test -cpuprofile=cpu.prof ./...
go tool pprof cpu.prof

# Goroutine 分析
go test -blockprofile=block.prof ./...
go tool pprof block.prof

# Web 界面
go tool pprof -http=:8080 cpu.prof

6.3 运行时统计

package main

import (
    "fmt"
    "runtime"
)

func main() {
    var m runtime.MemStats
    runtime.ReadMemStats(&m)
    
    fmt.Printf("Goroutines: %d\n", runtime.NumGoroutine())
    fmt.Printf("Threads: %d\n", m.NumGC)
    fmt.Printf("GOMAXPROCS: %d\n", runtime.GOMAXPROCS(0))
    
    // 详细统计
    fmt.Printf("Alloc = %v KB\n", m.Alloc/1024)
    fmt.Printf("TotalAlloc = %v KB\n", m.TotalAlloc/1024)
    fmt.Printf("Sys = %v KB\n", m.Sys/1024)
    fmt.Printf("NumGC = %v\n", m.NumGC)
}

七、最佳实践

7.1 GOMAXPROCS 设置

// 自动设置(默认)
runtime.GOMAXPROCS(0)

// 手动设置
runtime.GOMAXPROCS(4)

// 根据场景调整
// CPU 密集型:等于 CPU 核心数
// IO 密集型:可大于 CPU 核心数

7.2 避免 Goroutine 泄漏

// ✅ 推荐:使用 context
func worker(ctx context.Context, tasks <-chan Task) {
    for {
        select {
        case <-ctx.Done():
            return
        case task := <-tasks:
            process(task)
        }
    }
}

// ❌ 避免:无退出条件
func worker(tasks <-chan Task) {
    for task := range tasks {
        process(task)
    }
}

7.3 控制并发数

// 使用信号量
sem := make(chan struct{}, 10)

for i := 0; i < 100; i++ {
    sem <- struct{}{}
    go func() {
        defer func() { <-sem }()
        doWork()
    }()
}

// 使用 errgroup
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(10)

for i := 0; i < 100; i++ {
    g.Go(func() error {
        return doWork()
    })
}

7.4 性能分析

// 启动时开启 pprof
import _ "net/http/pprof"

func main() {
    go func() {
        http.ListenAndServe("localhost:6060", nil)
    }()
    
    // 业务逻辑
}

// 访问 http://localhost:6060/debug/pprof/

八、总结

GMP 模型核心要点:

组件作用特点
G协程轻量级、2KB 栈、动态增长
M线程系统线程、1:1 映射、最多 10000
P处理器管理队列、工作窃取、GOMAXPROCS
调度分配 G 到 M抢占式、公平、手递手

性能优化要点

  1. 本地队列优先(无锁)
  2. 工作窃取(负载均衡)
  3. 网络轮询器(IO 不阻塞)
  4. 批量操作(减少锁竞争)

GMP 模型是 Go 高性能并发的核心,理解其原理有助于编写高效的并发代码和排查性能问题。

延伸阅读


分享这篇文章到:

上一篇文章
B+ 树索引原理
下一篇文章
Go 错误处理最佳实践