深入浅出goroutine

深入浅出goroutine

并发和并行理解

并发是同一时间应对(dealing with)多件事情的能力。
并行是同一时间动手(doing)做多件事情的能力。

栗子:打印0-9的数字

1
2
3
4
5
6
7
8
9
10
11
package main

import "fmt"

func main() {
for i := 0; i < 10; i++ {
func() {
fmt.Println(i)
}()
}
}

使用goroutine,增加一个go关键字

1
2
3
4
5
6
7
8
9
10
11
package main

import "fmt"

func main() {
for i := 0; i < 10; i++ {
go func() {
fmt.Println(i)
}()
}
}
  1. Go 程序在运行时也总会有一个主 goroutine,这个主goroutine会在程序运行的时候启动,main函数

  2. 语法 go func(),函数做为goroutine入口函数

  3. goroutine函数执行的时间,顺序默认是不可控的,实际上是放入goroutine队列中

  4. 低成本,不需要操作系统参与

  5. 异步并发

go 主程序程序执行完goroutine没有获得运行的机会,程序就结束了,解决办法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
"fmt"
)

func main() {
num := 10
sign := make(chan struct{}, num) //初始化Channel,并且可以设置容量:

for i := 0; i < num; i++ {
go func(i int) {
fmt.Println(i)
sign <- struct{}{} //发送struct到sign中
}(i)
}

// 办法1。
//time.Sleep(time.Millisecond * 500)

// 办法2。
for j := 0; j < num; j++ {
<-sign //从sign中接收数据,这个表达式会一直被block,直到有数据可以接收。
}
}

更好的解决办法,sync.WaitGroup,Add(), Done(), Wait()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package main

import (
"fmt"
"sync"
)

func main() {
num := 10
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < num; i++ {
go func(i int) {
fmt.Println(i)
wg.Done()
}(i)
}
// 等待执行结束
wg.Wait()
}

理解线程模型(线程与内核调度实体(KSE,Kernel Scheduling Entity)之间的对应关系)

用户态和内核态

  1. 简单理解:当一个进程在执行用户自己的代码时处于用户运行态(用户态);当一个进程因为系统调用陷入内核代码中执行时处于内核运行态(内核态),简单点理解就是控制计算机的硬件资源。

  2. 权限不同,x86架构的cpu一共有四个级别,0-3级,0级特权级最高,3级特权级最低,用户态为3,内核为0,访问的地址空间也有限制

  3. 切换:1.系统调用, 2.异常 3.外围设备的中断
  4. 内核线程是一种只运行在内核地址空间的线程,共享同一份内核页表,32位系统3-4g虚拟地址空间,所以也共享同一份内核页表。这也是为什么叫内核线程,而不叫内核进程的原因

用户级线程模型

  1. 用户线程与内核线程KSE是多对一(N : 1)的映射模型
  2. 操作系统只管理用户进程,对线程无感知,内核的所有调度都是基于用户进程,线程的创建、销毁以及多线程之间的协调等操作都是由用户自己的线程库来负责
  3. 一个进程中所有创建的线程都只和同一个KSE在运行时动态绑定

  4. 大部分的协程库都属于这种方式(比如python的gevent),libco(阻塞让出,hook系统函数)

  5. 问题:并不能做到真正意义上的并发,假设在某个用户进程上的某个用户线程因为一个阻塞调用(比如I/O阻塞)而被CPU给中断(抢占式调度)了,那么该进程内的所有线程都被阻塞(因为单个用户进程内的线程自调度是没有CPU时钟中断的,从而没有轮转调度),整个进程被挂起。即便是多CPU的机器,也无济于事,因为在用户级线程模型下,一个CPU关联运行的是整个用户进程,进程内的子线程绑定到CPU执行是由用户进程调度的,内部线程对CPU是不可见的,此时可以理解为CPU的调度单位是用户进程。

    解决办法:很多的协程库会把自己一些阻塞的操作重新封装为完全的非阻塞形式,然后在以前要阻塞的点上,主动让出自己,并通过某种方式通知或唤醒其他待执行的用户线程在该KSE上运行,从而避免了内核调度器由于KSE阻塞而做上下文切换,这样整个进程也不会被阻塞了

内核级线程模型

  1. 用户线程与内核线程KSE是一对一(1 : 1)的映射模型,也就是每一个用户线程绑定一个实际的内核线程,而线程的调度则完全交付给操作系统内核去做,应用程序对线程的创建、终止以及同步都基于内核提供的系统调用来完成
  2. 大部分编程语言的线程库(比如Java的java.lang.Thread、C++11的std::thread等等)都是对操作系统的线程(内核级线程)的一层封装,创建出来的每个线程与一个独立的KSE静态绑定,因此其调度完全由操作系统内核调度器去做。
  3. 优点:,直接借助操作系统内核的线程以及调度器,所以CPU可以快速切换调度线程,于是多个线程可以同时运行,因此相较于用户级线程模型它真正做到了并行处理
  4. 缺点:由于直接借助了操作系统内核来创建、销毁和以及多个线程之间的上下文切换和调度,对性能影响很大

两级线程模型(也称混合型线程模型)

  1. 用户线程与内核KSE是多对多(N : M)的映射模型,并非绑定
  2. 一个进程可以与多个内核线程KSE关联,于是进程内的多个线程可以动态绑定不同的KSE
  3. 某个KSE因为其绑定的线程的阻塞操作被内核调度出CPU时,其关联的进程中其余用户线程可以重新与其他KSE绑定运行
  4. 用户调度器实现用户线程到KSE的调度,内核调度器实现KSE到CPU上的调度
  5. Go语言中的runtime调度器就是采用的这种实现方案,实现了Goroutine与KSE之间的动态关联

go语言的goroutine实现

G-P-M 模型

  1. G :goroutine合集

  2. P:processor,线程上下文,虚拟运算环境资源,P的数量决定了系统内最大可并行的G的数量(前提:物理CPU核数 >= P的数量),P的数量由用户设置的GOMAXPROCS决定,但是不论GOMAXPROCS设置为多大,P的数量最大为256,P提供了相关的执行环境(Context),如内存分配状态(mcache),任务队列(G)等,更多的是为了方便在不同的M切换。

  3. M:Machine,OS线程抽象,代表着真正执行计算的资源,在绑定有效的P后,进入schedule循环,M的数量是不定的,由Go Runtime调整,为了防止创建过多OS线程导致系统调度不过来,目前默认最大限制为10000个。

  • 每个P维护一个G的本地队列;

  • 当一个G被创建出来,或者变为可执行状态时,就把他放到P的可执行队列中;

  • 当一个G在M里执行结束后,P会从队列中把该G取出;如果此时P的队列为空,即没有其他G可以执行, M就随机选择另外一个P,从其可执行的G队列中取走一半。

正常流程:
  1. 通过go关键字创建一个新的goroutine

  2. 被放入一个全局的p队列

  3. 被放入一个本地的p队列

  4. m和p映射绑定,为了运行g

  5. 运行g

  6. m查看本地的p队列里面还有g没有

  7. p会去全局的p队列中查找

  8. work-stealing调度算法

  9. 去其他的p(随机)中偷窃g运行

阻塞的情况下,两种状态,系统阻塞,用户阻塞
系统调用阻塞
  1. 运行的G阻塞的时候(可以需要IO),
  2. G会阻塞在_Gsyscall状态,M也处于 block on syscall 状态
  3. 执行该G的M会与P解绑,而P则尝试与其它idle的M绑定。
  4. 执行其它G
  5. 没有其它idle的M,但P的Local队列中仍然有G需要执行,则创建一个新的M
用户调用阻塞
  1. 对应的G会被放置到某个wait队列,
  2. 该G的状态由_Gruning变为_Gwaitting
  3. M会跳过该G尝试获取并执行下一个G
  4. 此时没有runnable的G供M运行,那么M将解绑P,并进入sleep状态
  5. 当阻塞的G被另一端的G2唤醒时
  6. G被标记为runnable,尝试加入G2所在P的runnext

1
ps:在Go 1.0发布的时候,它的调度器其实G-M模型,也就是没有P的,调度过程全由G和M完成,全局锁的问题,带来性能很低。,后续版本裁引入p,印证一句哈,所有的计算机问题都可以通过一个中间层来解决。

G-P-M模型的定义放在src/runtime/runtime2.go里面,调度过程则放在了src/runtime/proc.go

优化:协程池

结构

  1. pool 是一个通用的协程池,支持不同类型的任务,亦即每一个任务绑定一个函数提交到池中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
pool.go
//协程池数据结构
type Pool struct {
//Pool的容量,也就是开启worker数量的上限,每一个worker绑定一个goroutine
capacity int32
//当前正在执行任务的worker数量
running int32
// worker的过期时长
expiryDuration time.Duration
//存放空闲worker
workers []*Worker
//当关闭该Pool支持通知所有worker退出运行以防goroutine泄露
release int32

// lock for synchronous operation.
lock sync.Mutex

// cond for waiting to get a idle worker.
cond *sync.Cond

// 确保Pool关闭操作只会执行一次。
once sync.Once

//worker临时对象池,在复用worker时减少新对象的创建并加速worker从pool中的获取速度
workerCache sync.Pool

//pool引发panic时的执行函数
PanicHandler func(interface{})
}

//pool初始化
func NewPool(size int) (*Pool, error) {
return NewTimingPool(size, DEFAULT_CLEAN_INTERVAL_TIME)
}

func NewTimingPool(size, expiry int) (*Pool, error) {
//检查参数
if size <= 0 {
return nil, ErrInvalidPoolSize
}
//清理时间间隔,DEFAULT_CLEAN_INTERVAL_TIME = 1
if expiry <= 0 {
return nil, ErrInvalidPoolExpiry
}

p := &Pool{
capacity: int32(size),
expiryDuration: time.Duration(expiry) * time.Second,
}

p.cond = sync.NewCond(&p.lock)

// 单独开始协程,启动定期清理过期worker任务
go p.periodicallyPurge()
return p, nil
}



// 提交任务到pool中
func (p *Pool) Submit(task func()) error {
//关闭pool时候将release值设置为1
if CLOSED == atomic.LoadInt32(&p.release) {
return ErrPoolClosed
}
//获取一个可用的worker,并加入
p.retrieveWorker().task <- task
return nil
}


// 返回可用执行task的work
func (p *Pool) retrieveWorker() *Worker {
var w *Worker

p.lock.Lock()
idleWorkers := p.workers
n := len(idleWorkers) - 1
if n >= 0 {
//有空闲worker,从队列尾部取出一个使用
w = idleWorkers[n]
idleWorkers[n] = nil
p.workers = idleWorkers[:n]
p.lock.Unlock()
} else if p.Running() < p.Cap() { //判断是否达到上限没有
p.lock.Unlock()
// 当前pool中无空闲worker,且pool数量未达到上线
// pool会先从临时对象池中寻找是否有已完成任务的worker,
// 若临时对象池中不存在,则重新创建一个worker并将其启动
if cacheWorker := p.workerCache.Get(); cacheWorker != nil {
w = cacheWorker.(*Worker)
} else {
w = &Worker{
pool: p,
task: make(chan func(), workerChanCap),
}
}
w.run()
} else {
for {
//阻塞等待直到有空闲worker,防止无节制地创建goroutine,大量创建g,导致系统无法复用g
p.cond.Wait()
l := len(p.workers) - 1
if l < 0 {
continue
}
w = p.workers[l]
p.workers[l] = nil
p.workers = p.workers[:l]
break
}
p.lock.Unlock()
}
return w
}


//worker 执行完任务后放回Pool
//使得其余正在阻塞的任务可以获取worker
func (p *Pool) revertWorker(worker *Worker) bool {
if CLOSED == atomic.LoadInt32(&p.release) {
return false
}
worker.recycleTime = time.Now()
p.lock.Lock()
p.workers = append(p.workers, worker)
//释放worker回pool
p.cond.Signal()
p.lock.Unlock()
return true
}

//过期清理
func (p *Pool) periodicallyPurge() {
//心跳定时器1秒
heartbeat := time.NewTicker(p.expiryDuration)
defer heartbeat.Stop()

for range heartbeat.C {
if CLOSED == atomic.LoadInt32(&p.release) {
break
}
currentTime := time.Now()
p.lock.Lock()
idleWorkers := p.workers
n := -1
//遍历可用worker时前面的往往里当前时间越久
for i, w := range idleWorkers {
if currentTime.Sub(w.recycleTime) <= p.expiryDuration {
//计算当前时间减去该worker的最后运行时间之差是否符合过期时长,实为过期,
break
}
n = i
w.task <- nil
idleWorkers[i] = nil
}
if n > -1 {
if n >= len(idleWorkers)-1 {
// 全部过期
p.workers = idleWorkers[:0]
} else {
// 部分过期
p.workers = idleWorkers[n+1:]
}
}
p.lock.Unlock()
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
work.go

type Worker struct {
pool *Pool
task chan func()
recycleTime time.Time
}

//执行任务
func (w *Worker) run() {
w.pool.incRunning()
go func() {
defer func() {
if p := recover(); p != nil {
//处理各种各种问题引发panic
w.pool.decRunning()
w.pool.workerCache.Put(w)
if w.pool.PanicHandler != nil {
w.pool.PanicHandler(p)
} else {
log.Printf("worker exits from a panic: %v", p)
}
}
}()

for f := range w.task {
if nil == f {
//任务队列中的函数全部被执行完后,
//pool中正在执行的worker数 -1,
//将worker 放回对象池
w.pool.decRunning()
w.pool.workerCache.Put(w)
return
}
f()
//回收复用,执行完后放回pool
//使得其余正在阻塞的任务可以获取worker
if ok := w.pool.revertWorker(w); !ok {
break
}
}
}()
}

流程

效果:

协程数量=100000000 NoPool AntsPool(AntsSize = 2000000) AntsPool(AntsSize = int32(size))
go 1.8.2 600s(ran too long) 149.56s(12125 MB) 149.56s(9450 MB)
go1.12.5 33.07s(6183 MB) 62.70s(1866 MB) 59.46s(1564 MB)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//测试代码
func TestAntsPool(t *testing.T) {
defer ants.Release()
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
ants.Submit(func() {
demoFunc()
wg.Done()
})
}
wg.Wait()
}

func TestNoPool(t *testing.T) {
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func() {
demoFunc()
wg.Done()
}()
}
}

总结:

  1. go语言的协程太强了,对比libco之类的协程库来说,单机千万级的并发

  2. go语言新版本协程优化性能提升明显,数量级上的提升

  3. 协程池对go语言来说带来的好处就是内存的可控,对于新版本来说并不会秒杀原生并发的goroutine。

GOMAXPROCS

  1. go 中 runtime 包的一个函数。
  2. 它设置了 P 的最多的个数,1.5之后默认为runtime.GOMAXPROCS(runtime.NumCPU()),最大256。

  3. GOMAXPROCS可以控制并发或者是并行,并行必须cpu大于1

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    package main

    import (
    "fmt"
    "runtime"
    "sync"
    )

    func trace(start, end int8) {
    for i := 0; i <= 3; i++ {
    for i := start; i <= end; i++ {
    fmt.Printf("%c ", i)
    //time.Sleep(time.Second)
    }
    }
    }

    func main() {
    runtime.GOMAXPROCS(1) //限制只有一个逻辑处理器
    var wg sync.WaitGroup //用于等待所有协程都完成
    wg.Add(2)

    go func() {
    defer wg.Done() //程序退出的时候执行
    trace('a', 'z')
    }()

    go func() {
    defer wg.Done() //程序退出的时候执行
    trace('A', 'Z')
    }()
    wg.Wait() //等待所有协程的完成
    }

竞争状态处理方式

  • 使用原子函数操作

    atomic

  • 使用互斥锁锁住临界区

    mutex.Lock()/mutex.Unlock()

  • 使用通道chan

    不要通过共享内存来通信,而应该通过通信来共享内存。

  1. channel 提供了一种通信机制

  2. channel 本身还需关联了一个类型,也就是 channel 可以发送数据的类型

  3. channel创建,读写

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    ch := make(chan int)
    close(ch)
    /*
    channel有两种,带缓冲的和不带缓冲的,默认不带缓冲的channel中,每一个发送者与接收者都会阻塞当前线程,
    带缓冲的如果超过容量大小,也会导致阻塞,导致死锁。
    */

    //往channel写
    ch <- x

    //从channel读
    x <- ch
  4. channel可进行3种操作:读,写,关闭,组合3种channel状态可以组合出9钟情况

    ps:有1个特殊场景:当nil的通道在select的某个case中时,这个case会阻塞,但不会造成死锁

    | 操作 | nil的channel | 正常channel | 已关闭channel |
    | ——— | ———— | ———– | ————- |
    | <- ch | 阻塞 | 成功或阻塞 | 读到零值 |
    | ch <- | 阻塞 | 成功或阻塞 | panic |
    | close(ch) | panic | 成功 | panic |

  5. struct{}类型channel

    使用channel传递信号

    channel struct{}是可以带缓冲

  6. select

    监听channel数据流动,select默认是阻塞的,只有当监听的channel中有发送或接收可以进行时才会运行,当多个channel都准备好的时候,select是随机的选择一个执行的

    处理timeout超时

常见问题:

compile: version “go1.9” does not match go tool version “go1.9.1”
参考:https://stackoverflow.com/questions/46693653/compile-version-go1-9-does-not-match-go-tool-version-go1-9-1

解决办法

open -e .bash_profile

export GOROOT=/usr/local/opt/go/libexec

source ~/.bash_profile

go语言版本切换

安装GVM

bash < <(curl -s -S -L https://raw.githubusercontent.com/moovweb/gvm/master/binscripts/gvm-installer)

1
2
3
安装指定版本gvm install go1.4.1 -B  
查看所有版本gvm list
切换版本 gvm use go1.9.7