深入浅出channel/select

深入浅出channel/select

go的编程哲学:不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存

channel

优雅的关闭channel

  1. 关闭一个已关闭的通道会引起Panic
  2. 将值发送到已关闭的通道会发生Panic

解决办法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import "fmt"

func IsClosed(ch <-chan int) bool {
select {
case <-ch:
return true
default:
}
return false
}

func main() {
c := make(chan int)
fmt.Println(IsClosed(c)) // false
close(c)
fmt.Println(IsClosed(c)) // true
}

API server listening at: 127.0.0.1:8248
false
true

问题:并发安全的问题

简单粗暴的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func SafeClose(ch chan T) (justClosed bool) {
defer func() {
if recover() != nil {
// The return result can be altered
// in a defer function call.
justClosed = false
}
}()

// assume ch != nil here.
close(ch) // panic if ch is closed
return true // <=> justClosed = true; return
}

func SafeSend(ch chan T, value T) (closed bool) {
defer func() {
if recover() != nil {
closed = true
}
}()

ch <- value // panic if ch is closed
return false // <=> closed = false; return
}

带来的问题

  1. recover性能影响,

  2. 使用场景有限

更多人采用的方式 sync.Once/

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
type MyChannel struct {
C chan T
once sync.Once
}

func NewMyChannel() *MyChannel {
return &MyChannel{C: make(chan T)}
}

func (mc *MyChannel) SafeClose() {
mc.once.Do(func() {
close(mc.C)
})
}
//利用sync.Once只能执行一次的特性。

/////////////////

type MyChannel struct {
C chan T
closed bool
mutex sync.Mutex
}

func NewMyChannel() *MyChannel {
return &MyChannel{C: make(chan T)}
}

func (mc *MyChannel) SafeClose() {
mc.mutex.Lock()
defer mc.mutex.Unlock()
if !mc.closed {
close(mc.C)
mc.closed = true
}
}

func (mc *MyChannel) IsClosed() bool {
mc.mutex.Lock()
defer mc.mutex.Unlock()
return mc.closed
}
//加锁加状态的方式

更优雅的方案

  1. N 个接收者,一个发送者,发送者通过关闭通道说不再发送
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package main

import (
"log"
"math/rand"
"sync"
"time"
)

func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)

// ...
const MaxRandomNumber = 100000
const NumReceivers = 100

wgReceivers := sync.WaitGroup{}
wgReceivers.Add(NumReceivers)

// 100 缓冲 int
dataCh := make(chan int, 100)

// the sender
go func() {
for {
if value := rand.Intn(MaxRandomNumber); value == 0 {
// The only sender can close the channel safely.
close(dataCh)
return
} else {
dataCh <- value
}
}
}()

// receivers
for i := 0; i < NumReceivers; i++ {
go func() {
defer wgReceivers.Done()
// Receive values until dataCh is closed and
// the value buffer queue of dataCh is empty.
for value := range dataCh {
log.Println(value)
}
}()
}
wgReceivers.Wait()
}

关闭原则:发送者处关闭

  1. 一个接收者,N个发送者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package main

import (
"log"
"math/rand"
"sync"
"time"
)

func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)

// ...
const MaxRandomNumber = 100000
const NumSenders = 1000

wgReceivers := sync.WaitGroup{}
wgReceivers.Add(1)

// ...
dataCh := make(chan int, 100)
stopCh := make(chan struct{})
// stopCh is an additional signal channel.
// Its sender is the receiver of channel dataCh.
// Its reveivers are the senders of channel dataCh.

// senders
for i := 0; i < NumSenders; i++ {
go func() {
for {
select {
case <-stopCh:
return
case dataCh <- rand.Intn(MaxRandomNumber):
}
}
}()
}

// the receiver
go func() {
defer wgReceivers.Done()

for value := range dataCh {
if value == MaxRandomNumber-1 {
close(stopCh)
return
}
log.Println(value)

}
}()
wgReceivers.Wait()
}

关闭原则:

  1. dataCh可以不必关闭,让系统进行垃圾回收。
  2. 引入额外的信号通道,以通知发送者停止发送。
  1. M 个接收者,N 个发送者
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    package main

    import (
    "log"
    "math/rand"
    "strconv"
    "sync"
    "time"
    )

    func main() {
    rand.Seed(time.Now().UnixNano())
    log.SetFlags(0)

    // ...
    const MaxRandomNumber = 100000
    const NumReceivers = 10
    const NumSenders = 1000

    wgReceivers := sync.WaitGroup{}
    wgReceivers.Add(NumReceivers)

    // ...
    dataCh := make(chan int, 100)
    stopCh := make(chan struct{})
    // stopCh is an additional signal channel.
    // Its sender is the moderator goroutine shown below.
    // Its reveivers are all senders and receivers of dataCh.
    toStop := make(chan string, 1)
    // The channel toStop is used to notify the moderator
    // to close the additional signal channel (stopCh).
    // Its senders are any senders and receivers of dataCh.
    // Its reveiver is the moderator goroutine shown below.
    // It must be a buffered channel.

    var stoppedBy string

    // moderator
    go func() {
    stoppedBy = <-toStop
    close(stopCh) //关闭stopCh来通知发送者
    }()

    // senders
    for i := 0; i < NumSenders; i++ {
    go func(id string) {
    for {
    value := rand.Intn(MaxRandomNumber)
    if value == 0 {
    // Here, the try-send operation is to notify the
    // moderator to close the additional signal channel.
    select {
    case toStop <- "sender#" + id: //发送者想停止也是通过toStop,接受者不需要停止
    default:
    }
    return
    }

    // The try-receive operation here is to try to exit the
    // sender goroutine as early as possible. Try-receive
    // try-send select blocks are specially optimized by the
    // standard Go compiler, so they are very efficient.
    select {
    case <-stopCh: //停止发送
    return
    default:
    }

    // Even if stopCh is closed, the first branch in this
    // select block may be still not selected for some
    // loops (and for ever in theory) if the send to dataCh
    // is also non-blocking. If this is not acceptable,
    // then the above try-receive operation is essential.
    select {
    case <-stopCh: //停止发送
    return
    case dataCh <- value:
    }
    }
    }(strconv.Itoa(i))
    }

    // receivers
    for i := 0; i < NumReceivers; i++ {
    go func(id string) {
    defer wgReceivers.Done()

    for {
    // Same as the sender goroutine, the try-receive
    // operation here is to try to exit the receiver
    // goroutine as early as possible.
    select {
    case <-stopCh:
    return
    default:
    }

    // Even if stopCh is closed, the first branch in this
    // select block may be still not selected for some
    // loops (and for ever in theory) if the receive from
    // dataCh is also non-blocking. If this is not acceptable,
    // then the above try-receive operation is essential.
    select {
    case <-stopCh:
    return
    case value := <-dataCh:
    if value == MaxRandomNumber-1 {
    // The same trick is used to notify
    // the moderator to close the
    // additional signal channel.
    select {
    case toStop <- "receiver#" + id: //接受者想停止,通过toSTOP
    log.Println(id)
    default:
    }
    return
    }

    // log.Println(value)
    }
    }
    }(strconv.Itoa(i))
    }

    // ...
    wgReceivers.Wait()
    log.Println("stopped by", stoppedBy)
    }

    关闭原则:

    1. 引入哨兵来保障toStop,通道并非一定要关闭
  1. 不到在接收端关闭通道,发送者如果只有一个,就在发送端关闭,关闭之后并不影响通道消息的接收

  2. 不要关闭多个并发下的channel

###源码分析

数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
type hchan struct {
qcount uint // 队列中的数据个数
dataqsiz uint //缓冲期数据大小
buf unsafe.Pointer // 指向缓冲期数据指针
elemsize uint16 //channel大小
closed uint32 //状态
elemtype *_type // channel类型
sendx uint // 标识当前 Channel 送的已经处理到了数组中的哪个位置。
recvx uint // 标识当前 Channel 接收已经处理到了数组中的哪个位置。
recvq waitq // 存储当前 Channel 由于缓冲区空间不足而阻塞的 Goroutine 列表
sendq waitq // list of send waiters

// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}

//sudog的双向链表
type waitq struct {
first *sudog //表示一个在等待列表中的 Goroutine,对g的封装
last *sudog
}

type sudog struct {
g *g

// isSelect indicates g is participating in a select, so
// g.selectDone must be CAS'd to win the wake-up race.
isSelect bool
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)

// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.

acquiretime int64
releasetime int64
ticket uint32
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}

通过两个游标(其实就是读取、接收数据的位置)来确定数据的位置,对应sendx/recvx

速度快,预先读取,类型唯一,大小固定

两个双向链表(sendq、recvq)和一个环状队列(buf 基于数组的指针存储)来实现的

初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
//typecheck.go
func typecheck1(n *Node, top int) (res *Node) case OMAKE: n.Op = OMAKECHAN
//walk.go
func walkexpr(n *Node, init *Nodes) *Node case OMAKECHAN: makechan

func makechan(t *chantype, size int) *hchan {
elem := t.elem

// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}

mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}

// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
//缓冲区大小为0
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.kind&kindNoPointers != 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
//连续内存空间
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
//指针类型
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}

c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)

if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
}
return c
}

####发送ch <- i(往channel写)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
//walk.go  
func walkexpr(n *Node, init *Nodes) *Node case OSEND: chansend

//chan.go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
// 如果不是阻塞模式,就直接退出
if !block {
return false
}
// 情况1:当 chan 为 nil时, 阻塞,让出协程
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}

if debugChan {
print("chansend: chan=", c, "\n")
}

if raceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}

// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not closed, we observe that the channel is
// not ready for sending. Each of these observations is a single word-sized read
// (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
// Because a closed channel cannot transition from 'ready for sending' to
// 'not ready for sending', even if the channel is closed between the two observations,
// they imply a moment between the two when the channel was both not yet closed
// and not ready for sending. We behave as if we observed the channel at that moment,
// and report that the send cannot proceed.
//
// It is okay if the reads are reordered here: if we observe that the channel is not
// ready for sending and then observe that it is not closed, that implies that the
// channel wasn't closed during the first observation.
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}

var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}

lock(&c.lock) //合法性检查完,锁住整个channel

//向关闭的channel发送,抛出panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}

//情况2:如果当有 G 在接受队列上等待时,直接将消息发送给 G,
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
// send中有个goready(gp, skip+1)可以唤醒 G
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}

//情况3缓存队列未满,则将消息复制到缓存队列上
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
// 通过sendx确定可放入数据的位置, qp其实就是当前这条数据放入的位置
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
//底层依赖于内存屏障来进行数据的拷贝操作
typedmemmove(c.elemtype, qp, ep)
c.sendx++
// 这里面就是环状队列实现的关键点, 当发送指针等于队列长度就移位到0
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
//解锁
unlock(&c.lock)
return true
}
// 如果不是阻塞模式,就直接退出,什么时候会走到这里?,感觉是打了一个补丁
if !block {
unlock(&c.lock)
return false
}

// Block on the channel. Some receiver will complete our operation for us.
// 获取当前的goroutine
//情况4: 缓存队列已满,将 G 加入到 send队列中
gp := getg()
// 从当前m里面获取一个sudog,注意m里面如果不存在就会从当前m对应的p里面获取,如果还没有,就创建一个新的
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
// 将当前goroutine放入到sendq队列中,然后调用goparkunlock,通过gopark将当前goroutine暂停,然后释放锁
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)

// someone woke us up.
// 被唤醒, 检查是否是被closechan唤醒,如果是就抛出异常,
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
//释放G
releaseSudog(mysg)
return true
}

同步情况下

查看Hchan结构体是否为空,步骤1,即是否有因为读该管道而阻塞的goroutine。

情况1,没有等待的g,阻塞,

情况2,又等待的g,直接走1.x流程,从目标带走目标,两种情况都是不带缓冲的channel

带缓冲的情况:

情况3,buf未满,丢如buf,走2.x

情况4,buf满的情况下阻塞,等待唤醒,唤醒之后检查状态,走3.x

接收x <- ch(从channel读)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: don't need to check ep, as it is always on the stack
// or is new memory allocated by reflect.

if debugChan {
print("chanrecv: chan=", c, "\n")
}
//nil状态的chan,阻塞
if c == nil {
// 如果不阻塞,直接返回 (false, false)
if !block {
return
}
//阻塞,nil的chan,挂起goroutine
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}

// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not ready for receiving, we observe that the
// channel is not closed. Each of these observations is a single word-sized read
// (first c.sendq.first or c.qcount, and second c.closed).
// Because a channel cannot be reopened, the later observation of the channel
// being not closed implies that it was also not closed at the moment of the
// first observation. We behave as if we observed the channel at that moment
// and report that the receive cannot proceed.
//
// The order of operations is important here: reversing the operations can lead to
// incorrect behavior when racing with a close.
//带缓存,无数据,未关闭channel,直接返回(false, false)
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}

var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
//同步锁
lock(&c.lock)

//向close且为空的chan中获取消息时,返回空值。
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}

//send 队列不为空,什么时候send队列不为空呢? 只有两种可能 1. 无缓存chan 2. 缓存队列已满
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
// CASE 2.1: 无缓存chan,直接进行内存拷贝(从 sender goroutine -> receiver goroutine)
// CASE 2.2: 缓存队列已满, 从队列获取头元素,唤醒send 将其消息加入队列尾部
//(由于是环线队列,所以尾部和头部是同一位置)移动recvx
// 同样在recv中有goready(gp, skip+1)可以唤醒G
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}

// 缓存队列不会空,直接从队列获取元素,移动头索引
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}

if !block {
unlock(&c.lock)
return false, false
}

// no sender available: block on this channel.
// 缓存队列为空, 将当前 G 加入接收队列中, 休眠
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil

// 将当前 G 加入 接收队列中
c.recvq.enqueue(mysg)
// 休眠
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}

// recv processes a receive operation on a full channel c.
// There are 2 parts:
// 1) The value sent by the sender sg is put into the channel
// and the sender is woken up to go on its merry way.
// 2) The value received by the receiver (the current G) is
// written to ep.
// For synchronous channels, both values are the same.
// For asynchronous channels, the receiver gets its data from
// the channel buffer and the sender's data is put in the
// channel buffer.
// Channel c must be full and locked. recv unlocks c with unlockf.
// sg must already be dequeued from c.
// A non-nil ep must point to the heap or the caller's stack.
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
}
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}

5种不同的情况:

  1. nil状态的chan,阻塞,申明未初始化的状态,初始化内存后继续
  2. 带缓存,无数据,被关闭的channel,直接返回
  3. send 队列不为空的情况,recv处理两者情况

    1. 不带缓存的 ,send获取数据

    2. 带缓存 ,缓存已满 ,从队列获取头元素

  4. send为空,缓存队列不会空,直接从队列获取元素,移动头索引
  5. sned为空,缓存队列为空,将当前 G 加入接收队列中, 休眠

关闭closed

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
func closechan(c *hchan) {
// 关闭一个 nil channel,panic
if c == nil {
panic(plainError("close of nil channel"))
}
//上锁
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}

if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
racerelease(c.raceaddr())
}
//修改状态
c.closed = 1

var glist gList

// release all readers
// 将 channel 所有等待接收队列的里 sudog 释放
for {
// 从接收队列里出队一个 sudog
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}

// release all writers (they will panic)
// 将 channel 等待发送队列里的 sudog 释放
// 如果存在,这些 goroutine 将会 panic,想关闭的chan发送数据
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)

// Ready all Gs now that we've dropped the channel lock.
// 遍历链表,唤醒
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
  1. 上锁

  2. 设置状态,关闭状态

  3. 接着把所有挂在这个 channel 上的 sender 和 receiver 全都连成一个 sudog 链表

  4. 解锁

  5. 所有的 sudog 全都唤醒执行(发送的sender出panic,receiver正常)

select

定义:监听多个channel的读写事件,只能和通道连用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func example() {
// 准备好几个通道。
intChannels := [3]chan int{
make(chan int, 1),
make(chan int, 1),
make(chan int, 1),
}
// 随机选择一个通道,并向它发送元素值。
rand.Seed(time.Now().Unix())
index := rand.Intn(3)
fmt.Printf("The index: %d\n", index)
intChannels[index] <- index
// 哪一个通道中有可取的元素值,哪个对应的分支就会被执行。
select {
default:
fmt.Println("default")
case <-intChannels[0]:
fmt.Println("11111111111")
case <-intChannels[1]:
fmt.Println("2222222222")
case elem := <-intChannels[2]:
fmt.Printf("3333333")

}
}

使用规则和注意点:

  1. 如果select存在默认分支,select就不会被阻塞,默认分支只能有一个,默认分支可以任何位置。

  2. 没有默认分支,select被阻塞,直到有一个case表达式满足条件,这个时候select才会被唤醒执行;

    Ps: 如果外层有for语句的,break是无法跳出的

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    func example2() {
    intChan := make(chan int, 1)
    // 一秒后关闭通道。
    time.AfterFunc(time.Second, func() {
    close(intChan)
    })
    select {
    case _, ok := <-intChan:
    if !ok {
    fmt.Println("通道关闭")
    break
    }
    }
    }
  3. select语句里面的case表达式求值顺序是从上到下,从左到右

  4. 多个case满足条件的时候,是随机选一个,伪随机算法
  5. 要关注表达式和语句本身是否有并发安全的问题。

源码分析

src/runtime/select.go

数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
const (
selectDir = iota /// 0 :表示case 为nil;在send 或者 recv 发生在一个 nil channel 上,通道如果已经关闭,屏蔽分支,设置为nil
selectSend // case Chan <- Send 接收通道
selectRecv // case <-Chan: 发送通道
selectDefault // default
)

//选择器的核心结构
type runtimeSelect struct {
dir selectDir
typ unsafe.Pointer // channel type (not used here)
ch *hchan // 指向每个case里面的chan操作
val unsafe.Pointer // 发送或者接收数据缓冲区
}

//select每一条case的描述
type scase struct {
c *hchan // 当前case所对应的chan引用,一个case只对应一个chan
elem unsafe.Pointer // 读或者些的缓冲区数据指针
kind uint16 //类型读,写,默认
pc uintptr // 用于指示当前将要执行的下一条机器指令的内存地址
releasetime int64
}

源码执行顺序

/refect/value.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
// Select executes a select operation described by the list of cases.
// Like the Go select statement, it blocks until at least one of the cases
// can proceed, makes a uniform pseudo-random choice,
// and then executes that case. It returns the index of the chosen case
// and, if that case was a receive operation, the value received and a
// boolean indicating whether the value corresponds to a send on the channel
// (as opposed to a zero value received because the channel is closed).
func Select(cases []SelectCase) (chosen int, recv Value, recvOK bool) {
// NOTE: Do not trust that caller is not modifying cases data underfoot.
// The range is safe because the caller cannot modify our copy of the len
// and each iteration makes its own copy of the value c.
runcases := make([]runtimeSelect, len(cases))
haveDefault := false
for i, c := range cases {
rc := &runcases[i]
rc.dir = c.Dir
switch c.Dir {
default:
panic("reflect.Select: invalid Dir")

case SelectDefault: // default
if haveDefault {
panic("reflect.Select: multiple default cases")
}
haveDefault = true
if c.Chan.IsValid() {
panic("reflect.Select: default case has Chan value")
}
if c.Send.IsValid() {
panic("reflect.Select: default case has Send value")
}

case SelectSend:
ch := c.Chan
if !ch.IsValid() {
break
}
ch.mustBe(Chan)
ch.mustBeExported()
tt := (*chanType)(unsafe.Pointer(ch.typ))
if ChanDir(tt.dir)&SendDir == 0 {
panic("reflect.Select: SendDir case using recv-only channel")
}
rc.ch = ch.pointer()
rc.typ = &tt.rtype
v := c.Send
if !v.IsValid() {
panic("reflect.Select: SendDir case missing Send value")
}
v.mustBeExported()
v = v.assignTo("reflect.Select", tt.elem, nil)
if v.flag&flagIndir != 0 {
rc.val = v.ptr
} else {
rc.val = unsafe.Pointer(&v.ptr)
}

case SelectRecv:
if c.Send.IsValid() {
panic("reflect.Select: RecvDir case has Send value")
}
ch := c.Chan
if !ch.IsValid() {
break
}
ch.mustBe(Chan)
ch.mustBeExported()
tt := (*chanType)(unsafe.Pointer(ch.typ))
if ChanDir(tt.dir)&RecvDir == 0 {
panic("reflect.Select: RecvDir case using send-only channel")
}
rc.ch = ch.pointer()
rc.typ = &tt.rtype
rc.val = unsafe_New(tt.elem)
}
}

chosen, recvOK = rselect(runcases)
if runcases[chosen].dir == SelectRecv {
tt := (*chanType)(unsafe.Pointer(runcases[chosen].typ))
t := tt.elem
p := runcases[chosen].val
fl := flag(t.Kind())
if ifaceIndir(t) {
recv = Value{t, p, fl | flagIndir}
} else {
recv = Value{t, *(*unsafe.Pointer)(p), fl}
}
}
return chosen, recv, recvOK
}

//基本处的初始化,调用调用rselect

runtime/select.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//rselect具体实现

func reflect_rselect(cases []runtimeSelect) (int, bool) {
//没有case阻塞
if len(cases) == 0 {
block()
}
sel := make([]scase, len(cases))
order := make([]uint16, 2*len(cases))
for i := range cases {
rc := &cases[i]
switch rc.dir {
case selectDefault:
sel[i] = scase{kind: caseDefault}
case selectSend:
sel[i] = scase{kind: caseSend, c: rc.ch, elem: rc.val}
case selectRecv:
sel[i] = scase{kind: caseRecv, c: rc.ch, elem: rc.val}
}
if raceenabled || msanenabled {
selectsetpc(&sel[i])
}
}

return selectgo(&sel[0], &order[0], len(cases))
}

前两步基本上都是初始化之类的,核心实现在selectgo中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
if debugSelect {
print("select: cas0=", cas0, "\n")
}

/*
第一步
1.初始化
2.随机生成一个遍历的轮询顺序 pollOrder 并根据 Channel 地址生成一个用于遍历的锁定顺序 lockOrder
*/
cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))

scases := cas1[:ncases:ncases]
pollorder := order1[:ncases:ncases]
lockorder := order1[ncases:][:ncases:ncases]

// Replace send/receive cases involving nil channels with
// caseNil so logic below can assume non-nil channel.
for i := range scases {
cas := &scases[i]
if cas.c == nil && cas.kind != caseDefault {
*cas = scase{}
}
}

var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
for i := 0; i < ncases; i++ {
scases[i].releasetime = -1
}
}

// The compiler rewrites selects that statically have
// only 0 or 1 cases plus default into simpler constructs.
// The only way we can end up with such small sel.ncase
// values here is for a larger select in which most channels
// have been nilled out. The general code handles those
// cases correctly, and they are rare enough not to bother
// optimizing (and needing to test).

// generate permuted order
//打乱随机
for i := 1; i < ncases; i++ {
j := fastrandn(uint32(i + 1))
pollorder[i] = pollorder[j]
pollorder[j] = uint16(i)
}

// sort the cases by Hchan address to get the locking order.
// simple heap sort, to guarantee n log n time and constant stack footprint.
// 按 hchan 的地址来进行排序,以生成加锁顺序
// 用堆排序来保证 nLog(n) 的时间复杂度
for i := 0; i < ncases; i++ {
// 初始化 lockorder 数组
j := i
// Start with the pollorder to permute cases on the same channel.
c := scases[pollorder[i]].c
for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
k := (j - 1) / 2
lockorder[j] = lockorder[k]
j = k
}
lockorder[j] = pollorder[i]
}
for i := ncases - 1; i >= 0; i-- {
//堆排序
o := lockorder[i]
c := scases[o].c
lockorder[i] = lockorder[0]
j := 0
for {
k := j*2 + 1
if k >= i {
break
}
if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {
k++
}
if c.sortkey() < scases[lockorder[k]].c.sortkey() {
lockorder[j] = lockorder[k]
j = k
continue
}
break
}
lockorder[j] = o
}

if debugSelect {
for i := 0; i+1 < ncases; i++ {
if scases[lockorder[i]].c.sortkey() > scases[lockorder[i+1]].c.sortkey() {
print("i=", i, " x=", lockorder[i], " y=", lockorder[i+1], "\n")
throw("select: broken sort")
}
}
}

// lock all the channels involved in the select
//对所有的channel枷锁
sellock(scases, lockorder)

var (
gp *g
sg *sudog
c *hchan
k *scase
sglist *sudog
sgnext *sudog
qp unsafe.Pointer
nextp **sudog
)

loop:
// pass 1 - look for something already waiting
var dfli int
var dfl *scase
var casi int
var cas *scase
var recvOK bool
//for遍历,ready的直接goto跳出去
for i := 0; i < ncases; i++ {
casi = int(pollorder[i])
cas = &scases[casi]
c = cas.c

switch cas.kind {
case caseNil:
continue

case caseRecv:
// <- ch
sg = c.sendq.dequeue()
if sg != nil {
goto recv
}
if c.qcount > 0 {
goto bufrecv
}
if c.closed != 0 {
goto rclose
}

case caseSend:
//ch <- 1
if raceenabled {
racereadpc(c.raceaddr(), cas.pc, chansendpc)
}
if c.closed != 0 {
goto sclose
}
sg = c.recvq.dequeue()
if sg != nil {
goto send
}
if c.qcount < c.dataqsiz {
goto bufsend
}

case caseDefault:
dfli = casi
dfl = cas
}
}
//进了 caseDefault 才会走到
if dfl != nil {
selunlock(scases, lockorder)
casi = dfli
cas = dfl
goto retc
}

// pass 2 - enqueue on all chans
//没有任何一个 case 满足,且没有 default
gp = getg()
if gp.waiting != nil {
throw("gp.waiting != nil")
}
nextp = &gp.waiting
//按照加锁的顺序把 gorutine 入每一个 channel 的等待队列
for _, casei := range lockorder {
casi = int(casei)
cas = &scases[casi]
if cas.kind == caseNil {
continue
}
c = cas.c
sg := acquireSudog()
sg.g = gp
sg.isSelect = true
// No stack splits between assigning elem and enqueuing
// sg on gp.waiting where copystack can find it.
sg.elem = cas.elem
sg.releasetime = 0
if t0 != 0 {
sg.releasetime = -1
}
sg.c = c
// Construct waiting list in lock order.
*nextp = sg
nextp = &sg.waitlink

//不同的类型进入不同的队列
switch cas.kind {
case caseRecv:
c.recvq.enqueue(sg)

case caseSend:
c.sendq.enqueue(sg)
}
}

// wait for someone to wake us up
//当前 goroutine 进入休眠,等待被唤醒
gp.param = nil
gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)

sellock(scases, lockorder)

gp.selectDone = 0
sg = (*sudog)(gp.param)
gp.param = nil

// pass 3 - dequeue from unsuccessful chans
// otherwise they stack up on quiet channels
// record the successful case, if any.
// We singly-linked up the SudoGs in lock order.
//被唤醒
casi = -1
cas = nil
sglist = gp.waiting
// Clear all elem before unlinking from gp.waiting.
for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
sg1.isSelect = false
sg1.elem = nil
sg1.c = nil
}
gp.waiting = nil

for _, casei := range lockorder {
k = &scases[casei]
if k.kind == caseNil {
continue
}
if sglist.releasetime > 0 {
k.releasetime = sglist.releasetime
}
if sg == sglist {
// sg has already been dequeued by the G that woke us up.
casi = int(casei)
cas = k
} else {
c = k.c
if k.kind == caseSend {
c.sendq.dequeueSudoG(sglist)
} else {
c.recvq.dequeueSudoG(sglist)
}
}
sgnext = sglist.waitlink
sglist.waitlink = nil
releaseSudog(sglist)
sglist = sgnext
}
//唤醒的时候,param = nil 没有合适的再次loop
if cas == nil {
// We can wake up with gp.param == nil (so cas == nil)
// when a channel involved in the select has been closed.
// It is easiest to loop and re-run the operation;
// we'll see that it's now closed.
// Maybe some day we can signal the close explicitly,
// but we'd have to distinguish close-on-reader from close-on-writer.
// It's easiest not to duplicate the code and just recheck above.
// We know that something closed, and things never un-close,
// so we won't block again.
goto loop
}

c = cas.c

if debugSelect {
print("wait-return: cas0=", cas0, " c=", c, " cas=", cas, " kind=", cas.kind, "\n")
}

if cas.kind == caseRecv {
recvOK = true
}

if raceenabled {
if cas.kind == caseRecv && cas.elem != nil {
raceWriteObjectPC(c.elemtype, cas.elem, cas.pc, chanrecvpc)
} else if cas.kind == caseSend {
raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)
}
}
if msanenabled {
if cas.kind == caseRecv && cas.elem != nil {
msanwrite(cas.elem, c.elemtype.size)
} else if cas.kind == caseSend {
msanread(cas.elem, c.elemtype.size)
}
}

selunlock(scases, lockorder)
goto retc

bufrecv:
// can receive from buffer
if raceenabled {
if cas.elem != nil {
raceWriteObjectPC(c.elemtype, cas.elem, cas.pc, chanrecvpc)
}
raceacquire(chanbuf(c, c.recvx))
racerelease(chanbuf(c, c.recvx))
}
if msanenabled && cas.elem != nil {
msanwrite(cas.elem, c.elemtype.size)
}
recvOK = true
qp = chanbuf(c, c.recvx)
if cas.elem != nil {
typedmemmove(c.elemtype, cas.elem, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
selunlock(scases, lockorder)
goto retc

bufsend:
// can send to buffer
if raceenabled {
raceacquire(chanbuf(c, c.sendx))
racerelease(chanbuf(c, c.sendx))
raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)
}
if msanenabled {
msanread(cas.elem, c.elemtype.size)
}
typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
selunlock(scases, lockorder)
goto retc

recv:
// can receive from sleeping sender (sg)
recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
if debugSelect {
print("syncrecv: cas0=", cas0, " c=", c, "\n")
}
recvOK = true
goto retc

rclose:
// read at end of closed channel
//读取关闭的channel
selunlock(scases, lockorder)
recvOK = false
if cas.elem != nil {
typedmemclr(c.elemtype, cas.elem)
}
if raceenabled {
raceacquire(c.raceaddr())
}
goto retc

send:
// can send to a sleeping receiver (sg)
if raceenabled {
raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)
}
if msanenabled {
msanread(cas.elem, c.elemtype.size)
}
send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
if debugSelect {
print("syncsend: cas0=", cas0, " c=", c, "\n")
}
goto retc

retc:
if cas.releasetime > 0 {
blockevent(cas.releasetime-t0, 1)
}
return casi, recvOK

sclose:
// send on closed channel
//向关闭的channel发送数据,触发panic
selunlock(scases, lockorder)
panic(plainError("send on closed channel"))
}

总结

  1. 初始化,核心就是把case实例打乱

  2. 锁住所有的channel

  3. 循环,查找准备就绪的channel,四种不同的情况处理 select 中的多个 case

    1. caseNil — 当前 case 不包含任何的 Channel,就直接会被跳过;
    2. caseRecv — 当前 case 会从 Channel 中接收数据;
      • 如果当前 Channel 的 sendq 上有等待的 Goroutine 就会直接跳到 recv 标签所在的代码段,从 Goroutine 中获取最新发送的数据;
      • 如果当前 Channel 的缓冲区不为空就会跳到 ` 标签处从缓冲区中获取数据;
      • 如果当前 Channel 已经被关闭就会跳到 rclose 做一些清除的收尾工作;
    3. caseSend — 当前 case 会向 Channel 发送数据;
      • 如果当前 Channel 已经被关闭就会直接跳到 sclose 代码段;
      • 如果当前 Channel 的 recvq 上有等待的 Goroutine 就会跳到 send 代码段向 Channel 直接发送数据;
  4. caseDefault 如果循环执行到了这种情况就表示前面的所有 case 都没有被执行,所以这里会直接解锁所有的 Channel 并退出 selectgo 函数,这时也就意味着当前 select 结构中的其他收发语句都是非阻塞的。

  1. 阻塞状态,进入下一个循环,加入c.sendq.dequeueSudoG(sglist)/c.recvq.dequeueSudoG(sglist)

  2. sudog 结构体都会被串成链表附着在当前 Goroutine 上,在入队之后会调用 gopark 函数挂起当前的 Goroutine 等待调度器的唤醒。

    gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)

  3. 根据 lockOrder 遍历全部 case 的过程中,我们会先获取 Goroutine 接收到的参数 param,这个参数其实就是被唤醒的 sudog 结构,其他的全部释放掉releaseSudog(sglist)

  4. 跳到不同的状态执行

回顾整个过程: