GMP 调度模型详解
GMP 是 Go 并发高性能的核心,理解其原理有助于编写更高效的并发代码。
一、核心概念
1.1 GMP 组成
G (Goroutine):
- 协程,包含栈、指令指针
- 初始栈 2KB,可动态增长
- 用户态,轻量级
- 存储结构:
struct g
M (Machine):
- 系统线程,执行代码的载体
- 与 OS 线程 1:1 映射
- 默认最多 10000 个
- 存储结构:
struct m
P (Processor):
- 逻辑处理器,管理 G 队列
- 数量由
GOMAXPROCS决定 - 默认等于 CPU 核心数
- 存储结构:
struct p
1.2 架构关系
┌─────────────────────────────────────────────┐
│ P1 P2 P3 P4 │
│ │ │ │ │ │
│ M1 M2 M3 M4 │
│ │ │ │ │ │
│ G1,G2,G3 G4,G5 G6 G7,G8 │
│ │
│ Global Queue: [G9, G10] │
│ Idle M: [M5, M6] │
└─────────────────────────────────────────────┘
1.3 源码定义
// runtime/runtime2.go
// G - Goroutine
struct g {
stack stack // 栈
m *m // 当前关联的 M
goid int64 // Goroutine ID
sched gobuf // 调度信息
atomicstatus uint32 // 状态
// ... 更多字段
}
// M - Machine
struct m {
g0 *g // 调度器 G
gsignal *g // 信号处理 G
curg *g // 当前用户 G
p puintptr // 关联的 P
nextp puintptr
id int32
// ... 更多字段
}
// P - Processor
struct p {
m muintptr // 关联的 M
runqhead uint32 // 队列头
runqtail uint32 // 队列尾
runq [256]guintptr // 本地队列
runnext guintptr // 下一个 G
status uint32
// ... 更多字段
}
二、调度流程
2.1 Goroutine 创建
go func() {
fmt.Println("Hello")
}()
内部流程:
// 1. 创建 G 对象
// runtime/proc.go:goexit
func newproc(siz int32, fn *funcval) {
gp := newproc1(fn)
// 2. 放入 P 的本地队列
systemstack(func() {
gp.m = getg().m
gp.gopc = getg().pc
gp.startpc = fn.fn
// 3. 调度器选择 M 执行
schedule()
})
}
// runtime/proc.go:newproc1
func newproc1(fn *funcval) *g {
// 分配 G 对象
newg := malg(_StackMin)
// 设置入口函数
newg.sched.pc = getcallerpc()
newg.sched.sp = newg.stack.hi
newg.sched.g = guintptr(unsafe.Pointer(newg))
// 设置状态为 runnable
newg.atomicstatus = _Grunnable
return newg
}
2.2 调度器启动
// runtime/proc.go:main
func main() {
// 1. 创建 P
for i := 0; i < gomaxprocs; i++ {
p := allocp()
// 2. 创建 M
m := newm()
// 3. 绑定 P 和 M
m.p = p
p.m = m
// 4. 启动线程
startm(m)
}
// 5. 启动主 G
g := getg()
g.m.g0 = g
schedule()
}
// runtime/proc.go:startm
func startm(mp *m) {
lock(&sched.lock)
// 获取空闲 P
pp := idlepget()
if pp == nil {
// 无空闲 P,将 M 放入空闲队列
mp.p = 0
mp.nextp = 0
idlemput(mp)
unlock(&sched.lock)
return
}
unlock(&sched.lock)
// 设置 P
mp.p = pp
pp.m = mp
// 启动线程
newm(nil, pp)
}
2.3 调度循环
// runtime/asm_amd64.s:goexit
TEXT ·goexit(SB),NOSPLIT,$0-0
CALL ·goexit1(SB) // 调用 goexit1
RET 0
// runtime/proc.go:goexit1
func goexit1() {
// 1. 清理资源
mcall(goexit0)
}
func goexit0(gp *g) {
// 2. 标记 G 为 dead
gp.status = _Gdead
// 3. 释放 G
gfput(getg().m.p, gp)
// 4. 调度下一个 G
schedule()
}
// runtime/proc.go:schedule
func schedule() {
_g_ := getg()
// 1. 从 P 获取 G
gp := _g_.m.p.runqget()
if gp == nil {
// 2. 本地队列为空,尝试窃取
gp = runqsteal()
}
if gp == nil {
// 3. 从全局队列获取
gp = globrunqget()
}
// 4. 执行 G
execute(gp)
}
// runtime/asm_amd64.s:execute
TEXT ·execute(SB),NOSPLIT,$0
MOVQ ·gobuf+0(FP), BX // 获取 G
MOVQ gobuf_sp(BX), SP // 恢复栈指针
MOVQ gobuf_pc(BX), DX // 恢复指令指针
JMP DX // 跳转到 G 的指令
三、工作窃取
3.1 窃取机制
当 P1 的队列为空时:
- 从其他 P 窃取(随机选择)
- 从全局队列获取
- 从网络轮询器获取
- 休眠等待
3.2 窃取实现
// runtime/proc.go:runqsteal
func runqsteal(_p_, p2 *p) bool {
// 1. 获取 P2 队列的一半
n := p2.runqhead - p2.runqtail
if n == 0 {
return false
}
n = n / 2
// 2. 批量窃取
for i := uint32(0); i < n; i++ {
// 从 P2 队列尾部取出
g := p2.runq[p2.runqtail%uint32(len(p2.runq))]
p2.runqtail++
// 放入本地队列头部
_p_.runq[_p_.runqhead%uint32(len(_p_.runq))] = g
_p_.runqhead++
}
return n > 0
}
3.3 窃取优化
// runtime/proc.go:findrunnable
func findrunnable() *g {
_g_ := getg()
_p_ := _g_.m.p
// 1. 限制窃取频率
if _p_.schedtick%4 != 0 {
// 每 4 次调度才尝试窃取
return nil
}
// 2. 随机选择 P
for i := 0; i < gomaxprocs; i++ {
p2 := allp[rand.Intn(gomaxprocs)]
if runqsteal(_p_, p2) {
return _p_.runqget()
}
}
// 3. 从全局队列获取
return globrunqget()
}
四、调度策略
4.1 抢占式调度
基于时间的抢占:
// runtime/proc.go:preemptone
func preemptone(_p_ *p) bool {
mp := _p_.m
// 获取当前运行的 G
gp := mp.curg
// 检查是否超时
if gp.preempt {
// 设置抢占标志
gp.stackguard0 = stackPreempt
return true
}
return false
}
基于函数调用的抢占:
// runtime/asm_amd64.s:morestack
TEXT ·morestack(SB),NOSPLIT,$0
// 在函数调用时检查抢占标志
MOVQ g_stackguard0(g), BX
CMPQ BX, SP
JLS morestack_call // 如果栈不足或需要抢占
// 正常函数调用
CALL ·newstack(SB)
RET
4.2 公平调度
// runtime/proc.go:schedule
func schedule() {
// 检查全局队列等待时间
if sched.gqwait > 0 {
// 优先调度全局队列的 G
gp = globrunqget()
if gp != nil {
execute(gp)
return
}
}
// 正常调度流程
// ...
}
4.3 手递手调度
// runtime/proc.go:handoffp
func handoffp(_p_ *p) {
// 1. 获取空闲 M
mp := getidle()
if mp != nil {
// 2. 直接传递 P
mp.p = _p_
_p_.m = mp
// 3. 启动 M
startm(mp)
}
}
五、性能优化
5.1 本地队列优先
// runtime/proc.go:runqget
func runqget(_p_ *p) *g {
// 1. 优先从本地队列获取(无锁)
for {
h := atomic.Load(&_p_.runqhead)
t := atomic.Load(&_p_.runqtail)
if t == h {
// 队列为空
break
}
g := _p_.runq[h%uint32(len(_p_.runq))]
if atomic.Cas(&_p_.runqhead, h, h+1) {
return g
}
}
// 2. 本地队列为空,尝试窃取
if g := runqsteal(); g != nil {
return g
}
// 3. 从全局队列获取(需要锁)
return globrunqget()
}
5.2 批量操作
// runtime/proc.go:runqgetBatch
func runqgetBatch(_p_ *p, batch []*g) int {
n := 0
for i := 0; i < len(batch) && n < 32; i++ {
h := atomic.Load(&_p_.runqhead)
t := atomic.Load(&_p_.runqtail)
if t == h {
break
}
batch[i] = _p_.runq[h%uint32(len(_p_.runq))]
atomic.Cas(&_p_.runqhead, h, h+1)
n++
}
return n
}
5.3 网络轮询器
// runtime/netpoll.go:netpoll
func netpoll(waitms int64) *g {
// 1. 使用 epoll/kqueue 等待网络事件
events := poller.wait(waitms)
// 2. 唤醒对应的 G
var gp *g
for _, event := range events {
gd := event.data.(*g)
gd.sched.link = gp
gp = gd
}
return gp
}
// runtime/proc.go:findrunnable
func findrunnable() *g {
// 从网络轮询器获取 G
if gp := netpoll(0); gp != nil {
return gp
}
// ...
}
六、调试与监控
6.1 GODEBUG
# 查看调度信息
GODEBUG=schedtrace=1000 ./app
# 输出
SCHED 1000ms: gomaxprocs=4 idleprocs=2 threads=10 idlethreads=4 runqueue=4 [1 2 3 4]
# 查看 GC 信息
GODEBUG=gctrace=1 ./app
# 输出
gc 1 @0.010s 0%: 0.010+0.50+0.010 ms clock, ...
6.2 pprof
# CPU 分析
go test -cpuprofile=cpu.prof ./...
go tool pprof cpu.prof
# Goroutine 分析
go test -blockprofile=block.prof ./...
go tool pprof block.prof
# Web 界面
go tool pprof -http=:8080 cpu.prof
6.3 运行时统计
package main
import (
"fmt"
"runtime"
)
func main() {
var m runtime.MemStats
runtime.ReadMemStats(&m)
fmt.Printf("Goroutines: %d\n", runtime.NumGoroutine())
fmt.Printf("Threads: %d\n", m.NumGC)
fmt.Printf("GOMAXPROCS: %d\n", runtime.GOMAXPROCS(0))
// 详细统计
fmt.Printf("Alloc = %v KB\n", m.Alloc/1024)
fmt.Printf("TotalAlloc = %v KB\n", m.TotalAlloc/1024)
fmt.Printf("Sys = %v KB\n", m.Sys/1024)
fmt.Printf("NumGC = %v\n", m.NumGC)
}
七、最佳实践
7.1 GOMAXPROCS 设置
// 自动设置(默认)
runtime.GOMAXPROCS(0)
// 手动设置
runtime.GOMAXPROCS(4)
// 根据场景调整
// CPU 密集型:等于 CPU 核心数
// IO 密集型:可大于 CPU 核心数
7.2 避免 Goroutine 泄漏
// ✅ 推荐:使用 context
func worker(ctx context.Context, tasks <-chan Task) {
for {
select {
case <-ctx.Done():
return
case task := <-tasks:
process(task)
}
}
}
// ❌ 避免:无退出条件
func worker(tasks <-chan Task) {
for task := range tasks {
process(task)
}
}
7.3 控制并发数
// 使用信号量
sem := make(chan struct{}, 10)
for i := 0; i < 100; i++ {
sem <- struct{}{}
go func() {
defer func() { <-sem }()
doWork()
}()
}
// 使用 errgroup
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(10)
for i := 0; i < 100; i++ {
g.Go(func() error {
return doWork()
})
}
7.4 性能分析
// 启动时开启 pprof
import _ "net/http/pprof"
func main() {
go func() {
http.ListenAndServe("localhost:6060", nil)
}()
// 业务逻辑
}
// 访问 http://localhost:6060/debug/pprof/
八、总结
GMP 模型核心要点:
| 组件 | 作用 | 特点 |
|---|---|---|
| G | 协程 | 轻量级、2KB 栈、动态增长 |
| M | 线程 | 系统线程、1:1 映射、最多 10000 |
| P | 处理器 | 管理队列、工作窃取、GOMAXPROCS |
| 调度 | 分配 G 到 M | 抢占式、公平、手递手 |
性能优化要点:
- 本地队列优先(无锁)
- 工作窃取(负载均衡)
- 网络轮询器(IO 不阻塞)
- 批量操作(减少锁竞争)
GMP 模型是 Go 高性能并发的核心,理解其原理有助于编写高效的并发代码和排查性能问题。
延伸阅读: