Go 从 0 到精通 · 第 27 课:select 与并发控制
学习定位:这是整套 Go 教程的第 27 课,也是阶段五(并发与工程阶段)的第三课。
前置要求:已经完成第 26 课,掌握了 channel 的创建、发送、接收和关闭。
本课目标:掌握 select 的语法和工作机制,能同时监听多个 channel,能实现超时控制和非阻塞操作,理解常见的 select 使用模式。
1. 本课你要解决的核心问题
上一课你学会了用 channel 在两个 goroutine 之间传递数据。但真实的并发场景不会这么简单:
你要同时等待 用户输入和超时信号——哪个先来就处理哪个
你要同时监听 多个服务的响应——任何一个返回就继续
你要做非阻塞 的 channel 操作——有数据就收,没有就跳过
这些场景都需要同时监听多个 channel ,这就是 select 干的事。
你可以把 select 理解为channel 版的 switch :switch 是根据值选择分支,select 是根据哪个 channel 准备好了来选择分支。
你需要搞明白以下问题:
select 的基本语法
多个 case 同时就绪时怎么选
怎么用 default 做非阻塞操作
怎么实现超时控制
怎么实现定时任务
常见的 select 使用模式
2. select 基本语法
2.1 长什么样
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 select {case v := <-ch1: fmt.Println("从 ch1 收到:" , v) case ch2 <- value: fmt.Println("发送到 ch2" ) case v, ok := <-ch3: if !ok { fmt.Println("ch3 已关闭" ) } default : fmt.Println("没有 channel 准备好" ) }
2.2 和 switch 的区别
特性
switch
select
选择依据
值匹配
channel 就绪状态
case 内容
表达式
channel 操作(发送或接收)
执行方式
从上到下匹配第一个
随机 选择一个就绪的 case
阻塞行为
不阻塞
没有 default 时会阻塞
最关键的区别 :switch 按顺序匹配,select 随机选择。
3. 第一个 select 示例
3.1 同时监听两个 channel
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 package mainimport ( "fmt" "time" ) func main () { ch1 := make (chan string ) ch2 := make (chan string ) go func () { time.Sleep(1 * time.Second) ch1 <- "来自 ch1" }() go func () { time.Sleep(2 * time.Second) ch2 <- "来自 ch2" }() select { case msg := <-ch1: fmt.Println("收到:" , msg) case msg := <-ch2: fmt.Println("收到:" , msg) } }
输出:
ch1 在 1 秒后就绪,ch2 要 2 秒。select 会选择先就绪的那个 case,所以选了 ch1。
注意 :select 只执行一个 case 就结束了。如果你想持续监听,需要把 select 放在循环里。
3.2 多个 case 同时就绪——随机选择
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 package mainimport "fmt" func main () { ch1 := make (chan string , 1 ) ch2 := make (chan string , 1 ) ch1 <- "A" ch2 <- "B" select { case msg := <-ch1: fmt.Println("选了 ch1:" , msg) case msg := <-ch2: fmt.Println("选了 ch2:" , msg) } }
运行多次,你会看到有时选 ch1,有时选 ch2。这是故意设计的 ——随机选择防止某个 channel 被饿死(永远得不到处理)。
4. select 在循环中使用
4.1 持续监听多个 channel
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 package mainimport ( "fmt" "time" ) func main () { tick := time.Tick(500 * time.Millisecond) boom := time.After(2 * time.Second) for { select { case <-tick: fmt.Println("tick." ) case <-boom: fmt.Println("BOOM!" ) return default : fmt.Println(" ." ) time.Sleep(100 * time.Millisecond) } } }
输出:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 . . . . tick. . . . . . tick. . . . . tick. . . . . . tick. BOOM!
这个例子展示了 select 的三种 case 类型:
定时触发 :time.Tick 返回一个 channel,定期发送信号
延时触发 :time.After 返回一个 channel,指定时间后发送一次信号
默认分支 :没有 channel 就绪时执行
4.2 监听多个数据源
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 package mainimport ( "fmt" "sync" "time" ) func sensor (name string , interval time.Duration, ch chan <- string ) { for i := 1 ; i <= 3 ; i++ { time.Sleep(interval) ch <- fmt.Sprintf("[%s] 数据 #%d" , name, i) } } func main () { tempCh := make (chan string ) humidityCh := make (chan string ) go sensor("温度" , 300 *time.Millisecond, tempCh) go sensor("湿度" , 500 *time.Millisecond, humidityCh) var wg sync.WaitGroup wg.Add(1 ) go func () { defer wg.Done() timeout := time.After(2 * time.Second) for { select { case msg := <-tempCh: fmt.Println("收到:" , msg) case msg := <-humidityCh: fmt.Println("收到:" , msg) case <-timeout: fmt.Println("监听超时,退出" ) return } } }() wg.Wait() }
可能的输出:
1 2 3 4 5 6 7 收到: [温度] 数据 #1 收到: [湿度] 数据 #1 收到: [温度] 数据 #2 收到: [温度] 数据 #3 收到: [湿度] 数据 #2 收到: [湿度] 数据 #3 监听超时,退出
两个传感器以不同频率产生数据,select 不管谁先来,来了就处理。
5. 超时控制
5.1 time.After——最常用的超时方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 package mainimport ( "fmt" "time" ) func slowOperation () chan string { ch := make (chan string ) go func () { time.Sleep(3 * time.Second) ch <- "操作完成" }() return ch } func main () { result := slowOperation() select { case msg := <-result: fmt.Println("成功:" , msg) case <-time.After(2 * time.Second): fmt.Println("超时!操作太慢了" ) } }
输出:
操作需要 3 秒,但我们只等 2 秒。time.After(2 * time.Second) 返回一个 channel,2 秒后会发送一个时间值。select 谁先就绪就执行谁——超时先到,就走超时分支。
5.2 time.After 的原理
time.After(d) 做了这些事:
创建一个 chan time.Time
启动一个内部定时器
d 时间后往 channel 发送当前时间
返回这个 channel
所以 <-time.After(2 * time.Second) 本质上就是"等 2 秒后收到一个信号"。
5.3 每次操作独立超时
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 package mainimport ( "fmt" "math/rand" "time" ) func query (server string ) chan string { ch := make (chan string ) go func () { delay := time.Duration(100 +rand.Intn(500 )) * time.Millisecond time.Sleep(delay) ch <- fmt.Sprintf("%s: 响应耗时 %v" , server, delay) }() return ch } func main () { servers := []string {"服务器A" , "服务器B" , "服务器C" } for _, server := range servers { result := query(server) select { case msg := <-result: fmt.Println("成功 -" , msg) case <-time.After(300 * time.Millisecond): fmt.Printf("超时 - %s 没有在 300ms 内响应\n" , server) } } }
可能的输出:
1 2 3 成功 - 服务器A : 响应耗时 156ms 超时 - 服务器B 没有在 300ms 内响应 成功 - 服务器C : 响应耗时 234ms
每个请求有独立的 300ms 超时限制。
6. default 分支——非阻塞操作
6.1 没有 default 时
如果所有 case 都没有就绪,select 会阻塞等待 ,直到某个 case 就绪。
6.2 有 default 时
如果加了 default,所有 case 都没就绪时直接执行 default,不会阻塞 。
6.3 非阻塞接收
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 package mainimport "fmt" func main () { ch := make (chan int , 1 ) select { case v := <-ch: fmt.Println("收到:" , v) default : fmt.Println("没有数据可接收" ) } ch <- 42 select { case v := <-ch: fmt.Println("收到:" , v) default : fmt.Println("没有数据可接收" ) } }
输出:
6.4 非阻塞发送
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 package mainimport "fmt" func main () { ch := make (chan int , 1 ) ch <- 1 select { case ch <- 2 : fmt.Println("发送成功" ) default : fmt.Println("缓冲区满了,发送失败" ) } }
输出:
6.5 什么时候用 default
轮询 :循环中不断检查 channel,有就处理,没有就做其他事
避免阻塞 :尝试发送或接收,但不想被卡住
忙等待 :通常配合 time.Sleep,防止 CPU 空转
1 2 3 4 5 6 7 8 9 10 for { select { case msg := <-ch: handle(msg) default : time.Sleep(10 * time.Millisecond) } }
注意 :default 用不好会导致 CPU 空转(一直循环做 default,浪费 CPU)。如果不需要非阻塞,就不要加 default。
7. 常见使用模式
7.1 模式一:超时退出
1 2 3 4 5 6 select {case result := <-ch: fmt.Println("结果:" , result) case <-time.After(5 * time.Second): fmt.Println("超时" ) }
最常见的模式。等待结果,但不无限等。
7.2 模式二:定时任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 package mainimport ( "fmt" "time" ) func main () { ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() done := time.After(5 * time.Second) for { select { case t := <-ticker.C: fmt.Println("定时任务执行:" , t.Format("15:04:05" )) case <-done: fmt.Println("结束" ) return } } }
输出:
1 2 3 4 5 6 定时任务执行: 14:30:01 定时任务执行: 14:30:02 定时任务执行: 14:30:03 定时任务执行: 14:30:04 定时任务执行: 14:30:05 结束
time.NewTicker vs time.Tick :
time.NewTicker 返回一个 *Ticker,可以调 Stop() 停止,用完必须停止
time.Tick 是简写版,返回的 channel 无法停止,会一直占用资源
在生产代码中用 NewTicker,在简单示例中可以用 Tick
7.3 模式三:退出信号
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 package mainimport ( "fmt" "time" ) func worker (quit <-chan struct {}) { for { select { case <-quit: fmt.Println("Worker: 收到退出信号,清理中..." ) fmt.Println("Worker: 已退出" ) return default : fmt.Println("Worker: 工作中..." ) time.Sleep(500 * time.Millisecond) } } } func main () { quit := make (chan struct {}) go worker(quit) time.Sleep(2 * time.Second) fmt.Println("Main: 发送退出信号" ) close (quit) time.Sleep(500 * time.Millisecond) }
输出:
1 2 3 4 5 6 7 Worker: 工作中...Worker: 工作中...Worker: 工作中...Worker: 工作中...Main: 发送退出信号Worker: 收到退出信号,清理中...Worker: 已退出
用 close(quit) 发送退出信号。关闭 channel 后,<-quit 立即返回零值(上一课讲的),所以所有在 <-quit 上等待的 goroutine 都会被唤醒。
7.4 模式四:多个服务竞速
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 package mainimport ( "fmt" "math/rand" "time" ) func queryServer (server string ) <-chan string { ch := make (chan string ) go func () { delay := time.Duration(100 +rand.Intn(900 )) * time.Millisecond time.Sleep(delay) ch <- fmt.Sprintf("%s (耗时 %v)" , server, delay) }() return ch } func main () { select { case result := <-queryServer("主服务器" ): fmt.Println("使用:" , result) case result := <-queryServer("备用服务器1" ): fmt.Println("使用:" , result) case result := <-queryServer("备用服务器2" ): fmt.Println("使用:" , result) case <-time.After(1 * time.Second): fmt.Println("所有服务器都超时了" ) } }
同时发出三个请求,谁先响应就用谁的结果。这在分布式系统中很常见——通过冗余请求降低延迟。
7.5 模式五:心跳检测
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 package mainimport ( "fmt" "time" ) func startWorker (heartbeat chan <- struct {}) { go func () { for i := 0 ; ; i++ { time.Sleep(500 * time.Millisecond) select { case heartbeat <- struct {}{}: default : } if i == 5 { fmt.Println("Worker: 我卡住了..." ) time.Sleep(10 * time.Second) } } }() } func main () { heartbeat := make (chan struct {}, 1 ) startWorker(heartbeat) for { select { case <-heartbeat: fmt.Println("收到心跳,Worker 正常" ) case <-time.After(2 * time.Second): fmt.Println("超过 2 秒没有心跳,Worker 可能挂了!" ) return } } }
输出:
1 2 3 4 5 6 7 收到心跳,Worker 正常 收到心跳,Worker 正常 收到心跳,Worker 正常 收到心跳,Worker 正常 收到心跳,Worker 正常 收到心跳,Worker 正常 超过 2 秒没有心跳,Worker 可能挂了!
Worker 定期发送心跳,监控方用 select + time.After 检测。如果超过 2 秒没有心跳,就判定 Worker 异常。
8. 空 select
空的 select 没有任何 case,所以永远不会有就绪的 case,主 goroutine 会永久阻塞 。
什么时候用?当你的所有逻辑都在 goroutine 中运行,主 goroutine 只需要保持程序不退出:
1 2 3 4 5 6 7 func main () { go server1() go server2() go server3() select {} }
不过在生产环境中,通常用信号监听(比如等待 Ctrl+C)来代替空 select。
9. select 的注意事项
9.1 select 不会 fall through
和 switch 一样,select 只执行一个 case,不会像 C 语言那样"掉下去"执行下一个 case。
9.2 case 里的表达式在 select 开始时求值
1 2 3 4 5 6 7 select {case v := <-getChannel(): fmt.Println(v) case ch <- getValue(): fmt.Println("sent" ) }
9.3 for-select 退出要用 return 或标签
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 for { select { case <-quit: break } } for { select { case <-quit: return } } Loop: for { select { case <-quit: break Loop } }
这是 Go 并发编程中一个常见的坑:在 for-select 中,break 默认只跳出 select,不跳出外层的 for。如果想跳出循环,要用 return 或带标签的 break。
10. 实战综合示例
10.1 限时任务处理器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 package mainimport ( "fmt" "math/rand" "time" ) type Job struct { ID int Data string } type Result struct { JobID int Value string } func processJob (job Job) Result { delay := time.Duration(50 +rand.Intn(200 )) * time.Millisecond time.Sleep(delay) return Result{ JobID: job.ID, Value: fmt.Sprintf("处理结果: %s (耗时 %v)" , job.Data, delay), } } func main () { jobs := make (chan Job, 10 ) results := make (chan Result, 10 ) for w := 1 ; w <= 3 ; w++ { go func (workerID int ) { for job := range jobs { fmt.Printf("Worker %d 处理任务 #%d\n" , workerID, job.ID) results <- processJob(job) } }(w) } totalJobs := 8 for i := 1 ; i <= totalJobs; i++ { jobs <- Job{ID: i, Data: fmt.Sprintf("任务数据_%d" , i)} } close (jobs) timeout := time.After(3 * time.Second) collected := 0 for collected < totalJobs { select { case r := <-results: fmt.Printf(" 结果: %s\n" , r.Value) collected++ case <-timeout: fmt.Printf("超时!只收到 %d/%d 个结果\n" , collected, totalJobs) return } } fmt.Printf("全部完成!收到 %d/%d 个结果\n" , collected, totalJobs) }
10.2 多路数据合并
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 package mainimport ( "fmt" "time" ) func merge (channels ...<-chan string ) <-chan string { out := make (chan string ) remaining := len (channels) done := make (chan struct {}) for _, ch := range channels { go func (c <-chan string ) { for v := range c { out <- v } done <- struct {}{} }(ch) } go func () { for i := 0 ; i < remaining; i++ { <-done } close (out) }() return out } func dataSource (name string , count int , interval time.Duration) <-chan string { ch := make (chan string ) go func () { for i := 1 ; i <= count; i++ { time.Sleep(interval) ch <- fmt.Sprintf("[%s] 数据 %d" , name, i) } close (ch) }() return ch } func main () { src1 := dataSource("快" , 5 , 100 *time.Millisecond) src2 := dataSource("中" , 3 , 250 *time.Millisecond) src3 := dataSource("慢" , 2 , 400 *time.Millisecond) merged := merge(src1, src2, src3) for msg := range merged { fmt.Println(msg) } fmt.Println("所有数据源已完成" ) }
这个 merge 函数把多个 channel 合并成一个,是 Go 并发编程中很经典的工具。
11. 常见坑总结
11.1 循环中的 time.After 会泄漏
1 2 3 4 5 6 7 8 9 10 for { select { case msg := <-ch: handle(msg) case <-time.After(5 * time.Second): fmt.Println("超时" ) return } }
每次循环都会调用 time.After,创建一个新的定时器。如果 ch 频繁有数据,旧的定时器还没触发就被丢弃了,但它们仍然在内存中直到触发时间到。在高频循环中会造成内存泄漏。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 timer := time.NewTimer(5 * time.Second) defer timer.Stop()for { select { case msg := <-ch: handle(msg) if !timer.Stop() { <-timer.C } timer.Reset(5 * time.Second) case <-timer.C: fmt.Println("超时" ) return } }
简单规则 :time.After 适合一次性的超时。循环中反复超时,用 time.NewTimer。
11.2 for-select 中 break 的陷阱
1 2 3 4 5 6 7 8 9 10 11 for { select { case <-quit: break case msg := <-ch: fmt.Println(msg) } }
前面第 9.3 节详细讲过,再强调一遍——这是非常高频的 bug。
11.3 忘记 default 导致意外阻塞
1 2 3 4 5 6 7 select {case ch <- data: fmt.Println("发送成功" ) }
需要非阻塞操作时一定要加 default。
11.4 default 导致 CPU 空转
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 for { select { case msg := <-ch: handle(msg) default : } } for { select { case msg := <-ch: handle(msg) default : time.Sleep(10 * time.Millisecond) } } for msg := range ch { handle(msg) }
11.5 Ticker 不停止会泄漏
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func f () { ticker := time.NewTicker(time.Second) for range ticker.C { if done { return } } } func f () { ticker := time.NewTicker(time.Second) defer ticker.Stop() }
12. 本课练习
练习 1:超时查询
要求:
写一个函数模拟数据库查询,随机耗时 1~5 秒
主 goroutine 调用这个查询,但最多等 3 秒
如果在 3 秒内返回,打印结果
如果超时,打印"查询超时"
练习 2:优先级 channel
要求:
创建两个 channel:highPriority 和 lowPriority
两个 goroutine 分别向两个 channel 发送数据
用 select 接收数据,当两个都有数据时,优先处理 highPriority
提示:可以用嵌套 select 或先尝试非阻塞接收高优先级。
练习 3:倒计时器
要求:
实现一个 10 秒倒计时
每秒打印剩余秒数
期间用户可以输入 “stop” 来提前停止
倒计时结束打印 “发射!”,提前停止打印 “已取消”
提示:用一个 goroutine 读用户输入,通过 channel 传递给 select。
练习 4:限速器
要求:
写一个函数,接收请求但限制为每秒最多 3 个
用 time.Ticker 实现限速
超出速率的请求等待,超过 5 秒等不到就丢弃
练习 5:多源合并排序
要求:
三个 goroutine 各自生成一组有序数字,发送到各自的 channel
写一个函数,用 select 从三个 channel 中接收,合并成一个有序输出
提示:这是归并排序中"合并"步骤的并发版本。
13. 自测题
13.1 概念题
select 和 switch 最大的区别是什么?
多个 case 同时就绪时,select 怎么选择?为什么这样设计?
select 中 default 的作用是什么?什么时候该用,什么时候不该用?
time.After 返回的是什么?它的原理是什么?
for-select 中 break 跳出的是什么?怎么跳出外层循环?
time.NewTicker 和 time.Tick 有什么区别?为什么推荐用 NewTicker?
空 select {} 会怎样?什么时候用?
在循环中使用 time.After 有什么问题?
13.2 代码阅读题
预测以下代码的行为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 package mainimport ( "fmt" "time" ) func main () { ch := make (chan int , 1 ) go func () { for i := 1 ; ; i++ { ch <- i time.Sleep(300 * time.Millisecond) } }() for { select { case v := <-ch: fmt.Println("收到:" , v) case <-time.After(1 * time.Second): fmt.Println("超时退出" ) return } } }
点击查看答案
程序会持续输出:
永远不会超时退出 。
解释:
goroutine 每 300ms 发送一个数字
select 每次循环时创建一个新的 time.After(1 * time.Second)
因为每 300ms 就能从 ch 收到数据,time.After 的 1 秒还没到就被新一轮循环替换了
每次循环都是新的 time.After,所以超时永远不会触发
如果想要"1 秒内没有任何数据就超时",应该把 time.After 或 time.NewTimer 放在循环外面 ,或者在收到数据后重置定时器。
14. 本课总结
这一课你学到了 select——Go 并发编程中的多路复用器。
场景
做法
同时监听多个 channel
select + 多个 case
超时控制
case <-time.After(d)
定时任务
time.NewTicker + select
非阻塞操作
select + default
退出信号
case <-quit,quit 用 close 触发
多服务竞速
多个 case 接收,先到先用
持续监听
for { select { ... } }
最重要的三件事:
select 监听多个 channel,谁先就绪执行谁——多个同时就绪则随机选
time.After 配合 select 实现超时——这是 Go 中最常见的超时方式
for-select 中 break 只跳出 select——要退出循环用 return 或标签
15. 下一课预告
到目前为止你学了 goroutine、channel、select,已经能写不少并发程序了。但有一类问题还没解决:多个 goroutine 访问同一个共享资源怎么办?
比如多个 goroutine 同时往一个 map 里写数据,程序直接崩溃。多个 goroutine 同时修改一个计数器,结果不对。这些都是并发安全 问题。
下一课:同步原语
会重点讲:
sync.Mutex——互斥锁,保护共享资源
sync.RWMutex——读写锁,读多写少的场景
sync.Once——只执行一次
什么是数据竞争,怎么用 go run -race 检测
什么时候用锁,什么时候用 channel
学完下一课,你就能写出安全可靠的并发程序了。