gpm源码阅读

gpm源码阅读

基本数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// stack 描述的是 Go 的执行栈,下界和上界分别为 [lo, hi]
// 如果从传统内存布局的角度来讲,Go 的栈实际上是分配在 C 语言中的堆区的
// 所以才能比 ulimit -s 的 stack size 还要大(1GB)
type stack struct {
lo uintptr
hi uintptr
}

// g 的运行现场
type gobuf struct {
sp uintptr // sp 寄存器
pc uintptr // pc 寄存器
g guintptr // g 指针
ctxt unsafe.Pointer // 这个似乎是用来辅助 gc 的
ret sys.Uintreg
lr uintptr // 这是在 arm 上用的寄存器,不用关心
bp uintptr // 开启 GOEXPERIMENT=framepointer,才会有这个
}


type g struct {
// 简单数据结构,lo 和 hi 成员描述了栈的下界和上界内存地址
stack stack
// 在函数的栈增长 prologue 中用 sp 寄存器和 stackguard0 来做比较
// 如果 sp 比 stackguard0 小(因为栈向低地址方向增长),那么就触发栈拷贝和调度
// 正常情况下 stackguard0 = stack.lo + StackGuard
// 不过 stackguard0 在需要进行调度时,会被修改为 StackPreempt
// 以触发抢占s
stackguard0 uintptr
// stackguard1 是在 C 栈增长 prologue 作对比的对象
// 在 g0 和 gsignal 栈上,其值为 stack.lo+StackGuard
// 在其它的栈上这个值是 ~0(按 0 取反)以触发 morestack 调用(并 crash)
stackguard1 uintptr

_panic *_panic
_defer *_defer
m *m // 当前与 g 绑定的 m
sched gobuf // goroutine 的现场
syscallsp uintptr // if status==Gsyscall, syscallsp = sched.sp to use during gc
syscallpc uintptr // if status==Gsyscall, syscallpc = sched.pc to use during gc
stktopsp uintptr // expected sp at top of stack, to check in traceback
param unsafe.Pointer // wakeup 时的传入参数
atomicstatus uint32
stackLock uint32 // sigprof/scang lock; TODO: fold in to atomicstatus
goid int64 // goroutine id
waitsince int64 // g 被阻塞之后的近似时间
waitreason string // if status==Gwaiting
schedlink guintptr
preempt bool // 抢占标记,这个为 true 时,stackguard0 是等于 stackpreempt 的
throwsplit bool // must not split stack
raceignore int8 // ignore race detection events
sysblocktraced bool // StartTrace has emitted EvGoInSyscall about this goroutine
sysexitticks int64 // syscall 返回之后的 cputicks,用来做 tracing
traceseq uint64 // trace event sequencer
tracelastp puintptr // last P emitted an event for this goroutine
lockedm muintptr // 如果调用了 LockOsThread,那么这个 g 会绑定到某个 m 上
sig uint32
writebuf []byte
sigcode0 uintptr
sigcode1 uintptr
sigpc uintptr
gopc uintptr // 创建该 goroutine 的语句的指令地址
startpc uintptr // goroutine 函数的指令地址
racectx uintptr
waiting *sudog // sudog structures this g is waiting on (that have a valid elem ptr); in lock order
cgoCtxt []uintptr // cgo traceback context
labels unsafe.Pointer // profiler labels
timer *timer // time.Sleep 缓存的定时器
selectDone uint32 // 该 g 是否正在参与 select,是否已经有人从 select 中胜出
}

当 g 遇到阻塞,或需要等待的场景时,会被打包成 sudog 这样一个结构。一个 g 可能被打包为多个 sudog 分别挂在不同的等待队列上:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// sudog 代表在等待列表里的 g,比如向 channel 发送/接收内容时
// 之所以需要 sudog 是因为 g 和同步对象之间的关系是多对多的
// 一个 g 可能会在多个等待队列中,所以一个 g 可能被打包为多个 sudog
// 多个 g 也可以等待在同一个同步对象上
// 因此对于一个同步对象就会有很多 sudog 了
// sudog 是从一个特殊的池中进行分配的。用 acquireSudog 和 releaseSudog 来分配和释放 sudog
type sudog struct {

// 之后的这些字段都是被该 g 所挂在的 channel 中的 hchan.lock 来保护的
// shrinkstack depends on
// this for sudogs involved in channel ops.
g *g

// isSelect 表示一个 g 是否正在参与 select 操作
// 所以 g.selectDone 必须用 CAS 来操作,以胜出唤醒的竞争
isSelect bool
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)

// 下面这些字段则永远都不会被并发访问
// 对于 channel 来说,waitlink 只会被 g 访问
// 对于信号量来说,所有的字段,包括上面的那些字段都只在持有 semaRoot 锁时才可以访问
acquiretime int64
releasetime int64
ticket uint32
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}

线程在 runtime 中的结构,对应一个 pthread,pthread 也会对应唯一的内核线程(task_struct):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
type m struct {
g0 *g // 用来执行调度指令的 goroutine
morebuf gobuf // gobuf arg to morestack
divmod uint32 // div/mod denominator for arm - known to liblink

// Fields not known to debuggers.
procid uint64 // for debuggers, but offset not hard-coded
gsignal *g // signal-handling g
goSigStack gsignalStack // Go-allocated signal handling stack
sigmask sigset // storage for saved signal mask
tls [6]uintptr // thread-local storage (for x86 extern register)
mstartfn func()
curg *g // 当前运行的用户 goroutine
caughtsig guintptr // goroutine running during fatal signal
p puintptr // attached p for executing go code (nil if not executing go code)
nextp puintptr
id int64
mallocing int32
throwing int32
preemptoff string // 该字段不等于空字符串的话,要保持 curg 始终在这个 m 上运行
locks int32
softfloat int32
dying int32
profilehz int32
helpgc int32
spinning bool // m 失业了,正在积极寻找工作~
blocked bool // m 正阻塞在 note
inwb bool // m 正在执行 write barrier
newSigstack bool // minit on C thread called sigaltstack
printlock int8
incgo bool // m 正在执行 cgo call
freeWait uint32 // if == 0, safe to free g0 and delete m (atomic)
fastrand [2]uint32
needextram bool
traceback uint8
ncgocall uint64 // cgo 调用总计数
ncgo int32 // 当前正在执行的 cgo 订单计数
cgoCallersUse uint32 // if non-zero, cgoCallers in use temporarily
cgoCallers *cgoCallers // cgo traceback if crashing in cgo call
park note
alllink *m // on allm
schedlink muintptr
mcache *mcache
lockedg guintptr
createstack [32]uintptr // stack that created this thread.
freglo [16]uint32 // d[i] lsb and f[i]
freghi [16]uint32 // d[i] msb and f[i+16]
fflag uint32 // floating point compare flags
lockedExt uint32 // tracking for external LockOSThread
lockedInt uint32 // tracking for internal lockOSThread
nextwaitm muintptr // 正在等待锁的下一个 m
waitunlockf unsafe.Pointer // todo go func(*g, unsafe.pointer) bool
waitlock unsafe.Pointer
waittraceev byte
waittraceskip int
startingtrace bool
syscalltick uint32
thread uintptr // thread handle
freelink *m // on sched.freem

// these are here because they are too large to be on the stack
// of low-level NOSPLIT functions.
libcall libcall
libcallpc uintptr // for cpu profiler
libcallsp uintptr
libcallg guintptr
syscall libcall // 存储 windows 平台的 syscall 参数

mOS
}

抽象数据结构,可以认为是 processor 的抽象,代表了任务执行时的上下文,m 必须获得 p 才能执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
type p struct {
lock mutex

id int32
status uint32 // one of pidle/prunning/...
link puintptr
schedtick uint32 // 每次调用 schedule 时会加一
syscalltick uint32 // 每次系统调用时加一
sysmontick sysmontick // 上次 sysmon 观察到的 tick 时间
m muintptr // 和相关联的 m 的反向指针,如果 p 是 idle 的话,那这个指针是 nil
mcache *mcache
racectx uintptr

deferpool [5][]*_defer // pool of available defer structs of different sizes (see panic.go)
deferpoolbuf [5][32]*_defer

// Cache of goroutine ids, amortizes accesses to runtime·sched.goidgen.
goidcache uint64
goidcacheend uint64

// runnable 状态的 goroutine。访问时是不加锁的
runqhead uint32
runqtail uint32
runq [256]guintptr
// runnext 非空时,代表的是一个 runnable 状态的 G,
// 这个 G 是被 当前 G 修改为 ready 状态的,
// 并且相比在 runq 中的 G 有更高的优先级
// 如果当前 G 的还有剩余的可用时间,那么就应该运行这个 G
// 运行之后,该 G 会继承当前 G 的剩余时间
// If a set of goroutines is locked in a
// communicate-and-wait pattern, this schedules that set as a
// unit and eliminates the (potentially large) scheduling
// latency that otherwise arises from adding the ready'd
// goroutines to the end of the run queue.
runnext guintptr

// Available G's (status == Gdead)
gfree *g
gfreecnt int32

sudogcache []*sudog
sudogbuf [128]*sudog

tracebuf traceBufPtr

// traceSweep indicates the sweep events should be traced.
// This is used to defer the sweep start event until a span
// has actually been swept.
traceSweep bool
// traceSwept and traceReclaimed track the number of bytes
// swept and reclaimed by sweeping in the current sweep loop.
traceSwept, traceReclaimed uintptr

palloc persistentAlloc // per-P to avoid mutex

// Per-P GC state
gcAssistTime int64 // Nanoseconds in assistAlloc
gcFractionalMarkTime int64 // Nanoseconds in fractional mark worker
gcBgMarkWorker guintptr
gcMarkWorkerMode gcMarkWorkerMode

// 当前标记 worker 的开始时间,单位纳秒
gcMarkWorkerStartTime int64

// gcw is this P's GC work buffer cache. The work buffer is
// filled by write barriers, drained by mutator assists, and
// disposed on certain GC state transitions.
gcw gcWork

// wbBuf is this P's GC write barrier buffer.
//
// TODO: Consider caching this in the running G.
wbBuf wbBuf

runSafePointFn uint32 // if 1, run sched.safePointFn at next safe point

pad [sys.CacheLineSize]byte
}

全局调度器,全局只有一个 schedt 类型的实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
type schedt struct {
// 下面两个变量需以原子访问访问。保持在 struct 顶部,以使其在 32 位系统上可以对齐
goidgen uint64
lastpoll uint64

lock mutex

// 当修改 nmidle,nmidlelocked,nmsys,nmfreed 这些数值时
// 需要记得调用 checkdead

midle muintptr // idle m's waiting for work
nmidle int32 // 当前等待工作的空闲 m 计数
nmidlelocked int32 // 当前等待工作的被 lock 的 m 计数
mnext int64 // 当前预缴创建的 m 数,并且该值会作为下一个创建的 m 的 ID
maxmcount int32 // 允许创建的最大的 m 数量
nmsys int32 // number of system m's not counted for deadlock
nmfreed int64 // cumulative number of freed m's

ngsys uint32 // number of system goroutines; updated atomically

pidle puintptr // 空闲 p's
npidle uint32
nmspinning uint32 // See "Worker thread parking/unparking" comment in proc.go.

// 全局的可运行 g 队列
runqhead guintptr
runqtail guintptr
runqsize int32

// dead G 的全局缓存
gflock mutex
gfreeStack *g
gfreeNoStack *g
ngfree int32

// sudog 结构的集中缓存
sudoglock mutex
sudogcache *sudog

// 不同大小的可用的 defer struct 的集中缓存池
deferlock mutex
deferpool [5]*_defer

// 被设置了 m.exited 标记之后的 m,这些 m 正在 freem 这个链表上等待被 free
// 链表用 m.freelink 字段进行链接
freem *m

gcwaiting uint32 // gc is waiting to run
stopwait int32
stopnote note
sysmonwait uint32
sysmonnote note

// safepointFn should be called on each P at the next GC
// safepoint if p.runSafePointFn is set.
safePointFn func(*p)
safePointWait int32
safePointNote note

profilehz int32 // cpu profiling rate

procresizetime int64 // 上次修改 gomaxprocs 的纳秒时间
totaltime int64 // gomaxprocs dt up to procresizetime
}

gpm流程

p初始化

程序启动后调用

graph TD
runtime.schedinit -->  runtime.procresize
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219

schedinit
// The new G calls runtime·main.
func schedinit() {
// raceinit must be the first call to race detector.
// In particular, it must be done before mallocinit below calls racemapshadow.
_g_ := getg()
if raceenabled {
_g_.racectx, raceprocctx0 = raceinit()
}
//设置m的上限值
sched.maxmcount = 10000

//各种初始化
tracebackinit()
moduledataverify()
stackinit()
mallocinit()
mcommoninit(_g_.m)//初始化当前m
cpuinit() // must run before alginit
alginit() // maps must not be used before this call
modulesinit() // provides activeModules
typelinksinit() // uses maps, activeModules
itabsinit() // uses activeModules
。。。。。。。。。
sched.lastpoll = uint64(nanotime())
//将P的数量调整为CPU数量
procs := ncpu
if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
procs = n
}

//初始化P
if procresize(procs) != nil {
throw("unknown runnable goroutine during bootstrap")
}
。。。。。。。。
}



//处理p
func procresize(nprocs int32) *p {
old := gomaxprocs
if old < 0 || nprocs <= 0 {
throw("procresize: invalid arg")
}
if trace.enabled {
traceGomaxprocs(nprocs)
}

// update statistics
now := nanotime()
if sched.procresizetime != 0 {
sched.totaltime += int64(old) * (now - sched.procresizetime)
}
sched.procresizetime = now

// Grow allp if necessary.
//扩大p的数量,前提是小于nprocs
if nprocs > int32(len(allp)) {
// Synchronize with retake, which could be running
// concurrently since it doesn't run on a P.
lock(&allpLock)
if nprocs <= int32(cap(allp)) {
allp = allp[:nprocs]
} else {
nallp := make([]*p, nprocs)
// Copy everything up to allp's cap so we
// never lose old allocated Ps.
copy(nallp, allp[:cap(allp)])
allp = nallp
}
unlock(&allpLock)
}

// 初始化p
for i := int32(0); i < nprocs; i++ {
pp := allp[i]
if pp == nil {
pp = new(p)
pp.id = i
pp.status = _Pgcstop
pp.sudogcache = pp.sudogbuf[:0]
for i := range pp.deferpool {
pp.deferpool[i] = pp.deferpoolbuf[i][:0]
}
pp.wbBuf.reset()
atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
}
if pp.mcache == nil {
if old == 0 && i == 0 {
if getg().m.mcache == nil {
throw("missing mcache?")
}
pp.mcache = getg().m.mcache // bootstrap
} else {
pp.mcache = allocmcache()
}
}
if raceenabled && pp.racectx == 0 {
if old == 0 && i == 0 {
pp.racectx = raceprocctx0
raceprocctx0 = 0 // bootstrap
} else {
pp.racectx = raceproccreate()
}
}
}

// free unused P's
for i := nprocs; i < old; i++ {
p := allp[i]
if trace.enabled && p == getg().m.p.ptr() {
// moving to p[0], pretend that we were descheduled
// and then scheduled again to keep the trace sane.
traceGoSched()
traceProcStop(p)
}
// move all runnable goroutines to the global queue
for p.runqhead != p.runqtail {
// pop from tail of local queue
p.runqtail--
gp := p.runq[p.runqtail%uint32(len(p.runq))].ptr()
// push onto head of global queue
globrunqputhead(gp)
}
if p.runnext != 0 {
globrunqputhead(p.runnext.ptr())
p.runnext = 0
}
// if there's a background worker, make it runnable and put
// it on the global queue so it can clean itself up
if gp := p.gcBgMarkWorker.ptr(); gp != nil {
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
globrunqput(gp)
// This assignment doesn't race because the
// world is stopped.
p.gcBgMarkWorker.set(nil)
}
// Flush p's write barrier buffer.
if gcphase != _GCoff {
wbBufFlush1(p)
p.gcw.dispose()
}
for i := range p.sudogbuf {
p.sudogbuf[i] = nil
}
p.sudogcache = p.sudogbuf[:0]
for i := range p.deferpool {
for j := range p.deferpoolbuf[i] {
p.deferpoolbuf[i][j] = nil
}
p.deferpool[i] = p.deferpoolbuf[i][:0]
}
freemcache(p.mcache)
p.mcache = nil
gfpurge(p)
traceProcFree(p)
if raceenabled {
raceprocdestroy(p.racectx)
p.racectx = 0
}
p.gcAssistTime = 0
p.status = _Pdead
// can't free P itself because it can be referenced by an M in syscall
}

// Trim allp.
if int32(len(allp)) != nprocs {
lock(&allpLock)
allp = allp[:nprocs]
unlock(&allpLock)
}

_g_ := getg()
if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs {
// continue to use the current P
_g_.m.p.ptr().status = _Prunning
_g_.m.p.ptr().mcache.prepareForSweep()
} else {
// release the current P and acquire allp[0]
if _g_.m.p != 0 {
_g_.m.p.ptr().m = 0
}
_g_.m.p = 0
_g_.m.mcache = nil
p := allp[0]
p.m = 0
p.status = _Pidle
acquirep(p)
if trace.enabled {
traceGoStart()
}
}
var runnablePs *p
for i := nprocs - 1; i >= 0; i-- {
p := allp[i]
if _g_.m.p.ptr() == p {
continue
}
// 设置 p 的状态
p.status = _Pidle
if runqempty(p) {
pidleput(p) //将这些 p 串成链表放进 sched 全局调度器的 pidle 队列中
} else {
p.m.set(mget())
p.link.set(runnablePs)
runnablePs = p
}
}
stealOrder.reset(uint32(nprocs))
var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32
atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))//把nprocs赋值给gomaxprocs
return runnablePs
}

手动调用 runtime.GOMAXPROCS也会procresize重置p

graph TD
runtime.GOMAXPROCS --> runtime.startTheWorld 
runtime.startTheWorld  --> runtime.startTheWorldWithSema
runtime.startTheWorldWithSema --> procresize

g 初始化

1
2
3
go func() {
// do.....
}()

实际上会被翻译成 runtime.newproc,初始化g,丢如g的等待队列中

graph TD
runtime.newproc --> runtime.newproc1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func newproc(siz int32, fn *funcval) {
argp := add(unsafe.Pointer(&fn), sys.PtrSize) //获取参数起始地址
gp := getg() //获取当前g指针
pc := getcallerpc()
systemstack(func() {
newproc1(fn, (*uint8)(argp), siz, gp, pc)
})
}

getcallerpc//获取f执行完的返回地址
getcallersp //获取f栈顶的指针,libco,sp是栈低的指针
// func f(arg1, arg2, arg3 int) {
// pc := getcallerpc()
// sp := getcallersp()
// }
graph TD
newproc1 --> newg
newg[gfget] --> nil{is nil?}
nil -->|no|E[init stack / gostartcallfn ]
nil -->|yes|C[malg]
C --> D[set g status=> idle->dead]
D --> k[allgadd]
k --> E
E --> G[set g status=> dead-> runnable]
G --> runqput

结果是调用 runqput 将 g 放进了执行队列

最关键的代码

newg.sched.pc = funcPC(goexit) + sys.PCQuantum

解决了goroutine运行结束后如何运行下一个goroutine的问题。

gostartcall 函数buf.pc 中的 goexit 的函数地址放到了 goroutine 的栈顶,保证函数执行完之后能执行runtime.goexit。

runqput(_p_, newg, true)

graph TD
runqput --> full{is the local runnable queue full?}
full -->|no|E[put local queue ]
full -->|yes|C[runqputslow]
C --> K[lock/globrunqputbatch/unlock]
E --> next{ next == true or false ?}
next --> |true| F[puts g in the _p_.runnext slot]
next --> |false| j[adds g to the tail of the runnable queue]

操作全局 sched 时,需要获取全局 sched.lock 锁,全局锁争抢的开销较大,所以才称之为 slow

m工作机制

在 runtime 中有三种线程,一种是主线程,一种是用来跑 sysmon 的线程,一种是普通的用户线程。

主线程在 runtime 由对应的全局变量: runtime.m0 来表示。

用户线程就是普通的线程了,和 p 绑定,执行 g 中的任务。

sysmon线程是一种特殊的内核线程,负责监控调度。

runtime.main执行流程(runtime/proc.go/func main())

graph TD
runtime.main --> A[m0 G0]
A --> D[init max stack size]
D --> B[systemstack execute -> newm -> sysmon]
B --> runtime.lockOsThread
runtime.lockOsThread --> runtime.init
runtime.init --> runtime.gcenable
runtime.gcenable --> startTemplateThread
startTemplateThread --> main.init
main.init --> main.main
  1. m0和g0绑定

    1
    2
    3
    // m0: 系统主线程
    // g0:主goroutine
    // m0、g0是比较特殊的 仅用于main goroutine的父goroutine
  2. 初始化maxstacksize,

1
2
3
4
5
6
//执行栈最大限制:1GB on 64-bit,250MB on 32-bit
sys.PtrSize == 8 { // 64bits 系统
maxstacksize = 1000000000
} else { // 32bits系统
maxstacksize = 250000000
}
  1. 初始化sysmon线程,创建一个新的m来跑,不需要绑定g执行,与整个调度协同脱离。

    1
    2
    3
    4
    5
    if GOARCH != "wasm" { // no threads on wasm yet, so no sysmon
    systemstack(func() {
    newm(sysmon, nil)
    })
    }
  2. 锁定lockOSThread,开始runtime初始化,开启gc

  3. startTemplateThread webaseembly环境下不启动,比较慢,大多数程序必须要,辅助线程,解决线程异常问题

  4. 各种init初始化

sysmon线程

源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
// Always runs without a P, so write barriers are not allowed.
//没有绑定p的,wirte barriers都是不被允许的,和gc机制相关
//go:nowritebarrierrec

func sysmon() {
lock(&sched.lock)
sched.nmsys++
checkdead()
unlock(&sched.lock)

// If a heap span goes unused for 5 minutes after a garbage collection,
// we hand it back to the operating system.
scavengelimit := int64(5 * 60 * 1e9)

if debug.scavenge > 0 {
// Scavenge-a-lot for testing.
forcegcperiod = 10 * 1e6
scavengelimit = 20 * 1e6
}

lastscavenge := nanotime()
nscavenge := 0

lasttrace := int64(0)
idle := 0 // how many cycles in succession we had not wokeup somebody
delay := uint32(0)
for {
if idle == 0 { // 初始化时 20us sleep.
delay = 20
} else if idle > 50 { // start doubling the sleep after 1ms...
delay *= 2
}
if delay > 10*1000 { //最长10ms
delay = 10 * 1000
}
usleep(delay)
if debug.schedtrace <= 0 && (sched.gcwaiting != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs)) {
lock(&sched.lock)
if atomic.Load(&sched.gcwaiting) != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs) {
atomic.Store(&sched.sysmonwait, 1)
unlock(&sched.lock)
// Make wake-up period small enough
// for the sampling to be correct.
maxsleep := forcegcperiod / 2
if scavengelimit < forcegcperiod {
maxsleep = scavengelimit / 2
}
shouldRelax := true
if osRelaxMinNS > 0 {
next := timeSleepUntil()
now := nanotime()
if next-now < osRelaxMinNS {
shouldRelax = false
}
}
if shouldRelax {
osRelax(true)
}
notetsleep(&sched.sysmonnote, maxsleep)
if shouldRelax {
osRelax(false)
}
lock(&sched.lock)
atomic.Store(&sched.sysmonwait, 0)
noteclear(&sched.sysmonnote)
idle = 0
delay = 20
}
unlock(&sched.lock)
}
// trigger libc interceptors if needed
if *cgo_yield != nil {
asmcgocall(*cgo_yield, nil)
}
// 如果 10ms 没有 poll 过 network,那么就 netpoll 一次
lastpoll := int64(atomic.Load64(&sched.lastpoll))
now := nanotime()
if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
//netpoll中会执行epollWait,epollWait返回可读写的fd
//netpoll返回可读写的fd关联的协程
list := netpoll(false) // non-blocking - returns list of goroutines
if !list.empty() {
// Need to decrement number of idle locked M's
// (pretending that one more is running) before injectglist.
// Otherwise it can lead to the following situation:
// injectglist grabs all P's but before it starts M's to run the P's,
// another M returns from syscall, finishes running its G,
// observes that there is no work to do and no other running M's
// and reports deadlock.
incidlelocked(-1)
//将可读写fd关联的协程状态设置为ready
injectglist(&list)
incidlelocked(1)
}
}
// retake P's blocked in syscalls
// and preempt long running G's
if retake(now) != 0 {
idle = 0
} else {
idle++
}
// 检查是否需要强制gc,两分钟一次
if t := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && atomic.Load(&forcegc.idle) != 0 {
lock(&forcegc.lock)
forcegc.idle = 0
var list gList
list.push(forcegc.g)
injectglist(&list)
unlock(&forcegc.lock)
}
// 清理内存
if lastscavenge+scavengelimit/2 < now {
mheap_.scavenge(int32(nscavenge), uint64(now), uint64(scavengelimit))
lastscavenge = now
nscavenge++
}
if debug.schedtrace > 0 && lasttrace+int64(debug.schedtrace)*1000000 <= now {
lasttrace = now
schedtrace(debug.scheddetail > 0)
}
}
}
核心功能:
  1. 检查checkdead ,检查goroutine锁死,启动时检查一次。
  2. 处理netpoll返回,injectglist将协程的状态设置为ready状态
  3. 强制gc
  4. 收回因为 syscall 而长时间阻塞的 p,同时抢占那些执行时间过长的 g,retake(now)
  5. 如果 堆 内存闲置超过 5min,那么释放掉,mheap_.scavenge(int32(nscavenge), uint64(now), uint64(scavengelimit))
流程图
graph TD
sysmon --> lock
lock --> checkdead
checkdead --> unlock
unlock --> for
K[scavenge heap once in a while]  --> |every 10ms| for 
for --> usleep
usleep --> netpoll
netpoll --> injectglist 
injectglist --> retake
retake --> forceGC
forceGC --> K

普通线程

graph TD
newm --> newm1
newm1 --> newosproc
newosproc --> clone
1
2
3
4
5
6
7
8
9
10

// 初始化一个m,该 m 会在启动时调用函数 fn,或者 schedule 函数
// fn 需要是 static 类型,且不能是在堆上分配的闭包。
// 运行 m 时,m.p 是有可能为 nil 的,所以不允许 write barriers
func newm(fn func(), _p_ *p) {
mp := allocm(_p_, fn)
mp.nextp.set(_p_) //传入的 p 会被赋值给 m 的 nextp 成员,在 m 执行 schedule 时,会将 nextp 拿出来,进行之后真正的绑定操作
//issue/22227.
newm1(mp)
}
1
2
3
4
5
func newm1(mp *m) {
execLock.rlock() // Prevent process clone.
newosproc(mp) -> thr_new
execLock.runlock()
}
工作流程

空闲的 m 会被丢进全局调度器的 midle 队列中,在需要 m 的时候,会先从这里取,如果获取不到newm申请一个

1
2
3
4
5
6
7
8
9
func mget() *m {
mp := sched.midle.ptr()
if mp != nil {
sched.midle = mp.schedlink
sched.nmidle--
}
return mp
}
//获取m,全局,调用mget必须lock(&sched.lock)
和newm相关的调用
graph TD
main --> |sysmon|newm
startTheWorld --> startTheWorldWithSema
gcMarkTermination --> startTheWorldWithSema
gcStart--> startTheWorldWithSema
startTheWorldWithSema --> |helpgc|newm
startTheWorldWithSema --> |run p|newm
startm --> mget
mget --> |if no free m|newm
startTemplateThread --> |templateThread|newm
LockOsThread --> startTemplateThread
main --> |iscgo|startTemplateThread
handoffp --> startm
wakep --> startm
injectglist --> startm

sched.midle 中没有空闲的 m 了,就去创建,两个特殊的sysmon,templateThread

核心流程
  1. schedule
graph TD
schedule --> A[schedtick%61 == 0]
A --> |yes|globrunqget
A --> |no|runqget
globrunqget --> C[gp == nil]
C --> |no|execute
C --> |yes|runqget
runqget --> B[gp == nil]
B --> |no|execute
B --> |yes|findrunnable
findrunnable --> execute
  1. cgo的g不能被schedule走,cgo实用的m的g0栈

    1
    2
    3
    4
    5
    // We should not schedule away from a g that is executing a cgo call,
    // since the cgo call is using the m's g0 stack.
    if _g_.m.incgo {
    throw("schedule: in cgo")
    }

2 . 为了保证调度的公平性,每个工作线程每进行61次调度就需要优先从全局运行队列中获取goroutine出来运行,

​ 因为如果只调度本地运行队列中的goroutine,则全局运行队列中的goroutine有可能得不到运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
if gp == nil {
// Check the global runnable queue once in a while to ensure fairness.
// Otherwise two goroutines can completely occupy the local runqueue
// by constantly respawning each other.
//全局的队列中找
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
if gp == nil {
//从与m关联的p的本地运行队列中获取goroutine
gp, inheritTime = runqget(_g_.m.p.ptr())
if gp != nil && _g_.m.spinning {
throw("schedule: spinning with local work")
}
}
if gp == nil {
//如果从本地运行队列和全局运行队列都没有找到需要运行的goroutine,
//则调用findrunnable函数从其它工作线程的运行队列中偷取,如果偷取不到,则当前工作线程进入睡眠,
//直到获取到需要运行的goroutine之后findrunnable函数才会返回
gp, inheritTime = findrunnable() // blocks until work is available
}
  1. 状态处理

    1
    2
    3
    4
    //偷窃状态的goroutine会进入spinning状态,重置状态,才能让m执行goroutine
    if _g_.m.spinning {
    resetspinning()
    }
  1. 执行execute,具体 gogo(&gp.sched),汇编完成嗯,执行go func()中func(),把 g 对象的 gobuf 里的内容搬到寄存器里。然后从 gobuf.pc 寄存器存储的指令位置开始继续向后执行
Goexit

每个goroutine栈底都会有runtime.goexit(),它其实就是在创建G的时候,被设置进去的

1
newg.sched.pc = funcPC(goexit) + sys.PCQuantum
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
// Finishes execution of the current goroutine.
func goexit1() {
if raceenabled {
racegoend()
}
if trace.enabled {
traceGoEnd()
}
mcall(goexit0)
}


// goexit continuation on g0.
func goexit0(gp *g) {
_g_ := getg() //获取g0

// 将已经执行完毕的goroutine状态设为dead
casgstatus(gp, _Grunning, _Gdead)
// 如果是系统goroutine则全局计数减一
if isSystemGoroutine(gp, false) {
atomic.Xadd(&sched.ngsys, -1)
}
//清空状态
gp.m = nil
locked := gp.lockedm != 0
gp.lockedm = 0
_g_.m.lockedg = 0
gp.paniconfault = false
gp._defer = nil // should be true already but just in case.
gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data.
gp.writebuf = nil
gp.waitreason = 0
gp.param = nil
gp.labels = nil
gp.timer = nil

if gcBlackenEnabled != 0 && gp.gcAssistBytes > 0 {
// Flush assist credit to the global pool. This gives
// better information to pacing if the application is
// rapidly creating an exiting goroutines.
scanCredit := int64(gcController.assistWorkPerByte * float64(gp.gcAssistBytes))
atomic.Xaddint64(&gcController.bgScanCredit, scanCredit)
gp.gcAssistBytes = 0
}

// Note that gp's stack scan is now "valid" because it has no
// stack.
gp.gcscanvalid = true
// 因为M和正在运行的G是有互相引用的,G都已经执行完了,所以就摘掉
dropg()

if GOARCH == "wasm" { // no threads yet on wasm
gfput(_g_.m.p.ptr(), gp)
schedule() // never returns
}

if _g_.m.lockedInt != 0 {
print("invalid m->lockedInt = ", _g_.m.lockedInt, "\n")
throw("internal lockOSThread error")
}
// 把已经执行完的goroutine放置在P的本地free goroutine队列里,最多64个,超出则转移到全局调度器
gfput(_g_.m.p.ptr(), gp)
if locked {
// The goroutine may have locked this thread because
// it put it in an unusual kernel state. Kill it
// rather than returning it to the thread pool.

// Return to mstart, which will release the P and exit
// the thread.
if GOOS != "plan9" { // See golang.org/issue/22227.
gogo(&_g_.m.g0.sched)
} else {
// Clear lockedExt on plan9 since we may end up re-using
// this thread.
_g_.m.lockedExt = 0
}
}
// 调度,找出下一个可以执行的goroutine,继续执行
schedule()
}
  1. G的状态变为_GDead,如果是系统G则更新全局计数器。
  2. 重置G身上一系列的属性变量。
  3. 解除M和G的互相引用关系。
  4. 放置在本地P或全局的free goroutine队列。
  5. 调度,寻找下一个可运行的goroutine。
wakep
graph TD
wakep --> startm 
startm --> mget 
mget --> |no|newm
mget --> |yes|notewakeup
  1. wakep的作用是添加一个P来执行goroutinue

​ 在有G变为runnable的时候,调用startm

  1. startm(nil, true),传入的_p_ = nil,要获取一个idle P,如果获取不到,直接返回,
  2. 调用mget获得一个已经睡眠m,获取到了则调用notewakeup来唤醒m(因为m在mput的时候已经睡眠了)
  3. 如果获取失败,就调用newm创建
gopark

阻塞,用于协程的切换,系统调用,channel读写条件不满足,抢占式调度时间片结束。

主要做两件事:

  1. 解除当前goroutine的m的绑定关系,将当前goroutine状态机切换为等待状态;

  2. 调用一次schedule()函数,在局部调度器P发起一轮新的调度

核心函数mcall(park_m),park_m是一个函数指针

  1. 切换当前线程的堆栈从g的堆栈切换到g0的堆栈;
  2. 并在g0的堆栈上执行新的函数park_m,park_m中执行schedule,调度器会重新调度选择一个goroutine去运行
  3. 保存当前协程的信息( PC/SP存储到g->sched),当后续对当前协程调用goready函数时候能够恢复现场;
goready

唤醒某一个goroutine,该协程转换到runnable的状态,并将其放入P的local queue,等待调度,有延时。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func goready(gp *g, traceskip int) {
// 切换到g0的栈
systemstack(func() {
ready(gp, traceskip, true)
})
}


// Mark gp ready to run.
func ready(gp *g, traceskip int, next bool) {
if trace.enabled {
traceGoUnpark(gp, traceskip)
}

status := readgstatus(gp)

// Mark runnable.
_g_ := getg()
_g_.m.locks++ // disable preemption because it can be holding p in a local var
if status&^_Gscan != _Gwaiting {
dumpgstatus(gp)
throw("bad g->status in ready")
}

// status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
//设置gp状态为runnable,然后加入到P的可运行local queue;
casgstatus(gp, _Gwaiting, _Grunnable)
runqput(_g_.m.p.ptr(), gp, next)
if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 {
wakep()
}
_g_.m.locks--
if _g_.m.locks == 0 && _g_.preempt { // restore the preemption request in Case we've cleared it in newstack
_g_.stackguard0 = stackPreempt
}
}
findrunnable(抛开gc状态)
graph TD
runqget --> A[gp == nil]
A --> |no|return
A --> |yes|globrunqget
globrunqget --> B[gp == nil]
B --> |no| return
B --> |yes| C[netpollinited && lastpoll != 0]
C --> |yes|netpoll
netpoll --> K[gp == nil]
K --> |no|return
K --> |yes|runqsteal
C --> |no|runqsteal
runqsteal --> D[gp == nil]
D --> |no|return
D --> |yes|E[globrunqget]
E --> F[gp == nil]
F --> |no| return
F --> |yes| G[check all p's runq]
G --> H[runq is empty]
H --> |no|runqget
H --> |yes|I[netpoll]
I --> J[gp == nil]
J --> |no| return
J --> |yes| stopm
stopm --> runqget
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231

// 找到一个可执行的 goroutine 来 execute
// 会尝试从其它的 P 那里偷 g,从全局队列中拿,或者 network 中 poll
func findrunnable() (gp *g, inheritTime bool) {
_g_ := getg()

// The conditions here and in handoffp must agree: if
// findrunnable would return a G to run, handoffp must start
// an M.

top:
_p_ := _g_.m.p.ptr()
if sched.gcwaiting != 0 {
gcstopm()
goto top
}
if _p_.runSafePointFn != 0 {
runSafePointFn()
}
if fingwait && fingwake {
if gp := wakefing(); gp != nil {
ready(gp, 0, true)
}
}
if *cgo_yield != nil {
asmcgocall(*cgo_yield, nil)
}

// 从本地队列中获取
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}

// 从全局队列中获取
if sched.runqsize != 0 {
lock(&sched.lock)
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false
}
}

// Poll network.
// This netpoll is only an optimization before we resort to stealing.
// We can safely skip it if there are no waiters or a thread is blocked
// in netpoll already. If there is any kind of logical race with that
// blocked thread (e.g. it has already returned from netpoll, but does
// not set lastpoll yet), this thread will do blocking netpoll below
// anyway.
// 从poll获取
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
if list := netpoll(false); !list.empty() { // non-blocking
gp := list.pop()
injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
}

// Steal work from other P's.
procs := uint32(gomaxprocs)
if atomic.Load(&sched.npidle) == procs-1 {
// Either GOMAXPROCS=1 or everybody, except for us, is idle already.
// New work can appear from returning syscall/cgocall, network or timers.
// Neither of that submits to local run queues, so no point in stealing.
goto stop
}
// If number of spinning M's >= number of busy P's, block.
// This is necessary to prevent excessive CPU consumption
// when GOMAXPROCS>>1 but the program parallelism is low.
if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
goto stop
}
if !_g_.m.spinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
// 尝试4次从别的P偷
for i := 0; i < 4; i++ {
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
if sched.gcwaiting != 0 {
goto top
}
stealRunNextG := i > 2 // first look for ready queues with more than 1 g
// 在这里开始针对P进行偷取操作
if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
return gp, false
}
}
}

stop:

// We have nothing to do. If we're in the GC mark phase, can
// safely scan and blacken objects, and have work to do, run
// idle-time marking rather than give up the P.
//如果处于垃圾回收标记阶段,就进行垃圾回收的标记工作;
if gcBlackenEnabled != 0 && _p_.gcBgMarkWorker != 0 && gcMarkWorkAvailable(_p_) {
_p_.gcMarkWorkerMode = gcMarkWorkerIdleMode
gp := _p_.gcBgMarkWorker.ptr()
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}

// wasm only:
// If a callback returned and no other goroutine is awake,
// then pause execution until a callback was triggered.
if beforeIdle() {
// At least one goroutine got woken.
goto top
}

// Before we drop our P, make a snapshot of the allp slice,
// which can change underfoot once we no longer block
// safe-points. We don't need to snapshot the contents because
// everything up to cap(allp) is immutable.
allpSnapshot := allp

// return P and block
lock(&sched.lock)
if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 {
unlock(&sched.lock)
goto top
}
if sched.runqsize != 0 {
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
return gp, false
}
if releasep() != _p_ {
throw("findrunnable: wrong p")
}
pidleput(_p_)
unlock(&sched.lock)

// Delicate dance: thread transitions from spinning to non-spinning state,
// potentially concurrently with submission of new goroutines. We must
// drop nmspinning first and then check all per-P queues again (with
// #StoreLoad memory barrier in between). If we do it the other way around,
// another thread can submit a goroutine after we've checked all run queues
// but before we drop nmspinning; as the result nobody will unpark a thread
// to run the goroutine.
// If we discover new work below, we need to restore m.spinning as a signal
// for resetspinning to unpark a new worker thread (because there can be more
// than one starving goroutine). However, if after discovering new work
// we also observe no idle Ps, it is OK to just park the current thread:
// the system is fully loaded so no spinning threads are required.
// Also see "Worker thread parking/unparking" comment at the top of the file.
wasSpinning := _g_.m.spinning
if _g_.m.spinning {
_g_.m.spinning = false
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("findrunnable: negative nmspinning")
}
}

// check all runqueues once again
for _, _p_ := range allpSnapshot {
if !runqempty(_p_) {
lock(&sched.lock)
_p_ = pidleget()
unlock(&sched.lock)
if _p_ != nil {
acquirep(_p_)
if wasSpinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
goto top
}
break
}
}

// Check for idle-priority GC work again.
if gcBlackenEnabled != 0 && gcMarkWorkAvailable(nil) {
lock(&sched.lock)
_p_ = pidleget()
if _p_ != nil && _p_.gcBgMarkWorker == 0 {
pidleput(_p_)
_p_ = nil
}
unlock(&sched.lock)
if _p_ != nil {
acquirep(_p_)
if wasSpinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
// Go back to idle GC check.
goto stop
}
}

// poll network
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
if _g_.m.p != 0 {
throw("findrunnable: netpoll with p")
}
if _g_.m.spinning {
throw("findrunnable: netpoll with spinning")
}
list := netpoll(true) // block until new work is available
atomic.Store64(&sched.lastpoll, uint64(nanotime()))
if !list.empty() {
lock(&sched.lock)
_p_ = pidleget()
unlock(&sched.lock)
if _p_ != nil {
acquirep(_p_)
gp := list.pop()
injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
injectglist(&list)
}
}
stopm()
goto top
}
  1. 调用runqget()从当前P的队列中取G(和schedule()中的调用相同),获取到就return

  2. 获取不到就全局队列中获取,获取到了就return

  3. 尝试从poll中获取,获取得到也return

  4. 获取不到就尝试从其他的p中获取,找数量超过1个的,找得到就返回,runqsteal

  5. 如果处于垃圾回收标记阶段,就进行垃圾回收的标记工作;

  6. 再次调用globrunqget()从全局队列中取可执行的G,获取的到就返回 return
  7. 再次检查所有的runqueues,如果有返回到最开始的top
  8. 没有就做gc方面的工作,然次从poll获取,获取的到就return,获取不到就然后调用stopm
  9. stopm的核心是调用mput把m结构体对象放入sched的midle空闲队列,然后通过notesleep(&m.park)函数让自己进入睡眠状态。
  10. 唤醒后在再次跳转到top
handoffp

p和m解除绑定状态,把 p 放回全局的 pidle 队列中

大概有5中情况调用handoffp

  1. 线程退出mexit
  2. 遍历p的时候,p的状态syscall
  3. m已经被某个g锁定,先停止当前m(stoplockedm),等待g可运行时,再执行g
  4. entersyscallblock,锁相关的时候导致的阻塞回调用到,直接p和m解除绑定状态
  5. retake抢占式调度,会解绑
graph TD

mexit --> A[is m0?]
A --> |yes|B[handoffp]
A --> |no| C[iterate allm]
C --> |m found|handoffp
C --> |m not found| throw

forEachP --> |p status == syscall| handoffp

stoplockedm --> handoffp

entersyscallblock --> entersyscallblock_handoff
entersyscallblock_handoff --> handoffp

retake --> |p status == syscall| handoffp
g状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
const (
// G status
_Gidle = iota // 刚刚新建,未初始化

_Grunnable // 大的g队列中,还没有运行过,没有自己的堆栈空间,等待m绑定运行

_Grunning // 正在运行,有堆栈空间

_Gsyscall // 当正在运行的goroutine进行系统调用的时候,其状态就会转变为Gsyscall。当系统调用完成后goroutine的状态就会变为Grunnable

_Gwaiting // 运行的goroutine进行一些阻塞调用的时候,就会从Grunning状态进入Gwaiting状态,比写入一个 满的channel、读取空的channel、IO操作、定时器Ticker等。当阻塞调用完成后,goroutine的状态就会从Gwaiting转变为Grunnable;

_Gmoribund_unused // 5没使用

_Gdead // 没有使用了,在空闲列表上,没有堆栈

_Genqueue_unused // 7

_Gcopystack //栈空间正在被扩展或者压缩

//gc
_Gscan = 0x1000
_Gscanrunnable = _Gscan + _Grunnable // 0x1001
_Gscanrunning = _Gscan + _Grunning // 0x1002
_Gscansyscall = _Gscan + _Gsyscall // 0x1003
_Gscanwaiting = _Gscan + _Gwaiting // 0x1004
)
G的状态迁移
graph LR
start{newg} --> Gidle
Gidle --> |oneNewExtraM|Gdead
Gidle --> |newproc1|Gdead

Gdead --> |newproc1|Grunnable
Gdead --> |needm|Gsyscall

Gscanrunning --> |scang|Grunning

Grunnable --> |execute|Grunning

Gany --> |casgcopystack|Gcopystack

Gcopystack --> |todotodo|Grunning

Gsyscall --> |dropm|Gdead
Gsyscall --> |exitsyscall0|Grunnable
Gsyscall --> |exitsyscall|Grunning

Grunning --> |goschedImpl|Grunnable
Grunning --> |goexit0|Gdead
Grunning --> |newstack|Gcopystack
Grunning --> |reentersyscall|Gsyscall
Grunning --> |entersyscallblock|Gsyscall
Grunning --> |markroot|Gwaiting
Grunning --> |gcAssistAlloc1|Gwaiting
Grunning --> |park_m|Gwaiting
Grunning --> |gcMarkTermination|Gwaiting
Grunning --> |gcBgMarkWorker|Gwaiting
Grunning --> |newstack|Gwaiting

Gwaiting --> |gcMarkTermination|Grunning
Gwaiting --> |gcBgMarkWorker|Grunning
Gwaiting --> |markroot|Grunning
Gwaiting --> |gcAssistAlloc1|Grunning
Gwaiting --> |newstack|Grunning
Gwaiting --> |findRunnableGCWorker|Grunnable
Gwaiting --> |ready|Grunnable
Gwaiting --> |findrunnable|Grunnable
Gwaiting --> |injectglist|Grunnable
Gwaiting --> |schedule|Grunnable
Gwaiting --> |park_m|Grunnable
Gwaiting --> |procresize|Grunnable
Gwaiting --> |checkdead|Grunnable
P状态
1
2
3
4
5
6
7
8
const (
// P status
_Pidle = iota
_Prunning // Only this P is allowed to change from _Prunning.
_Psyscall //等待状态,retake中关注陷入retake的p,继续判断它等待的时间是否已经太长,如果是这样,就准备抛弃原来的还陷入syscall的m,调用handoff(p),开始为p准备新生活
_Pgcstop
_Pdead
)
P状态迁移
graph LR

Pidle --> |acquirep1|Prunning

Psyscall --> |retake|Pidle
Psyscall --> |entersyscall_gcwait|Pgcstop
Psyscall --> |exitsyscallfast|Prunning

Pany --> |gcstopm|Pgcstop
Pany --> |forEachP|Pidle
Pany --> |releasep|Pidle
Pany --> |handoffp|Pgcstop
Pany --> |procresize release current p use allp 0|Pidle
Pany --> |procresize when init|Pgcstop
Pany --> |procresize when free old p| Pdead
Pany --> |procresize after resize use current p|Prunning
Pany --> |reentersyscall|Psyscall
Pany --> |stopTheWorldWithSema|Pgcstop