Go 从 0 到精通 · 第 27 课:select 与并发控制

学习定位:这是整套 Go 教程的第 27 课,也是阶段五(并发与工程阶段)的第三课。
前置要求:已经完成第 26 课,掌握了 channel 的创建、发送、接收和关闭。
本课目标:掌握 select 的语法和工作机制,能同时监听多个 channel,能实现超时控制和非阻塞操作,理解常见的 select 使用模式。


1. 本课你要解决的核心问题

上一课你学会了用 channel 在两个 goroutine 之间传递数据。但真实的并发场景不会这么简单:

  • 你要同时等待用户输入和超时信号——哪个先来就处理哪个
  • 你要同时监听多个服务的响应——任何一个返回就继续
  • 你要做非阻塞的 channel 操作——有数据就收,没有就跳过

这些场景都需要同时监听多个 channel,这就是 select 干的事。

你可以把 select 理解为channel 版的 switchswitch 是根据值选择分支,select 是根据哪个 channel 准备好了来选择分支。

你需要搞明白以下问题:

  • select 的基本语法
  • 多个 case 同时就绪时怎么选
  • 怎么用 default 做非阻塞操作
  • 怎么实现超时控制
  • 怎么实现定时任务
  • 常见的 select 使用模式

2. select 基本语法

2.1 长什么样

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
select {
case v := <-ch1:
// ch1 有数据可接收时执行
fmt.Println("从 ch1 收到:", v)
case ch2 <- value:
// ch2 可以发送数据时执行
fmt.Println("发送到 ch2")
case v, ok := <-ch3:
// ch3 有数据或已关闭时执行
if !ok {
fmt.Println("ch3 已关闭")
}
default:
// 所有 case 都不就绪时执行
fmt.Println("没有 channel 准备好")
}

2.2 和 switch 的区别

特性 switch select
选择依据 值匹配 channel 就绪状态
case 内容 表达式 channel 操作(发送或接收)
执行方式 从上到下匹配第一个 随机选择一个就绪的 case
阻塞行为 不阻塞 没有 default 时会阻塞

最关键的区别switch 按顺序匹配,select 随机选择。


3. 第一个 select 示例

3.1 同时监听两个 channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import (
"fmt"
"time"
)

func main() {
ch1 := make(chan string)
ch2 := make(chan string)

go func() {
time.Sleep(1 * time.Second)
ch1 <- "来自 ch1"
}()

go func() {
time.Sleep(2 * time.Second)
ch2 <- "来自 ch2"
}()

// 等待第一个返回的结果
select {
case msg := <-ch1:
fmt.Println("收到:", msg)
case msg := <-ch2:
fmt.Println("收到:", msg)
}
}

输出:

1
收到: 来自 ch1

ch1 在 1 秒后就绪,ch2 要 2 秒。select 会选择先就绪的那个 case,所以选了 ch1。

注意select 只执行一个 case 就结束了。如果你想持续监听,需要把 select 放在循环里。

3.2 多个 case 同时就绪——随机选择

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package main

import "fmt"

func main() {
ch1 := make(chan string, 1)
ch2 := make(chan string, 1)

ch1 <- "A"
ch2 <- "B"

// 两个 case 都就绪了,select 会随机选一个
select {
case msg := <-ch1:
fmt.Println("选了 ch1:", msg)
case msg := <-ch2:
fmt.Println("选了 ch2:", msg)
}
}

运行多次,你会看到有时选 ch1,有时选 ch2。这是故意设计的——随机选择防止某个 channel 被饿死(永远得不到处理)。


4. select 在循环中使用

4.1 持续监听多个 channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package main

import (
"fmt"
"time"
)

func main() {
tick := time.Tick(500 * time.Millisecond) // 每 500ms 触发一次
boom := time.After(2 * time.Second) // 2 秒后触发一次

for {
select {
case <-tick:
fmt.Println("tick.")
case <-boom:
fmt.Println("BOOM!")
return // 退出整个程序
default:
fmt.Println(" .")
time.Sleep(100 * time.Millisecond)
}
}
}

输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
    .
.
.
.
tick.
.
.
.
.
.
tick.
.
.
.
.
tick.
.
.
.
.
.
tick.
BOOM!

这个例子展示了 select 的三种 case 类型:

  • 定时触发time.Tick 返回一个 channel,定期发送信号
  • 延时触发time.After 返回一个 channel,指定时间后发送一次信号
  • 默认分支:没有 channel 就绪时执行

4.2 监听多个数据源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package main

import (
"fmt"
"sync"
"time"
)

func sensor(name string, interval time.Duration, ch chan<- string) {
for i := 1; i <= 3; i++ {
time.Sleep(interval)
ch <- fmt.Sprintf("[%s] 数据 #%d", name, i)
}
}

func main() {
tempCh := make(chan string)
humidityCh := make(chan string)

go sensor("温度", 300*time.Millisecond, tempCh)
go sensor("湿度", 500*time.Millisecond, humidityCh)

var wg sync.WaitGroup
wg.Add(1)

go func() {
defer wg.Done()
timeout := time.After(2 * time.Second)
for {
select {
case msg := <-tempCh:
fmt.Println("收到:", msg)
case msg := <-humidityCh:
fmt.Println("收到:", msg)
case <-timeout:
fmt.Println("监听超时,退出")
return
}
}
}()

wg.Wait()
}

可能的输出:

1
2
3
4
5
6
7
收到: [温度] 数据 #1
收到: [湿度] 数据 #1
收到: [温度] 数据 #2
收到: [温度] 数据 #3
收到: [湿度] 数据 #2
收到: [湿度] 数据 #3
监听超时,退出

两个传感器以不同频率产生数据,select 不管谁先来,来了就处理。


5. 超时控制

5.1 time.After——最常用的超时方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import (
"fmt"
"time"
)

func slowOperation() chan string {
ch := make(chan string)
go func() {
time.Sleep(3 * time.Second) // 模拟一个很慢的操作
ch <- "操作完成"
}()
return ch
}

func main() {
result := slowOperation()

select {
case msg := <-result:
fmt.Println("成功:", msg)
case <-time.After(2 * time.Second):
fmt.Println("超时!操作太慢了")
}
}

输出:

1
超时!操作太慢了

操作需要 3 秒,但我们只等 2 秒。time.After(2 * time.Second) 返回一个 channel,2 秒后会发送一个时间值。select 谁先就绪就执行谁——超时先到,就走超时分支。

5.2 time.After 的原理

time.After(d) 做了这些事:

  1. 创建一个 chan time.Time
  2. 启动一个内部定时器
  3. d 时间后往 channel 发送当前时间
  4. 返回这个 channel

所以 <-time.After(2 * time.Second) 本质上就是"等 2 秒后收到一个信号"。

5.3 每次操作独立超时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
"fmt"
"math/rand"
"time"
)

func query(server string) chan string {
ch := make(chan string)
go func() {
// 模拟响应时间 100ms ~ 600ms
delay := time.Duration(100+rand.Intn(500)) * time.Millisecond
time.Sleep(delay)
ch <- fmt.Sprintf("%s: 响应耗时 %v", server, delay)
}()
return ch
}

func main() {
servers := []string{"服务器A", "服务器B", "服务器C"}

for _, server := range servers {
result := query(server)

select {
case msg := <-result:
fmt.Println("成功 -", msg)
case <-time.After(300 * time.Millisecond):
fmt.Printf("超时 - %s 没有在 300ms 内响应\n", server)
}
}
}

可能的输出:

1
2
3
成功 - 服务器A: 响应耗时 156ms
超时 - 服务器B 没有在 300ms 内响应
成功 - 服务器C: 响应耗时 234ms

每个请求有独立的 300ms 超时限制。


6. default 分支——非阻塞操作

6.1 没有 default

如果所有 case 都没有就绪,select阻塞等待,直到某个 case 就绪。

6.2 有 default

如果加了 default,所有 case 都没就绪时直接执行 default不会阻塞

6.3 非阻塞接收

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import "fmt"

func main() {
ch := make(chan int, 1)

// 非阻塞接收:有数据就收,没有就跳过
select {
case v := <-ch:
fmt.Println("收到:", v)
default:
fmt.Println("没有数据可接收")
}

ch <- 42

// 现在有数据了
select {
case v := <-ch:
fmt.Println("收到:", v)
default:
fmt.Println("没有数据可接收")
}
}

输出:

1
2
没有数据可接收
收到: 42

6.4 非阻塞发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package main

import "fmt"

func main() {
ch := make(chan int, 1)
ch <- 1 // 缓冲区满了

// 非阻塞发送:能发就发,不能发就跳过
select {
case ch <- 2:
fmt.Println("发送成功")
default:
fmt.Println("缓冲区满了,发送失败")
}
}

输出:

1
缓冲区满了,发送失败

6.5 什么时候用 default

  • 轮询:循环中不断检查 channel,有就处理,没有就做其他事
  • 避免阻塞:尝试发送或接收,但不想被卡住
  • 忙等待:通常配合 time.Sleep,防止 CPU 空转
1
2
3
4
5
6
7
8
9
10
// 轮询模式
for {
select {
case msg := <-ch:
handle(msg)
default:
// 没有消息,做其他事或短暂休息
time.Sleep(10 * time.Millisecond)
}
}

注意default 用不好会导致 CPU 空转(一直循环做 default,浪费 CPU)。如果不需要非阻塞,就不要加 default


7. 常见使用模式

7.1 模式一:超时退出

1
2
3
4
5
6
select {
case result := <-ch:
fmt.Println("结果:", result)
case <-time.After(5 * time.Second):
fmt.Println("超时")
}

最常见的模式。等待结果,但不无限等。

7.2 模式二:定时任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
"fmt"
"time"
)

func main() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop() // 不用了要 Stop,防止泄漏

done := time.After(5 * time.Second)

for {
select {
case t := <-ticker.C:
fmt.Println("定时任务执行:", t.Format("15:04:05"))
case <-done:
fmt.Println("结束")
return
}
}
}

输出:

1
2
3
4
5
6
定时任务执行: 14:30:01
定时任务执行: 14:30:02
定时任务执行: 14:30:03
定时任务执行: 14:30:04
定时任务执行: 14:30:05
结束

time.NewTicker vs time.Tick

  • time.NewTicker 返回一个 *Ticker,可以调 Stop() 停止,用完必须停止
  • time.Tick 是简写版,返回的 channel 无法停止,会一直占用资源
  • 在生产代码中用 NewTicker,在简单示例中可以用 Tick

7.3 模式三:退出信号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package main

import (
"fmt"
"time"
)

func worker(quit <-chan struct{}) {
for {
select {
case <-quit:
fmt.Println("Worker: 收到退出信号,清理中...")
// 做清理工作
fmt.Println("Worker: 已退出")
return
default:
fmt.Println("Worker: 工作中...")
time.Sleep(500 * time.Millisecond)
}
}
}

func main() {
quit := make(chan struct{})

go worker(quit)

// 让 worker 工作 2 秒
time.Sleep(2 * time.Second)

// 发送退出信号
fmt.Println("Main: 发送退出信号")
close(quit)

// 等一下让 worker 完成清理
time.Sleep(500 * time.Millisecond)
}

输出:

1
2
3
4
5
6
7
Worker: 工作中...
Worker: 工作中...
Worker: 工作中...
Worker: 工作中...
Main: 发送退出信号
Worker: 收到退出信号,清理中...
Worker: 已退出

close(quit) 发送退出信号。关闭 channel 后,<-quit 立即返回零值(上一课讲的),所以所有在 <-quit 上等待的 goroutine 都会被唤醒。

7.4 模式四:多个服务竞速

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import (
"fmt"
"math/rand"
"time"
)

func queryServer(server string) <-chan string {
ch := make(chan string)
go func() {
delay := time.Duration(100+rand.Intn(900)) * time.Millisecond
time.Sleep(delay)
ch <- fmt.Sprintf("%s (耗时 %v)", server, delay)
}()
return ch
}

func main() {
// 同时请求三个服务器,谁先响应用谁的结果
select {
case result := <-queryServer("主服务器"):
fmt.Println("使用:", result)
case result := <-queryServer("备用服务器1"):
fmt.Println("使用:", result)
case result := <-queryServer("备用服务器2"):
fmt.Println("使用:", result)
case <-time.After(1 * time.Second):
fmt.Println("所有服务器都超时了")
}
}

同时发出三个请求,谁先响应就用谁的结果。这在分布式系统中很常见——通过冗余请求降低延迟。

7.5 模式五:心跳检测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import (
"fmt"
"time"
)

func startWorker(heartbeat chan<- struct{}) {
go func() {
for i := 0; ; i++ {
// 每 500ms 发送一次心跳
time.Sleep(500 * time.Millisecond)
select {
case heartbeat <- struct{}{}:
default:
// 心跳 channel 满了就跳过,不阻塞
}

// 模拟:第 6 次后 worker "卡住" 了
if i == 5 {
fmt.Println("Worker: 我卡住了...")
time.Sleep(10 * time.Second)
}
}
}()
}

func main() {
heartbeat := make(chan struct{}, 1)
startWorker(heartbeat)

for {
select {
case <-heartbeat:
fmt.Println("收到心跳,Worker 正常")
case <-time.After(2 * time.Second):
fmt.Println("超过 2 秒没有心跳,Worker 可能挂了!")
return
}
}
}

输出:

1
2
3
4
5
6
7
收到心跳,Worker 正常
收到心跳,Worker 正常
收到心跳,Worker 正常
收到心跳,Worker 正常
收到心跳,Worker 正常
收到心跳,Worker 正常
超过 2 秒没有心跳,Worker 可能挂了!

Worker 定期发送心跳,监控方用 select + time.After 检测。如果超过 2 秒没有心跳,就判定 Worker 异常。


8. 空 select

1
select {}  // 永久阻塞

空的 select 没有任何 case,所以永远不会有就绪的 case,主 goroutine 会永久阻塞

什么时候用?当你的所有逻辑都在 goroutine 中运行,主 goroutine 只需要保持程序不退出:

1
2
3
4
5
6
7
func main() {
go server1()
go server2()
go server3()

select {} // 主 goroutine 永远等待,让其他 goroutine 运行
}

不过在生产环境中,通常用信号监听(比如等待 Ctrl+C)来代替空 select。


9. select 的注意事项

9.1 select 不会 fall through

switch 一样,select 只执行一个 case,不会像 C 语言那样"掉下去"执行下一个 case。

9.2 case 里的表达式在 select 开始时求值

1
2
3
4
5
6
7
// 每个 case 的 channel 表达式在 select 开始时就确定了
select {
case v := <-getChannel(): // getChannel() 在 select 开始时就调用了
fmt.Println(v)
case ch <- getValue(): // getValue() 在 select 开始时就调用了
fmt.Println("sent")
}

9.3 for-select 退出要用 return 或标签

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 错误:break 只跳出 select,不跳出 for
for {
select {
case <-quit:
break // 这只跳出 select,循环继续!
}
}

// 方法一:用 return
for {
select {
case <-quit:
return // 跳出整个函数
}
}

// 方法二:用标签
Loop:
for {
select {
case <-quit:
break Loop // 跳出标签对应的 for 循环
}
}
// 代码继续执行...

这是 Go 并发编程中一个常见的坑:在 for-select 中,break 默认只跳出 select,不跳出外层的 for。如果想跳出循环,要用 return 或带标签的 break


10. 实战综合示例

10.1 限时任务处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package main

import (
"fmt"
"math/rand"
"time"
)

type Job struct {
ID int
Data string
}

type Result struct {
JobID int
Value string
}

func processJob(job Job) Result {
// 模拟处理耗时
delay := time.Duration(50+rand.Intn(200)) * time.Millisecond
time.Sleep(delay)
return Result{
JobID: job.ID,
Value: fmt.Sprintf("处理结果: %s (耗时 %v)", job.Data, delay),
}
}

func main() {
jobs := make(chan Job, 10)
results := make(chan Result, 10)

// 启动 3 个 worker
for w := 1; w <= 3; w++ {
go func(workerID int) {
for job := range jobs {
fmt.Printf("Worker %d 处理任务 #%d\n", workerID, job.ID)
results <- processJob(job)
}
}(w)
}

// 发送 8 个任务
totalJobs := 8
for i := 1; i <= totalJobs; i++ {
jobs <- Job{ID: i, Data: fmt.Sprintf("任务数据_%d", i)}
}
close(jobs)

// 收集结果,最多等 3 秒
timeout := time.After(3 * time.Second)
collected := 0

for collected < totalJobs {
select {
case r := <-results:
fmt.Printf(" 结果: %s\n", r.Value)
collected++
case <-timeout:
fmt.Printf("超时!只收到 %d/%d 个结果\n", collected, totalJobs)
return
}
}

fmt.Printf("全部完成!收到 %d/%d 个结果\n", collected, totalJobs)
}

10.2 多路数据合并

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package main

import (
"fmt"
"time"
)

// merge 把多个 channel 合并成一个
func merge(channels ...<-chan string) <-chan string {
out := make(chan string)

// 为每个输入 channel 启动一个 goroutine
remaining := len(channels)
done := make(chan struct{})

for _, ch := range channels {
go func(c <-chan string) {
for v := range c {
out <- v
}
done <- struct{}{}
}(ch)
}

// 等所有输入 channel 关闭后,关闭输出 channel
go func() {
for i := 0; i < remaining; i++ {
<-done
}
close(out)
}()

return out
}

func dataSource(name string, count int, interval time.Duration) <-chan string {
ch := make(chan string)
go func() {
for i := 1; i <= count; i++ {
time.Sleep(interval)
ch <- fmt.Sprintf("[%s] 数据 %d", name, i)
}
close(ch)
}()
return ch
}

func main() {
// 三个不同频率的数据源
src1 := dataSource("快", 5, 100*time.Millisecond)
src2 := dataSource("中", 3, 250*time.Millisecond)
src3 := dataSource("慢", 2, 400*time.Millisecond)

// 合并成一个 channel
merged := merge(src1, src2, src3)

// 从合并后的 channel 接收所有数据
for msg := range merged {
fmt.Println(msg)
}

fmt.Println("所有数据源已完成")
}

这个 merge 函数把多个 channel 合并成一个,是 Go 并发编程中很经典的工具。


11. 常见坑总结

11.1 循环中的 time.After 会泄漏

1
2
3
4
5
6
7
8
9
10
// 有问题:每次循环都创建一个新的 timer,旧的不会被回收
for {
select {
case msg := <-ch:
handle(msg)
case <-time.After(5 * time.Second): // 每次循环创建新的!
fmt.Println("超时")
return
}
}

每次循环都会调用 time.After,创建一个新的定时器。如果 ch 频繁有数据,旧的定时器还没触发就被丢弃了,但它们仍然在内存中直到触发时间到。在高频循环中会造成内存泄漏。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 正确:用 time.NewTimer 并手动重置
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()

for {
select {
case msg := <-ch:
handle(msg)
// 重置定时器
if !timer.Stop() {
<-timer.C
}
timer.Reset(5 * time.Second)
case <-timer.C:
fmt.Println("超时")
return
}
}

简单规则time.After 适合一次性的超时。循环中反复超时,用 time.NewTimer

11.2 for-selectbreak 的陷阱

1
2
3
4
5
6
7
8
9
10
11
// 错误:break 只跳出 select
for {
select {
case <-quit:
break // 不会退出 for 循环!
case msg := <-ch:
fmt.Println(msg)
}
}

// 正确:用 return 或标签

前面第 9.3 节详细讲过,再强调一遍——这是非常高频的 bug。

11.3 忘记 default 导致意外阻塞

1
2
3
4
5
6
7
// 本意是"尝试发送,不行就算了"
// 但忘了 default,如果 ch 满了就会永久阻塞
select {
case ch <- data:
fmt.Println("发送成功")
// 忘了 default!
}

需要非阻塞操作时一定要加 default

11.4 default 导致 CPU 空转

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 有 default 的空循环会疯狂消耗 CPU
for {
select {
case msg := <-ch:
handle(msg)
default:
// 什么都不做,CPU 一直在这转圈
}
}

// 加个 sleep 缓解
for {
select {
case msg := <-ch:
handle(msg)
default:
time.Sleep(10 * time.Millisecond)
}
}

// 更好的做法:不用 default,让 select 阻塞等待
for msg := range ch {
handle(msg)
}

11.5 Ticker 不停止会泄漏

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 有问题:ticker 永远不会停止
func f() {
ticker := time.NewTicker(time.Second)
for range ticker.C {
// ...
if done {
return // 函数返回了,但 ticker 还在运行
}
}
}

// 正确:defer 停止
func f() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop() // 函数返回时停止 ticker
// ...
}

12. 本课练习

练习 1:超时查询

要求:

  • 写一个函数模拟数据库查询,随机耗时 1~5 秒
  • 主 goroutine 调用这个查询,但最多等 3 秒
  • 如果在 3 秒内返回,打印结果
  • 如果超时,打印"查询超时"

练习 2:优先级 channel

要求:

  • 创建两个 channel:highPrioritylowPriority
  • 两个 goroutine 分别向两个 channel 发送数据
  • select 接收数据,当两个都有数据时,优先处理 highPriority

提示:可以用嵌套 select 或先尝试非阻塞接收高优先级。


练习 3:倒计时器

要求:

  • 实现一个 10 秒倒计时
  • 每秒打印剩余秒数
  • 期间用户可以输入 “stop” 来提前停止
  • 倒计时结束打印 “发射!”,提前停止打印 “已取消”

提示:用一个 goroutine 读用户输入,通过 channel 传递给 select。


练习 4:限速器

要求:

  • 写一个函数,接收请求但限制为每秒最多 3 个
  • time.Ticker 实现限速
  • 超出速率的请求等待,超过 5 秒等不到就丢弃

练习 5:多源合并排序

要求:

  • 三个 goroutine 各自生成一组有序数字,发送到各自的 channel
  • 写一个函数,用 select 从三个 channel 中接收,合并成一个有序输出

提示:这是归并排序中"合并"步骤的并发版本。


13. 自测题

13.1 概念题

  1. selectswitch 最大的区别是什么?
  2. 多个 case 同时就绪时,select 怎么选择?为什么这样设计?
  3. selectdefault 的作用是什么?什么时候该用,什么时候不该用?
  4. time.After 返回的是什么?它的原理是什么?
  5. for-selectbreak 跳出的是什么?怎么跳出外层循环?
  6. time.NewTickertime.Tick 有什么区别?为什么推荐用 NewTicker
  7. select {} 会怎样?什么时候用?
  8. 在循环中使用 time.After 有什么问题?

13.2 代码阅读题

预测以下代码的行为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main

import (
"fmt"
"time"
)

func main() {
ch := make(chan int, 1)

go func() {
for i := 1; ; i++ {
ch <- i
time.Sleep(300 * time.Millisecond)
}
}()

for {
select {
case v := <-ch:
fmt.Println("收到:", v)
case <-time.After(1 * time.Second):
fmt.Println("超时退出")
return
}
}
}
点击查看答案

程序会持续输出:

1
2
3
4
收到: 1
收到: 2
收到: 3
...

永远不会超时退出

解释:

  1. goroutine 每 300ms 发送一个数字
  2. select 每次循环时创建一个新的 time.After(1 * time.Second)
  3. 因为每 300ms 就能从 ch 收到数据,time.After 的 1 秒还没到就被新一轮循环替换了
  4. 每次循环都是新的 time.After,所以超时永远不会触发

如果想要"1 秒内没有任何数据就超时",应该把 time.Aftertime.NewTimer 放在循环外面,或者在收到数据后重置定时器。


14. 本课总结

这一课你学到了 select——Go 并发编程中的多路复用器。

场景 做法
同时监听多个 channel select + 多个 case
超时控制 case <-time.After(d)
定时任务 time.NewTicker + select
非阻塞操作 select + default
退出信号 case <-quit,quit 用 close 触发
多服务竞速 多个 case 接收,先到先用
持续监听 for { select { ... } }

最重要的三件事:

  1. select 监听多个 channel,谁先就绪执行谁——多个同时就绪则随机选
  2. time.After 配合 select 实现超时——这是 Go 中最常见的超时方式
  3. for-selectbreak 只跳出 select——要退出循环用 return 或标签

15. 下一课预告

到目前为止你学了 goroutine、channel、select,已经能写不少并发程序了。但有一类问题还没解决:多个 goroutine 访问同一个共享资源怎么办?

比如多个 goroutine 同时往一个 map 里写数据,程序直接崩溃。多个 goroutine 同时修改一个计数器,结果不对。这些都是并发安全问题。

下一课:同步原语

会重点讲:

  • sync.Mutex——互斥锁,保护共享资源
  • sync.RWMutex——读写锁,读多写少的场景
  • sync.Once——只执行一次
  • 什么是数据竞争,怎么用 go run -race 检测
  • 什么时候用锁,什么时候用 channel

学完下一课,你就能写出安全可靠的并发程序了。