Channel 底层原理
Channel 是 Go 并发编程的核心,理解其底层实现有助于正确使用和性能优化。
一、核心结构
1.1 hchan 定义
// runtime/chan.go
type hchan struct {
qcount uint // 队列中数据数量
dataqsiz uint // 缓冲区大小
buf unsafe.Pointer // 缓冲区指针
elemsize uint16 // 元素大小
closed uint32 // 是否关闭
sendx uint // 发送索引
recvx uint // 接收索引
recvq waitq // 接收等待队列
sendq waitq // 发送等待队列
lock mutex // 互斥锁
}
// 等待队列
type waitq struct {
first *sudog
last *sudog
}
// 等待中的 G
type sudog struct {
g *g
next *sudog
prev *sudog
elem unsafe.Pointer // 数据指针
waitlink *sudog
releasetime int64
ticket uint32
}
1.2 内存布局
hchan 内存布局(以 make(chan int, 5) 为例)
┌──────────────────────────────────────────┐
│ hchan 头 (固定 96 字节) │
│ qcount: 8 bytes │
│ dataqsiz: 8 bytes │
│ buf: 8 bytes │
│ elemsize: 2 bytes │
│ closed: 4 bytes │
│ sendx: 8 bytes │
│ recvx: 8 bytes │
│ recvq: 16 bytes │
│ sendq: 16 bytes │
│ lock: 16 bytes │
├──────────────────────────────────────────┤
│ 缓冲区 (5 * 4 = 20 字节) │
│ [0] [1] [2] [3] [4] │
├──────────────────────────────────────────┤
│ 等待队列 (动态) │
│ recvq: [G1, G2, ...] │
│ sendq: [G3, G4, ...] │
└──────────────────────────────────────────┘
1.3 创建 Channel
// make(chan int, 5)
// runtime/chan.go:makechan
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// 1. 计算内存大小
var c *hchan
if size > 0 {
// 分配 hchan + 缓冲区
c = (*hchan)(mallocgc(unsafe.Sizeof(hchan{})+uintptr(size)*elem.size, nil, false))
c.buf = add(unsafe.Pointer(c), unsafe.Sizeof(*c))
} else {
// 无缓冲 channel
c = new(hchan)
}
// 2. 初始化字段
c.dataqsiz = uint(size)
c.elemsize = uint16(elem.size)
c.closed = 0
return c
}
二、发送流程
2.1 发送实现
// ch <- value
// runtime/chan.go:chansend
func chansend(c *hchan, ep unsafe.Pointer, block bool) bool {
// 1. 加锁
lock(&c.lock)
// 2. 有等待的接收者,直接传递
if sg := c.recvq.dequeue(); sg != nil {
sendg(sg, ep)
unlock(&c.lock)
return true
}
// 3. 缓冲区未满,放入队列
if c.qcount < c.dataqsiz {
sendbuf(c, ep)
unlock(&c.lock)
return true
}
// 4. 非阻塞模式,返回 false
if !block {
unlock(&c.lock)
return false
}
// 5. 缓冲区满,阻塞等待
gp := getg()
sg := acquireSudog()
sg.g = gp
sg.elem = ep
c.sendq.enqueue(sg)
// 6. 挂起 G
gopark()
// 7. 被唤醒后继续
releaseSudog(sg)
return true
}
// 发送到缓冲区
func sendbuf(c *hchan, ep unsafe.Pointer) {
// 复制数据到缓冲区
memmove(
add(c.buf, c.sendx*c.elemsize),
ep,
c.elemsize,
)
c.sendx = (c.sendx + 1) % c.dataqsiz
c.qcount++
}
2.2 直接传递
发送者 接收者
│ │
│ ch <- value │
│────────────────────>│ <- value
│ │
直接传递流程:
1. 接收者在 recvq 等待
2. 发送者从 recvq 取出接收者
3. 直接复制数据到接收者栈
4. 唤醒接收者 G
5. 发送者继续执行
2.3 缓冲传递
Channel 缓冲区 (dataqsiz=5, qcount=3)
┌─────┬─────┬─────┬─────┬─────┐
│ 10 │ 20 │ 30 │ │ │
└─────┴─────┴─────┴─────┴─────┘
↑ ↑
recvx=0 sendx=3
发送 40:
1. 检查 recvq 无等待者
2. 检查 qcount(3) < dataqsiz(5)
3. 复制到 buf[3]
4. sendx = (3+1) % 5 = 4
5. qcount = 4
三、接收流程
3.1 接收实现
// value := <-ch
// runtime/chan.go:chanrecv
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) {
// 1. 加锁
lock(&c.lock)
// 2. 有等待的发送者,直接接收
if sg := c.sendq.dequeue(); sg != nil {
recvg(sg, ep)
unlock(&c.lock)
return
}
// 3. 缓冲区有数据,取出
if c.qcount > 0 {
recvbuf(c, ep)
unlock(&c.lock)
return
}
// 4. channel 已关闭
if c.closed != 0 {
// 返回零值
if ep != nil {
typedslicecopy(...)
}
unlock(&c.lock)
return
}
// 5. 非阻塞模式,返回 false
if !block {
unlock(&c.lock)
return false
}
// 6. 无数据,阻塞等待
gp := getg()
sg := acquireSudog()
sg.g = gp
sg.elem = ep
c.recvq.enqueue(sg)
// 7. 挂起 G
gopark()
// 8. 被唤醒后继续
releaseSudog(sg)
}
// 从缓冲区接收
func recvbuf(c *hchan, ep unsafe.Pointer) {
// 从缓冲区复制数据
memmove(
ep,
add(c.buf, c.recvx*c.elemsize),
c.elemsize,
)
c.recvx = (c.recvx + 1) % c.dataqsiz
c.qcount--
}
3.2 非阻塞接收
// select {
// case v := <-ch:
// // 处理 v
// default:
// // 无数据
// }
// runtime/chan.go:chanrecvnb
func chanrecvnb(c *hchan, ep unsafe.Pointer) bool {
// 调用 chanrecv,block=false
return chanrecv(c, ep, false)
}
3.3 多路选择
select {
case v1 := <-ch1:
// 处理 ch1
case v2 := <-ch2:
// 处理 ch2
default:
// 都无数据
}
// runtime/select.go:selectgo
func selectgo(sel *select) {
// 1. 随机打乱 case 顺序(避免饥饿)
shuffle(sel.cases)
// 2. 尝试每个 case
for i := 0; i < len(sel.cases); i++ {
case_ := sel.cases[i]
if case_.chan != nil {
// 尝试发送/接收
if trycase(case_) {
return
}
}
}
// 3. 都不可用,阻塞等待
for i := 0; i < len(sel.cases); i++ {
case_ := sel.cases[i]
enqueue_wait(case_)
}
gopark()
}
四、关闭 Channel
4.1 关闭实现
// close(ch)
// runtime/chan.go:chanclose
func chanclose(c *hchan) {
// 1. 加锁
lock(&c.lock)
// 2. 检查是否已关闭
if c.closed != 0 {
unlock(&c.lock)
panic("close of closed channel")
}
// 3. 标记为关闭
c.closed = 1
// 4. 唤醒所有等待的接收者
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
// 返回零值
sg.elem = nil
goready(sg.g)
}
// 5. 唤醒所有等待的发送者(会 panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
// 发送者会 panic
goready(sg.g)
}
unlock(&c.lock)
}
4.2 关闭后行为
ch := make(chan int, 3)
ch <- 1
ch <- 2
close(ch)
// 读取已关闭的 channel
<-ch // 1
<-ch // 2
<-ch // 0 (零值)
<-ch // 0 (零值)
// 写入已关闭的 channel
ch <- 3 // panic: send on closed channel
// 底层实现
// runtime/chan.go:chansend
func chansend(c *hchan, ...) {
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic("send on closed channel")
}
// ...
}
4.3 检测关闭
// 方式 1:comma-ok idiom
v, ok := <-ch
if !ok {
// channel 已关闭
}
// 方式 2:range
for v := range ch {
// channel 关闭后自动退出循环
}
// 底层实现
// runtime/chan.go:chanrecv
func chanrecv(c *hchan, ...) (received bool) {
lock(&c.lock)
if c.closed != 0 && c.qcount == 0 {
unlock(&c.lock)
return false // ok = false
}
// ...
return true // ok = true
}
五、性能优化
5.1 无锁优化
// 无缓冲 channel 的直接传递
// 不需要锁,直接内存拷贝
// runtime/chan.go:sendg
func sendg(sg *sudog, ep unsafe.Pointer) {
// 直接复制数据到接收者栈
if sg.elem != nil {
memmove(sg.elem, ep, elemsize)
}
// 唤醒接收者
goready(sg.g)
}
// runtime/chan.go:recvg
func recvg(sg *sudog, ep unsafe.Pointer) {
// 直接复制数据到接收者
if ep != nil {
memmove(ep, sg.elem, elemsize)
}
// 唤醒发送者
goready(sg.g)
}
5.2 内存对齐
// hchan 结构内存对齐
type hchan struct {
qcount uint64 // 8 字节对齐
dataqsiz uint64
buf unsafe.Pointer
elemsize uint16
closed uint32
sendx uint64
recvx uint64
recvq waitq // 16 字节
sendq waitq
lock mutex // 16 字节
}
// 总大小:96 字节(64 位系统)
5.3 等待队列优化
// 使用链表而非数组
type waitq struct {
first *sudog
last *sudog
}
// O(1) 入队
func (q *waitq) enqueue(sg *sudog) {
sg.next = nil
sg.prev = q.last
if q.last != nil {
q.last.next = sg
} else {
q.first = sg
}
q.last = sg
}
// O(1) 出队
func (q *waitq) dequeue() *sudog {
sg := q.first
if sg != nil {
q.first = sg.next
if q.first != nil {
q.first.prev = nil
} else {
q.last = nil
}
}
return sg
}
5.4 批处理优化
// 批量发送
func batchsend(c *hchan, eps []unsafe.Pointer) {
lock(&c.lock)
for _, ep := range eps {
// 有等待的接收者,直接传递
if sg := c.recvq.dequeue(); sg != nil {
sendg(sg, ep)
continue
}
// 缓冲区未满,放入队列
if c.qcount < c.dataqsiz {
sendbuf(c, ep)
} else {
// 缓冲区满,阻塞等待
// ...
}
}
unlock(&c.lock)
}
六、使用陷阱
6.1 Goroutine 泄漏
// ❌ 泄漏:接收者阻塞
ch := make(chan int)
go func() {
value := <-ch // 永远阻塞
fmt.Println(value)
}()
// ✅ 修复:使用 select
go func() {
select {
case value := <-ch:
fmt.Println(value)
case <-time.After(time.Second):
fmt.Println("timeout")
}
}()
// ✅ 修复:带缓冲
ch := make(chan int, 1)
go func() {
value := <-ch
fmt.Println(value)
}()
ch <- 1 // 不会阻塞
6.2 死锁
// ❌ 死锁:互相等待
ch1 := make(chan int)
ch2 := make(chan int)
go func() {
ch1 <- 1 // 阻塞,等待 ch2
<-ch2
}()
go func() {
ch2 <- 2 // 阻塞,等待 ch1
<-ch1
}()
// ✅ 修复:使用缓冲
ch1 := make(chan int, 1)
ch2 := make(chan int, 1)
// ✅ 修复:使用 select
go func() {
select {
case ch1 <- 1:
<-ch2
case <-ch2:
ch1 <- 1
}
}()
6.3 竞态条件
// ❌ 竞态:不必要的锁
var mu sync.Mutex
ch := make(chan int)
go func() {
mu.Lock()
ch <- 1 // channel 本身线程安全
mu.Unlock()
}()
// ✅ 修复:直接使用
ch := make(chan int)
go func() {
ch <- 1 // 无需额外锁
}()
6.4 nil Channel
var ch chan int // nil channel
// 发送到 nil channel:永远阻塞
ch <- 1 // deadlock
// 从 nil channel 接收:永远阻塞
<-ch // deadlock
// 关闭 nil channel:panic
close(ch) // panic: close of nil channel
// 使用场景:动态控制 select
ch1 := make(chan int)
ch2 := make(chan int)
var ch3 chan int // nil
for {
select {
case v := <-ch1:
// 处理 ch1
if done {
ch3 = nil // 禁用 ch3 case
}
case v := <-ch2:
// 处理 ch2
case v := <-ch3: // ch3 为 nil 时,此 case 被跳过
// 处理 ch3
}
}
七、总结
Channel 核心要点:
| 特性 | 说明 | 性能 |
|---|---|---|
| 无缓冲 | 同步传递 | 快,无锁(直接传递) |
| 有缓冲 | 异步传递 | 中,需锁 |
| 直接传递 | 发送者→接收者 | 最快,零拷贝 |
| 缓冲传递 | 发送者→缓冲区→接收者 | 中等 |
| 关闭 | 返回零值 | 接收者立即唤醒 |
性能优化要点:
- 优先使用无缓冲 channel(直接传递)
- 避免不必要的锁(channel 本身线程安全)
- 使用 select 控制流程
- 注意 nil channel 的行为
常见陷阱:
- Goroutine 泄漏(忘记退出)
- 死锁(互相等待)
- 竞态条件(过度同步)
- nil channel 行为
Channel 是 Go 并发编程的核心,理解其底层实现有助于避免陷阱和优化性能。
延伸阅读: