深入浅出channel/select
go的编程哲学:不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存
channel
优雅的关闭channel
- 关闭一个已关闭的通道会引起Panic
- 将值发送到已关闭的通道会发生Panic
解决办法:
1 | package main |
问题:并发安全的问题
简单粗暴的方法
1 | func SafeClose(ch chan T) (justClosed bool) { |
带来的问题
recover性能影响,
使用场景有限
更多人采用的方式 sync.Once/
1 | type MyChannel struct { |
更优雅的方案
- N 个接收者,一个发送者,发送者通过关闭通道说不再发送
1 | package main |
关闭原则:发送者处关闭
1 | package main |
关闭原则:
- dataCh可以不必关闭,让系统进行垃圾回收。
- 引入额外的信号通道,以通知发送者停止发送。
M 个接收者,N 个发送者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128package main
import (
"log"
"math/rand"
"strconv"
"sync"
"time"
)
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
// ...
const MaxRandomNumber = 100000
const NumReceivers = 10
const NumSenders = 1000
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(NumReceivers)
// ...
dataCh := make(chan int, 100)
stopCh := make(chan struct{})
// stopCh is an additional signal channel.
// Its sender is the moderator goroutine shown below.
// Its reveivers are all senders and receivers of dataCh.
toStop := make(chan string, 1)
// The channel toStop is used to notify the moderator
// to close the additional signal channel (stopCh).
// Its senders are any senders and receivers of dataCh.
// Its reveiver is the moderator goroutine shown below.
// It must be a buffered channel.
var stoppedBy string
// moderator
go func() {
stoppedBy = <-toStop
close(stopCh) //关闭stopCh来通知发送者
}()
// senders
for i := 0; i < NumSenders; i++ {
go func(id string) {
for {
value := rand.Intn(MaxRandomNumber)
if value == 0 {
// Here, the try-send operation is to notify the
// moderator to close the additional signal channel.
select {
case toStop <- "sender#" + id: //发送者想停止也是通过toStop,接受者不需要停止
default:
}
return
}
// The try-receive operation here is to try to exit the
// sender goroutine as early as possible. Try-receive
// try-send select blocks are specially optimized by the
// standard Go compiler, so they are very efficient.
select {
case <-stopCh: //停止发送
return
default:
}
// Even if stopCh is closed, the first branch in this
// select block may be still not selected for some
// loops (and for ever in theory) if the send to dataCh
// is also non-blocking. If this is not acceptable,
// then the above try-receive operation is essential.
select {
case <-stopCh: //停止发送
return
case dataCh <- value:
}
}
}(strconv.Itoa(i))
}
// receivers
for i := 0; i < NumReceivers; i++ {
go func(id string) {
defer wgReceivers.Done()
for {
// Same as the sender goroutine, the try-receive
// operation here is to try to exit the receiver
// goroutine as early as possible.
select {
case <-stopCh:
return
default:
}
// Even if stopCh is closed, the first branch in this
// select block may be still not selected for some
// loops (and for ever in theory) if the receive from
// dataCh is also non-blocking. If this is not acceptable,
// then the above try-receive operation is essential.
select {
case <-stopCh:
return
case value := <-dataCh:
if value == MaxRandomNumber-1 {
// The same trick is used to notify
// the moderator to close the
// additional signal channel.
select {
case toStop <- "receiver#" + id: //接受者想停止,通过toSTOP
log.Println(id)
default:
}
return
}
// log.Println(value)
}
}
}(strconv.Itoa(i))
}
// ...
wgReceivers.Wait()
log.Println("stopped by", stoppedBy)
}关闭原则:
- 引入哨兵来保障toStop,通道并非一定要关闭
不到在接收端关闭通道,发送者如果只有一个,就在发送端关闭,关闭之后并不影响通道消息的接收
不要关闭多个并发下的channel
###源码分析
数据结构
1 | type hchan struct { |
通过两个游标(其实就是读取、接收数据的位置)来确定数据的位置,对应sendx/recvx
速度快,预先读取,类型唯一,大小固定
两个双向链表(sendq、recvq)和一个环状队列(buf 基于数组的指针存储)来实现的
初始化
1 | //typecheck.go |
####发送ch <- i(往channel写)
1 | //walk.go |
同步情况下
查看Hchan结构体是否为空,步骤1,即是否有因为读该管道而阻塞的goroutine。
情况1,没有等待的g,阻塞,
情况2,又等待的g,直接走1.x流程,从目标带走目标,两种情况都是不带缓冲的channel
带缓冲的情况:
情况3,buf未满,丢如buf,走2.x
情况4,buf满的情况下阻塞,等待唤醒,唤醒之后检查状态,走3.x
接收x <- ch(从channel读)
1 | func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { |
5种不同的情况:
- nil状态的chan,阻塞,申明未初始化的状态,初始化内存后继续
- 带缓存,无数据,被关闭的channel,直接返回
send 队列不为空的情况,recv处理两者情况
不带缓存的 ,send获取数据
带缓存 ,缓存已满 ,从队列获取头元素
- send为空,缓存队列不会空,直接从队列获取元素,移动头索引
- sned为空,缓存队列为空,将当前 G 加入接收队列中, 休眠
关闭closed
1 | func closechan(c *hchan) { |
上锁
设置状态,关闭状态
接着把所有挂在这个 channel 上的 sender 和 receiver 全都连成一个 sudog 链表
解锁
所有的 sudog 全都唤醒执行(发送的sender出panic,receiver正常)
select
定义:监听多个channel的读写事件,只能和通道连用
1 | func example() { |
使用规则和注意点:
如果select存在默认分支,select就不会被阻塞,默认分支只能有一个,默认分支可以任何位置。
没有默认分支,select被阻塞,直到有一个case表达式满足条件,这个时候select才会被唤醒执行;
Ps: 如果外层有for语句的,break是无法跳出的
1
2
3
4
5
6
7
8
9
10
11
12
13
14func example2() {
intChan := make(chan int, 1)
// 一秒后关闭通道。
time.AfterFunc(time.Second, func() {
close(intChan)
})
select {
case _, ok := <-intChan:
if !ok {
fmt.Println("通道关闭")
break
}
}
}select语句里面的case表达式求值顺序是从上到下,从左到右
- 多个case满足条件的时候,是随机选一个,伪随机算法
- 要关注表达式和语句本身是否有并发安全的问题。
源码分析
src/runtime/select.go
数据结构
1 | const ( |
源码执行顺序
/refect/value.go
1 | // Select executes a select operation described by the list of cases. |
runtime/select.go
1 | //rselect具体实现 |
前两步基本上都是初始化之类的,核心实现在selectgo中
1 | func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) { |
总结
初始化,核心就是把case实例打乱
锁住所有的channel
循环,查找准备就绪的channel,四种不同的情况处理
select
中的多个case
:caseNil
— 当前case
不包含任何的 Channel,就直接会被跳过;caseRecv
— 当前case
会从 Channel 中接收数据;- 如果当前 Channel 的
sendq
上有等待的 Goroutine 就会直接跳到recv
标签所在的代码段,从 Goroutine 中获取最新发送的数据; - 如果当前 Channel 的缓冲区不为空就会跳到 ` 标签处从缓冲区中获取数据;
- 如果当前 Channel 已经被关闭就会跳到
rclose
做一些清除的收尾工作;
- 如果当前 Channel 的
- caseSend — 当前 case 会向 Channel 发送数据;
- 如果当前 Channel 已经被关闭就会直接跳到
sclose
代码段; - 如果当前 Channel 的
recvq
上有等待的 Goroutine 就会跳到send
代码段向 Channel 直接发送数据;
- 如果当前 Channel 已经被关闭就会直接跳到
caseDefault 如果循环执行到了这种情况就表示前面的所有
case
都没有被执行,所以这里会直接解锁所有的 Channel 并退出selectgo
函数,这时也就意味着当前select
结构中的其他收发语句都是非阻塞的。
阻塞状态,进入下一个循环,加入c.sendq.dequeueSudoG(sglist)/c.recvq.dequeueSudoG(sglist)
sudog
结构体都会被串成链表附着在当前 Goroutine 上,在入队之后会调用gopark
函数挂起当前的 Goroutine 等待调度器的唤醒。gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
根据 lockOrder 遍历全部 case 的过程中,我们会先获取 Goroutine 接收到的参数 param,这个参数其实就是被唤醒的 sudog 结构,其他的全部释放掉releaseSudog(sglist)
跳到不同的状态执行