Go 并发编程核心机制
一、Goroutine 原理
1.1 什么是 Goroutine
func main() {
go sayHello() // 启动一个 Goroutine
fmt.Println("main")
}
func sayHello() {
fmt.Println("hello")
}
| 特性 | Goroutine | 线程 |
|---|---|---|
| 内存占用 | 2KB | 1MB |
| 创建开销 | 微秒级 | 毫秒级 |
| 调度方式 | 用户态 (M:N) | 内核态 (1:1) |
| 通信方式 | Channel | 共享内存 + 锁 |
1.2 GMP 调度模型
G (Goroutine) → M (Machine) → P (Processor)
↓ ↓ ↓
协程 系统线程 逻辑处理器
graph LR
P1 --> M1
P1 --> M2
P2 --> M3
G1 --> P1
G2 --> P1
G3 --> P2
- G (Goroutine):协程,包含栈、指令指针等
- M (Machine):系统线程,执行代码的载体
- P (Processor):逻辑处理器,管理 G 队列
1.3 调度过程
// 1. 创建 Goroutine
go func() {
fmt.Println("hello")
}()
// 2. G 被放入 P 的本地队列
// 3. M 从 P 获取 G 并执行
// 4. G 遇到系统调用 → M 阻塞 → G 迁移到其他 M
// 5. G 执行完毕 → 回到 P 的队列等待回收
二、Channel 机制
2.1 Channel 基础
// 无缓冲 Channel (同步)
ch := make(chan int)
ch <- 1 // 发送,阻塞直到有人接收
v := <-ch // 接收,阻塞直到有数据
// 有缓冲 Channel (异步)
ch := make(chan int, 10)
ch <- 1 // 不阻塞,直到缓冲区满
v := <-ch // 不阻塞,直到缓冲区空
// 关闭 Channel
close(ch)
v, ok := <-ch // ok=false 表示 channel 已关闭
2.2 Channel 底层结构
type hchan struct {
qcount uint // 队列中数据数量
dataqsiz uint // 缓冲区大小
buf unsafe.Pointer // 缓冲区指针
elemsize uint16 // 元素大小
closed uint32 // 是否关闭
sendx uint // 发送索引
recvx uint // 接收索引
recvq waitq // 接收等待队列
sendq waitq // 发送等待队列
lock mutex // 互斥锁
}
2.3 使用场景
// 1. 管道模式 (Pipeline)
func pipeline(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * 2
}
close(out)
}()
return out
}
// 2. 扇入扇出 (Fan-in/Fan-out)
func fanIn(ins ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
for _, in := range ins {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for n := range c {
out <- n
}
}(in)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
// 3. 控制并发数
func worker(jobs <-chan int, results chan<- int) {
for j := range jobs {
results <- j * 2
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
// 启动 3 个 worker
for w := 1; w <= 3; w++ {
go worker(jobs, results)
}
// 发送任务
for j := 1; j <= 5; j++ {
jobs <- j
}
close(jobs)
// 收集结果
for r := 1; r <= 5; r++ {
fmt.Println(<-results)
}
}
三、select 多路复用
3.1 基础用法
select {
case msg1 := <-ch1:
fmt.Println("received", msg1)
case msg2 := <-ch2:
fmt.Println("received", msg2)
case ch3 <- data:
fmt.Println("sent", data)
default:
fmt.Println("no channel ready")
case <-time.After(time.Second):
fmt.Println("timeout")
}
3.2 典型场景
// 1. 超时控制
func requestWithTimeout(url string, timeout time.Duration) ([]byte, error) {
ch := make(chan []byte, 1)
errCh := make(chan error, 1)
go func() {
resp, err := http.Get(url)
if err != nil {
errCh <- err
return
}
ch <- resp.Body
}()
select {
case data := <-ch:
return data, nil
case err := <-errCh:
return nil, err
case <-time.After(timeout):
return nil, errors.New("request timeout")
}
}
// 2. 优雅退出
func worker(done chan bool) {
for {
select {
case <-time.After(100 * time.Millisecond):
fmt.Println("working...")
case <-done:
fmt.Println("exit")
return
}
}
}
四、sync 包
4.1 Mutex 互斥锁
type Counter struct {
mu sync.Mutex
count int
}
func (c *Counter) Inc() {
c.mu.Lock()
defer c.mu.Unlock()
c.count++
}
func (c *Counter) Get() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.count
}
4.2 RWMutex 读写锁
type Cache struct {
mu sync.RWMutex
data map[string]string
}
func (c *Cache) Get(key string) string {
c.mu.RLock() // 读锁,可并发
defer c.mu.RUnlock()
return c.data[key]
}
func (c *Cache) Set(key, value string) {
c.mu.Lock() // 写锁,互斥
defer c.mu.Unlock()
c.data[key] = value
}
4.3 WaitGroup 等待组
func main() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
fmt.Println("worker", id)
}(i)
}
wg.Wait() // 等待所有 Goroutine 完成
}
4.4 Once 单次执行
var (
instance *Singleton
once sync.Once
)
func GetInstance() *Singleton {
once.Do(func() {
instance = &Singleton{}
})
return instance
}
4.5 Atomic 原子操作
var count int64
// 原子自增
atomic.AddInt64(&count, 1)
// 原子加载
v := atomic.LoadInt64(&count)
// 原子存储
atomic.StoreInt64(&count, 100)
// CAS 操作
atomic.CompareAndSwapInt64(&count, 0, 1)
五、并发陷阱
5.1 Goroutine 泄漏
// ❌ 泄漏:Goroutine 永远阻塞
func leak() {
ch := make(chan int)
go func() {
ch <- 1 // 阻塞,无人接收
}()
}
// ✅ 修复:使用缓冲或 select
func fixed() {
ch := make(chan int, 1)
go func() {
select {
case ch <- 1:
case <-time.After(time.Second):
}
}()
}
5.2 竞态条件
// ❌ 竞态:多个 Goroutine 同时写
var m = make(map[string]int)
go func() { m["a"] = 1 }()
go func() { m["b"] = 2 }()
// ✅ 修复:加锁或使用 sync.Map
var mu sync.Mutex
var m = make(map[string]int)
go func() {
mu.Lock()
m["a"] = 1
mu.Unlock()
}()
5.3 Channel 使用错误
// ❌ 向已关闭的 channel 发送
close(ch)
ch <- 1 // panic!
// ✅ 检查是否关闭
v, ok := <-ch
if !ok {
fmt.Println("channel closed")
}
// ❌ 重复关闭 channel
close(ch)
close(ch) // panic!
// ✅ 确保只关闭一次
var once sync.Once
once.Do(func() { close(ch) })
六、最佳实践
6.1 并发模式
| 模式 | 场景 | 实现 |
|---|---|---|
| Worker Pool | 任务分发 | Channel + Goroutine |
| Pipeline | 流式处理 | 多级 Channel |
| Fan-out | 并行处理 | 多个 Goroutine 读同一 Channel |
| Fan-in | 结果合并 | 多个 Channel 写入一个 |
6.2 设计原则
- 不要通过共享内存通信,通过通信共享内存
- Goroutine 生命周期要可控
- Channel 大小要合理(通常 1 或 0)
- 避免 Goroutine 泄漏
- 使用 context 控制超时和取消
6.3 Context 使用
func process(ctx context.Context, data string) error {
select {
case <-ctx.Done():
return ctx.Err()
case result := <-doWork(data):
return result
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := process(ctx, "data"); err != nil {
fmt.Println("error:", err)
}
}
总结
Go 并发编程核心要点:
- Goroutine:轻量级协程,GMP 调度模型
- Channel:类型安全的通信机制,遵循 CSP 模型
- select:多路复用,处理超时和取消
- sync 包:Mutex、RWMutex、WaitGroup 等同步原语
- 避免陷阱:Goroutine 泄漏、竞态条件、Channel 误用
Go 并发哲学:CSP 模型(Communicating Sequential Processes)