0%

《Golang》Golang锁原理

首先我们都知道,当两个g对同一个变量读写时一定要加锁,为什么?

先看看一个错误案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package main

import "fmt"

func main() {
var a int64 = 0
finish := make(chan bool, 0)
go func() {
for i := 0; i < 10000; i++ {
a++
}
finish <- true
}()
for i := 0; i < 10000; i++ {
a++
}
<- finish
fmt.Println(a)
}

多运行几次,你会发现很多时候输出并不是20000,而是<=20000(思考一下为什么不会>20000)

带着这个疑问,我们先学习其他的内容,学习完后,这个疑问自然就解决了

我们先看看一个常用的函数atomic.AddInt32

atomic.AddInt32

它的声明位于sync/atomic/doc.go

1
func AddInt32(addr *int32, delta int32) (new int32)

很明显,它对应一个汇编函数,找到它,发现在src/sync/atomic/asm.s

1
2
TEXT ·AddInt32(SB),NOSPLIT,$0
JMP runtime∕internal∕atomic·Xadd(SB)

调用了runtime∕internal∕atomic包下的Xadd函数,找到在文件src/runtime/internal/atomic/asm_amd64.s

1
2
3
4
5
6
7
8
9
10
11
12
13
// uint32 Xadd(uint32 volatile *val, int32 delta)
// Atomically:
// *val += delta;
// return *val;
TEXT runtime∕internal∕atomic·Xadd(SB), NOSPLIT, $0-20
MOVQ ptr+0(FP), BX // 第一个参数放到BX,是个指针
MOVL delta+8(FP), AX // 第二个参数放到AX
MOVL AX, CX // 第二个参数又放到CX
LOCK // 锁总线,来自其他处理器或总线代理的总线控制请求将被阻塞,下一行指令执行完自动释放锁(CPU特性)
XADDL AX, 0(BX) // 交换并相加。指针中的值与AX的值交换,然后两者相加,结果放到指针指向的值中
ADDL CX, AX // 指针中原来的值(AX)加上第二个参数(CX),结果放入AX
MOVL AX, ret+16(FP) // 返回AX
RET

在多核CPU中,每个核都有自己的一套寄存器。当两个线程并行运行在两个核上,两个线程对同一个变量读写

如果不加总线锁,他们可能同时执行到同一个汇编指令,比如XADDL AX, 0(BX),那么这里就会发生并发问题

这句汇编命令分解成几步

  1. BX寄存器中是一个指向虚拟内存的地址,第一步是从内存取出这个地址上的值
  2. 上一步取出的值加上AX寄存器中的值并写入BX寄存器指向的内存处
  3. 上一步取出的值放入AX寄存器

上面3步原本在单个核中是原子性的(因为所有汇编指令的执行都是原子性的),但是多个核之间使用各自的寄存器,最后的结果就会有影响

但因为加上了LOCK指令,导致XADDL AX, 0(BX)在处理器之间具有排他性,导致上面3步在多个核之间也不会互相干扰

如果没有LOCK指令的话,当两个处理器同时执行了第一步,他们同时取到了旧值,然后同时做加法,最后两个处理器得到的结果都不正确

举个例子:核1读出数据a(初始值0),核2读出数据a,核2对a加1然后写入内存,核1对a加1也写入内存,本来内存中的值期望结果是2,但是结果成了1

atomic.AddInt64 会干两件事情:

  1. 改变变量的内存中的值
  2. 通知其他核缓存失效(类似于写磁盘的 flush 动作,保证其他核不会读到缓存中的旧值,也就是可见性,Java 中的 volatile 关键字就是保证了可见性)

上面的例子中a++这个操作其实并不是原子性的,对于cpu来讲,存在两个步骤:

  1. 从内存中取出a的值
  2. 加上1然后写回内存
1
2
3
4
5
6
package main

func main() {
a := 1
a++
}

看看上面Go代码的汇编代码就知道了

1
2
3
4
5
6
7
8
main.go:3             0x1056d70               4883ec10                SUBQ $0x10, SP          
main.go:3 0x1056d74 48896c2408 MOVQ BP, 0x8(SP)
main.go:3 0x1056d79 488d6c2408 LEAQ 0x8(SP), BP
main.go:4 0x1056d7e 48c7042401000000 MOVQ $0x1, 0(SP)
main.go:5 0x1056d86 48c7042402000000 MOVQ $0x2, 0(SP) // 这行指令有两个步骤。1是从内存取出0(SP)的值 2是将0x2写入0(SP)内存中
main.go:6 0x1056d8e 488b6c2408 MOVQ 0x8(SP), BP
main.go:6 0x1056d93 4883c410 ADDQ $0x10, SP
main.go:6 0x1056d97 c3 RET

要保证线程安全的话,必须将这两步骤锁定,使其具有原子性。所以Golang中提供了atomic包进行原子操作,使用LOCK总线索保证上面两步的原子性

到这里,一开始的疑问其实已经有了答案(还没有答案的,多看看吧),也可以解释为什么上面的错误案例输出结果永远是<=20000,而不会大于20000了

类似的,atomic包里面其他函数AddInt64、CompareAndSwapInt32等也是这个原理

这里可能有人有疑问,难道单核就不会发生锁的问题了吗?答案是的,看下面例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
"fmt"
"runtime"
"sync"
)


func main() {
runtime.GOMAXPROCS(1)
var lockB sync.Mutex
b := 1
go func() {
lockB.Lock()
b++
lockB.Unlock()
}()

//lockB.RLock()
fmt.Println(b)
//lockB.RUnlock()
}

你觉得上面有竞态问题吗,答案是没有,因为runtime.GOMAXPROCS(1)这一句限制了所有g只会跑在一个p上,也就是只会跑在一个线程上,那么所有g都只会运行在一个核上,那么自然不会有并发问题

使用go run -race运行代码会发现并没有data race

然而,atomic包中函数实现的只是一个单一操作的加锁,那我要锁住一段代码怎么办呢?

下面我们研究一下sync包

sync包

同样先看一个案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main

import (
"fmt"
"sync"
"time"
)

func main() {
var lock sync.Mutex

a := make(chan bool, 0)
go func() {
lock.Lock()
a <- true
for {
fmt.Println(1)
time.Sleep(time.Second)
}
lock.Unlock()
}()
<- a
for {
lock.Lock()
fmt.Println(2)
lock.Unlock()
}
}

猜猜上面的行为是什么

答案是:一直打印1,不会打印2出来

因为新启动的协程占用了锁一直不释放,导致主协程一直抢不到锁而被休眠

要分析其中原理,首先来看Lock函数做了什么。下面贴了标有注释的源代码

src/sync/mutex.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
type Mutex struct {
state int32 // 前29位表示阻塞等待锁的个数,第30位表示该Mutex是否处理饥饿模式,第31位表示是否有m已被唤醒,最后一位表示该Mutex是否已被锁定
sema uint32 // 指向sema根
}

func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { // 原子操作:比较&m.state是否等于0,等于的话&m.state设置为mutexLocked,不等于的话返回false。所以同一个mutex必然只有一个走进if里面
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return // 直接返回
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow() // 其他没抢到锁的g会执行这里
}

func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
// Don't spin in starvation mode, ownership is handed off to waiters
// so we won't be able to acquire the mutex anyway.
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // 如果锁已被抢走且非mutexStarving状态且g可以自旋
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { // 设置为已有g被唤醒
awoke = true
}
runtime_doSpin() // cpu自旋一会儿(就是睡眠)
iter++
old = m.state
continue // 上面这个自旋过程不会超过4次(runtime_canSpin中有限制),等一会儿为的是在这个期间锁被释放后,本g能够立马拿到锁,不用进入无谓的阻塞等待
}
new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
if old&mutexStarving == 0 { // 如果非饥饿模式,则mutex变为已上锁
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 { // 如果锁住了或者处于饥饿模式
new += 1 << mutexWaiterShift // waiter+1
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
if starving && old&mutexLocked != 0 { // 如果有阻塞中的g饥饿了且mutex锁住了,则mutex切换饥饿模式
new |= mutexStarving
}
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken // 按位清空。mutexWoken位为0的话,置为0,否则不变。这里其实多余,mutexWoken位肯定是1,是0的话上面就throw了
}
if atomic.CompareAndSwapInt32(&m.state, old, new) { // 变更mutex到新属性
if old&(mutexLocked|mutexStarving) == 0 { // 变更前没有锁且没处于饥饿模式,就退出(大部分情况下,自旋期间锁就已被释放,这里就会退出,本g就直接抢到锁了,不用进入等待)
break // locked the mutex with CAS
}
// If we were already waiting before, queue at the front of the queue. 执行到这里就表示自旋过程没有抢到锁
queueLifo := waitStartTime != 0 // 表示本g之前有没有阻塞过
if waitStartTime == 0 {
waitStartTime = runtime_nanotime() // 记录开始等待的时间
}
runtime_SemacquireMutex(&m.sema, queueLifo, 1) // g休眠,当被唤醒时继续执行。这里queueLifo为true的话,g会立马被调度
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs // 是否饥饿
old = m.state
if old&mutexStarving != 0 { // 如果处于饥饿模式
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift) // waiter-1
if !starving || old>>mutexWaiterShift == 1 { // 如果自己是最后一个waiter,那么清除 mutexStarving 标记
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
delta -= mutexStarving // 退出饥饿模式
}
atomic.AddInt32(&m.state, delta)
break // 抢锁成功,退出
}
awoke = true // 如果不是饥饿模式,被唤醒后还得继续去抢锁,有可能还是抢不到
iter = 0
} else {
old = m.state
}
}

if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}

再看看Unlock函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}

// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked) // locked标记位置为0
if new != 0 { // 如果结果是0,说明其他标记都是0,不需要做其他操作了
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}

func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 { // 如果Mutex没有处于饥饿模式(有g阻塞时间超过1ms了,mutex就会切换到饥饿模式)
old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
// In starvation mode ownership is directly handed off from unlocking
// goroutine to the next waiter. We are not part of this chain,
// since we did not observe mutexStarving when we unlocked the mutex above.
// So get off the way.
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else { // 饥饿模式下
// Starving mode: handoff mutex ownership to the next waiter, and yield
// our time slice so that the next waiter can start to run immediately.
// Note: mutexLocked is not set, the waiter will set it after wakeup.
// But mutex is still considered locked if mutexStarving is set,
// so new coming goroutines won't acquire it.
runtime_Semrelease(&m.sema, true, 1) // 释放锁,且让拿到锁的处于饥饿状态的g立马在当前m下继续执行
}
}

总结起来以下几点:

  1. 使用 atomic.CompareAndSwapInt32 原子操作使不同m下的g之间不会产生并发问题
  2. 没有抢到锁的g在进入休眠之前会先自旋,就是执行4个轮回,每个轮回让CPU暂停一会儿。自旋期间如果持有锁的g释放了锁(其实这种锁住时间短的情况相当多见),那么这个g就会立马得到锁,而不会进入休眠了,避免不必要的调度开销。如果自旋期间锁并没有被释放,那么g将会被休眠阻塞。
  3. g进入休眠其实就是g从运行队列移除,状态被置为waiting,然后m进入调度。而g被唤醒,则是由释放锁的g来唤醒,原理就是把等待队列中的头部的g(休眠队列是先进先出,跟g运行队列一样)设置为当前p的下一个运行的g,这样的话当前m进行调度后必然回去运行那个g,那个g就可以接着抢锁。
  4. 因为存在上述的自旋抢锁,所以可能会导致进入休眠等待的g一直无法得到锁(一唤醒然后抢锁发现锁又被自旋的g抢走了,然后又进入休眠),所以引入了饥饿模式,一旦有等待队列中的g的等待时间超过阈值,mutex就会进入饥饿模式,饥饿模式中,自旋就会失效,其他任何g都不能通过自旋得到锁,而休眠等待的g被唤醒后会被立马调度到唤醒它的m中执行,解决了一直抢不到锁的问题。

上面代码中有一个g睡眠的函数runtime_SemacquireMutex,还有一个唤醒g的runtime_Semrelease函数,都在src/runtime/sema.go

由于篇幅原因,这里就不贴代码了,做简单的解释

这个文件中实现的是针对g的信号量控制,类似于linux中针对线程的的Semaphore信号量(linux中的锁机制之一),通过发送或释放信号量(一把锁)控制g的休眠与唤醒

信号量有一堆许可证,代表最多可以授予别人多少次许可

sema.acquire(2)就是请求2张许可证,成功的话g就会得到执行机会。如果信号量持有的许可证发放完了,则acquire将不成功,只能等待

而release函数就是归还许可证,如果有某g release了,则其他g又可以acquire到许可证,然后接着执行了,也就是被唤醒了

其原理主要就是一个等待队列,先进去的,先被唤醒

在文件src/runtime/lock_sema.go中,则通过系统调用使用的是真正的Semaphore信号量,是针对m线程的信号量控制,其原理跟上面对g控制的sema机制是一样的

RWMutex

上面的Mutex只是实现了写锁,纯粹的互斥锁,一个g Lock了,另一个g只能等,不管Lock的时候做了什么。

想想上面讲的多核时的并发问题,如果两个核仅仅是同时对变量读,那么不可能有并发问题

如果还是一股脑的加上写锁,很明显会影响效率,看下面代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package main

import (
"fmt"
"sync"
)


func main() {
var lockB sync.Mutex
b := 1
go func() {
lockB.Lock()
fmt.Println(b)
lockB.Unlock()
}()

lockB.Lock()
fmt.Println(b)
lockB.Unlock()
}

都是读,却使用的是写锁,导致g不能利用多核并行执行,如果这种代码多了,势必影响整体效率。所以RWMutex出来了

RWMutex提供了两种锁,读锁以及写锁。RWMutex.Lock是加写锁,RWMutex.RLock则是加读锁,上面代码变成下面代码才是正道

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package main

import (
"fmt"
"sync"
)


func main() {
var lockB sync.RWMutex
b := 1
go func() {
lockB.RLock()
fmt.Println(b)
lockB.RUnlock()
}()

lockB.RLock()
fmt.Println(b)
lockB.RUnlock()
}

这样的话两个g可以在多核上并行执行,效率能得到提升,这里有人可能又有疑问

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package main

import (
"fmt"
)


func main() {
b := 1
go func() {
fmt.Println(b)
}()

fmt.Println(b)
}

我使用下面代码不是更好吗,既然都是读,本来就不会有并发问题,那就没必要加锁了呀

在上面案例中,确实是这样的,没必要加锁,但是如果是下面的案例呢

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main

import (
"fmt"
"sync"
)


func main() {
var lockB sync.RWMutex
b := 1
go func() {
lockB.RLock()
fmt.Println(b)
lockB.RUnlock()
}()

go func() {
lockB.Lock()
b++
lockB.Unlock()
}()

lockB.RLock()
fmt.Println(b)
lockB.RUnlock()
}

RWMutex读锁的必要性就体现出来了

这里我又想提一个问题:如果我在RLock之后干了写的事情,会发生什么?

答案是视情况而定,但最好别这么做

下面代码是没有问题的。因为总共只有两个地方上锁,一个上读锁一个上写锁,读锁和写锁本来就是互斥的,这两个g就不可能并行,自然就没有并发问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import (
"fmt"
"sync"
)


func main() {
var lockB sync.RWMutex
b := 1
go func() {
lockB.Lock()
b++
lockB.Unlock()
}()

lockB.RLock()
b++
fmt.Println(b)
lockB.RUnlock()
}

但是下面的代码就有竞态问题。两个地方都是加的读锁,两个g会并行执行,一个读锁里面出现了写操作,当然就有并发问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import (
"fmt"
"sync"
)


func main() {
var lockB sync.RWMutex
b := 1
go func() {
lockB.RLock()
fmt.Println(b)
lockB.RUnlock()
}()

lockB.RLock()
b++
fmt.Println(b)
lockB.RUnlock()
}

如果获取写锁的g已经休眠的情况下,其他的g再获取读锁则会进入休眠。因为我先获取写锁的,凭什么让你获取到读锁然后执行

看下面例子,所有g会休眠导致死锁panic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main
import (
"fmt"
"sync"
"time"
)

func main() {
var mu sync.RWMutex
var count int
go func() {
mu.RLock()
defer mu.RUnlock()
time.Sleep(2 * time.Second)
mu.RLock() // 获取读锁进入休眠
defer mu.RUnlock()
}()
time.Sleep(time.Second)
mu.Lock() // 获取写锁进入休眠
defer mu.Unlock()
count++
fmt.Println(count)
}

为什么写锁被占用了之后,读锁就不让获取了呢,甚至在 atomic 包中也是这样?

下面的两段代码都会发生问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package main

import (
"fmt"
"sync/atomic"
)

func main() {
var i int64 = 1
go func() {
atomic.AddInt64(&i, 1)
}()
fmt.Println(i)
//fmt.Println(atomic.LoadInt64(&i)) // 这样就不会有问题
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package main

import (
"fmt"
"sync"
)

func main() {
var i int64 = 1
var lock sync.Mutex
go func() {
lock.Lock()
i++
lock.Unlock()
}()
fmt.Println(i)
}

简单解释就是:我还没写完,你就给读去了,你读到的可能是错误的只有一半的数据( i++ 这种没有问题,不会影响结果,但是如果写一个很大的 struct 变量呢)

WaitGroup

WaitGroup需要注意的一点是:Wait完成到结束期间不允许再Add。看下面例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package main
import (
"sync"
"time"
)
func main() {
var wg sync.WaitGroup
wg.Add(1)
go func() {
time.Sleep(time.Millisecond)

wg.Done()
//time.Sleep(1000 * time.Millisecond) // 这里注释打开就不会报错
wg.Add(1)
}()
wg.Wait() // 这里会报错 WaitGroup is reused before previous Wait has returned
}

惊群问题

惊群问题是指多个进程同时阻塞等待同一个事件的发生,当这个事件发生时,所有的线程都被唤醒,而只有一个线程能得到正确的数据,其他线程得到错误后又进入阻塞监听

惊群问题会导致资源的浪费

典型的案例是在epoll中,大量的线程使用epollwait向操作系统获取发生的事件,如果是阻塞等待,那么就会发生惊群问题

解决办法就是在epollwait前加上互斥锁,防止多个线程同时epollwait

Golang运行时中没有这个问题或者说无需担心这个问题,原因有二:

  1. Golang中线程本来就不会很多
  2. Golang中epollwait是异步等待

易错题

1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package main
import (
"fmt"
"sync"
)
type MyMutex struct {
count int
sync.Mutex
}
func main() {
var mu MyMutex
mu.Lock()
var mu2 = mu // 这里是值拷贝,Mutex的状态同样会被拷贝,此时是Lock的状态
mu.count++
mu.Unlock()
mu2.Lock() // 这里再次Lock,造成死锁
mu2.count++
mu2.Unlock()
fmt.Println(mu.count, mu2.count)
}



微信关注我,及时接收最新技术文章