0%

《Golang》Channel 原理

要分析 channel 原理,首先看看 channel 相关操作会被编译器如何处理,编译下面代码

1
2
3
4
5
6
7
8
package main

func main() {
a := make(chan bool, 2)
a <- true
close(a)
<- a
}

然后看对应的汇编代码,如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
TEXT main.main(SB) /Users/joy/Work/backend/golang/golang-test/bin/main/main.go
main.go:3 0x1056d70 65488b0c2530000000 MOVQ GS:0x30, CX
main.go:3 0x1056d79 483b6110 CMPQ 0x10(CX), SP
main.go:3 0x1056d7d 7675 JBE 0x1056df4
main.go:3 0x1056d7f 4883ec28 SUBQ $0x28, SP
main.go:3 0x1056d83 48896c2420 MOVQ BP, 0x20(SP)
main.go:3 0x1056d88 488d6c2420 LEAQ 0x20(SP), BP
main.go:4 0x1056d8d 488d056c890000 LEAQ runtime.types+35072(SB), AX
main.go:4 0x1056d94 48890424 MOVQ AX, 0(SP)
main.go:4 0x1056d98 48c744240802000000 MOVQ $0x2, 0x8(SP)
main.go:4 0x1056da1 e8bac8faff CALL runtime.makechan(SB)
main.go:4 0x1056da6 488b442410 MOVQ 0x10(SP), AX
main.go:4 0x1056dab 4889442418 MOVQ AX, 0x18(SP)
main.go:5 0x1056db0 48890424 MOVQ AX, 0(SP)
main.go:5 0x1056db4 488d05f3c60200 LEAQ runtime.egcdata(SB), AX
main.go:5 0x1056dbb 4889442408 MOVQ AX, 0x8(SP)
main.go:5 0x1056dc0 e8cbcafaff CALL runtime.chansend1(SB)
main.go:6 0x1056dc5 488b442418 MOVQ 0x18(SP), AX
main.go:6 0x1056dca 48890424 MOVQ AX, 0(SP)
main.go:6 0x1056dce e85dd3faff CALL runtime.closechan(SB)
main.go:7 0x1056dd3 488b442418 MOVQ 0x18(SP), AX
main.go:7 0x1056dd8 48890424 MOVQ AX, 0(SP)
main.go:7 0x1056ddc 48c744240800000000 MOVQ $0x0, 0x8(SP)
main.go:7 0x1056de5 e8c6d5faff CALL runtime.chanrecv1(SB)
main.go:8 0x1056dea 488b6c2420 MOVQ 0x20(SP), BP
main.go:8 0x1056def 4883c428 ADDQ $0x28, SP
main.go:8 0x1056df3 c3 RET
main.go:3 0x1056df4 e8f79cffff CALL runtime.morestack_noctxt(SB)
main.go:3 0x1056df9 e972ffffff JMP main.main(SB)

可以看到

  • a := make(chan bool, 2) 被翻译成了 runtime.makechan
  • a <- true 被翻译成了 runtime.chansend1
  • close(a) 被翻译成了 runtime.closechan
  • <- a 被翻译成了 runtime.chanrecv1

下面我们一次分析上面的四个方法

开始前先看一下 channel 的结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type hchan struct {
qcount uint // total data in the queue 存放 chan 中已有元素的数量
dataqsiz uint // size of the circular queue 记录 chan 的容量,就是 make 时设定的值
buf unsafe.Pointer // points to an array of dataqsiz elements 指向元素队列的开始位置
elemsize uint16 // 记录元素的大小
closed uint32 // 记录 chan 是否已被关闭
elemtype *_type // element type 记录元素的类型
sendx uint // send index 记录下一个要发送的数据的存放位置
recvx uint // receive index 记录下一个要接收元素的位置
recvq waitq // list of recv waiters 正在阻塞接收的 g 队列(双向链表)
sendq waitq // list of send waiters 正在阻塞发送的 g 队列(双向链表)

// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex // 线程锁
}

接着看 runtime.makechan 做了什么

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
func makechan(t *chantype, size int) *hchan {  // make(chan *Test, 0) 会被编译器翻译成这个函数
elem := t.elem

// compiler checks this but be safe.
if elem.size >= 1<<16 { // 如果 chan 元素类型占用内存大小 >64k ,则报错
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}

mem, overflow := math.MulUintptr(elem.size, uintptr(size)) // 得到 chan 存储数据需要的内存大小
if overflow || mem > maxAlloc-hchanSize || size < 0 { // 如果上面计算溢出了,或者太大,或者给的 size 小于 0 ,则报错
panic(plainError("makechan: size out of range"))
}

// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0: // 如果 chan 大小给的是 0
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true)) // 从内存分配器申请一个 hchan 大小的内存,转化为 hchan 类型
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0: // 如果 chan 中元素类型不是指针类型
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) // 向堆申请内存
c.buf = add(unsafe.Pointer(c), hchanSize) // chan 后面放的就是 chan 中的数据,这里指向了数据的开始位置,也就是第一个元素的位置
default: // 如果是指针类型
// Elements contain pointers.
c = new(hchan) // new 方法会被编译器翻译成 runtime.newobject 方法,数据会放在堆中
c.buf = mallocgc(mem, elem, true) // 申请剩余内存
}

c.elemsize = uint16(elem.size) // 设置每个元素占用空间大小
c.elemtype = elem // 设置元素类型
c.dataqsiz = uint(size) // 设置 chan 能装的数据量的大小

if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}

然后看看发送数据到 chan 中 runtime.chansend1 发生了什么

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}

if debugChan {
print("chansend: chan=", c, "\n")
}

if raceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}

// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not closed, we observe that the channel is
// not ready for sending. Each of these observations is a single word-sized read
// (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
// Because a closed channel cannot transition from 'ready for sending' to
// 'not ready for sending', even if the channel is closed between the two observations,
// they imply a moment between the two when the channel was both not yet closed
// and not ready for sending. We behave as if we observed the channel at that moment,
// and report that the send cannot proceed.
//
// It is okay if the reads are reordered here: if we observe that the channel is not
// ready for sending and then observe that it is not closed, that implies that the
// channel wasn't closed during the first observation.
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}

var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}

lock(&c.lock) // m 锁

if c.closed != 0 { // 如果 chan 已经 close ,往里发送数据就会 panic
unlock(&c.lock)
panic(plainError("send on closed channel"))
}

if sg := c.recvq.dequeue(); sg != nil { // 取出一个正在等待这个 chan 数据的 g ,有的话直接发送数据给它(就是数据复制给 sg.elem(sg.elem 存的是 <- 的返回值的地址)然后唤醒它)
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}

if c.qcount < c.dataqsiz { // 如果 chan 中已有数据数量小于 chan 容量
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx) // 找到队列尾部
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep) // 插入数据到尾部
c.sendx++ // 更下下一个要发送的数据的存放位置
if c.sendx == c.dataqsiz { // 如果这个元素刚好让 chan 装满
c.sendx = 0
}
c.qcount++ // 已有数据加 1
unlock(&c.lock)
return true
}

if !block {
unlock(&c.lock)
return false
}

// Block on the channel. Some receiver will complete our operation for us.
gp := getg() // 如果已有数据的数量 >= 容量
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep // 数据赋值给 mysg.elem
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg) // 当前 g 放入发送等待队列
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) // 休眠当前 g
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)

// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if gp.param == nil { // gp.param 在唤醒时会被赋值
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
}

接着看关闭 channel runtime.closechan 发生了什么

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}

lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}

if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
racerelease(c.raceaddr())
}

c.closed = 1 // 更新 chan 的关闭状态

var glist gList

// release all readers
for {
sg := c.recvq.dequeue() // 取出正在等待的接收者
if sg == nil {
break
}
if sg.elem != nil { // 如果 <- 的返回值位置有值了,清除它,导致返回值变成零值
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp) // 放到临时列表中
}

// release all writers (they will panic)
for {
sg := c.sendq.dequeue() // 取出正在等待的发送者
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)

// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() { // 唤醒所有的正在等待发送或正在等待接收的 g
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}

最后看从 channel 接收数据 runtime.chanrecv1 发生了什么

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}

//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}

// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: don't need to check ep, as it is always on the stack
// or is new memory allocated by reflect.

if debugChan {
print("chanrecv: chan=", c, "\n")
}

if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}

// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not ready for receiving, we observe that the
// channel is not closed. Each of these observations is a single word-sized read
// (first c.sendq.first or c.qcount, and second c.closed).
// Because a channel cannot be reopened, the later observation of the channel
// being not closed implies that it was also not closed at the moment of the
// first observation. We behave as if we observed the channel at that moment
// and report that the receive cannot proceed.
//
// The order of operations is important here: reversing the operations can lead to
// incorrect behavior when racing with a close.
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}

var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}

lock(&c.lock)

if c.closed != 0 && c.qcount == 0 { // 如果 chan 已经被关闭且 chan 中没有元素
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil { // 如果 ep 指向的位置有值了,也把它清空(清空了以后就是零值,<-的返回值就是零值,这就是为什么 chan close 了还能取出零值的原因)
typedmemclr(c.elemtype, ep)
}
return true, false
}

if sg := c.sendq.dequeue(); sg != nil { // 如果有发送者正在等待发送
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3) // 则直接接收(就是把发送者发送的元素 sg.elem 复制给 ep 指向的位置(就是 <- 的返回值),然后唤醒这个发送者)
return true, true
}

if c.qcount > 0 { // 如果 chan 中存在还没有被接收的数据
// Receive directly from queue
qp := chanbuf(c, c.recvx) // 找到要接收的元素的位置
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp) // 把要接收的元素复制到 ep 指向的位置
}
typedmemclr(c.elemtype, qp) // 清除掉刚被接收掉的元素
c.recvx++ // 更新下一个要接收元素的位置
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}

if !block {
unlock(&c.lock)
return false, false
}

// no sender available: block on this channel.
gp := getg() // 如果 chan 中没有数据可以接收
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep // <-运算的返回值的地址放入 sg.elem ,发送者会往这里写数据
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg) // 加入接收者等待队列
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) // 休眠 g

// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}

源码解析完了,我们总结一下:

  1. 因为 channel 是用于协程之间数据传输,那么必然需要做到协程并发安全,所以可以看到 channel 结构体中有一个 mutex 线程锁,而且发送数据以及接收数据时都使用了这个锁
  2. channel 中的元素类型占用内存大小不能超过最大值 64k
  3. channel 的容量(就是 make 的第二个参数)也有最大限制
  4. channel 整个结构体都是分配在堆中的
  5. channel 关闭后,向其放入数据会 panic ,从其接收数据可以接收到对应类型的零值
  6. channel 中堆积的数据量如果等于容量了,下次放入会造成 g 休眠阻塞。channel 没有堆积数据时,向其接收数据会造成 g 休眠阻塞
  7. 阻塞的发送者会被接收者唤醒(谁先等待谁就先被唤醒,先进先出队列),阻塞的接收者会被发送者唤醒,channel 关闭时所有阻塞的发送者以及接收者都会被唤醒
  8. 如果 chan 中途被置为 nil 了,那么发送和接收数据时都将会休眠阻塞,关闭时会 panic
  9. channel 实际是 struct 值类型,只是被做的像指针类型,跟 Slice 和 Map 一样。它本身就被编译器视为地址,指向 channel 结构第一个元素,即已有元素个数



微信关注我,及时接收最新技术文章