0%

《Golang》Goroutine的抢占

前面讲了Golang启动过程以及Goroutine的调度,g0会新建一个g(这个g被成为主协程),将runtime.main函数挂钩到这个g,然后将这个g指定为m0的p的下一个运行的g,m0会进行调度找工作,会找到刚才新建的g,然后开始运行runtime.main函数

下面我们研究一下 runtime.main函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
// The main goroutine.
func main() {
g := getg()

// Racectx of m0->g0 is used only as the parent of the main goroutine.
// It must not be used for anything else.
g.m.g0.racectx = 0

// Max stack size is 1 GB on 64-bit, 250 MB on 32-bit.
// Using decimal instead of binary GB and MB because
// they look nicer in the stack overflow failure message.
if sys.PtrSize == 8 {
maxstacksize = 1000000000
} else {
maxstacksize = 250000000
}

// Allow newproc to start new Ms.
mainStarted = true // mainStarted设置为true,新的m才会被创建(查看newproc1函数)

if GOARCH != "wasm" { // no threads on wasm yet, so no sysmon
systemstack(func() {
newm(sysmon, nil) // 新启动一个m,指定m的启动函数为sysmon,没有p
})
}

// Lock the main goroutine onto this, the main OS thread,
// during initialization. Most programs won't care, but a few
// do require certain calls to be made by the main thread.
// Those can arrange for main.main to run in the main thread
// by calling runtime.LockOSThread during initialization
// to preserve the lock.
lockOSThread() // 将当前g绑死在当前m,即m0上

if g.m != &m0 { // runtime.main函数必然是运行在m0上的
throw("runtime.main not on m0")
}

doInit(&runtime_inittask) // must be before defer 执行各个package下面的init函数
if nanotime() == 0 {
throw("nanotime returning zero")
}

// Defer unlock so that runtime.Goexit during init does the unlock too.
needUnlock := true
defer func() {
if needUnlock {
unlockOSThread() // 本函数结束如果需要就解除对m0的独占,使m0可以处理其他g了
}
}()

// Record when the world started.
runtimeInitTime = nanotime()

gcenable() // 启动两个g进行gc

main_init_done = make(chan bool)
if iscgo {
if _cgo_thread_start == nil {
throw("_cgo_thread_start missing")
}
if GOOS != "windows" {
if _cgo_setenv == nil {
throw("_cgo_setenv missing")
}
if _cgo_unsetenv == nil {
throw("_cgo_unsetenv missing")
}
}
if _cgo_notify_runtime_init_done == nil {
throw("_cgo_notify_runtime_init_done missing")
}
// Start the template thread in case we enter Go from
// a C-created thread and need to create a new thread.
startTemplateThread()
cgocall(_cgo_notify_runtime_init_done, nil)
}

doInit(&main_inittask)

close(main_init_done)

needUnlock = false
unlockOSThread() // 现在就解除独占

if isarchive || islibrary { // 如果使使用-buildmode=c-archive or c-shared编译出来的,那么直接返回
// A program compiled with -buildmode=c-archive or c-shared
// has a main, but it is not executed.
return
}
fn := main_main // main_main会被编译器翻译成我们写的main.main入口函数
fn() // 执行入口函数,可能启动了更多的g,其中的g可能被另一个m执行。一般情况下,这里开发人员应该自行阻塞
if raceenabled {
racefini()
}

// Make racy client program work: if panicking on
// another goroutine at the same time as main returns,
// let the other goroutine finish printing the panic trace.
// Once it does, it will exit. See issues 3934 and 20018.
if atomic.Load(&runningPanicDefers) != 0 {
// Running deferred functions should not take long.
for c := 0; c < 1000; c++ {
if atomic.Load(&runningPanicDefers) == 0 {
break
}
Gosched()
}
}
if atomic.Load(&panicking) != 0 { // panicking在调用panic函数时会增加然后减少, panicking可以阻止进程退出(main函数中启动了其他的g,这个g在新的m上运行然后panic了,这里就有可能成立)
gopark(nil, nil, waitReasonPanicWait, traceEvGoStop, 1) // 将当前g设置为等待状态,并调用schedual调度函数
}

exit(0) // 直接退出进程,其他的g都将结束,不管其是否正在运行
for {
var x *int32
*x = 0
}
}

runtime.main函数新建了一个m来启动sysmon函数,下面看看 newm函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// Create a new m. It will start off with a call to fn, or else the scheduler.
// fn needs to be static and not a heap allocated closure.
// May run with m.p==nil, so write barriers are not allowed.
//go:nowritebarrierrec
func newm(fn func(), _p_ *p) {
mp := allocm(_p_, fn) // 初始化m实例,将fn也就是sysmon函数指定为m的mstartfn属性,会在m启动后进入调度前得到执行,但并不是线程的启动函数
mp.nextp.set(_p_)
mp.sigmask = initSigmask
if gp := getg(); gp != nil && gp.m != nil && (gp.m.lockedExt != 0 || gp.m.incgo) && GOOS != "plan9" {
// We're on a locked M or a thread that may have been
// started by C. The kernel state of this thread may
// be strange (the user may have locked it for that
// purpose). We don't want to clone that into another
// thread. Instead, ask a known-good thread to create
// the thread for us.
//
// This is disabled on Plan 9. See golang.org/issue/22227.
//
// TODO: This may be unnecessary on Windows, which
// doesn't model thread creation off fork.
lock(&newmHandoff.lock)
if newmHandoff.haveTemplateThread == 0 {
throw("on a locked thread with no template thread")
}
mp.schedlink = newmHandoff.newm
newmHandoff.newm.set(mp)
if newmHandoff.waiting {
newmHandoff.waiting = false
notewakeup(&newmHandoff.wake)
}
unlock(&newmHandoff.lock)
return
}
newm1(mp)
}

func newm1(mp *m) {
if iscgo {
var ts cgothreadstart
if _cgo_thread_start == nil {
throw("_cgo_thread_start missing")
}
ts.g.set(mp.g0)
ts.tls = (*uint64)(unsafe.Pointer(&mp.tls[0]))
ts.fn = unsafe.Pointer(funcPC(mstart))
if msanenabled {
msanwrite(unsafe.Pointer(&ts), unsafe.Sizeof(ts))
}
execLock.rlock() // Prevent process clone.
asmcgocall(_cgo_thread_start, unsafe.Pointer(&ts))
execLock.runlock()
return
}
execLock.rlock() // Prevent process clone.
newosproc(mp) // 启动系统线程,mstart_stub被指定为线程的启动函数
execLock.runlock()
}

新的m启动后会执行sysmon函数(意味着sysmon函数是在newm的时候就启动了,不会等待调度),下面看看这篇文章的主角 sysmon函数 吧

sysmon函数是在m进入调度前执行的,如果sysmon函数阻塞的话,意味着这个m就一直不会进入调度,而是一直为sysmon函数服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212

// Always runs without a P, so write barriers are not allowed.
//
//go:nowritebarrierrec
func sysmon() {
lock(&sched.lock)
sched.nmsys++
checkdead()
unlock(&sched.lock)

lasttrace := int64(0)
idle := 0 // how many cycles in succession we had not wokeup somebody
delay := uint32(0)
for { // 死循环
if idle == 0 { // start with 20us sleep...
delay = 20
} else if idle > 50 { // start doubling the sleep after 1ms...
delay *= 2
}
if delay > 10*1000 { // up to 10ms
delay = 10 * 1000
}
usleep(delay) // 睡眠
now := nanotime()
next, _ := timeSleepUntil()
if debug.schedtrace <= 0 && (sched.gcwaiting != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs)) {
lock(&sched.lock)
if atomic.Load(&sched.gcwaiting) != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs) {
if next > now {
atomic.Store(&sched.sysmonwait, 1)
unlock(&sched.lock)
// Make wake-up period small enough
// for the sampling to be correct.
sleep := forcegcperiod / 2
if next-now < sleep {
sleep = next - now
}
shouldRelax := sleep >= osRelaxMinNS
if shouldRelax {
osRelax(true)
}
notetsleep(&sched.sysmonnote, sleep)
if shouldRelax {
osRelax(false)
}
now = nanotime()
next, _ = timeSleepUntil()
lock(&sched.lock)
atomic.Store(&sched.sysmonwait, 0)
noteclear(&sched.sysmonnote)
}
idle = 0
delay = 20
}
unlock(&sched.lock)
}
// trigger libc interceptors if needed
if *cgo_yield != nil {
asmcgocall(*cgo_yield, nil)
}
// poll network if not polled for more than 10ms
lastpoll := int64(atomic.Load64(&sched.lastpoll))
if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now { // 如果上一次netpoll与现在相隔了10ms以上
atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
list := netpoll(0) // non-blocking - returns list of goroutines 进行netpoll,取到所有等待运行的g
if !list.empty() {
// Need to decrement number of idle locked M's
// (pretending that one more is running) before injectglist.
// Otherwise it can lead to the following situation:
// injectglist grabs all P's but before it starts M's to run the P's,
// another M returns from syscall, finishes running its G,
// observes that there is no work to do and no other running M's
// and reports deadlock.
incidlelocked(-1)
injectglist(&list) // 将g全部设置为可运行,并且加入全局可运行队列等待消费
incidlelocked(1)
}
}
if next < now { // 如果下一个定时器的触发时间比现在小,说明定时器没有m消费
// There are timers that should have already run,
// perhaps because there is an unpreemptible P.
// Try to start an M to run them.
startm(nil, false) // 尝试来一个m
}
// retake P's blocked in syscalls
// and preempt long running G's
if retake(now) != 0 { // 尝试抢占所有正在运行的而且连续运行了超过10ms的p的当前g
idle = 0
} else {
idle++
}
// check if we need to force a GC
if t := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && atomic.Load(&forcegc.idle) != 0 {
lock(&forcegc.lock)
forcegc.idle = 0
var list gList
list.push(forcegc.g)
injectglist(&list)
unlock(&forcegc.lock)
}
if debug.schedtrace > 0 && lasttrace+int64(debug.schedtrace)*1000000 <= now {
lasttrace = now
schedtrace(debug.scheddetail > 0)
}
}
}

func retake(now int64) uint32 {
n := 0
// Prevent allp slice changes. This lock will be completely
// uncontended unless we're already stopping the world.
lock(&allpLock)
// We can't use a range loop over allp because we may
// temporarily drop the allpLock. Hence, we need to re-fetch
// allp each time around the loop.
for i := 0; i < len(allp); i++ { // 总共有多少p就循环多少次
_p_ := allp[i]
if _p_ == nil {
// This can happen if procresize has grown
// allp but not yet created new Ps.
continue
}
pd := &_p_.sysmontick
s := _p_.status
sysretake := false // 是否抢占处于系统调用中的g
if s == _Prunning || s == _Psyscall { // 如果p正在运行或者处于系统调用
// Preempt G if it's running for too long.
t := int64(_p_.schedtick)
if int64(pd.schedtick) != t {
pd.schedtick = uint32(t)
pd.schedwhen = now
} else if pd.schedwhen+forcePreemptNS <= now { // 如果p连续运行时间超过了10ms
preemptone(_p_) // 告诉运行在p上的g停止运行(将g的可抢占标记设置为true,newstack函数被调用(被morestack调用)时触发抢占,那时g会被真正的停止)
// In case of syscall, preemptone() doesn't
// work, because there is no M wired to P.
sysretake = true
}
}
if s == _Psyscall {
// Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
t := int64(_p_.syscalltick)
if !sysretake && int64(pd.syscalltick) != t {
pd.syscalltick = uint32(t)
pd.syscallwhen = now
continue
}
// On the one hand we don't want to retake Ps if there is no other work to do,
// but on the other hand we want to retake them eventually
// because they can prevent the sysmon thread from deep sleep.
if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now { // 如果p的运行队列空且(存在自旋的m或p)且系统调用时间不足10ms,则不抢占
continue
}
// Drop allpLock so we can take sched.lock.
unlock(&allpLock)
// Need to decrement number of idle locked M's
// (pretending that one more is running) before the CAS.
// Otherwise the M from which we retake can exit the syscall,
// increment nmidle and report deadlock.
incidlelocked(-1)
if atomic.Cas(&_p_.status, s, _Pidle) { // 将p状态改成空闲
if trace.enabled {
traceGoSysBlock(_p_)
traceProcStop(_p_)
}
n++
_p_.syscalltick++
handoffp(_p_) // 将p从m中移走,必要的话会选择一个新m或创建一个m挂钩这个p,然后进入调度
}
incidlelocked(1)
lock(&allpLock)
}
}
unlock(&allpLock)
return uint32(n)
}

// Tell the goroutine running on processor P to stop.
// This function is purely best-effort. It can incorrectly fail to inform the
// goroutine. It can send inform the wrong goroutine. Even if it informs the
// correct goroutine, that goroutine might ignore the request if it is
// simultaneously executing newstack.
// No lock needs to be held.
// Returns true if preemption request was issued.
// The actual preemption will happen at some point in the future
// and will be indicated by the gp->status no longer being
// Grunning
func preemptone(_p_ *p) bool {
mp := _p_.m.ptr()
if mp == nil || mp == getg().m {
return false
}
gp := mp.curg
if gp == nil || gp == mp.g0 {
return false
}

gp.preempt = true // 抢占信息设置true,表明自己即将被抢占

// Every call in a go routine checks for stack overflow by
// comparing the current stack pointer to gp->stackguard0.
// Setting gp->stackguard0 to StackPreempt folds
// preemption into the normal stack overflow check.
gp.stackguard0 = stackPreempt // g中的每一个函数调用都会通过当前的栈指针到stackguard0来检查栈溢出(这是编译器通过编译链接实现的),如果检查发现g的栈小了,这里设置成stackPreempt就会导致栈检查失败,就会调用morestack,而morestack会肩负起检查抢占的任务。所以设置为stackPreempt就可以让每次函数调用都能调用morestack,从而执行抢占操作

// Request an async preemption of this P.
if preemptMSupported && debug.asyncpreemptoff == 0 {
_p_.preempt = true
preemptM(mp)
}

return true
}

sysmon的确是阻塞的,说明整个golang程序运行过程中,都有一个单独的m运行sysmon去进行netpoll的监控以及goroutine的抢占工作

前面说到g中的每一个函数调用都会通过当前的栈指针到stackguard0来检查栈溢出(这是编译器通过编译链接实现的),如果检查发现g的栈小了,就会调用morestack,而morestack会肩负起检查抢占的任务。

其实并不是每个函数调用都会检查栈空间,编译器会根据函数中内容可能将函数标记为NOSPLIT,从而优化掉栈检查,提高性能

例如下面的Test函数就会被优化,而不会执行栈检查

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package main

import (
"runtime"
)

func main() {
runtime.GOMAXPROCS(1)
go func() {
panic(1)
}()
for {
Test()
}
}

func Test() {
b := make([]byte, 200)
b[0] = 23
}

检查过程稍微解释一下:

1
2
3
4
5
6
7
8
9
package main

func main() {
Test1()
}

func Test1() {
_ = make([]byte, 200) // 当被调用函数栈帧小于StackSmall的时候,编译器不会加上栈空间大小判断而是直接执行,在一定程度上优化了小函数的调用
}

生成汇编文件:

1
go tool compile -N -l -S ./bin/main/main.go

(TLS)取到的是结构体G的第一个域,也就是g->stackguard地址,将它赋值给CX。然后CX地址的值与SP进行比较,如果SP大于g->stackguard了,则会调用runtime.morestack函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
"".Test1 STEXT size=109 args=0x0 locals=0xd0
0x0000 00000 (./bin/main/main.go:7) TEXT "".Test1(SB), ABIInternal, $208-0
0x0000 00000 (./bin/main/main.go:7) MOVQ (TLS), CX // 取到的是结构体G的第一个域,也就是g->stackguard地址
0x0009 00009 (./bin/main/main.go:7) LEAQ -80(SP), AX
0x000e 00014 (./bin/main/main.go:7) CMPQ AX, 16(CX) // 然后CX地址的值与SP进行比较, 如果SP大于g->stackguard了,则会调用runtime.morestack函数
0x0012 00018 (./bin/main/main.go:7) PCDATA $0, $-2
0x0012 00018 (./bin/main/main.go:7) JLS 102 // 跳转到102处,会执行栈检测
0x0014 00020 (./bin/main/main.go:7) PCDATA $0, $-1
0x0014 00020 (./bin/main/main.go:7) SUBQ $208, SP
0x001b 00027 (./bin/main/main.go:7) MOVQ BP, 200(SP)
0x0023 00035 (./bin/main/main.go:7) LEAQ 200(SP), BP
0x002b 00043 (./bin/main/main.go:7) PCDATA $0, $-2
0x002b 00043 (./bin/main/main.go:7) PCDATA $1, $-2
0x002b 00043 (./bin/main/main.go:7) FUNCDATA $0, gclocals·33cdeccccebe80329f1fdbee7f5874cb(SB)
0x002b 00043 (./bin/main/main.go:7) FUNCDATA $1, gclocals·33cdeccccebe80329f1fdbee7f5874cb(SB)
0x002b 00043 (./bin/main/main.go:7) FUNCDATA $2, gclocals·cd666f9a7f09fcd2aca7dadbf3522159(SB)
0x002b 00043 (./bin/main/main.go:8) PCDATA $0, $0
0x002b 00043 (./bin/main/main.go:8) PCDATA $1, $0
0x002b 00043 (./bin/main/main.go:8) MOVQ $0, ""..autotmp_0(SP)
0x0033 00051 (./bin/main/main.go:8) PCDATA $0, $1
0x0033 00051 (./bin/main/main.go:8) LEAQ ""..autotmp_0+8(SP), DI
0x0038 00056 (./bin/main/main.go:8) XORPS X0, X0
0x003b 00059 (./bin/main/main.go:8) PCDATA $0, $0
0x003b 00059 (./bin/main/main.go:8) DUFFZERO $247
0x004e 00078 (./bin/main/main.go:8) PCDATA $0, $2
0x004e 00078 (./bin/main/main.go:8) LEAQ ""..autotmp_0(SP), AX
0x0052 00082 (./bin/main/main.go:8) PCDATA $0, $0
0x0052 00082 (./bin/main/main.go:8) TESTB AL, (AX)
0x0054 00084 (./bin/main/main.go:8) JMP 86
0x0056 00086 (./bin/main/main.go:9) PCDATA $0, $-1
0x0056 00086 (./bin/main/main.go:9) PCDATA $1, $-1
0x0056 00086 (./bin/main/main.go:9) MOVQ 200(SP), BP
0x005e 00094 (./bin/main/main.go:9) ADDQ $208, SP
0x0065 00101 (./bin/main/main.go:9) RET
0x0066 00102 (./bin/main/main.go:9) NOP
0x0066 00102 (./bin/main/main.go:7) PCDATA $1, $-1
0x0066 00102 (./bin/main/main.go:7) PCDATA $0, $-2
0x0066 00102 (./bin/main/main.go:7) CALL runtime.morestack_noctxt(SB)
0x006b 00107 (./bin/main/main.go:7) PCDATA $0, $-1
0x006b 00107 (./bin/main/main.go:7) JMP 0

morestack中会调用newstack

接下来简单看下 newstack函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
// Called from runtime·morestack when more stack is needed.
// Allocate larger stack and relocate to new stack.
// Stack growth is multiplicative, for constant amortized cost.
//
// g->atomicstatus will be Grunning or Gscanrunning upon entry.
// If the scheduler is trying to stop this g, then it will set preemptStop.
//
// This must be nowritebarrierrec because it can be called as part of
// stack growth from other nowritebarrierrec functions, but the
// compiler doesn't check this.
//
//go:nowritebarrierrec
func newstack() {
thisg := getg()
// TODO: double check all gp. shouldn't be getg().
if thisg.m.morebuf.g.ptr().stackguard0 == stackFork {
throw("stack growth after fork")
}
if thisg.m.morebuf.g.ptr() != thisg.m.curg {
print("runtime: newstack called from g=", hex(thisg.m.morebuf.g), "\n"+"\tm=", thisg.m, " m->curg=", thisg.m.curg, " m->g0=", thisg.m.g0, " m->gsignal=", thisg.m.gsignal, "\n")
morebuf := thisg.m.morebuf
traceback(morebuf.pc, morebuf.sp, morebuf.lr, morebuf.g.ptr())
throw("runtime: wrong goroutine in newstack")
}

gp := thisg.m.curg

if thisg.m.curg.throwsplit {
// Update syscallsp, syscallpc in case traceback uses them.
morebuf := thisg.m.morebuf
gp.syscallsp = morebuf.sp
gp.syscallpc = morebuf.pc
pcname, pcoff := "(unknown)", uintptr(0)
f := findfunc(gp.sched.pc)
if f.valid() {
pcname = funcname(f)
pcoff = gp.sched.pc - f.entry
}
print("runtime: newstack at ", pcname, "+", hex(pcoff),
" sp=", hex(gp.sched.sp), " stack=[", hex(gp.stack.lo), ", ", hex(gp.stack.hi), "]\n",
"\tmorebuf={pc:", hex(morebuf.pc), " sp:", hex(morebuf.sp), " lr:", hex(morebuf.lr), "}\n",
"\tsched={pc:", hex(gp.sched.pc), " sp:", hex(gp.sched.sp), " lr:", hex(gp.sched.lr), " ctxt:", gp.sched.ctxt, "}\n")

thisg.m.traceback = 2 // Include runtime frames
traceback(morebuf.pc, morebuf.sp, morebuf.lr, gp)
throw("runtime: stack split at bad time")
}

morebuf := thisg.m.morebuf
thisg.m.morebuf.pc = 0
thisg.m.morebuf.lr = 0
thisg.m.morebuf.sp = 0
thisg.m.morebuf.g = 0

// NOTE: stackguard0 may change underfoot, if another thread
// is about to try to preempt gp. Read it just once and use that same
// value now and below.
preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt // 当前g是否需要被抢占

// Be conservative about where we preempt.
// We are interested in preempting user Go code, not runtime code.
// If we're holding locks, mallocing, or preemption is disabled, don't
// preempt.
// This check is very early in newstack so that even the status change
// from Grunning to Gwaiting and back doesn't happen in this case.
// That status change by itself can be viewed as a small preemption,
// because the GC might change Gwaiting to Gscanwaiting, and then
// this goroutine has to wait for the GC to finish before continuing.
// If the GC is in some way dependent on this goroutine (for example,
// it needs a lock held by the goroutine), that small preemption turns
// into a real deadlock.
if preempt {
if !canPreemptM(thisg.m) { // 如果m被当前g独占,则不抢占,继续运行当前g
// Let the goroutine keep running for now.
// gp->preempt is set, so it will be preempted next time.
gp.stackguard0 = gp.stack.lo + _StackGuard
gogo(&gp.sched) // never return
}
}

if gp.stack.lo == 0 {
throw("missing stack in newstack")
}
sp := gp.sched.sp
if sys.ArchFamily == sys.AMD64 || sys.ArchFamily == sys.I386 || sys.ArchFamily == sys.WASM {
// The call to morestack cost a word.
sp -= sys.PtrSize
}
if stackDebug >= 1 || sp < gp.stack.lo {
print("runtime: newstack sp=", hex(sp), " stack=[", hex(gp.stack.lo), ", ", hex(gp.stack.hi), "]\n",
"\tmorebuf={pc:", hex(morebuf.pc), " sp:", hex(morebuf.sp), " lr:", hex(morebuf.lr), "}\n",
"\tsched={pc:", hex(gp.sched.pc), " sp:", hex(gp.sched.sp), " lr:", hex(gp.sched.lr), " ctxt:", gp.sched.ctxt, "}\n")
}
if sp < gp.stack.lo {
print("runtime: gp=", gp, ", goid=", gp.goid, ", gp->status=", hex(readgstatus(gp)), "\n ")
print("runtime: split stack overflow: ", hex(sp), " < ", hex(gp.stack.lo), "\n")
throw("runtime: split stack overflow")
}

if preempt {
if gp == thisg.m.g0 { // g0无法被抢占
throw("runtime: preempt g0")
}
if thisg.m.p == 0 && thisg.m.locks == 0 {
throw("runtime: g is running but p is not")
}

if gp.preemptShrink {
// We're at a synchronous safe point now, so
// do the pending stack shrink.
gp.preemptShrink = false
shrinkstack(gp)
}

if gp.preemptStop {
preemptPark(gp) // never returns
}

// Act like goroutine called runtime.Gosched.
gopreempt_m(gp) // never return 将g从running状态改成runnable状态,然后调用schedule函数进入调度
}

// Allocate a bigger segment and move the stack.
oldsize := gp.stack.hi - gp.stack.lo
newsize := oldsize * 2
if newsize > maxstacksize {
print("runtime: goroutine stack exceeds ", maxstacksize, "-byte limit\n")
print("runtime: sp=", hex(sp), " stack=[", hex(gp.stack.lo), ", ", hex(gp.stack.hi), "]\n")
throw("stack overflow")
}

// The goroutine must be executing in order to call newstack,
// so it must be Grunning (or Gscanrunning).
casgstatus(gp, _Grunning, _Gcopystack)

// The concurrent GC will not scan the stack while we are doing the copy since
// the gp is in a Gcopystack status.
copystack(gp, newsize)
if stackDebug >= 1 {
print("stack grow done\n")
}
casgstatus(gp, _Gcopystack, _Grunning)
gogo(&gp.sched)
}

当前g将自己从running状态改成runnable状态,然后调用schedule函数进入调度,就可以完成当前g的抢占

讲到这里就可以解释上一篇文章提出的问题:

m找到一个g后就会立马运行这个g,假如这个g是一个不会终止的task,而下一次调度函数的调用在goexit中,g不终止那调度就永远都不会发生,此m的p上的所有其他g都没有运行的机会?

此m的p上的所有其他g会被sysmon监控到p连续执行超过10ms,然后g就会在某次函数调用时被强行抢占,m会被让出给其他的g运行

总结

  1. 抢占发生在sysmon,p占用m时间过长,m运行的g就会被抢占(主动被动双管齐下)
  2. 被动抢占的真正让位时机是在morestack中

下篇预告

Defer的实现原理




微信关注我,及时接收最新技术文章