Go 从 0 到精通 · 第 30 课:并发模式实战

学习定位:这是整套 Go 教程的第 30 课,也是阶段五(并发与工程阶段)的第六课。
前置要求:已经完成第 29 课,掌握了 context 与取消机制。需要熟练运用 goroutine、channel、select、WaitGroup 和 context。
本课目标:掌握 Go 中最常见的并发设计模式——生产者-消费者、Worker Pool、Pipeline、Fan-out/Fan-in,能根据实际需求选择合适的并发模式,写出结构清晰、可维护的并发程序。


1. 本课你要解决的核心问题

前面五课你学了 Go 并发编程的所有工具:goroutine、channel、select、锁、context。但工具只是工具,怎么把它们组合起来解决实际问题,才是关键。

就像你知道锤子、锯子、螺丝刀怎么用,但让你做一把椅子,你不知道从哪下手。并发模式就是"椅子的图纸"——告诉你在什么场景下,怎么组合这些工具。

这一课讲四个最常用的并发模式:

模式 解决什么问题 类比
生产者-消费者 一方产生数据,一方处理数据 工厂流水线
Worker Pool 大量任务需要限制并发数 银行柜台窗口
Pipeline 多步处理,步骤之间串联 汽车装配线
Fan-out / Fan-in 一个任务拆给多人做,最后汇总 考试阅卷

学完这一课,你就能写出结构清晰、可维护的并发程序了。


2. 模式一:生产者-消费者

2.1 解决什么问题

一方不断产生数据,另一方不断处理数据。两方速度可能不同,需要解耦。

现实例子:

  • 日志系统:应用程序不断产生日志(生产者),后台服务写入文件(消费者)
  • 消息队列:用户下单(生产者),后台处理订单(消费者)
  • 爬虫:URL 发现器不断找到新链接(生产者),下载器去抓取(消费者)

2.2 核心结构

1
生产者 goroutine → channel → 消费者 goroutine

channel 是两者之间的缓冲区。生产者只管往 channel 里塞,消费者只管从 channel 里取。

2.3 完整示例:日志处理系统

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package main

import (
"fmt"
"math/rand"
"sync"
"time"
)

type LogEntry struct {
Timestamp time.Time
Level string
Message string
}

// 生产者:模拟应用程序产生日志
func logProducer(logs chan<- LogEntry, wg *sync.WaitGroup) {
defer wg.Done()

levels := []string{"INFO", "WARN", "ERROR"}
messages := []string{
"用户登录",
"查询数据库",
"响应超时",
"文件未找到",
"缓存命中",
"连接断开",
}

for i := 0; i < 20; i++ {
entry := LogEntry{
Timestamp: time.Now(),
Level: levels[rand.Intn(len(levels))],
Message: messages[rand.Intn(len(messages))],
}
logs <- entry
time.Sleep(time.Duration(50+rand.Intn(100)) * time.Millisecond)
}
}

// 消费者:处理日志(写文件、发告警等)
func logConsumer(id int, logs <-chan LogEntry, wg *sync.WaitGroup) {
defer wg.Done()

count := 0
for entry := range logs {
count++
fmt.Printf("[消费者%d] %s [%s] %s\n",
id,
entry.Timestamp.Format("15:04:05.000"),
entry.Level,
entry.Message,
)

// 模拟处理耗时
time.Sleep(time.Duration(100+rand.Intn(150)) * time.Millisecond)
}
fmt.Printf("[消费者%d] 退出,共处理 %d 条日志\n", id, count)
}

func main() {
logs := make(chan LogEntry, 10) // 缓冲区 10 条

var producerWg sync.WaitGroup
var consumerWg sync.WaitGroup

// 启动 2 个生产者
for i := 0; i < 2; i++ {
producerWg.Add(1)
go logProducer(logs, &producerWg)
}

// 启动 3 个消费者
for i := 1; i <= 3; i++ {
consumerWg.Add(1)
go logConsumer(i, logs, &consumerWg)
}

// 等所有生产者完成后关闭 channel
producerWg.Wait()
close(logs)

// 等所有消费者处理完
consumerWg.Wait()
fmt.Println("日志处理完成")
}

2.4 关键设计点

两组 WaitGroup:生产者和消费者各一组。流程是:

1
2
3
4
5
1. 所有生产者启动
2. 所有消费者启动
3. 等所有生产者完成 → close(channel)
4. 消费者的 range 循环自然结束
5. 等所有消费者完成

缓冲区大小make(chan LogEntry, 10) 允许生产者先跑一段。如果消费者暂时处理不过来,生产者不会立刻被阻塞(最多缓存 10 条)。缓冲区满了,生产者自动限速。

2.5 什么时候用

  • 数据产生和处理的速度不同
  • 需要解耦生产和消费逻辑
  • 需要多个消费者并行处理

3. 模式二:Worker Pool(工作池)

3.1 解决什么问题

你有大量任务要处理,但不想每个任务启动一个 goroutine(可能有十万个任务)。你想控制并发数——比如最多同时处理 5 个任务。

现实例子:

  • 银行柜台:客户很多,但只有 5 个窗口
  • 数据库连接池:连接数有限,不能无限创建
  • 批量下载:同时下载太多会被限流

3.2 核心结构

1
2
3
4
                    ┌── Worker 1 ──┐
任务 → jobs channel ├── Worker 2 ──├→ results channel → 收集
├── Worker 3 ──┤
└── Worker N ──┘

固定数量的 worker goroutine 从 jobs channel 中取任务,处理后把结果发到 results channel。

3.3 完整示例:批量图片处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package main

import (
"context"
"fmt"
"math/rand"
"sync"
"time"
)

type Job struct {
ID int
FileName string
}

type Result struct {
JobID int
FileName string
Size int
Duration time.Duration
Error error
}

// worker 从 jobs 中取任务,处理后发到 results
func worker(ctx context.Context, id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()

for {
select {
case <-ctx.Done():
fmt.Printf("Worker %d: 收到取消信号,退出\n", id)
return
case job, ok := <-jobs:
if !ok {
fmt.Printf("Worker %d: 任务队列关闭,退出\n", id)
return
}

// 处理任务
start := time.Now()
time.Sleep(time.Duration(100+rand.Intn(400)) * time.Millisecond)
size := 1024 + rand.Intn(4096)

results <- Result{
JobID: job.ID,
FileName: job.FileName,
Size: size,
Duration: time.Since(start),
}
}
}
}

func main() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

const numWorkers = 3
const numJobs = 15

jobs := make(chan Job, numJobs)
results := make(chan Result, numJobs)

// 启动 worker pool
var wg sync.WaitGroup
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(ctx, i, jobs, results, &wg)
}

// 发送任务
for i := 1; i <= numJobs; i++ {
jobs <- Job{
ID: i,
FileName: fmt.Sprintf("image_%03d.jpg", i),
}
}
close(jobs)

// 用一个 goroutine 等待所有 worker 完成后关闭 results
go func() {
wg.Wait()
close(results)
}()

// 收集结果
totalTime := time.Now()
successCount := 0
for r := range results {
successCount++
fmt.Printf(" 完成: %s (%dKB, 耗时 %v)\n",
r.FileName, r.Size/1024, r.Duration)
}

fmt.Printf("\n处理完成: %d/%d 个文件, 总耗时 %v\n",
successCount, numJobs, time.Since(totalTime))
fmt.Printf("Worker 数量: %d, 平均每个 Worker 处理 %d 个任务\n",
numWorkers, numJobs/numWorkers)
}

3.4 关键设计点

固定 Worker 数量:不管有多少任务,始终只有 numWorkers 个 goroutine 在工作。这是 Worker Pool 的核心——控制并发度

jobs channel 作为任务队列:所有 worker 从同一个 channel 取任务。Go 的 channel 保证每个任务只被一个 worker 拿到。

results channel 收集结果:用一个单独的 goroutine 等 worker 全部完成后关闭 results,然后主 goroutine 用 range 收集所有结果。

配合 context:支持超时取消,避免任务处理太久时整个程序卡住。

3.5 通用 Worker Pool 模板

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func RunWorkerPool(ctx context.Context, numWorkers int, jobs <-chan Job) <-chan Result {
results := make(chan Result)

var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for job := range jobs {
select {
case <-ctx.Done():
return
case results <- process(job):
}
}
}()
}

go func() {
wg.Wait()
close(results)
}()

return results
}

这个模板可以直接复用:传入 worker 数量和 jobs channel,返回 results channel。

3.6 什么时候用

  • 任务量大,但需要限制并发数
  • 每个任务互相独立,不需要特定顺序
  • 需要控制资源使用(CPU、网络连接、数据库连接)

4. 模式三:Pipeline(流水线)

4.1 解决什么问题

数据需要经过多个处理步骤,每个步骤做不同的事。步骤之间用 channel 串联,每个步骤独立并发运行。

现实例子:

  • 汽车装配线:焊接 → 喷漆 → 装配 → 检测
  • 数据处理:读取 → 解析 → 过滤 → 转换 → 存储
  • 视频处理:解码 → 滤镜 → 编码 → 输出

4.2 核心结构

1
阶段1 → channel → 阶段2 → channel → 阶段3 → channel → 输出

每个阶段是一个 goroutine(或一组 goroutine),从输入 channel 读数据,处理后写到输出 channel。

4.3 完整示例:订单处理流水线

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package main

import (
"context"
"fmt"
"math/rand"
"time"
)

type Order struct {
ID int
Product string
Quantity int
Price float64
Status string
}

// 阶段 1:接收订单
func receiveOrders(ctx context.Context, orderCount int) <-chan Order {
out := make(chan Order)
go func() {
defer close(out)
products := []string{"手机", "笔记本", "耳机", "平板", "键盘"}
for i := 1; i <= orderCount; i++ {
order := Order{
ID: i,
Product: products[rand.Intn(len(products))],
Quantity: 1 + rand.Intn(5),
Price: float64(50+rand.Intn(950)) + 0.9,
Status: "已接收",
}
select {
case out <- order:
fmt.Printf("[接收] 订单 #%d: %s x%d\n", order.ID, order.Product, order.Quantity)
case <-ctx.Done():
return
}
time.Sleep(100 * time.Millisecond)
}
}()
return out
}

// 阶段 2:验证订单
func validateOrders(ctx context.Context, orders <-chan Order) <-chan Order {
out := make(chan Order)
go func() {
defer close(out)
for order := range orders {
select {
case <-ctx.Done():
return
default:
}

time.Sleep(50 * time.Millisecond)

// 模拟验证:数量超过 3 的拒绝
if order.Quantity > 3 {
fmt.Printf("[验证] 订单 #%d: 拒绝(数量过多: %d)\n", order.ID, order.Quantity)
continue // 不传递给下一个阶段
}

order.Status = "已验证"
fmt.Printf("[验证] 订单 #%d: 通过\n", order.ID)
out <- order
}
}()
return out
}

// 阶段 3:计算价格
func calculatePrice(ctx context.Context, orders <-chan Order) <-chan Order {
out := make(chan Order)
go func() {
defer close(out)
for order := range orders {
select {
case <-ctx.Done():
return
default:
}

time.Sleep(80 * time.Millisecond)

order.Price = order.Price * float64(order.Quantity)
// 满 500 打九折
if order.Price > 500 {
order.Price *= 0.9
}
order.Status = "已定价"
fmt.Printf("[定价] 订单 #%d: ¥%.2f\n", order.ID, order.Price)
out <- order
}
}()
return out
}

// 阶段 4:完成订单
func completeOrders(ctx context.Context, orders <-chan Order) <-chan Order {
out := make(chan Order)
go func() {
defer close(out)
for order := range orders {
select {
case <-ctx.Done():
return
default:
}

time.Sleep(60 * time.Millisecond)

order.Status = "已完成"
fmt.Printf("[完成] 订单 #%d: %s x%d = ¥%.2f\n",
order.ID, order.Product, order.Quantity, order.Price)
out <- order
}
}()
return out
}

func main() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

// 组装流水线
orders := receiveOrders(ctx, 8)
validated := validateOrders(ctx, orders)
priced := calculatePrice(ctx, validated)
completed := completeOrders(ctx, priced)

// 消费最终结果
fmt.Println("\n========== 订单处理流水线 ==========\n")

var totalRevenue float64
count := 0
for order := range completed {
totalRevenue += order.Price
count++
}

fmt.Printf("\n========== 汇总 ==========\n")
fmt.Printf("完成订单: %d 个\n", count)
fmt.Printf("总收入: ¥%.2f\n", totalRevenue)
}

4.4 关键设计点

每个阶段返回 <-chan Order:这是 Pipeline 的标准写法。每个函数创建 channel、启动 goroutine、返回只读 channel。调用方像搭积木一样串起来:

1
2
3
4
orders := receiveOrders(ctx, 8)
validated := validateOrders(ctx, orders)
priced := calculatePrice(ctx, validated)
completed := completeOrders(ctx, priced)

阶段可以过滤数据validateOrders 拒绝了不合格的订单(continue 跳过,不传递给下一阶段)。流水线中每个阶段的输出不一定等于输入。

每个阶段关闭自己的输出 channel:当输入 channel 关闭且处理完最后一个元素后,defer close(out) 关闭输出。信号像多米诺骨牌一样传递:第一个阶段关闭 → 第二个收到信号处理完剩余后关闭 → 依次传递。

4.5 什么时候用

  • 数据需要经过多个有序的处理步骤
  • 各步骤的处理逻辑可以独立
  • 需要步骤之间的流控(channel 缓冲)

5. 模式四:Fan-out / Fan-in(扇出 / 扇入)

5.1 解决什么问题

Fan-out(扇出):一个任务拆成多份,分配给多个 goroutine 并行处理。
Fan-in(扇入):多个 goroutine 的结果合并成一个 channel。

两者通常一起使用。

现实例子:

  • 考试阅卷:一叠卷子分给多个老师批(扇出),批完后汇总分数(扇入)
  • 搜索引擎:一个查询发给多个索引分片(扇出),结果合并排序(扇入)
  • 文件处理:把大文件分块,多个 goroutine 各自处理一块,最后合并

5.2 核心结构

1
2
3
4
                    ┌── goroutine 1 ──┐
一个 input channel ─├── goroutine 2 ──├─ merge ─→ 一个 output channel
└── goroutine 3 ──┘
(Fan-out) (Fan-in)

5.3 Fan-in 工具函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func fanIn(ctx context.Context, channels ...<-chan Result) <-chan Result {
out := make(chan Result)
var wg sync.WaitGroup

for _, ch := range channels {
wg.Add(1)
go func(c <-chan Result) {
defer wg.Done()
for v := range c {
select {
case out <- v:
case <-ctx.Done():
return
}
}
}(ch)
}

go func() {
wg.Wait()
close(out)
}()

return out
}

这个函数把多个 channel 合并成一个。每个输入 channel 对应一个 goroutine,所有数据汇聚到 out

5.4 完整示例:并行文件搜索

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package main

import (
"context"
"fmt"
"math/rand"
"strings"
"sync"
"time"
)

type SearchResult struct {
WorkerID int
FileName string
Line int
Content string
}

// 模拟搜索一批文件
func searchWorker(ctx context.Context, id int, files []string, keyword string) <-chan SearchResult {
out := make(chan SearchResult)
go func() {
defer close(out)
for _, file := range files {
select {
case <-ctx.Done():
return
default:
}

// 模拟文件搜索
time.Sleep(time.Duration(50+rand.Intn(150)) * time.Millisecond)

// 模拟随机找到匹配
if rand.Intn(3) == 0 {
result := SearchResult{
WorkerID: id,
FileName: file,
Line: rand.Intn(100) + 1,
Content: fmt.Sprintf("... %s ...", keyword),
}
select {
case out <- result:
case <-ctx.Done():
return
}
}
}
}()
return out
}

// Fan-in:合并多个 channel
func merge(ctx context.Context, channels ...<-chan SearchResult) <-chan SearchResult {
out := make(chan SearchResult)
var wg sync.WaitGroup

for _, ch := range channels {
wg.Add(1)
go func(c <-chan SearchResult) {
defer wg.Done()
for v := range c {
select {
case out <- v:
case <-ctx.Done():
return
}
}
}(ch)
}

go func() {
wg.Wait()
close(out)
}()

return out
}

func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// 模拟 30 个文件
allFiles := make([]string, 30)
for i := range allFiles {
dirs := []string{"src", "pkg", "internal", "cmd"}
exts := []string{".go", ".txt", ".md", ".json"}
allFiles[i] = fmt.Sprintf("%s/file_%02d%s",
dirs[rand.Intn(len(dirs))], i, exts[rand.Intn(len(exts))])
}

keyword := "TODO"
numWorkers := 3

// Fan-out:把文件分配给多个 worker
chunkSize := (len(allFiles) + numWorkers - 1) / numWorkers
var workerChannels []<-chan SearchResult

for i := 0; i < numWorkers; i++ {
start := i * chunkSize
end := start + chunkSize
if end > len(allFiles) {
end = len(allFiles)
}

chunk := allFiles[start:end]
fmt.Printf("Worker %d: 负责 %d 个文件 (%s ~ %s)\n",
i+1, len(chunk), chunk[0], chunk[len(chunk)-1])

ch := searchWorker(ctx, i+1, chunk, keyword)
workerChannels = append(workerChannels, ch)
}

// Fan-in:合并所有 worker 的结果
results := merge(ctx, workerChannels...)

// 收集结果
fmt.Printf("\n搜索 \"%s\"...\n\n", keyword)
count := 0
for r := range results {
count++
fmt.Printf(" [Worker%d] %s:%d %s\n",
r.WorkerID, r.FileName, r.Line, r.Content)
}

if count == 0 {
fmt.Println(" 没有找到匹配")
} else {
fmt.Printf("\n共找到 %d 处匹配\n", count)
}

_ = strings.Contains // 避免 import 警告
}

5.5 关键设计点

Fan-out:把 30 个文件平均分成 3 份,每个 worker 处理 10 个。任务分配在主 goroutine 中完成。

Fan-inmerge 函数把 3 个 worker 的输出 channel 合并成一个。主 goroutine 只需要从合并后的 channel 中读取,不用关心数据来自哪个 worker。

结果按完成顺序返回:先处理完的先输出,不需要等所有 worker 都完成。

5.6 什么时候用

  • 一个大任务可以拆成多个独立的小任务
  • 需要并行加速处理
  • 结果需要汇总到一处

6. 模式对比与选择

6.1 四种模式的对比

模式 数据流向 goroutine 数量 适用场景
生产者-消费者 单向:生产 → 消费 固定 解耦生产和消费
Worker Pool 任务 → 多 Worker → 结果 固定 限制并发数
Pipeline 线性:A → B → C → D 每阶段一个 多步骤顺序处理
Fan-out/Fan-in 分散 → 汇聚 动态 并行加速

6.2 怎么选

1
2
3
4
5
6
7
8
9
10
11
需求:大量任务,限制并发数?
→ Worker Pool

需求:数据经过多个处理步骤?
→ Pipeline

需求:一个大任务拆成多个并行子任务?
→ Fan-out / Fan-in

需求:一方产生数据,一方消费?
→ 生产者-消费者

实际项目中经常组合使用。比如 Pipeline 的某个阶段内部用 Worker Pool 来并行处理。


7. 综合示例:数据处理管道

把多种模式组合起来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package main

import (
"context"
"fmt"
"math/rand"
"sync"
"time"
)

type Record struct {
ID int
Value int
Valid bool
Result int
}

// 阶段 1:生成数据(生产者)
func generate(ctx context.Context, count int) <-chan Record {
out := make(chan Record)
go func() {
defer close(out)
for i := 1; i <= count; i++ {
record := Record{ID: i, Value: rand.Intn(100)}
select {
case out <- record:
case <-ctx.Done():
return
}
}
}()
return out
}

// 阶段 2:验证(Pipeline 阶段,内部用 Worker Pool 加速)
func validate(ctx context.Context, in <-chan Record, numWorkers int) <-chan Record {
out := make(chan Record)

var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for record := range in {
select {
case <-ctx.Done():
return
default:
}
time.Sleep(20 * time.Millisecond) // 模拟验证
record.Valid = record.Value >= 10 // 值 >= 10 才有效
out <- record
}
}()
}

go func() {
wg.Wait()
close(out)
}()

return out
}

// 阶段 3:过滤(只保留有效记录)
func filter(ctx context.Context, in <-chan Record) <-chan Record {
out := make(chan Record)
go func() {
defer close(out)
for record := range in {
select {
case <-ctx.Done():
return
default:
}
if record.Valid {
out <- record
}
}
}()
return out
}

// 阶段 4:计算(Worker Pool)
func compute(ctx context.Context, in <-chan Record, numWorkers int) <-chan Record {
out := make(chan Record)

var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for record := range in {
select {
case <-ctx.Done():
return
default:
}
time.Sleep(30 * time.Millisecond)
record.Result = record.Value * record.Value
out <- record
}
}()
}

go func() {
wg.Wait()
close(out)
}()

return out
}

func main() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

start := time.Now()

// 组装管道:生成 → 验证(3 workers) → 过滤 → 计算(2 workers)
records := generate(ctx, 50)
validated := validate(ctx, records, 3)
filtered := filter(ctx, validated)
computed := compute(ctx, filtered, 2)

// 收集最终结果
total := 0
count := 0
for record := range computed {
total += record.Result
count++
}

fmt.Printf("处理完成: %d 条有效记录,总和 = %d\n", count, total)
fmt.Printf("总耗时: %v\n", time.Since(start))
}

这个例子展示了 Pipeline + Worker Pool 的组合:

  • 整体是 Pipeline(四个阶段串联)
  • 验证阶段和计算阶段内部各自用了 Worker Pool(多个 goroutine 并行处理)
  • 全程配合 context,支持超时取消

8. 常见坑总结

8.1 goroutine 泄漏

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 坏:如果 ctx 取消了,往 out 发送会阻塞(没人收了)
go func() {
for record := range in {
out <- process(record) // 如果消费者已经退出,这里永远阻塞
}
}()

// 好:每次发送都检查 ctx
go func() {
for record := range in {
select {
case out <- process(record):
case <-ctx.Done():
return
}
}
}()

Pipeline 的每一个 goroutine 中,发送和接收都要配合 ctx.Done()。不然一旦下游取消,上游就永远阻塞。

8.2 忘记关闭 channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 坏:Worker Pool 中忘了关闭输出 channel
for i := 0; i < n; i++ {
go func() {
for job := range jobs {
results <- process(job)
}
}()
}
// results 永远不会关闭,下游的 range 永远不结束

// 好:用 WaitGroup + 单独的 goroutine 关闭
go func() {
wg.Wait()
close(results)
}()

8.3 Worker Pool 中 Worker 数量选择

1
2
3
CPU 密集型任务:Worker 数 ≈ CPU 核心数
IO 密集型任务:Worker 数可以多一些(核心数 × 2~10
不确定:从 runtime.NumCPU() 开始,根据实测调整
1
2
3
import "runtime"

numWorkers := runtime.NumCPU() // CPU 核心数

8.4 Pipeline 阶段顺序有依赖

Pipeline 的每个阶段必须按顺序串联。如果阶段 B 依赖阶段 A 的输出,不能把它们并行执行。但阶段内部可以用 Worker Pool 来并行。


9. 本课练习

练习 1:文件下载器

要求:

  • 给定 20 个 URL(模拟)
  • 用 Worker Pool 模式,最多同时下载 5 个
  • 每个下载任务模拟随机耗时 200ms~1000ms
  • 收集并打印每个 URL 的下载结果(成功/失败、耗时、大小)

练习 2:日志分析流水线

要求:

  • 阶段 1:读取日志行(模拟 100 条)
  • 阶段 2:解析日志(提取时间、级别、消息)
  • 阶段 3:过滤(只保留 ERROR 和 WARN)
  • 阶段 4:统计(按级别计数)
  • 用 Pipeline 模式串联

练习 3:并行单词计数

要求:

  • 给定 10 段文本(模拟 10 个文件)
  • 用 Fan-out 把每段文本交给一个 goroutine 统计单词频率
  • 用 Fan-in 合并所有结果
  • 输出总的单词频率 Top 10

练习 4:带限速的 API 调用

要求:

  • 模拟调用一个 API 100 次
  • 限速:每秒最多 10 次请求
  • 用 Worker Pool + time.Ticker 实现
  • 总超时 15 秒

练习 5:组合模式

要求:

  • 设计一个"电商订单处理系统"
  • 阶段 1:接收订单(生产者)
  • 阶段 2:验证库存(Worker Pool,3 个 worker)
  • 阶段 3:计算价格(Pipeline)
  • 阶段 4:发送通知(Fan-out 给邮件和短信两个 channel,Fan-in 合并确认)

10. 自测题

10.1 概念题

  1. 生产者-消费者模式中,channel 缓冲区的作用是什么?
  2. Worker Pool 模式的核心目的是什么?Worker 数量怎么选?
  3. Pipeline 模式中,每个阶段为什么要返回 <-chan T 而不是 chan T
  4. Fan-in 是怎么实现的?为什么需要一个单独的 goroutine 来关闭输出 channel?
  5. 为什么 Pipeline 的每个 goroutine 里发送和接收都要配合 ctx.Done()
  6. Worker Pool 和 Fan-out 有什么区别?
  7. 怎么避免 Pipeline 中的 goroutine 泄漏?
  8. 四种模式分别适合什么场景?

10.2 代码阅读题

以下代码存在一个问题,找出来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func process(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for v := range in {
out <- v * 2
}
}()
return out
}

func main() {
ch := make(chan int)
go func() {
for i := 0; i < 5; i++ {
ch <- i
}
close(ch)
}()

result := process(ch)
for v := range result {
fmt.Println(v)
}
}
点击查看答案

问题:out channel 永远不会被关闭

process 函数中的 goroutine 在 for range in 结束后退出了,但没有 close(out)。主 goroutine 中的 for v := range result 会在收到 0、2、4、6、8 后永远阻塞等待,最终报 deadlock。

修复:

1
2
3
4
5
6
go func() {
defer close(out) // 加上这一行
for v := range in {
out <- v * 2
}
}()

这是 Pipeline 模式中最常见的 bug——忘记关闭输出 channel。


11. 本课总结

这一课你学到了 Go 中最常用的四种并发设计模式。

模式 核心结构 关键 channel
生产者-消费者 生产 → buffer → 消费 一个数据 channel
Worker Pool 任务 → N 个 Worker → 结果 jobs + results
Pipeline A → B → C → D 每个阶段之间一个 channel
Fan-out/Fan-in 拆分 → 并行 → 合并 多个输入 channel → merge → 一个输出

最重要的三件事:

  1. Worker Pool 控制并发度——不是 goroutine 越多越好,资源有限就要限制
  2. Pipeline 每个阶段返回只读 channel 并 defer close(out)——这是不泄漏的前提
  3. 所有 goroutine 的发送和接收都配合 ctx.Done()——确保取消时整个管道能干净退出

12. 下一课预告

到这里你已经掌握了 Go 并发编程的核心内容。下面进入工程实践部分。

下一课:测试基础

会重点讲:

  • Go 的 testing 包怎么用
  • 单元测试的写法和命名约定
  • 表驱动测试——Go 测试的经典范式
  • go test 命令的常用参数
  • 测试覆盖率

学完下一课,你就能为自己写的代码加上测试,提高代码质量了。