0%

《Golang》深入Goroutine的调度

在 《深入Golang启动过程》 一问中讲了Golang程序的启动过程,m0最终进入了调度

下面开始分析调度函数 schedule

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {
_g_ := getg() // 获取当前g

if _g_.m.locks != 0 {
throw("schedule: holding locks")
}

if _g_.m.lockedg != 0 { // 如果当前m线程被lock(g可以通过LockOSThread函数将自己强行绑定在这个m上,而且不让其他g在这个m上运行,直到g运行UnLock函数)
stoplockedm() // 将当前m中的p移交给其他的m运行
execute(_g_.m.lockedg.ptr(), false) // Never returns. 使用当前m运行要求独占的g,阻塞
}

// We should not schedule away from a g that is executing a cgo call,
// since the cgo call is using the m's g0 stack.
if _g_.m.incgo { // 当前g正在进行cgo调用,不允许调度走
throw("schedule: in cgo")
}

top:
pp := _g_.m.p.ptr()
pp.preempt = false

if sched.gcwaiting != 0 {
gcstopm() // 停止当前的m,阻塞直到stw重启
goto top
}
if pp.runSafePointFn != 0 {
runSafePointFn()
}

// Sanity check: if we are spinning, the run queue should be empty.
// Check this before calling checkTimers, as that might call
// goready to put a ready goroutine on the local run queue.
if _g_.m.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) {
throw("schedule: spinning with local work")
}

checkTimers(pp, 0) // 检查所有定时器,并且运行能运行的

var gp *g
var inheritTime bool

// Normal goroutines will check for need to wakeP in ready,
// but GCworkers and tracereaders will not, so the check must
// be done here instead.
tryWakeP := false
if trace.enabled || trace.shutdown {
gp = traceReader()
if gp != nil {
casgstatus(gp, _Gwaiting, _Grunnable)
traceGoUnpark(gp, 0)
tryWakeP = true
}
}
if gp == nil && gcBlackenEnabled != 0 {
gp = gcController.findRunnableGCWorker(_g_.m.p.ptr()) // 找到p下面唯一的一个执行标记任务的g,没有就返回nil。这里就实现了多个p的并发标记
tryWakeP = tryWakeP || gp != nil
}
if gp == nil {
// Check the global runnable queue once in a while to ensure fairness.
// Otherwise two goroutines can completely occupy the local runqueue
// by constantly respawning each other.
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 { // 如果全局g队列中有g而且p调度了61的倍数次,避免全局队列中的g被饿死
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1) // 从全局g队列拿出一个到当前p,并pop一个g出来
unlock(&sched.lock)
}
}
if gp == nil {
gp, inheritTime = runqget(_g_.m.p.ptr()) // 从当前p中取出一个g(p.nextg存在的话优先取它,runtime.main函数前面被指定为了nextg,不存在就从队列中取)
// We can see gp != nil here even if the M is spinning,
// if checkTimers added a local goroutine via goready.
}
if gp == nil { // 如果当前p也没有可运行的g,就会走进去
gp, inheritTime = findrunnable() // blocks until work is available 阻塞在这里,要么自旋要么进入休眠,直到有g可用
// 1. 本地队列取
// 2. 全局队列取
// 3. netpoll中取
// 4. 从其他p偷取
// 5. 重复1
}

// This thread is going to run a goroutine and is not spinning anymore,
// so if it was marked as spinning we need to reset it now and potentially
// start a new spinning M.
if _g_.m.spinning {
resetspinning()
}

if sched.disable.user && !schedEnabled(gp) {
// Scheduling of this goroutine is disabled. Put it on
// the list of pending runnable goroutines for when we
// re-enable user scheduling and look again.
lock(&sched.lock)
if schedEnabled(gp) {
// Something re-enabled scheduling while we
// were acquiring the lock.
unlock(&sched.lock)
} else {
sched.disable.runnable.pushBack(gp)
sched.disable.n++
unlock(&sched.lock)
goto top
}
}

// If about to schedule a not-normal goroutine (a GCworker or tracereader),
// wake a P if there is one.
if tryWakeP {
if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 {
wakep()
}
}
if gp.lockedm != 0 {
// Hands off own p to the locked m,
// then blocks waiting for a new p.
startlockedm(gp)
goto top
}

execute(gp, inheritTime) // 运行选出来的g,运行完了又会调用schedule函数进入调度
}

// Schedules gp to run on the current M.
// If inheritTime is true, gp inherits the remaining time in the
// current time slice. Otherwise, it starts a new time slice.
// Never returns.
//
// Write barriers are allowed because this is called immediately after
// acquiring a P in several places.
//
//go:yeswritebarrierrec
func execute(gp *g, inheritTime bool) {
_g_ := getg()

// Assign gp.m before entering _Grunning so running Gs have an
// M.
_g_.m.curg = gp
gp.m = _g_.m
casgstatus(gp, _Grunnable, _Grunning) // 将g改成running状态
gp.waitsince = 0
gp.preempt = false
gp.stackguard0 = gp.stack.lo + _StackGuard
if !inheritTime {
_g_.m.p.ptr().schedtick++
}

// Check whether the profiler needs to be turned on or off.
hz := sched.profilehz
if _g_.m.profilehz != hz {
setThreadCPUProfiler(hz)
}

if trace.enabled {
// GoSysExit has to happen when we have a P, but before GoStart.
// So we emit it here.
if gp.syscallsp != 0 && gp.sysblocktraced {
traceGoSysExit(gp.sysexitticks)
}
traceGoStart()
}

gogo(&gp.sched) // 恢复g的上下文,进入g的执行栈(跳到g.gobuf.pc处)执行g,执行完后会执行goexit汇编函数,goexit中执行goexit1,接着进入调度
}

schedule函数会被m一直运行,m找到g后运行g,g执行完了又会调schedule函数,其中 findrunnable 函数会让m进入自旋或者休眠

下面看 findrunnable函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from local or global queue, poll network.
func findrunnable() (gp *g, inheritTime bool) {
_g_ := getg()

// The conditions here and in handoffp must agree: if
// findrunnable would return a G to run, handoffp must start
// an M.

top:
_p_ := _g_.m.p.ptr()
if sched.gcwaiting != 0 {
gcstopm()
goto top
}
if _p_.runSafePointFn != 0 {
runSafePointFn()
}

now, pollUntil, _ := checkTimers(_p_, 0)

if fingwait && fingwake {
if gp := wakefing(); gp != nil {
ready(gp, 0, true)
}
}
if *cgo_yield != nil {
asmcgocall(*cgo_yield, nil)
}

// local runq
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}

// global runq
if sched.runqsize != 0 {
lock(&sched.lock)
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false
}
}

// Poll network.
// This netpoll is only an optimization before we resort to stealing.
// We can safely skip it if there are no waiters or a thread is blocked
// in netpoll already. If there is any kind of logical race with that
// blocked thread (e.g. it has already returned from netpoll, but does
// not set lastpoll yet), this thread will do blocking netpoll below
// anyway.
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
if list := netpoll(0); !list.empty() { // non-blocking
gp := list.pop()
injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
}

// Steal work from other P's.
procs := uint32(gomaxprocs)
ranTimer := false
// If number of spinning M's >= number of busy P's, block.
// This is necessary to prevent excessive CPU consumption
// when GOMAXPROCS>>1 but the program parallelism is low.
if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
goto stop
}
if !_g_.m.spinning {
_g_.m.spinning = true // 将自己标记为自旋状态
atomic.Xadd(&sched.nmspinning, 1)
}
for i := 0; i < 4; i++ { // 偷取4次,后面两次连nextG也偷
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
if sched.gcwaiting != 0 {
goto top
}
stealRunNextG := i > 2 // first look for ready queues with more than 1 g
p2 := allp[enum.position()] // 随机找到了一个p
if _p_ == p2 { // 找到了自己
continue
}
if gp := runqsteal(_p_, p2, stealRunNextG); gp != nil { // 偷取一个p
return gp, false
}

// Consider stealing timers from p2.
// This call to checkTimers is the only place where
// we hold a lock on a different P's timers.
// Lock contention can be a problem here, so avoid
// grabbing the lock if p2 is running and not marked
// for preemption. If p2 is running and not being
// preempted we assume it will handle its own timers.
if i > 2 && shouldStealTimers(p2) { // 如果可以的话,也偷取定时器
tnow, w, ran := checkTimers(p2, now)
now = tnow
if w != 0 && (pollUntil == 0 || w < pollUntil) {
pollUntil = w
}
if ran {
// Running the timers may have
// made an arbitrary number of G's
// ready and added them to this P's
// local run queue. That invalidates
// the assumption of runqsteal
// that is always has room to add
// stolen G's. So check now if there
// is a local G to run.
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}
ranTimer = true
}
}
}
}
if ranTimer { // 偷到了定时器
// Running a timer may have made some goroutine ready.
goto top
}

stop:

// We have nothing to do. If we're in the GC mark phase, can
// safely scan and blacken objects, and have work to do, run
// idle-time marking rather than give up the P.
if gcBlackenEnabled != 0 && _p_.gcBgMarkWorker != 0 && gcMarkWorkAvailable(_p_) {
_p_.gcMarkWorkerMode = gcMarkWorkerIdleMode
gp := _p_.gcBgMarkWorker.ptr()
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}

delta := int64(-1)
if pollUntil != 0 {
// checkTimers ensures that polluntil > now.
delta = pollUntil - now
}

// wasm only:
// If a callback returned and no other goroutine is awake,
// then pause execution until a callback was triggered.
if beforeIdle(delta) {
// At least one goroutine got woken.
goto top
}

// Before we drop our P, make a snapshot of the allp slice,
// which can change underfoot once we no longer block
// safe-points. We don't need to snapshot the contents because
// everything up to cap(allp) is immutable.
allpSnapshot := allp

// return P and block
lock(&sched.lock)
if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 {
unlock(&sched.lock)
goto top
}
if sched.runqsize != 0 {
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
return gp, false
}
if releasep() != _p_ { // 当前m释放当前p
throw("findrunnable: wrong p")
}
pidleput(_p_)
unlock(&sched.lock)

// Delicate dance: thread transitions from spinning to non-spinning state,
// potentially concurrently with submission of new goroutines. We must
// drop nmspinning first and then check all per-P queues again (with
// #StoreLoad memory barrier in between). If we do it the other way around,
// another thread can submit a goroutine after we've checked all run queues
// but before we drop nmspinning; as the result nobody will unpark a thread
// to run the goroutine.
// If we discover new work below, we need to restore m.spinning as a signal
// for resetspinning to unpark a new worker thread (because there can be more
// than one starving goroutine). However, if after discovering new work
// we also observe no idle Ps, it is OK to just park the current thread:
// the system is fully loaded so no spinning threads are required.
// Also see "Worker thread parking/unparking" comment at the top of the file.
wasSpinning := _g_.m.spinning
if _g_.m.spinning {
_g_.m.spinning = false // 到这里m就不再找工作了,取消自旋状态,后面进入空闲状态
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("findrunnable: negative nmspinning")
}
}

// check all runqueues once again
for _, _p_ := range allpSnapshot { // 再次查找每个p是否g队列都空了
if !runqempty(_p_) { // 发现有g
lock(&sched.lock)
_p_ = pidleget()
unlock(&sched.lock)
if _p_ != nil {
acquirep(_p_) // 将有g的p挂钩到当前m运行
if wasSpinning {
_g_.m.spinning = true // 又开始找工作了
atomic.Xadd(&sched.nmspinning, 1)
}
goto top // 返回去
}
break
}
}

// Check for idle-priority GC work again.
if gcBlackenEnabled != 0 && gcMarkWorkAvailable(nil) {
lock(&sched.lock)
_p_ = pidleget()
if _p_ != nil && _p_.gcBgMarkWorker == 0 {
pidleput(_p_)
_p_ = nil
}
unlock(&sched.lock)
if _p_ != nil {
acquirep(_p_)
if wasSpinning {
_g_.m.spinning = true // 又开始找工作了
atomic.Xadd(&sched.nmspinning, 1)
}
// Go back to idle GC check.
goto stop
}
}

// poll network
if netpollinited() && (atomic.Load(&netpollWaiters) > 0 || pollUntil != 0) && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
atomic.Store64(&sched.pollUntil, uint64(pollUntil))
if _g_.m.p != 0 {
throw("findrunnable: netpoll with p")
}
if _g_.m.spinning {
throw("findrunnable: netpoll with spinning")
}
if faketime != 0 {
// When using fake time, just poll.
delta = 0
}
list := netpoll(delta) // block until new work is available
atomic.Store64(&sched.pollUntil, 0)
atomic.Store64(&sched.lastpoll, uint64(nanotime()))
if faketime != 0 && list.empty() {
// Using fake time and nothing is ready; stop M.
// When all M's stop, checkdead will call timejump.
stopm()
goto top
}
lock(&sched.lock)
_p_ = pidleget()
unlock(&sched.lock)
if _p_ == nil {
injectglist(&list)
} else {
acquirep(_p_)
if !list.empty() {
gp := list.pop()
injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
if wasSpinning {
_g_.m.spinning = true // 又开始找工作了
atomic.Xadd(&sched.nmspinning, 1)
}
goto top
}
} else if pollUntil != 0 && netpollinited() {
pollerPollUntil := int64(atomic.Load64(&sched.pollUntil))
if pollerPollUntil == 0 || pollerPollUntil > pollUntil {
netpollBreak()
}
}
stopm() // 停止m(设置一些状态,将m放入空闲m列表中),会阻塞,等待唤醒
goto top // 被唤醒后又进入查找g的循环里
}

使用go关键字新建一个协程的时候,会判断如果没有任何处于自旋的m,那么就去唤醒一个m,如果没有可唤醒的,就新建并启动一个m

g0的执行栈就是m的执行栈,每个g都有自己的执行栈,g之间的切换都需要上下文的保存与恢复,mcall函数负责保存g上下文进入g0的执行栈,gogo函数负责恢复g的上下文进入g的执行栈

m找到一个g后就会立马运行这个g,假如这个g是一个不会终止的task,而下一次调度函数的调用在goexit中,g不终止那调度就永远都不会发生,此m的p上的所有其他g都没有运行的机会?

答案是否定的,涉及到Goroutine的抢占,下篇揭晓

MPG的关系

G切换时机

这里先透露一下G的切换时机(不保证包含所有),其实就是进入调度的时机

  1. runtime.Gosched()主动进行调度会触发切换
  2. sysmon会监控如果g连续执行超过10ms,则会被抢占(被动抢占主动强占同时进行)
  3. 定时器也会触发重新调度,因为要保证g在某个时间点得到执行,必然会抢走p
  4. sync.Mutex锁也会造成g休眠唤醒,也有g的切换
  5. 垃圾回收时STW也会造成g切换
  6. 每次进入系统调用前都会将p迁移到新的m(这个p当然不能傻傻的等m退出系统调用,如果p下面还有很多g怎么办),退出系统调用后,当前m会进入g0调度
  7. runtime.Goexit()使自己退出后,也会进入调度
  8. 每个新的m启动后会进入调度

下篇预告

Goroutine的抢占




微信关注我,及时接收最新技术文章