Go 从 0 到精通 · 第 30 课:并发模式实战
学习定位:这是整套 Go 教程的第 30 课,也是阶段五(并发与工程阶段)的第六课。
前置要求:已经完成第 29 课,掌握了 context 与取消机制。需要熟练运用 goroutine、channel、select、WaitGroup 和 context。
本课目标:掌握 Go 中最常见的并发设计模式——生产者-消费者、Worker Pool、Pipeline、Fan-out/Fan-in,能根据实际需求选择合适的并发模式,写出结构清晰、可维护的并发程序。
1. 本课你要解决的核心问题
前面五课你学了 Go 并发编程的所有工具:goroutine、channel、select、锁、context。但工具只是工具,怎么把它们组合起来解决实际问题 ,才是关键。
就像你知道锤子、锯子、螺丝刀怎么用,但让你做一把椅子,你不知道从哪下手。并发模式就是"椅子的图纸"——告诉你在什么场景下,怎么组合这些工具。
这一课讲四个最常用的并发模式:
模式
解决什么问题
类比
生产者-消费者
一方产生数据,一方处理数据
工厂流水线
Worker Pool
大量任务需要限制并发数
银行柜台窗口
Pipeline
多步处理,步骤之间串联
汽车装配线
Fan-out / Fan-in
一个任务拆给多人做,最后汇总
考试阅卷
学完这一课,你就能写出结构清晰、可维护的并发程序了。
2. 模式一:生产者-消费者
2.1 解决什么问题
一方不断产生 数据,另一方不断处理 数据。两方速度可能不同,需要解耦。
现实例子:
日志系统:应用程序不断产生日志(生产者),后台服务写入文件(消费者)
消息队列:用户下单(生产者),后台处理订单(消费者)
爬虫:URL 发现器不断找到新链接(生产者),下载器去抓取(消费者)
2.2 核心结构
1 生产者 goroutine → channel → 消费者 goroutine
channel 是两者之间的缓冲区。生产者只管往 channel 里塞,消费者只管从 channel 里取。
2.3 完整示例:日志处理系统
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 package mainimport ( "fmt" "math/rand" "sync" "time" ) type LogEntry struct { Timestamp time.Time Level string Message string } func logProducer (logs chan <- LogEntry, wg *sync.WaitGroup) { defer wg.Done() levels := []string {"INFO" , "WARN" , "ERROR" } messages := []string { "用户登录" , "查询数据库" , "响应超时" , "文件未找到" , "缓存命中" , "连接断开" , } for i := 0 ; i < 20 ; i++ { entry := LogEntry{ Timestamp: time.Now(), Level: levels[rand.Intn(len (levels))], Message: messages[rand.Intn(len (messages))], } logs <- entry time.Sleep(time.Duration(50 +rand.Intn(100 )) * time.Millisecond) } } func logConsumer (id int , logs <-chan LogEntry, wg *sync.WaitGroup) { defer wg.Done() count := 0 for entry := range logs { count++ fmt.Printf("[消费者%d] %s [%s] %s\n" , id, entry.Timestamp.Format("15:04:05.000" ), entry.Level, entry.Message, ) time.Sleep(time.Duration(100 +rand.Intn(150 )) * time.Millisecond) } fmt.Printf("[消费者%d] 退出,共处理 %d 条日志\n" , id, count) } func main () { logs := make (chan LogEntry, 10 ) var producerWg sync.WaitGroup var consumerWg sync.WaitGroup for i := 0 ; i < 2 ; i++ { producerWg.Add(1 ) go logProducer(logs, &producerWg) } for i := 1 ; i <= 3 ; i++ { consumerWg.Add(1 ) go logConsumer(i, logs, &consumerWg) } producerWg.Wait() close (logs) consumerWg.Wait() fmt.Println("日志处理完成" ) }
2.4 关键设计点
两组 WaitGroup :生产者和消费者各一组。流程是:
1 2 3 4 5 1. 所有生产者启动2. 所有消费者启动3. 等所有生产者完成 → close(channel)4. 消费者的 range 循环自然结束5. 等所有消费者完成
缓冲区大小 :make(chan LogEntry, 10) 允许生产者先跑一段。如果消费者暂时处理不过来,生产者不会立刻被阻塞(最多缓存 10 条)。缓冲区满了,生产者自动限速。
2.5 什么时候用
数据产生和处理的速度不同
需要解耦生产和消费逻辑
需要多个消费者并行处理
3. 模式二:Worker Pool(工作池)
3.1 解决什么问题
你有大量任务要处理,但不想每个任务启动一个 goroutine(可能有十万个任务)。你想控制并发数 ——比如最多同时处理 5 个任务。
现实例子:
银行柜台:客户很多,但只有 5 个窗口
数据库连接池:连接数有限,不能无限创建
批量下载:同时下载太多会被限流
3.2 核心结构
1 2 3 4 ┌── Worker 1 ──┐ 任务 → jobs channel ├── Worker 2 ──├→ results channel → 收集 ├── Worker 3 ──┤ └── Worker N ──┘
固定数量的 worker goroutine 从 jobs channel 中取任务,处理后把结果发到 results channel。
3.3 完整示例:批量图片处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 package mainimport ( "context" "fmt" "math/rand" "sync" "time" ) type Job struct { ID int FileName string } type Result struct { JobID int FileName string Size int Duration time.Duration Error error } func worker (ctx context.Context, id int , jobs <-chan Job, results chan <- Result, wg *sync.WaitGroup) { defer wg.Done() for { select { case <-ctx.Done(): fmt.Printf("Worker %d: 收到取消信号,退出\n" , id) return case job, ok := <-jobs: if !ok { fmt.Printf("Worker %d: 任务队列关闭,退出\n" , id) return } start := time.Now() time.Sleep(time.Duration(100 +rand.Intn(400 )) * time.Millisecond) size := 1024 + rand.Intn(4096 ) results <- Result{ JobID: job.ID, FileName: job.FileName, Size: size, Duration: time.Since(start), } } } } func main () { ctx, cancel := context.WithTimeout(context.Background(), 10 *time.Second) defer cancel() const numWorkers = 3 const numJobs = 15 jobs := make (chan Job, numJobs) results := make (chan Result, numJobs) var wg sync.WaitGroup for i := 1 ; i <= numWorkers; i++ { wg.Add(1 ) go worker(ctx, i, jobs, results, &wg) } for i := 1 ; i <= numJobs; i++ { jobs <- Job{ ID: i, FileName: fmt.Sprintf("image_%03d.jpg" , i), } } close (jobs) go func () { wg.Wait() close (results) }() totalTime := time.Now() successCount := 0 for r := range results { successCount++ fmt.Printf(" 完成: %s (%dKB, 耗时 %v)\n" , r.FileName, r.Size/1024 , r.Duration) } fmt.Printf("\n处理完成: %d/%d 个文件, 总耗时 %v\n" , successCount, numJobs, time.Since(totalTime)) fmt.Printf("Worker 数量: %d, 平均每个 Worker 处理 %d 个任务\n" , numWorkers, numJobs/numWorkers) }
3.4 关键设计点
固定 Worker 数量 :不管有多少任务,始终只有 numWorkers 个 goroutine 在工作。这是 Worker Pool 的核心——控制并发度 。
jobs channel 作为任务队列 :所有 worker 从同一个 channel 取任务。Go 的 channel 保证每个任务只被一个 worker 拿到。
results channel 收集结果 :用一个单独的 goroutine 等 worker 全部完成后关闭 results,然后主 goroutine 用 range 收集所有结果。
配合 context :支持超时取消,避免任务处理太久时整个程序卡住。
3.5 通用 Worker Pool 模板
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 func RunWorkerPool (ctx context.Context, numWorkers int , jobs <-chan Job) <-chan Result { results := make (chan Result) var wg sync.WaitGroup for i := 0 ; i < numWorkers; i++ { wg.Add(1 ) go func () { defer wg.Done() for job := range jobs { select { case <-ctx.Done(): return case results <- process(job): } } }() } go func () { wg.Wait() close (results) }() return results }
这个模板可以直接复用:传入 worker 数量和 jobs channel,返回 results channel。
3.6 什么时候用
任务量大,但需要限制并发数
每个任务互相独立,不需要特定顺序
需要控制资源使用(CPU、网络连接、数据库连接)
4. 模式三:Pipeline(流水线)
4.1 解决什么问题
数据需要经过多个处理步骤,每个步骤做不同的事。步骤之间用 channel 串联,每个步骤独立并发运行。
现实例子:
汽车装配线:焊接 → 喷漆 → 装配 → 检测
数据处理:读取 → 解析 → 过滤 → 转换 → 存储
视频处理:解码 → 滤镜 → 编码 → 输出
4.2 核心结构
1 阶段1 → channel → 阶段2 → channel → 阶段3 → channel → 输出
每个阶段是一个 goroutine(或一组 goroutine),从输入 channel 读数据,处理后写到输出 channel。
4.3 完整示例:订单处理流水线
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 package mainimport ( "context" "fmt" "math/rand" "time" ) type Order struct { ID int Product string Quantity int Price float64 Status string } func receiveOrders (ctx context.Context, orderCount int ) <-chan Order { out := make (chan Order) go func () { defer close (out) products := []string {"手机" , "笔记本" , "耳机" , "平板" , "键盘" } for i := 1 ; i <= orderCount; i++ { order := Order{ ID: i, Product: products[rand.Intn(len (products))], Quantity: 1 + rand.Intn(5 ), Price: float64 (50 +rand.Intn(950 )) + 0.9 , Status: "已接收" , } select { case out <- order: fmt.Printf("[接收] 订单 #%d: %s x%d\n" , order.ID, order.Product, order.Quantity) case <-ctx.Done(): return } time.Sleep(100 * time.Millisecond) } }() return out } func validateOrders (ctx context.Context, orders <-chan Order) <-chan Order { out := make (chan Order) go func () { defer close (out) for order := range orders { select { case <-ctx.Done(): return default : } time.Sleep(50 * time.Millisecond) if order.Quantity > 3 { fmt.Printf("[验证] 订单 #%d: 拒绝(数量过多: %d)\n" , order.ID, order.Quantity) continue } order.Status = "已验证" fmt.Printf("[验证] 订单 #%d: 通过\n" , order.ID) out <- order } }() return out } func calculatePrice (ctx context.Context, orders <-chan Order) <-chan Order { out := make (chan Order) go func () { defer close (out) for order := range orders { select { case <-ctx.Done(): return default : } time.Sleep(80 * time.Millisecond) order.Price = order.Price * float64 (order.Quantity) if order.Price > 500 { order.Price *= 0.9 } order.Status = "已定价" fmt.Printf("[定价] 订单 #%d: ¥%.2f\n" , order.ID, order.Price) out <- order } }() return out } func completeOrders (ctx context.Context, orders <-chan Order) <-chan Order { out := make (chan Order) go func () { defer close (out) for order := range orders { select { case <-ctx.Done(): return default : } time.Sleep(60 * time.Millisecond) order.Status = "已完成" fmt.Printf("[完成] 订单 #%d: %s x%d = ¥%.2f\n" , order.ID, order.Product, order.Quantity, order.Price) out <- order } }() return out } func main () { ctx, cancel := context.WithTimeout(context.Background(), 10 *time.Second) defer cancel() orders := receiveOrders(ctx, 8 ) validated := validateOrders(ctx, orders) priced := calculatePrice(ctx, validated) completed := completeOrders(ctx, priced) fmt.Println("\n========== 订单处理流水线 ==========\n" ) var totalRevenue float64 count := 0 for order := range completed { totalRevenue += order.Price count++ } fmt.Printf("\n========== 汇总 ==========\n" ) fmt.Printf("完成订单: %d 个\n" , count) fmt.Printf("总收入: ¥%.2f\n" , totalRevenue) }
4.4 关键设计点
每个阶段返回 <-chan Order :这是 Pipeline 的标准写法。每个函数创建 channel、启动 goroutine、返回只读 channel。调用方像搭积木一样串起来:
1 2 3 4 orders := receiveOrders(ctx, 8 ) validated := validateOrders(ctx, orders) priced := calculatePrice(ctx, validated) completed := completeOrders(ctx, priced)
阶段可以过滤数据 :validateOrders 拒绝了不合格的订单(continue 跳过,不传递给下一阶段)。流水线中每个阶段的输出不一定等于输入。
每个阶段关闭自己的输出 channel :当输入 channel 关闭且处理完最后一个元素后,defer close(out) 关闭输出。信号像多米诺骨牌一样传递:第一个阶段关闭 → 第二个收到信号处理完剩余后关闭 → 依次传递。
4.5 什么时候用
数据需要经过多个有序的处理步骤
各步骤的处理逻辑可以独立
需要步骤之间的流控(channel 缓冲)
5. 模式四:Fan-out / Fan-in(扇出 / 扇入)
5.1 解决什么问题
Fan-out(扇出) :一个任务拆成多份,分配给多个 goroutine 并行处理。
Fan-in(扇入) :多个 goroutine 的结果合并成一个 channel。
两者通常一起使用。
现实例子:
考试阅卷:一叠卷子分给多个老师批(扇出),批完后汇总分数(扇入)
搜索引擎:一个查询发给多个索引分片(扇出),结果合并排序(扇入)
文件处理:把大文件分块,多个 goroutine 各自处理一块,最后合并
5.2 核心结构
1 2 3 4 ┌── goroutine 1 ──┐ 一个 input channel ─├── goroutine 2 ──├─ merge ─→ 一个 output channel └── goroutine 3 ──┘ (Fan-out ) (Fan-in )
5.3 Fan-in 工具函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 func fanIn (ctx context.Context, channels ...<-chan Result) <-chan Result { out := make (chan Result) var wg sync.WaitGroup for _, ch := range channels { wg.Add(1 ) go func (c <-chan Result) { defer wg.Done() for v := range c { select { case out <- v: case <-ctx.Done(): return } } }(ch) } go func () { wg.Wait() close (out) }() return out }
这个函数把多个 channel 合并成一个。每个输入 channel 对应一个 goroutine,所有数据汇聚到 out。
5.4 完整示例:并行文件搜索
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 package mainimport ( "context" "fmt" "math/rand" "strings" "sync" "time" ) type SearchResult struct { WorkerID int FileName string Line int Content string } func searchWorker (ctx context.Context, id int , files []string , keyword string ) <-chan SearchResult { out := make (chan SearchResult) go func () { defer close (out) for _, file := range files { select { case <-ctx.Done(): return default : } time.Sleep(time.Duration(50 +rand.Intn(150 )) * time.Millisecond) if rand.Intn(3 ) == 0 { result := SearchResult{ WorkerID: id, FileName: file, Line: rand.Intn(100 ) + 1 , Content: fmt.Sprintf("... %s ..." , keyword), } select { case out <- result: case <-ctx.Done(): return } } } }() return out } func merge (ctx context.Context, channels ...<-chan SearchResult) <-chan SearchResult { out := make (chan SearchResult) var wg sync.WaitGroup for _, ch := range channels { wg.Add(1 ) go func (c <-chan SearchResult) { defer wg.Done() for v := range c { select { case out <- v: case <-ctx.Done(): return } } }(ch) } go func () { wg.Wait() close (out) }() return out } func main () { ctx, cancel := context.WithTimeout(context.Background(), 5 *time.Second) defer cancel() allFiles := make ([]string , 30 ) for i := range allFiles { dirs := []string {"src" , "pkg" , "internal" , "cmd" } exts := []string {".go" , ".txt" , ".md" , ".json" } allFiles[i] = fmt.Sprintf("%s/file_%02d%s" , dirs[rand.Intn(len (dirs))], i, exts[rand.Intn(len (exts))]) } keyword := "TODO" numWorkers := 3 chunkSize := (len (allFiles) + numWorkers - 1 ) / numWorkers var workerChannels []<-chan SearchResult for i := 0 ; i < numWorkers; i++ { start := i * chunkSize end := start + chunkSize if end > len (allFiles) { end = len (allFiles) } chunk := allFiles[start:end] fmt.Printf("Worker %d: 负责 %d 个文件 (%s ~ %s)\n" , i+1 , len (chunk), chunk[0 ], chunk[len (chunk)-1 ]) ch := searchWorker(ctx, i+1 , chunk, keyword) workerChannels = append (workerChannels, ch) } results := merge(ctx, workerChannels...) fmt.Printf("\n搜索 \"%s\"...\n\n" , keyword) count := 0 for r := range results { count++ fmt.Printf(" [Worker%d] %s:%d %s\n" , r.WorkerID, r.FileName, r.Line, r.Content) } if count == 0 { fmt.Println(" 没有找到匹配" ) } else { fmt.Printf("\n共找到 %d 处匹配\n" , count) } _ = strings.Contains }
5.5 关键设计点
Fan-out :把 30 个文件平均分成 3 份,每个 worker 处理 10 个。任务分配在主 goroutine 中完成。
Fan-in :merge 函数把 3 个 worker 的输出 channel 合并成一个。主 goroutine 只需要从合并后的 channel 中读取,不用关心数据来自哪个 worker。
结果按完成顺序返回 :先处理完的先输出,不需要等所有 worker 都完成。
5.6 什么时候用
一个大任务可以拆成多个独立的小任务
需要并行加速处理
结果需要汇总到一处
6. 模式对比与选择
6.1 四种模式的对比
模式
数据流向
goroutine 数量
适用场景
生产者-消费者
单向:生产 → 消费
固定
解耦生产和消费
Worker Pool
任务 → 多 Worker → 结果
固定
限制并发数
Pipeline
线性:A → B → C → D
每阶段一个
多步骤顺序处理
Fan-out/Fan-in
分散 → 汇聚
动态
并行加速
6.2 怎么选
1 2 3 4 5 6 7 8 9 10 11 需求:大量任务,限制并发数? → Worker Pool 需求:数据经过多个处理步骤? → Pipeline 需求:一个大任务拆成多个并行子任务? → Fan-out / Fan-in 需求:一方产生数据,一方消费? → 生产者-消费者
实际项目中经常组合使用。比如 Pipeline 的某个阶段内部用 Worker Pool 来并行处理。
7. 综合示例:数据处理管道
把多种模式组合起来:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 package mainimport ( "context" "fmt" "math/rand" "sync" "time" ) type Record struct { ID int Value int Valid bool Result int } func generate (ctx context.Context, count int ) <-chan Record { out := make (chan Record) go func () { defer close (out) for i := 1 ; i <= count; i++ { record := Record{ID: i, Value: rand.Intn(100 )} select { case out <- record: case <-ctx.Done(): return } } }() return out } func validate (ctx context.Context, in <-chan Record, numWorkers int ) <-chan Record { out := make (chan Record) var wg sync.WaitGroup for i := 0 ; i < numWorkers; i++ { wg.Add(1 ) go func () { defer wg.Done() for record := range in { select { case <-ctx.Done(): return default : } time.Sleep(20 * time.Millisecond) record.Valid = record.Value >= 10 out <- record } }() } go func () { wg.Wait() close (out) }() return out } func filter (ctx context.Context, in <-chan Record) <-chan Record { out := make (chan Record) go func () { defer close (out) for record := range in { select { case <-ctx.Done(): return default : } if record.Valid { out <- record } } }() return out } func compute (ctx context.Context, in <-chan Record, numWorkers int ) <-chan Record { out := make (chan Record) var wg sync.WaitGroup for i := 0 ; i < numWorkers; i++ { wg.Add(1 ) go func () { defer wg.Done() for record := range in { select { case <-ctx.Done(): return default : } time.Sleep(30 * time.Millisecond) record.Result = record.Value * record.Value out <- record } }() } go func () { wg.Wait() close (out) }() return out } func main () { ctx, cancel := context.WithTimeout(context.Background(), 10 *time.Second) defer cancel() start := time.Now() records := generate(ctx, 50 ) validated := validate(ctx, records, 3 ) filtered := filter(ctx, validated) computed := compute(ctx, filtered, 2 ) total := 0 count := 0 for record := range computed { total += record.Result count++ } fmt.Printf("处理完成: %d 条有效记录,总和 = %d\n" , count, total) fmt.Printf("总耗时: %v\n" , time.Since(start)) }
这个例子展示了 Pipeline + Worker Pool 的组合:
整体是 Pipeline(四个阶段串联)
验证阶段和计算阶段内部各自用了 Worker Pool(多个 goroutine 并行处理)
全程配合 context,支持超时取消
8. 常见坑总结
8.1 goroutine 泄漏
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 go func () { for record := range in { out <- process(record) } }() go func () { for record := range in { select { case out <- process(record): case <-ctx.Done(): return } } }()
Pipeline 的每一个 goroutine 中,发送和接收都要配合 ctx.Done() 。不然一旦下游取消,上游就永远阻塞。
8.2 忘记关闭 channel
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 for i := 0 ; i < n; i++ { go func () { for job := range jobs { results <- process(job) } }() } go func () { wg.Wait() close (results) }()
8.3 Worker Pool 中 Worker 数量选择
1 2 3 CPU 密集型任务:Worker 数 ≈ CPU 核心数IO 密集型任务:Worker 数可以多一些(核心数 × 2 ~10 ) 不确定:从 runtime. NumCPU() 开始,根据实测调整
1 2 3 import "runtime" numWorkers := runtime.NumCPU()
8.4 Pipeline 阶段顺序有依赖
Pipeline 的每个阶段必须按顺序串联。如果阶段 B 依赖阶段 A 的输出,不能把它们并行执行。但阶段内部可以用 Worker Pool 来并行。
9. 本课练习
练习 1:文件下载器
要求:
给定 20 个 URL(模拟)
用 Worker Pool 模式,最多同时下载 5 个
每个下载任务模拟随机耗时 200ms~1000ms
收集并打印每个 URL 的下载结果(成功/失败、耗时、大小)
练习 2:日志分析流水线
要求:
阶段 1:读取日志行(模拟 100 条)
阶段 2:解析日志(提取时间、级别、消息)
阶段 3:过滤(只保留 ERROR 和 WARN)
阶段 4:统计(按级别计数)
用 Pipeline 模式串联
练习 3:并行单词计数
要求:
给定 10 段文本(模拟 10 个文件)
用 Fan-out 把每段文本交给一个 goroutine 统计单词频率
用 Fan-in 合并所有结果
输出总的单词频率 Top 10
练习 4:带限速的 API 调用
要求:
模拟调用一个 API 100 次
限速:每秒最多 10 次请求
用 Worker Pool + time.Ticker 实现
总超时 15 秒
练习 5:组合模式
要求:
设计一个"电商订单处理系统"
阶段 1:接收订单(生产者)
阶段 2:验证库存(Worker Pool,3 个 worker)
阶段 3:计算价格(Pipeline)
阶段 4:发送通知(Fan-out 给邮件和短信两个 channel,Fan-in 合并确认)
10. 自测题
10.1 概念题
生产者-消费者模式中,channel 缓冲区的作用是什么?
Worker Pool 模式的核心目的是什么?Worker 数量怎么选?
Pipeline 模式中,每个阶段为什么要返回 <-chan T 而不是 chan T?
Fan-in 是怎么实现的?为什么需要一个单独的 goroutine 来关闭输出 channel?
为什么 Pipeline 的每个 goroutine 里发送和接收都要配合 ctx.Done()?
Worker Pool 和 Fan-out 有什么区别?
怎么避免 Pipeline 中的 goroutine 泄漏?
四种模式分别适合什么场景?
10.2 代码阅读题
以下代码存在一个问题,找出来:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 func process (in <-chan int ) <-chan int { out := make (chan int ) go func () { for v := range in { out <- v * 2 } }() return out } func main () { ch := make (chan int ) go func () { for i := 0 ; i < 5 ; i++ { ch <- i } close (ch) }() result := process(ch) for v := range result { fmt.Println(v) } }
点击查看答案
问题:out channel 永远不会被关闭 。
process 函数中的 goroutine 在 for range in 结束后退出了,但没有 close(out)。主 goroutine 中的 for v := range result 会在收到 0、2、4、6、8 后永远阻塞等待,最终报 deadlock。
修复:
1 2 3 4 5 6 go func () { defer close (out) for v := range in { out <- v * 2 } }()
这是 Pipeline 模式中最常见的 bug——忘记关闭输出 channel。
11. 本课总结
这一课你学到了 Go 中最常用的四种并发设计模式。
模式
核心结构
关键 channel
生产者-消费者
生产 → buffer → 消费
一个数据 channel
Worker Pool
任务 → N 个 Worker → 结果
jobs + results
Pipeline
A → B → C → D
每个阶段之间一个 channel
Fan-out/Fan-in
拆分 → 并行 → 合并
多个输入 channel → merge → 一个输出
最重要的三件事:
Worker Pool 控制并发度——不是 goroutine 越多越好,资源有限就要限制
Pipeline 每个阶段返回只读 channel 并 defer close(out)——这是不泄漏的前提
所有 goroutine 的发送和接收都配合 ctx.Done()——确保取消时整个管道能干净退出
12. 下一课预告
到这里你已经掌握了 Go 并发编程的核心内容。下面进入工程实践部分。
下一课:测试基础
会重点讲:
Go 的 testing 包怎么用
单元测试的写法和命名约定
表驱动测试——Go 测试的经典范式
go test 命令的常用参数
测试覆盖率
学完下一课,你就能为自己写的代码加上测试,提高代码质量了。