Skip to content
清晨的一缕阳光
返回

Go Channel 底层原理

Channel 底层原理

Channel 是 Go 并发编程的核心,理解其底层实现有助于正确使用和性能优化。

一、核心结构

1.1 hchan 定义

// runtime/chan.go
type hchan struct {
    qcount   uint           // 队列中数据数量
    dataqsiz uint           // 缓冲区大小
    buf      unsafe.Pointer // 缓冲区指针
    elemsize uint16         // 元素大小
    closed   uint32         // 是否关闭
    sendx    uint           // 发送索引
    recvx    uint           // 接收索引
    recvq    waitq          // 接收等待队列
    sendq    waitq          // 发送等待队列
    lock     mutex          // 互斥锁
}

// 等待队列
type waitq struct {
    first *sudog
    last  *sudog
}

// 等待中的 G
type sudog struct {
    g          *g
    next       *sudog
    prev       *sudog
    elem       unsafe.Pointer  // 数据指针
    waitlink   *sudog
    releasetime int64
    ticket     uint32
}

1.2 内存布局

hchan 内存布局(以 make(chan int, 5) 为例)

┌──────────────────────────────────────────┐
│ hchan 头 (固定 96 字节)                    │
│  qcount: 8 bytes                         │
│  dataqsiz: 8 bytes                       │
│  buf: 8 bytes                            │
│  elemsize: 2 bytes                       │
│  closed: 4 bytes                         │
│  sendx: 8 bytes                          │
│  recvx: 8 bytes                          │
│  recvq: 16 bytes                         │
│  sendq: 16 bytes                         │
│  lock: 16 bytes                          │
├──────────────────────────────────────────┤
│ 缓冲区 (5 * 4 = 20 字节)                   │
│  [0] [1] [2] [3] [4]                     │
├──────────────────────────────────────────┤
│ 等待队列 (动态)                            │
│  recvq: [G1, G2, ...]                    │
│  sendq: [G3, G4, ...]                    │
└──────────────────────────────────────────┘

1.3 创建 Channel

// make(chan int, 5)

// runtime/chan.go:makechan
func makechan(t *chantype, size int) *hchan {
    elem := t.elem
    
    // 1. 计算内存大小
    var c *hchan
    
    if size > 0 {
        // 分配 hchan + 缓冲区
        c = (*hchan)(mallocgc(unsafe.Sizeof(hchan{})+uintptr(size)*elem.size, nil, false))
        c.buf = add(unsafe.Pointer(c), unsafe.Sizeof(*c))
    } else {
        // 无缓冲 channel
        c = new(hchan)
    }
    
    // 2. 初始化字段
    c.dataqsiz = uint(size)
    c.elemsize = uint16(elem.size)
    c.closed = 0
    
    return c
}

二、发送流程

2.1 发送实现

// ch <- value

// runtime/chan.go:chansend
func chansend(c *hchan, ep unsafe.Pointer, block bool) bool {
    // 1. 加锁
    lock(&c.lock)
    
    // 2. 有等待的接收者,直接传递
    if sg := c.recvq.dequeue(); sg != nil {
        sendg(sg, ep)
        unlock(&c.lock)
        return true
    }
    
    // 3. 缓冲区未满,放入队列
    if c.qcount < c.dataqsiz {
        sendbuf(c, ep)
        unlock(&c.lock)
        return true
    }
    
    // 4. 非阻塞模式,返回 false
    if !block {
        unlock(&c.lock)
        return false
    }
    
    // 5. 缓冲区满,阻塞等待
    gp := getg()
    sg := acquireSudog()
    sg.g = gp
    sg.elem = ep
    c.sendq.enqueue(sg)
    
    // 6. 挂起 G
    gopark()
    
    // 7. 被唤醒后继续
    releaseSudog(sg)
    return true
}

// 发送到缓冲区
func sendbuf(c *hchan, ep unsafe.Pointer) {
    // 复制数据到缓冲区
    memmove(
        add(c.buf, c.sendx*c.elemsize),
        ep,
        c.elemsize,
    )
    c.sendx = (c.sendx + 1) % c.dataqsiz
    c.qcount++
}

2.2 直接传递

发送者                接收者
  │                     │
  │  ch <- value        │
  │────────────────────>│  <- value
  │                     │
  
直接传递流程:
1. 接收者在 recvq 等待
2. 发送者从 recvq 取出接收者
3. 直接复制数据到接收者栈
4. 唤醒接收者 G
5. 发送者继续执行

2.3 缓冲传递

Channel 缓冲区 (dataqsiz=5, qcount=3)
┌─────┬─────┬─────┬─────┬─────┐
│  10 │  20 │  30 │     │     │
└─────┴─────┴─────┴─────┴─────┘
  ↑               ↑
recvx=0         sendx=3

发送 40:
1. 检查 recvq 无等待者
2. 检查 qcount(3) < dataqsiz(5)
3. 复制到 buf[3]
4. sendx = (3+1) % 5 = 4
5. qcount = 4

三、接收流程

3.1 接收实现

// value := <-ch

// runtime/chan.go:chanrecv
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) {
    // 1. 加锁
    lock(&c.lock)
    
    // 2. 有等待的发送者,直接接收
    if sg := c.sendq.dequeue(); sg != nil {
        recvg(sg, ep)
        unlock(&c.lock)
        return
    }
    
    // 3. 缓冲区有数据,取出
    if c.qcount > 0 {
        recvbuf(c, ep)
        unlock(&c.lock)
        return
    }
    
    // 4. channel 已关闭
    if c.closed != 0 {
        // 返回零值
        if ep != nil {
            typedslicecopy(...)
        }
        unlock(&c.lock)
        return
    }
    
    // 5. 非阻塞模式,返回 false
    if !block {
        unlock(&c.lock)
        return false
    }
    
    // 6. 无数据,阻塞等待
    gp := getg()
    sg := acquireSudog()
    sg.g = gp
    sg.elem = ep
    c.recvq.enqueue(sg)
    
    // 7. 挂起 G
    gopark()
    
    // 8. 被唤醒后继续
    releaseSudog(sg)
}

// 从缓冲区接收
func recvbuf(c *hchan, ep unsafe.Pointer) {
    // 从缓冲区复制数据
    memmove(
        ep,
        add(c.buf, c.recvx*c.elemsize),
        c.elemsize,
    )
    c.recvx = (c.recvx + 1) % c.dataqsiz
    c.qcount--
}

3.2 非阻塞接收

// select {
// case v := <-ch:
//     // 处理 v
// default:
//     // 无数据
// }

// runtime/chan.go:chanrecvnb
func chanrecvnb(c *hchan, ep unsafe.Pointer) bool {
    // 调用 chanrecv,block=false
    return chanrecv(c, ep, false)
}

3.3 多路选择

select {
case v1 := <-ch1:
    // 处理 ch1
case v2 := <-ch2:
    // 处理 ch2
default:
    // 都无数据
}

// runtime/select.go:selectgo
func selectgo(sel *select) {
    // 1. 随机打乱 case 顺序(避免饥饿)
    shuffle(sel.cases)
    
    // 2. 尝试每个 case
    for i := 0; i < len(sel.cases); i++ {
        case_ := sel.cases[i]
        
        if case_.chan != nil {
            // 尝试发送/接收
            if trycase(case_) {
                return
            }
        }
    }
    
    // 3. 都不可用,阻塞等待
    for i := 0; i < len(sel.cases); i++ {
        case_ := sel.cases[i]
        enqueue_wait(case_)
    }
    
    gopark()
}

四、关闭 Channel

4.1 关闭实现

// close(ch)

// runtime/chan.go:chanclose
func chanclose(c *hchan) {
    // 1. 加锁
    lock(&c.lock)
    
    // 2. 检查是否已关闭
    if c.closed != 0 {
        unlock(&c.lock)
        panic("close of closed channel")
    }
    
    // 3. 标记为关闭
    c.closed = 1
    
    // 4. 唤醒所有等待的接收者
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        // 返回零值
        sg.elem = nil
        goready(sg.g)
    }
    
    // 5. 唤醒所有等待的发送者(会 panic)
    for {
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }
        // 发送者会 panic
        goready(sg.g)
    }
    
    unlock(&c.lock)
}

4.2 关闭后行为

ch := make(chan int, 3)
ch <- 1
ch <- 2
close(ch)

// 读取已关闭的 channel
<-ch  // 1
<-ch  // 2
<-ch  // 0 (零值)
<-ch  // 0 (零值)

// 写入已关闭的 channel
ch <- 3  // panic: send on closed channel

// 底层实现
// runtime/chan.go:chansend
func chansend(c *hchan, ...) {
    lock(&c.lock)
    
    if c.closed != 0 {
        unlock(&c.lock)
        panic("send on closed channel")
    }
    
    // ...
}

4.3 检测关闭

// 方式 1:comma-ok idiom
v, ok := <-ch
if !ok {
    // channel 已关闭
}

// 方式 2:range
for v := range ch {
    // channel 关闭后自动退出循环
}

// 底层实现
// runtime/chan.go:chanrecv
func chanrecv(c *hchan, ...) (received bool) {
    lock(&c.lock)
    
    if c.closed != 0 && c.qcount == 0 {
        unlock(&c.lock)
        return false  // ok = false
    }
    
    // ...
    return true  // ok = true
}

五、性能优化

5.1 无锁优化

// 无缓冲 channel 的直接传递
// 不需要锁,直接内存拷贝

// runtime/chan.go:sendg
func sendg(sg *sudog, ep unsafe.Pointer) {
    // 直接复制数据到接收者栈
    if sg.elem != nil {
        memmove(sg.elem, ep, elemsize)
    }
    
    // 唤醒接收者
    goready(sg.g)
}

// runtime/chan.go:recvg
func recvg(sg *sudog, ep unsafe.Pointer) {
    // 直接复制数据到接收者
    if ep != nil {
        memmove(ep, sg.elem, elemsize)
    }
    
    // 唤醒发送者
    goready(sg.g)
}

5.2 内存对齐

// hchan 结构内存对齐
type hchan struct {
    qcount   uint64  // 8 字节对齐
    dataqsiz uint64
    buf      unsafe.Pointer
    elemsize uint16
    closed   uint32
    sendx    uint64
    recvx    uint64
    recvq    waitq   // 16 字节
    sendq    waitq
    lock     mutex   // 16 字节
}

// 总大小:96 字节(64 位系统)

5.3 等待队列优化

// 使用链表而非数组
type waitq struct {
    first *sudog
    last  *sudog
}

// O(1) 入队
func (q *waitq) enqueue(sg *sudog) {
    sg.next = nil
    sg.prev = q.last
    
    if q.last != nil {
        q.last.next = sg
    } else {
        q.first = sg
    }
    
    q.last = sg
}

// O(1) 出队
func (q *waitq) dequeue() *sudog {
    sg := q.first
    
    if sg != nil {
        q.first = sg.next
        if q.first != nil {
            q.first.prev = nil
        } else {
            q.last = nil
        }
    }
    
    return sg
}

5.4 批处理优化

// 批量发送
func batchsend(c *hchan, eps []unsafe.Pointer) {
    lock(&c.lock)
    
    for _, ep := range eps {
        // 有等待的接收者,直接传递
        if sg := c.recvq.dequeue(); sg != nil {
            sendg(sg, ep)
            continue
        }
        
        // 缓冲区未满,放入队列
        if c.qcount < c.dataqsiz {
            sendbuf(c, ep)
        } else {
            // 缓冲区满,阻塞等待
            // ...
        }
    }
    
    unlock(&c.lock)
}

六、使用陷阱

6.1 Goroutine 泄漏

// ❌ 泄漏:接收者阻塞
ch := make(chan int)
go func() {
    value := <-ch  // 永远阻塞
    fmt.Println(value)
}()

// ✅ 修复:使用 select
go func() {
    select {
    case value := <-ch:
        fmt.Println(value)
    case <-time.After(time.Second):
        fmt.Println("timeout")
    }
}()

// ✅ 修复:带缓冲
ch := make(chan int, 1)
go func() {
    value := <-ch
    fmt.Println(value)
}()
ch <- 1  // 不会阻塞

6.2 死锁

// ❌ 死锁:互相等待
ch1 := make(chan int)
ch2 := make(chan int)

go func() {
    ch1 <- 1  // 阻塞,等待 ch2
    <-ch2
}()

go func() {
    ch2 <- 2  // 阻塞,等待 ch1
    <-ch1
}()

// ✅ 修复:使用缓冲
ch1 := make(chan int, 1)
ch2 := make(chan int, 1)

// ✅ 修复:使用 select
go func() {
    select {
    case ch1 <- 1:
        <-ch2
    case <-ch2:
        ch1 <- 1
    }
}()

6.3 竞态条件

// ❌ 竞态:不必要的锁
var mu sync.Mutex
ch := make(chan int)

go func() {
    mu.Lock()
    ch <- 1  // channel 本身线程安全
    mu.Unlock()
}()

// ✅ 修复:直接使用
ch := make(chan int)
go func() {
    ch <- 1  // 无需额外锁
}()

6.4 nil Channel

var ch chan int  // nil channel

// 发送到 nil channel:永远阻塞
ch <- 1  // deadlock

// 从 nil channel 接收:永远阻塞
<-ch  // deadlock

// 关闭 nil channel:panic
close(ch)  // panic: close of nil channel

// 使用场景:动态控制 select
ch1 := make(chan int)
ch2 := make(chan int)
var ch3 chan int  // nil

for {
    select {
    case v := <-ch1:
        // 处理 ch1
        if done {
            ch3 = nil  // 禁用 ch3 case
        }
    case v := <-ch2:
        // 处理 ch2
    case v := <-ch3:  // ch3 为 nil 时,此 case 被跳过
        // 处理 ch3
    }
}

七、总结

Channel 核心要点:

特性说明性能
无缓冲同步传递快,无锁(直接传递)
有缓冲异步传递中,需锁
直接传递发送者→接收者最快,零拷贝
缓冲传递发送者→缓冲区→接收者中等
关闭返回零值接收者立即唤醒

性能优化要点

  1. 优先使用无缓冲 channel(直接传递)
  2. 避免不必要的锁(channel 本身线程安全)
  3. 使用 select 控制流程
  4. 注意 nil channel 的行为

常见陷阱

  1. Goroutine 泄漏(忘记退出)
  2. 死锁(互相等待)
  3. 竞态条件(过度同步)
  4. nil channel 行为

Channel 是 Go 并发编程的核心,理解其底层实现有助于避免陷阱和优化性能。

延伸阅读


分享这篇文章到:

上一篇文章
索引设计与最佳实践
下一篇文章
B+ 树索引原理