0%

《NSQ消息队列》Nsqd源码解析1

启动

下面是nsqd启动过程

  1. 解析命令行参数
  2. 加载上次关闭后持久化到磁盘的topic、channel,并且创建他们
  3. 新协程启动TCP服务器
  4. 新协程启动HTTP服务器
  5. 新协程处理DerferdQueue延迟队列以及InFlightQueue超时队列
  6. 新协程处理lookupd相关逻辑

apps/nsqd/main.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
func (p *program) Start() error {
opts := nsqd.NewOptions()

flagSet := nsqdFlagSet(opts)
flagSet.Parse(os.Args[1:])

rand.Seed(time.Now().UTC().UnixNano())

if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) {
fmt.Println(version.String("nsqd"))
os.Exit(0)
}

var cfg config
configFile := flagSet.Lookup("config").Value.String()
if configFile != "" {
_, err := toml.DecodeFile(configFile, &cfg)
if err != nil {
logFatal("failed to load config file %s - %s", configFile, err)
}
}
cfg.Validate()

options.Resolve(opts, flagSet, cfg) // 解析命令行参数
nsqd, err := nsqd.New(opts)
if err != nil {
logFatal("failed to instantiate nsqd - %s", err)
}
p.nsqd = nsqd

err = p.nsqd.LoadMetadata() // 加载持久化的topic、channel等信息 {"topics":[{"channels":[],"name":"test","paused":false}],"version":"1.2.0"}
if err != nil {
logFatal("failed to load metadata - %s", err)
}
err = p.nsqd.PersistMetadata()
if err != nil {
logFatal("failed to persist metadata - %s", err)
}

go func() {
err := p.nsqd.Main() // 启动主逻辑
if err != nil {
p.Stop()
os.Exit(1)
}
}()

return nil
}

nsqd主逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (n *NSQD) Main() error {
ctx := &context{n}

exitCh := make(chan error)
var once sync.Once
exitFunc := func(err error) {
once.Do(func() {
if err != nil {
n.logf(LOG_FATAL, "%s", err)
}
exitCh <- err
})
}

n.tcpServer.ctx = ctx
n.waitGroup.Wrap(func() {
exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf)) // 启动TCP server
})

httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
n.waitGroup.Wrap(func() {
exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf)) // 启动HTTP server
})

if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
httpsServer := newHTTPServer(ctx, true, true)
n.waitGroup.Wrap(func() {
exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf))
})
}

n.waitGroup.Wrap(n.queueScanLoop) // 使用协程处理DerferdQueue延迟队列以及InFlightQueue超时队列,算法跟Redis过期数据清理算法类似,每轮只取所有topic的channel中的1/4个channel进行协程单独处理
n.waitGroup.Wrap(n.lookupLoop) // 开启lookup相关的loop处理
if n.getOpts().StatsdAddress != "" {
n.waitGroup.Wrap(n.statsdLoop)
}

err := <-exitCh
return err
}

TCP服务器

下面是每个连接的消息处理循环,每来一个TCP连接都会执行这个函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
func (p *protocolV2) IOLoop(conn net.Conn) error {
var err error
var line []byte
var zeroTime time.Time

clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1)
client := newClientV2(clientID, conn, p.ctx) // 创建一个client管理这个TCP连接
p.ctx.nsqd.AddClient(client.ID, client) // 保存client

// synchronize the startup of messagePump in order
// to guarantee that it gets a chance to initialize
// goroutine local state derived from client attributes
// and avoid a potential race with IDENTIFY (where a client
// could have changed or disabled said attributes)
messagePumpStartedChan := make(chan bool)
go p.messagePump(client, messagePumpStartedChan) // 开启 消息处理协程 处理消息
<-messagePumpStartedChan // 等待上面协程通知是否可以继续往下走

for {
if client.HeartbeatInterval > 0 {
client.SetReadDeadline(time.Now().Add(client.HeartbeatInterval * 2)) // 设置tcp连接的读超时
} else {
client.SetReadDeadline(zeroTime) // 没有通过nsqd的ClientTimeout选项设置心跳间隔的话,读超时设置成永不超时
}

// ReadSlice does not allocate new space for the data each request
// ie. the returned slice is only valid until the next call to it
line, err = client.Reader.ReadSlice('\n') // 读出一行
if err != nil {
if err == io.EOF {
err = nil
} else {
err = fmt.Errorf("failed to read command - %s", err)
}
break
}

// trim the '\n'
line = line[:len(line)-1]
// optionally trim the '\r'
if len(line) > 0 && line[len(line)-1] == '\r' { // 如果有\r就去掉它
line = line[:len(line)-1]
}
params := bytes.Split(line, separatorBytes) // 空格切分

p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, params)

var response []byte
response, err = p.Exec(client, params)
if err != nil {
ctx := ""
if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {
ctx = " - " + parentErr.Error()
}
p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx)

sendErr := p.Send(client, frameTypeError, []byte(err.Error()))
if sendErr != nil {
p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx)
break
}

// errors of type FatalClientErr should forceably close the connection
if _, ok := err.(*protocol.FatalClientErr); ok {
break
}
continue
}

if response != nil {
err = p.Send(client, frameTypeResponse, response)
if err != nil {
err = fmt.Errorf("failed to send response - %s", err)
break
}
}
}

p.ctx.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting ioloop", client)
conn.Close()
close(client.ExitChan)
if client.Channel != nil {
client.Channel.RemoveClient(client.ID)
}

p.ctx.nsqd.RemoveClient(client.ID)
return err
}

消息处理协程的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
var err error
var memoryMsgChan chan *Message
var backendMsgChan <-chan []byte
var subChannel *Channel
// NOTE: `flusherChan` is used to bound message latency for
// the pathological case of a channel on a low volume topic
// with >1 clients having >1 RDY counts
var flusherChan <-chan time.Time
var sampleRate int32

subEventChan := client.SubEventChan
identifyEventChan := client.IdentifyEventChan
outputBufferTicker := time.NewTicker(client.OutputBufferTimeout)
heartbeatTicker := time.NewTicker(client.HeartbeatInterval)
heartbeatChan := heartbeatTicker.C
msgTimeout := client.MsgTimeout

// v2 opportunistically buffers data to clients to reduce write system calls
// we force flush in two cases:
// 1. when the client is not ready to receive messages
// 2. we're buffered and the channel has nothing left to send us
// (ie. we would block in this loop anyway)
//
flushed := true

// signal to the goroutine that started the messagePump
// that we've started up
close(startedChan)

for {
if subChannel == nil || !client.IsReadyForMessages() {
// the client is not ready to receive messages...
memoryMsgChan = nil // 置为nil的话,通道会阻塞
backendMsgChan = nil
flusherChan = nil
// force flush
client.writeLock.Lock()
err = client.Flush() // 将缓存的数据全部写入这个连接
client.writeLock.Unlock()
if err != nil {
goto exit
}
flushed = true
} else if flushed {
// last iteration we flushed...
// do not select on the flusher ticker channel
memoryMsgChan = subChannel.memoryMsgChan // 连接nsqd开启的channel,有消息就会推送了
backendMsgChan = subChannel.backend.ReadChan() // 连接nsqd开启的channel,有消息就会推送了
flusherChan = nil // 不要flush
} else { // channel开始传输了而且没有flush
// we're buffered (if there isn't any more data we should flush)...
// select on the flusher ticker channel, too
memoryMsgChan = subChannel.memoryMsgChan
backendMsgChan = subChannel.backend.ReadChan()
flusherChan = outputBufferTicker.C // 一段时间后flush
}

select {
case <-flusherChan: // channel开始传输之后,这里定时flush数据到tcp连接
// if this case wins, we're either starved
// or we won the race between other channels...
// in either case, force flush
client.writeLock.Lock()
err = client.Flush()
client.writeLock.Unlock()
if err != nil {
goto exit
}
flushed = true
case <-client.ReadyStateChan:
case subChannel = <-subEventChan: // client发送SUB命令的时候,subEventChan中会收到所订阅的channel
// you can't SUB anymore
subEventChan = nil
case identifyData := <-identifyEventChan: // consumer经过Identify后会走这里
// you can't IDENTIFY anymore
identifyEventChan = nil

outputBufferTicker.Stop()
if identifyData.OutputBufferTimeout > 0 {
outputBufferTicker = time.NewTicker(identifyData.OutputBufferTimeout)
}

heartbeatTicker.Stop()
heartbeatChan = nil
if identifyData.HeartbeatInterval > 0 {
heartbeatTicker = time.NewTicker(identifyData.HeartbeatInterval)
heartbeatChan = heartbeatTicker.C
}

if identifyData.SampleRate > 0 {
sampleRate = identifyData.SampleRate
}

msgTimeout = identifyData.MsgTimeout
case <-heartbeatChan: // 定时器TCP连接心跳
err = p.Send(client, frameTypeResponse, heartbeatBytes) // 发送心跳
if err != nil {
goto exit
}
case b := <-backendMsgChan: // 持久化channel有消息时触发这里
if sampleRate > 0 && rand.Int31n(100) > sampleRate {
continue
}

msg, err := decodeMessage(b)
if err != nil {
p.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
continue
}
msg.Attempts++

subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) // 先存入超时队列,一段时间不ack,会重新入channel
client.SendingMessage()
err = p.SendMessage(client, msg) // 推送消息
if err != nil {
goto exit
}
flushed = false
case msg := <-memoryMsgChan: // 内存channel有消息时触发这里
if sampleRate > 0 && rand.Int31n(100) > sampleRate {
continue
}
msg.Attempts++

subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) // 先存入超时队列
client.SendingMessage()
err = p.SendMessage(client, msg) // 推送消息
if err != nil {
goto exit
}
flushed = false
case <-client.ExitChan: // consumer与nsqd的连接断开后会走这里
goto exit
}
}

exit:
p.ctx.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting messagePump", client)
heartbeatTicker.Stop()
outputBufferTicker.Stop()
if err != nil {
p.ctx.nsqd.logf(LOG_ERROR, "PROTOCOL(V2): [%s] messagePump error - %s", client, err)
}
}

TCP应用层协议跟lookupd的协议格式一致(有兴趣可以看上一篇文章《NSQ消息队列》nsqlookup源码解析),只是协议版本(nsqd版本是“ V2”)以及命令不一样

下篇文章讲TCP各种命令




微信关注我,及时接收最新技术文章