0%

《Golang》Goroutine基于信号的异步抢占

问题

再讲基于信号的异步抢占前,我先抛出一个问题,请看下面代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package main

import (
"runtime"
)

func main() {
runtime.GOMAXPROCS(1)
go func() {
panic(1)
}()
a := 1
for {
a++
// fmt.Println(11)
}
}

如果Go版本是v1.14之前的,程序运行结果是怎样的?

如果Go版本是v1.14或之后的,程序运行结果是怎样的?

如果Go版本是v1.14或之后的,使用 GODEBUG=asyncpreemptoff=1 go run ./bin/main/ 运行程序,程序运行结果是怎样的?

如果把 a++ 那一行换成它下面一行,程序运行结果是怎样的?

异步抢占简介

先复习下前面讲的抢占原理。

在runtime.main方法中,sysmon模块会被使用独立线程启动,目的之一就是用来监控追踪各个p的连续运行时长。

一旦发现p连续运行超过10ms,就会触发被动抢占,设置当前运行的g为可被抢占。

然后在发生特定函数调用的地方会触发morestack函数,接着会发现自己是可被抢占的,然后就将自己变成可运行的并且走进调度函数。这样自己就被抢占了

回头看上面的问题。

首先设置了最大拥有1个p,意味着所有的g都在一个p上调度。

主协程中运行一个for循环,而且主协程中没有任何函数调用。

另一个协程直接panic,如果这个协程得到执行,那么结果就是整个进程立马退出

如果Go版本是v1.14之前的,就不存在异步抢占机制,sysmon有监控到这个p连续工作了10ms以上,触发了被动抢占,但是当前进入for循环的g没有发生任何函数调用,就不会触发morestack,导致这个g永远无法被抢占,结果就是程序被hang住。

其实,类似的,在操作系统的早期,也有这类问题。比如在Windows 3.1中,程序调用Yield、GetMessage、PeekMessage等系统调用时,会切入内核态,内核检查CPU是不是应该分配给其他程序,如果需要,则暂停当前程序,执行权移交给另一个程序。但是如果某个程序在进行一个CPU密集任务,没有调用任何系统调用,那么它就一直霸占CPU,整个系统跟死机了一样

如果Go版本是v1.14或之后的,引入了异步抢占机制,进入for循环的g就会被正常抢占,结果就是程序panic退出

如果把 a++ 那一行换成它下面一行,因为发生了系统调用,运行时会将这个p从m中移除,然后新建一个m,然后进入调度,直接被主动抢占了,结果就是程序panic退出

异步抢占原理

先看看信号处理函数的注册

mstart1函数是每个m启动都会执行的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
func mstart1() {
_g_ := getg()

if _g_ != _g_.m.g0 { // 判断是不是g0
throw("bad runtime·mstart")
}

// Record the caller for use as the top of stack in mcall and
// for terminating the thread.
// We're never coming back to mstart1 after we call schedule,
// so other calls can reuse the current frame.
save(getcallerpc(), getcallersp()) // 保存pc、sp信息到g0
asminit() // asm初始化
minit() // m初始化

// Install signal handlers; after minit so that minit can
// prepare the thread to be able to handle the signals.
if _g_.m == &m0 { // 只有主线程才能注册信号处理函数
mstartm0() // 启动m0的signal handler
}

if fn := _g_.m.mstartfn; fn != nil {
fn()
}

if _g_.m != &m0 { // 如果不是m0
acquirep(_g_.m.nextp.ptr())
_g_.m.nextp = 0
}
schedule() // 进入调度。这个函数会阻塞
}

// mstartm0 implements part of mstart1 that only runs on the m0.
//
// Write barriers are allowed here because we know the GC can't be
// running yet, so they'll be no-ops.
//
//go:yeswritebarrierrec
func mstartm0() {
// Create an extra M for callbacks on threads not created by Go.
// An extra M is also needed on Windows for callbacks created by
// syscall.NewCallback. See issue #6751 for details.
if (iscgo || GOOS == "windows") && !cgoHasExtraM {
cgoHasExtraM = true
newextram()
}
initsig(false) // 初始化信号处理
}

// Initialize signals.
// Called by libpreinit so runtime may not be initialized.
//go:nosplit
//go:nowritebarrierrec
func initsig(preinit bool) {
if !preinit {
// It's now OK for signal handlers to run.
signalsOK = true
}

// For c-archive/c-shared this is called by libpreinit with
// preinit == true.
if (isarchive || islibrary) && !preinit {
return
}

for i := uint32(0); i < _NSIG; i++ { // 处理32个信号
t := &sigtable[i]
if t.flags == 0 || t.flags&_SigDefault != 0 {
continue
}

// We don't need to use atomic operations here because
// there shouldn't be any other goroutines running yet.
fwdSig[i] = getsig(i)

if !sigInstallGoHandler(i) {
// Even if we are not installing a signal handler,
// set SA_ONSTACK if necessary.
if fwdSig[i] != _SIG_DFL && fwdSig[i] != _SIG_IGN {
setsigstack(i)
} else if fwdSig[i] == _SIG_IGN {
sigInitIgnored(i)
}
continue
}

handlingSig[i] = 1
setsig(i, funcPC(sighandler)) // 设置信号处理函数
}
}

从上面可以看到 sighandler 是主线程安装的信号处理函数,接收到信号主线程会被内核打断(不管主线程正在跑哪个g,都会被操作系统内核打断)来执行这里,下面看看这个函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149

func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
_g_ := getg()
c := &sigctxt{info, ctxt}

if sig == _SIGPROF {
sigprof(c.sigpc(), c.sigsp(), c.siglr(), gp, _g_.m)
return
}

if sig == _SIGTRAP && testSigtrap != nil && testSigtrap(info, (*sigctxt)(noescape(unsafe.Pointer(c))), gp) {
return
}

if sig == _SIGUSR1 && testSigusr1 != nil && testSigusr1(gp) {
return
}

if sig == sigPreempt { // 抢占信号
// Might be a preemption signal.
doSigPreempt(gp, c) // 处理抢占
// Even if this was definitely a preemption signal, it
// may have been coalesced with another signal, so we
// still let it through to the application.
}

flags := int32(_SigThrow)
if sig < uint32(len(sigtable)) {
flags = sigtable[sig].flags
}
if flags&_SigPanic != 0 && gp.throwsplit {
// We can't safely sigpanic because it may grow the
// stack. Abort in the signal handler instead.
flags = (flags &^ _SigPanic) | _SigThrow
}
if isAbortPC(c.sigpc()) {
// On many architectures, the abort function just
// causes a memory fault. Don't turn that into a panic.
flags = _SigThrow
}
if c.sigcode() != _SI_USER && flags&_SigPanic != 0 {
// The signal is going to cause a panic.
// Arrange the stack so that it looks like the point
// where the signal occurred made a call to the
// function sigpanic. Then set the PC to sigpanic.

// Have to pass arguments out of band since
// augmenting the stack frame would break
// the unwinding code.
gp.sig = sig
gp.sigcode0 = uintptr(c.sigcode())
gp.sigcode1 = uintptr(c.fault())
gp.sigpc = c.sigpc()

c.preparePanic(sig, gp)
return
}

if c.sigcode() == _SI_USER || flags&_SigNotify != 0 {
if sigsend(sig) {
return
}
}

if c.sigcode() == _SI_USER && signal_ignored(sig) {
return
}

if flags&_SigKill != 0 {
dieFromSignal(sig)
}

if flags&_SigThrow == 0 {
return
}

_g_.m.throwing = 1
_g_.m.caughtsig.set(gp)

if crashing == 0 {
startpanic_m()
}

if sig < uint32(len(sigtable)) {
print(sigtable[sig].name, "\n")
} else {
print("Signal ", sig, "\n")
}

print("PC=", hex(c.sigpc()), " m=", _g_.m.id, " sigcode=", c.sigcode(), "\n")
if _g_.m.lockedg != 0 && _g_.m.ncgo > 0 && gp == _g_.m.g0 {
print("signal arrived during cgo execution\n")
gp = _g_.m.lockedg.ptr()
}
print("\n")

level, _, docrash := gotraceback()
if level > 0 {
goroutineheader(gp)
tracebacktrap(c.sigpc(), c.sigsp(), c.siglr(), gp)
if crashing > 0 && gp != _g_.m.curg && _g_.m.curg != nil && readgstatus(_g_.m.curg)&^_Gscan == _Grunning {
// tracebackothers on original m skipped this one; trace it now.
goroutineheader(_g_.m.curg)
traceback(^uintptr(0), ^uintptr(0), 0, _g_.m.curg)
} else if crashing == 0 {
tracebackothers(gp)
print("\n")
}
dumpregs(c)
}

if docrash {
crashing++
if crashing < mcount()-int32(extraMCount) {
// There are other m's that need to dump their stacks.
// Relay SIGQUIT to the next m by sending it to the current process.
// All m's that have already received SIGQUIT have signal masks blocking
// receipt of any signals, so the SIGQUIT will go to an m that hasn't seen it yet.
// When the last m receives the SIGQUIT, it will fall through to the call to
// crash below. Just in case the relaying gets botched, each m involved in
// the relay sleeps for 5 seconds and then does the crash/exit itself.
// In expected operation, the last m has received the SIGQUIT and run
// crash/exit and the process is gone, all long before any of the
// 5-second sleeps have finished.
print("\n-----\n\n")
raiseproc(_SIGQUIT)
usleep(5 * 1000 * 1000)
}
crash()
}

printDebugLog()

exit(2)
}

// doSigPreempt handles a preemption signal on gp.
func doSigPreempt(gp *g, ctxt *sigctxt) {
// Check if this G wants to be preempted and is safe to
// preempt.
if wantAsyncPreempt(gp) && isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()) { // 如果g需要被抢占,且处于异步抢占的安全点
// Inject a call to asyncPreempt.
ctxt.pushCall(funcPC(asyncPreempt)) // 向m0中断前的栈帧(可能是是g0也可能不是)中注入asyncPreempt函数
}

// Acknowledge the preemption.
atomic.Xadd(&gp.m.preemptGen, 1)
atomic.Store(&gp.m.signalPending, 0)
}

主线程在收到信号后,操作系统内核会先保存线程上下文,然后执行处理函数,处理函数返回后会恢复线程的上下文,然后接着执行。asyncPreempt函数被注入到m0的g0栈中,其实就是修改了IP和SP寄存器,导致m0恢复后立马执行注入的函数,然后接着执行。下面看asyncPreempt函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// asyncPreempt saves all user registers and calls asyncPreempt2.
//
// When stack scanning encounters an asyncPreempt frame, it scans that
// frame and its parent frame conservatively.
//
// asyncPreempt is implemented in assembly.
func asyncPreempt() // 是汇编函数。先把所有寄存器中的值放入栈帧,然后调用asyncPreempt2,接着从栈帧恢复所有寄存器的值

//go:nosplit
func asyncPreempt2() {
gp := getg()
gp.asyncSafePoint = true
if gp.preemptStop { // 如果抢占停止
mcall(preemptPark) // 中断抢占
} else {
mcall(gopreempt_m) // 保存g上下文,然后进入g0栈执行gopreempt_m函数
}
gp.asyncSafePoint = false
}

func gopreempt_m(gp *g) {
if trace.enabled {
traceGoPreempt()
}
goschedImpl(gp) // 抢占
}

下面看实际的抢占动作 goschedImpl函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func goschedImpl(gp *g) {
status := readgstatus(gp)
if status&^_Gscan != _Grunning {
dumpgstatus(gp)
throw("bad g status")
}
casgstatus(gp, _Grunning, _Grunnable) // 将g状态有运行中改成可运行
dropg() // 置空curg变量
lock(&sched.lock)
globrunqput(gp) // 将g放到全局g队列的尾部
unlock(&sched.lock)

schedule() // 进入调度
}

到这里我们就知道了,信号处理函数会针对sigPreempt抢占信号作出抢占处理

注意:因为信号只被主线程监听,主线程中断后执行信号处理函数,其他线程不会受任何影响,所以基于信号的异步抢占只能抢占主线程中的g

虽然只能抢占主线程中的g,但是已经够了。在没有基于信号的主动抢占机制之前,运行在单核上的golang程序一旦某个g进入cpu密集的死循环中,其他的g将永远得不到执行机会,有了基于信号的主动抢占机制后,这个问题得到了解决。

那么,哪里发出的抢占信号?

还是上篇文章中讲到的 sysmon

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
func preemptone(_p_ *p) bool {
mp := _p_.m.ptr()
if mp == nil || mp == getg().m {
return false
}
gp := mp.curg
if gp == nil || gp == mp.g0 {
return false
}

gp.preempt = true // 抢占信息设置true,表明自己即将被抢占

// Every call in a go routine checks for stack overflow by
// comparing the current stack pointer to gp->stackguard0.
// Setting gp->stackguard0 to StackPreempt folds
// preemption into the normal stack overflow check.
gp.stackguard0 = stackPreempt // g中的每一个函数调用都会通过当前的栈指针到stackguard0来检查栈溢出(这是编译器通过编译链接实现的),如果检查发现g的栈小了,这里设置成stackPreempt就会导致栈检查失败,就会调用morestack,而morestack会肩负起检查抢占的任务。所以设置为stackPreempt就可以让每次函数调用都能调用morestack,从而执行抢占操作

// Request an async preemption of this P.
if preemptMSupported && debug.asyncpreemptoff == 0 {
_p_.preempt = true
preemptM(mp) // 这里发送异步抢占信号
}

return true
}

func preemptM(mp *m) {
if !pushCallSupported {
// This architecture doesn't support ctxt.pushCall
// yet, so doSigPreempt won't work.
return
}
if GOOS == "darwin" && (GOARCH == "arm" || GOARCH == "arm64") && !iscgo {
// On darwin, we use libc calls, and cgo is required on ARM and ARM64
// so we have TLS set up to save/restore G during C calls. If cgo is
// absent, we cannot save/restore G in TLS, and if a signal is
// received during C execution we cannot get the G. Therefore don't
// send signals.
// This can only happen in the go_bootstrap program (otherwise cgo is
// required).
return
}
if atomic.Cas(&mp.signalPending, 0, 1) { // 如果m没有正在被抢占,则标记为正在被抢占
// If multiple threads are preempting the same M, it may send many
// signals to the same M such that it hardly make progress, causing
// live-lock problem. Apparently this could happen on darwin. See
// issue #37741.
// Only send a signal if there isn't already one pending.
signalM(mp, sigPreempt) // 通过kill系统调用发送信号(选择的是_SIGURG信号,此处用法不是按它本来含义来用的)给主线程
}
}

结束

到这里总结一下Goroutine的抢占。当前1.14版本,抢占具有两种方式:

  1. morestack的被动抢占
  2. 基于信号的异步主动抢占

下篇预告

Defer的实现原理




微信关注我,及时接收最新技术文章