gpm源码阅读
基本数据结构
1 | // stack 描述的是 Go 的执行栈,下界和上界分别为 [lo, hi] |
当 g 遇到阻塞,或需要等待的场景时,会被打包成 sudog 这样一个结构。一个 g 可能被打包为多个 sudog 分别挂在不同的等待队列上:
1 | // sudog 代表在等待列表里的 g,比如向 channel 发送/接收内容时 |
线程在 runtime 中的结构,对应一个 pthread,pthread 也会对应唯一的内核线程(task_struct):
1 | type m struct { |
抽象数据结构,可以认为是 processor 的抽象,代表了任务执行时的上下文,m 必须获得 p 才能执行:
1 | type p struct { |
全局调度器,全局只有一个 schedt 类型的实例:
1 | type schedt struct { |
gpm流程
p初始化
程序启动后调用
graph TD runtime.schedinit --> runtime.procresize
1 |
|
手动调用 runtime.GOMAXPROCS也会procresize重置p
graph TD runtime.GOMAXPROCS --> runtime.startTheWorld runtime.startTheWorld --> runtime.startTheWorldWithSema runtime.startTheWorldWithSema --> procresize
g 初始化
1 | go func() { |
实际上会被翻译成 runtime.newproc
,初始化g,丢如g的等待队列中
graph TD runtime.newproc --> runtime.newproc1
1 | func newproc(siz int32, fn *funcval) { |
graph TD newproc1 --> newg newg[gfget] --> nil{is nil?} nil -->|no|E[init stack / gostartcallfn ] nil -->|yes|C[malg] C --> D[set g status=> idle->dead] D --> k[allgadd] k --> E E --> G[set g status=> dead-> runnable] G --> runqput
结果是调用 runqput 将 g 放进了执行队列
最关键的代码
newg.sched.pc = funcPC(goexit) + sys.PCQuantum
解决了goroutine运行结束后如何运行下一个goroutine的问题。
gostartcall 函数buf.pc 中的 goexit 的函数地址放到了 goroutine 的栈顶,保证函数执行完之后能执行runtime.goexit。
runqput(_p_, newg, true)
graph TD runqput --> full{is the local runnable queue full?} full -->|no|E[put local queue ] full -->|yes|C[runqputslow] C --> K[lock/globrunqputbatch/unlock] E --> next{ next == true or false ?} next --> |true| F[puts g in the _p_.runnext slot] next --> |false| j[adds g to the tail of the runnable queue]
操作全局 sched 时,需要获取全局 sched.lock 锁,全局锁争抢的开销较大,所以才称之为 slow
m工作机制
在 runtime 中有三种线程,一种是主线程,一种是用来跑 sysmon 的线程,一种是普通的用户线程。
主线程在 runtime 由对应的全局变量: runtime.m0
来表示。
用户线程就是普通的线程了,和 p 绑定,执行 g 中的任务。
sysmon线程是一种特殊的内核线程,负责监控调度。
runtime.main执行流程(runtime/proc.go/func main())
graph TD runtime.main --> A[m0 G0] A --> D[init max stack size] D --> B[systemstack execute -> newm -> sysmon] B --> runtime.lockOsThread runtime.lockOsThread --> runtime.init runtime.init --> runtime.gcenable runtime.gcenable --> startTemplateThread startTemplateThread --> main.init main.init --> main.main
m0和g0绑定
1
2
3// m0: 系统主线程
// g0:主goroutine
// m0、g0是比较特殊的 仅用于main goroutine的父goroutine初始化maxstacksize,
1 | //执行栈最大限制:1GB on 64-bit,250MB on 32-bit |
初始化sysmon线程,创建一个新的m来跑,不需要绑定g执行,与整个调度协同脱离。
1
2
3
4
5if GOARCH != "wasm" { // no threads on wasm yet, so no sysmon
systemstack(func() {
newm(sysmon, nil)
})
}锁定lockOSThread,开始runtime初始化,开启gc
startTemplateThread webaseembly环境下不启动,比较慢,大多数程序必须要,辅助线程,解决线程异常问题
各种init初始化
sysmon线程
源码
1 | // Always runs without a P, so write barriers are not allowed. |
核心功能:
- 检查checkdead ,检查goroutine锁死,启动时检查一次。
- 处理netpoll返回,injectglist将协程的状态设置为ready状态
- 强制gc
- 收回因为 syscall 而长时间阻塞的 p,同时抢占那些执行时间过长的 g,retake(now)
- 如果 堆 内存闲置超过 5min,那么释放掉,mheap_.scavenge(int32(nscavenge), uint64(now), uint64(scavengelimit))
流程图
graph TD sysmon --> lock lock --> checkdead checkdead --> unlock unlock --> for K[scavenge heap once in a while] --> |every 10ms| for for --> usleep usleep --> netpoll netpoll --> injectglist injectglist --> retake retake --> forceGC forceGC --> K
普通线程
graph TD newm --> newm1 newm1 --> newosproc newosproc --> clone
1 |
|
1 | func newm1(mp *m) { |
工作流程
空闲的 m 会被丢进全局调度器的 midle 队列中,在需要 m 的时候,会先从这里取,如果获取不到newm申请一个
1 | func mget() *m { |
和newm相关的调用
graph TD main --> |sysmon|newm startTheWorld --> startTheWorldWithSema gcMarkTermination --> startTheWorldWithSema gcStart--> startTheWorldWithSema startTheWorldWithSema --> |helpgc|newm startTheWorldWithSema --> |run p|newm startm --> mget mget --> |if no free m|newm startTemplateThread --> |templateThread|newm LockOsThread --> startTemplateThread main --> |iscgo|startTemplateThread handoffp --> startm wakep --> startm injectglist --> startm
sched.midle 中没有空闲的 m 了,就去创建,两个特殊的sysmon,templateThread
核心流程
- schedule
graph TD schedule --> A[schedtick%61 == 0] A --> |yes|globrunqget A --> |no|runqget globrunqget --> C[gp == nil] C --> |no|execute C --> |yes|runqget runqget --> B[gp == nil] B --> |no|execute B --> |yes|findrunnable findrunnable --> execute
cgo的g不能被schedule走,cgo实用的m的g0栈
1
2
3
4
5// We should not schedule away from a g that is executing a cgo call,
// since the cgo call is using the m's g0 stack.
if _g_.m.incgo {
throw("schedule: in cgo")
}
2 . 为了保证调度的公平性,每个工作线程每进行61次调度就需要优先从全局运行队列中获取goroutine出来运行,
因为如果只调度本地运行队列中的goroutine,则全局运行队列中的goroutine有可能得不到运行
1 | if gp == nil { |
状态处理
1
2
3
4//偷窃状态的goroutine会进入spinning状态,重置状态,才能让m执行goroutine
if _g_.m.spinning {
resetspinning()
}
- 执行execute,具体 gogo(&gp.sched),汇编完成嗯,执行go func()中func(),把 g 对象的 gobuf 里的内容搬到寄存器里。然后从
gobuf.pc
寄存器存储的指令位置开始继续向后执行
Goexit
每个goroutine栈底都会有runtime.goexit(),它其实就是在创建G的时候,被设置进去的
1 | newg.sched.pc = funcPC(goexit) + sys.PCQuantum |
1 | // Finishes execution of the current goroutine. |
- G的状态变为_GDead,如果是系统G则更新全局计数器。
- 重置G身上一系列的属性变量。
- 解除M和G的互相引用关系。
- 放置在本地P或全局的free goroutine队列。
- 调度,寻找下一个可运行的goroutine。
wakep
graph TD wakep --> startm startm --> mget mget --> |no|newm mget --> |yes|notewakeup
- wakep的作用是添加一个P来执行goroutinue
在有G变为runnable的时候,调用startm
- startm(nil, true),传入的_p_ = nil,要获取一个idle P,如果获取不到,直接返回,
- 调用
mget
获得一个已经睡眠m,获取到了则调用notewakeup来唤醒m(因为m在mput的时候已经睡眠了) - 如果获取失败,就调用newm创建
gopark
阻塞,用于协程的切换,系统调用,channel读写条件不满足,抢占式调度时间片结束。
主要做两件事:
解除当前goroutine的m的绑定关系,将当前goroutine状态机切换为等待状态;
调用一次schedule()函数,在局部调度器P发起一轮新的调度
核心函数mcall(park_m),park_m是一个函数指针
- 切换当前线程的堆栈从g的堆栈切换到g0的堆栈;
- 并在g0的堆栈上执行新的函数park_m,park_m中执行schedule,调度器会重新调度选择一个goroutine去运行
- 保存当前协程的信息( PC/SP存储到g->sched),当后续对当前协程调用goready函数时候能够恢复现场;
goready
唤醒某一个goroutine,该协程转换到runnable的状态,并将其放入P的local queue,等待调度,有延时。
1 | func goready(gp *g, traceskip int) { |
findrunnable(抛开gc状态)
graph TD runqget --> A[gp == nil] A --> |no|return A --> |yes|globrunqget globrunqget --> B[gp == nil] B --> |no| return B --> |yes| C[netpollinited && lastpoll != 0] C --> |yes|netpoll netpoll --> K[gp == nil] K --> |no|return K --> |yes|runqsteal C --> |no|runqsteal runqsteal --> D[gp == nil] D --> |no|return D --> |yes|E[globrunqget] E --> F[gp == nil] F --> |no| return F --> |yes| G[check all p's runq] G --> H[runq is empty] H --> |no|runqget H --> |yes|I[netpoll] I --> J[gp == nil] J --> |no| return J --> |yes| stopm stopm --> runqget
1 |
|
调用runqget()从当前P的队列中取G(和schedule()中的调用相同),获取到就return
获取不到就全局队列中获取,获取到了就return
尝试从poll中获取,获取得到也return
获取不到就尝试从其他的p中获取,找数量超过1个的,找得到就返回,runqsteal
如果处于垃圾回收标记阶段,就进行垃圾回收的标记工作;
- 再次调用globrunqget()从全局队列中取可执行的G,获取的到就返回 return
- 再次检查所有的runqueues,如果有返回到最开始的top
- 没有就做gc方面的工作,然次从poll获取,获取的到就return,获取不到就然后调用stopm
- stopm的核心是调用mput把m结构体对象放入sched的midle空闲队列,然后通过notesleep(&m.park)函数让自己进入睡眠状态。
- 唤醒后在再次跳转到top
handoffp
p和m解除绑定状态,把 p 放回全局的 pidle 队列中
大概有5中情况调用handoffp
- 线程退出mexit
- 遍历p的时候,p的状态syscall
- m已经被某个g锁定,先停止当前m(stoplockedm),等待g可运行时,再执行g
- entersyscallblock,锁相关的时候导致的阻塞回调用到,直接p和m解除绑定状态
- retake抢占式调度,会解绑
graph TD mexit --> A[is m0?] A --> |yes|B[handoffp] A --> |no| C[iterate allm] C --> |m found|handoffp C --> |m not found| throw forEachP --> |p status == syscall| handoffp stoplockedm --> handoffp entersyscallblock --> entersyscallblock_handoff entersyscallblock_handoff --> handoffp retake --> |p status == syscall| handoffp
g状态
1 | const ( |
G的状态迁移
graph LR start{newg} --> Gidle Gidle --> |oneNewExtraM|Gdead Gidle --> |newproc1|Gdead Gdead --> |newproc1|Grunnable Gdead --> |needm|Gsyscall Gscanrunning --> |scang|Grunning Grunnable --> |execute|Grunning Gany --> |casgcopystack|Gcopystack Gcopystack --> |todotodo|Grunning Gsyscall --> |dropm|Gdead Gsyscall --> |exitsyscall0|Grunnable Gsyscall --> |exitsyscall|Grunning Grunning --> |goschedImpl|Grunnable Grunning --> |goexit0|Gdead Grunning --> |newstack|Gcopystack Grunning --> |reentersyscall|Gsyscall Grunning --> |entersyscallblock|Gsyscall Grunning --> |markroot|Gwaiting Grunning --> |gcAssistAlloc1|Gwaiting Grunning --> |park_m|Gwaiting Grunning --> |gcMarkTermination|Gwaiting Grunning --> |gcBgMarkWorker|Gwaiting Grunning --> |newstack|Gwaiting Gwaiting --> |gcMarkTermination|Grunning Gwaiting --> |gcBgMarkWorker|Grunning Gwaiting --> |markroot|Grunning Gwaiting --> |gcAssistAlloc1|Grunning Gwaiting --> |newstack|Grunning Gwaiting --> |findRunnableGCWorker|Grunnable Gwaiting --> |ready|Grunnable Gwaiting --> |findrunnable|Grunnable Gwaiting --> |injectglist|Grunnable Gwaiting --> |schedule|Grunnable Gwaiting --> |park_m|Grunnable Gwaiting --> |procresize|Grunnable Gwaiting --> |checkdead|Grunnable
P状态
1 | const ( |
P状态迁移
graph LR Pidle --> |acquirep1|Prunning Psyscall --> |retake|Pidle Psyscall --> |entersyscall_gcwait|Pgcstop Psyscall --> |exitsyscallfast|Prunning Pany --> |gcstopm|Pgcstop Pany --> |forEachP|Pidle Pany --> |releasep|Pidle Pany --> |handoffp|Pgcstop Pany --> |procresize release current p use allp 0|Pidle Pany --> |procresize when init|Pgcstop Pany --> |procresize when free old p| Pdead Pany --> |procresize after resize use current p|Prunning Pany --> |reentersyscall|Psyscall Pany --> |stopTheWorldWithSema|Pgcstop