0%

《NSQ消息队列》Nsqlookup源码解析

启动过程

启动文件位于 apps/nsqlookupd/main.go,先解析命令行参数,然后启动了Main主业务逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func (p *program) Init(env svc.Environment) error {
if env.IsWindowsService() {
dir := filepath.Dir(os.Args[0])
return os.Chdir(dir)
}
return nil
}

func (p *program) Start() error {
opts := nsqlookupd.NewOptions()

flagSet := nsqlookupdFlagSet(opts)
flagSet.Parse(os.Args[1:])

if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) {
fmt.Println(version.String("nsqlookupd"))
os.Exit(0)
}

var cfg map[string]interface{}
configFile := flagSet.Lookup("config").Value.String()
if configFile != "" {
_, err := toml.DecodeFile(configFile, &cfg)
if err != nil {
logFatal("failed to load config file %s - %s", configFile, err)
}
}

options.Resolve(opts, flagSet, cfg) // 参数解析
nsqlookupd, err := nsqlookupd.New(opts) // 初始化一个nsqlookupd实例
if err != nil {
logFatal("failed to instantiate nsqlookupd", err)
}
p.nsqlookupd = nsqlookupd

go func() {
err := p.nsqlookupd.Main() // 开启协程启动Main业务逻辑
if err != nil {
p.Stop()
os.Exit(1)
}
}()

return nil
}

nsqlookup开启了两个server,一个负责接收nsqd的tcp请求,一个负责接收http请求

nsqlookupd/nsqlookupd.go 53行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (l *NSQLookupd) Main() error {
ctx := &Context{l}

exitCh := make(chan error)
var once sync.Once
exitFunc := func(err error) {
once.Do(func() { // 只执行一次
if err != nil {
l.logf(LOG_FATAL, "%s", err)
}
exitCh <- err // 一旦有错误,就退出
})
}

l.tcpServer = &tcpServer{ctx: ctx}
l.waitGroup.Wrap(func() {
exitFunc(protocol.TCPServer(l.tcpListener, l.tcpServer, l.logf)) // 开启TCP服务器,并监听错误事件
})
httpServer := newHTTPServer(ctx)
l.waitGroup.Wrap(func() {
exitFunc(http_api.Serve(l.httpListener, httpServer, "HTTP", l.logf)) // 开启HTTP服务器,并监听错误事件
})

err := <-exitCh // 等待退出
return err
}

TCP服务

下面是TCP服务器的处理函数

nsqlookupd/tcp.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func (p *tcpServer) Handle(clientConn net.Conn) {
p.ctx.nsqlookupd.logf(LOG_INFO, "TCP: new client(%s)", clientConn.RemoteAddr())

// The client should initialize itself by sending a 4 byte sequence indicating
// the version of the protocol that it intends to communicate, this will allow us
// to gracefully upgrade the protocol away from text/line oriented to whatever...
buf := make([]byte, 4)
_, err := io.ReadFull(clientConn, buf) // 读出前四个字节
if err != nil {
p.ctx.nsqlookupd.logf(LOG_ERROR, "failed to read protocol version - %s", err)
clientConn.Close()
return
}
protocolMagic := string(buf)

p.ctx.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'",
clientConn.RemoteAddr(), protocolMagic)

var prot protocol.Protocol
switch protocolMagic {
case " V1": // 判断前四个字节是不是对应协议版本
prot = &LookupProtocolV1{ctx: p.ctx} // 初始化LookupProtocolV1实例
default: // 不是的话返回协议版本错误,并且关闭连接
protocol.SendResponse(clientConn, []byte("E_BAD_PROTOCOL"))
clientConn.Close()
p.ctx.nsqlookupd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
clientConn.RemoteAddr(), protocolMagic)
return
}

p.conns.Store(clientConn.RemoteAddr(), clientConn) // 保存这个连接

err = prot.IOLoop(clientConn) // 运行接收数据的循环
if err != nil {
p.ctx.nsqlookupd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
}

p.conns.Delete(clientConn.RemoteAddr()) // 请求处理完后删除连接
}

下面是接收数据的循环

nsqlookupd/lookup_protocol_v1.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
func (p *LookupProtocolV1) IOLoop(conn net.Conn) error {
var err error
var line string

client := NewClientV1(conn)
reader := bufio.NewReader(client)
for {
line, err = reader.ReadString('\n') // 读取一行
if err != nil {
break
}

line = strings.TrimSpace(line) // 去除前后空格
params := strings.Split(line, " ") // 按空格分割,拿到所有参数,第一个元素的命令

var response []byte
response, err = p.Exec(client, reader, params) // 执行命令
if err != nil {
ctx := ""
if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {
ctx = " - " + parentErr.Error()
}
p.ctx.nsqlookupd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx)

_, sendErr := protocol.SendResponse(client, []byte(err.Error()))
if sendErr != nil {
p.ctx.nsqlookupd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx)
break
}

// errors of type FatalClientErr should forceably close the connection
if _, ok := err.(*protocol.FatalClientErr); ok {
break
}
continue
}

if response != nil {
_, err = protocol.SendResponse(client, response)
if err != nil {
break
}
}
}

conn.Close() // 关闭连接
p.ctx.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): closing", client)
if client.peerInfo != nil {
registrations := p.ctx.nsqlookupd.DB.LookupRegistrations(client.peerInfo.id)
for _, r := range registrations {
if removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(r, client.peerInfo.id); removed {
p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) UNREGISTER category:%s key:%s subkey:%s",
client, r.Category, r.Key, r.SubKey)
}
}
}
return err
}

接收nsqd的tcp请求server,采用自己的应用层协议,请求协议是一个文本协议,回复协议是二进制协议

请求协议格式

我想,如果文本协议换成二进制协议,性能上可能有所提升吧(文本协议的parser会花写性能)

目前支持的命令:

命令 参数 消息体
PING
IDENTIFY 生产者基本信息(前四个字节表示消息体长度)
REGISTER topicName、channelName(可选)
UNREGISTER topicName、channelName(可选)
  • PING

    为了客户端探测服务端的活性

    nsqlookupd/lookup_protocol_v1.go

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    func (p *LookupProtocolV1) PING(client *ClientV1, params []string) ([]byte, error) {
    if client.peerInfo != nil {
    // we could get a PING before other commands on the same client connection
    cur := time.Unix(0, atomic.LoadInt64(&client.peerInfo.lastUpdate))
    now := time.Now()
    p.ctx.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): pinged (last ping %s)", client.peerInfo.id,
    now.Sub(cur))
    atomic.StoreInt64(&client.peerInfo.lastUpdate, now.UnixNano())
    }
    return []byte("OK"), nil // 回复OK
    }
  • IDENTIFY

    注册client类别信息(提交自己的peerInfo信息,注册client信息)

    nsqlookupd/lookup_protocol_v1.go

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    func (p *LookupProtocolV1) IDENTIFY(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
    var err error

    if client.peerInfo != nil {
    return nil, protocol.NewFatalClientErr(err, "E_INVALID", "cannot IDENTIFY again")
    }

    var bodyLen int32
    err = binary.Read(reader, binary.BigEndian, &bodyLen) // 读出四个字节,数值放入bodyLen变量,表示body内容的长度
    if err != nil {
    return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to read body size")
    }

    body := make([]byte, bodyLen) // 初始化一个字节slice,长度是bodyLen
    _, err = io.ReadFull(reader, body) // 将body内容都读出来,放到body
    if err != nil {
    return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to read body")
    }

    // body is a json structure with producer information
    peerInfo := PeerInfo{id: client.RemoteAddr().String()}
    err = json.Unmarshal(body, &peerInfo) // 解析json字符串到peerInfo结构体
    if err != nil {
    return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to decode JSON body")
    }

    peerInfo.RemoteAddress = client.RemoteAddr().String()

    // require all fields
    if peerInfo.BroadcastAddress == "" || peerInfo.TCPPort == 0 || peerInfo.HTTPPort == 0 || peerInfo.Version == "" {
    return nil, protocol.NewFatalClientErr(nil, "E_BAD_BODY", "IDENTIFY missing fields")
    }

    atomic.StoreInt64(&peerInfo.lastUpdate, time.Now().UnixNano())

    p.ctx.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): IDENTIFY Address:%s TCP:%d HTTP:%d Version:%s",
    client, peerInfo.BroadcastAddress, peerInfo.TCPPort, peerInfo.HTTPPort, peerInfo.Version)

    client.peerInfo = &peerInfo
    if p.ctx.nsqlookupd.DB.AddProducer(Registration{"client", "", ""}, &Producer{peerInfo: client.peerInfo}) { // 注册client类别信息(topic、channel都是空字符串)
    p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category:%s key:%s subkey:%s", client, "client", "", "")
    }

    // build a response
    data := make(map[string]interface{})
    data["tcp_port"] = p.ctx.nsqlookupd.RealTCPAddr().Port
    data["http_port"] = p.ctx.nsqlookupd.RealHTTPAddr().Port
    data["version"] = version.Binary
    hostname, err := os.Hostname()
    if err != nil {
    log.Fatalf("ERROR: unable to get hostname %s", err)
    }
    data["broadcast_address"] = p.ctx.nsqlookupd.opts.BroadcastAddress
    data["hostname"] = hostname

    response, err := json.Marshal(data) // 构造json字符串(包含nsqlookupd自己的一些信息)
    if err != nil {
    p.ctx.nsqlookupd.logf(LOG_ERROR, "marshaling %v", data)
    return []byte("OK"), nil
    }
    return response, nil // 将自己的信息回复给请求者
    }

    存储注册信息的结构是一个map,key是一个Registration(包含类别、topic、channel),value又是一个map(key是生产者id,value是生产者结构)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    type ProducerMap map[string]*Producer

    type RegistrationDB struct {
    sync.RWMutex
    registrationMap map[Registration]ProducerMap
    }

    type Registration struct {
    Category string
    Key string
    SubKey string
    }
  • REGISTER

    注册生产者

    nsqlookupd/lookup_protocol_v1.go

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    func (p *LookupProtocolV1) REGISTER(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
    if client.peerInfo == nil { // 必须先IDENTIFY,才能注册
    return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "client must IDENTIFY")
    }

    topic, channel, err := getTopicChan("REGISTER", params) // 取出topic, channel参数
    if err != nil {
    return nil, err
    }

    if channel != "" { // 如果channel参数有的话,注册channel类别信息
    key := Registration{"channel", topic, channel}
    if p.ctx.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo}) {
    p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category:%s key:%s subkey:%s",
    client, "channel", topic, channel)
    }
    }
    key := Registration{"topic", topic, ""}
    if p.ctx.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo}) { // 注册topic类别信息
    p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category:%s key:%s subkey:%s",
    client, "topic", topic, "")
    }

    return []byte("OK"), nil // 回复OK
    }
  • UNREGISTER

    反注册生产者

    nsqlookupd/lookup_protocol_v1.go

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    func (p *LookupProtocolV1) UNREGISTER(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
    if client.peerInfo == nil {
    return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "client must IDENTIFY")
    }

    topic, channel, err := getTopicChan("UNREGISTER", params)
    if err != nil {
    return nil, err
    }

    if channel != "" { // 如果channel参数有的话,反注册channel类别topic主题channel通道下的这个生产者
    key := Registration{"channel", topic, channel}
    removed, left := p.ctx.nsqlookupd.DB.RemoveProducer(key, client.peerInfo.id)
    if removed {
    p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) UNREGISTER category:%s key:%s subkey:%s",
    client, "channel", topic, channel)
    }
    // for ephemeral channels, remove the channel as well if it has no producers
    if left == 0 && strings.HasSuffix(channel, "#ephemeral") {
    p.ctx.nsqlookupd.DB.RemoveRegistration(key)
    }
    } else { // 如果channel参数没有的话,反注册channel类别topic主题所有通道下的这个生产者,且反注册topic类别topic主题下的这个生产者
    // no channel was specified so this is a topic unregistration
    // remove all of the channel registrations...
    // normally this shouldn't happen which is why we print a warning message
    // if anything is actually removed
    registrations := p.ctx.nsqlookupd.DB.FindRegistrations("channel", topic, "*")
    for _, r := range registrations {
    removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(r, client.peerInfo.id)
    if removed {
    p.ctx.nsqlookupd.logf(LOG_WARN, "client(%s) unexpected UNREGISTER category:%s key:%s subkey:%s",
    client, "channel", topic, r.SubKey)
    }
    }

    key := Registration{"topic", topic, ""}
    removed, left := p.ctx.nsqlookupd.DB.RemoveProducer(key, client.peerInfo.id)
    if removed {
    p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) UNREGISTER category:%s key:%s subkey:%s",
    client, "topic", topic, "")
    }
    if left == 0 && strings.HasSuffix(topic, "#ephemeral") {
    p.ctx.nsqlookupd.DB.RemoveRegistration(key)
    }
    }

    return []byte("OK"), nil
    }

回复协议格式:

前四个字节表示后面数据的长度

多个命令可以放在同一个请求包里,从而复用用一个tcp连接,回复也会被放在同一个回复包里

HTTP服务

HTTP服务器注册了一些路由,这里只分析几个常用的

下面是注册路由的过程

nsqlookupd/http.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func newHTTPServer(ctx *Context) *httpServer {
log := http_api.Log(ctx.nsqlookupd.logf)

router := httprouter.New()
router.HandleMethodNotAllowed = true
router.PanicHandler = http_api.LogPanicHandler(ctx.nsqlookupd.logf)
router.NotFound = http_api.LogNotFoundHandler(ctx.nsqlookupd.logf)
router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqlookupd.logf)
s := &httpServer{
ctx: ctx,
router: router,
}

router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))
router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V1))

// v1 negotiate
router.Handle("GET", "/debug", http_api.Decorate(s.doDebug, log, http_api.V1))
router.Handle("GET", "/lookup", http_api.Decorate(s.doLookup, log, http_api.V1))
router.Handle("GET", "/topics", http_api.Decorate(s.doTopics, log, http_api.V1))
router.Handle("GET", "/channels", http_api.Decorate(s.doChannels, log, http_api.V1))
router.Handle("GET", "/nodes", http_api.Decorate(s.doNodes, log, http_api.V1))

// only v1
router.Handle("POST", "/topic/create", http_api.Decorate(s.doCreateTopic, log, http_api.V1))
router.Handle("POST", "/topic/delete", http_api.Decorate(s.doDeleteTopic, log, http_api.V1))
router.Handle("POST", "/channel/create", http_api.Decorate(s.doCreateChannel, log, http_api.V1))
router.Handle("POST", "/channel/delete", http_api.Decorate(s.doDeleteChannel, log, http_api.V1))
router.Handle("POST", "/topic/tombstone", http_api.Decorate(s.doTombstoneTopicProducer, log, http_api.V1))

// debug
router.HandlerFunc("GET", "/debug/pprof", pprof.Index)
router.HandlerFunc("GET", "/debug/pprof/cmdline", pprof.Cmdline)
router.HandlerFunc("GET", "/debug/pprof/symbol", pprof.Symbol)
router.HandlerFunc("POST", "/debug/pprof/symbol", pprof.Symbol)
router.HandlerFunc("GET", "/debug/pprof/profile", pprof.Profile)
router.Handler("GET", "/debug/pprof/heap", pprof.Handler("heap"))
router.Handler("GET", "/debug/pprof/goroutine", pprof.Handler("goroutine"))
router.Handler("GET", "/debug/pprof/block", pprof.Handler("block"))
router.Handler("GET", "/debug/pprof/threadcreate", pprof.Handler("threadcreate"))

return s
}

GET /topics

获取所有topic

也就是获取注册了 topic类别所有topic 的所有topic

nsqlookupd/http.go

1
2
3
4
5
6
func (s *httpServer) doTopics(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
topics := s.ctx.nsqlookupd.DB.FindRegistrations("topic", "*", "").Keys()
return map[string]interface{}{
"topics": topics,
}, nil
}

GET /channels

获取所有channel

也就是获取注册了 channel类别指定topic所有channel 的所有channel

nsqlookupd/http.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (s *httpServer) doChannels(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
reqParams, err := http_api.NewReqParams(req)
if err != nil {
return nil, http_api.Err{400, "INVALID_REQUEST"}
}

topicName, err := reqParams.Get("topic")
if err != nil {
return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
}

channels := s.ctx.nsqlookupd.DB.FindRegistrations("channel", topicName, "*").SubKeys()
return map[string]interface{}{
"channels": channels,
}, nil
}

POST /channel/create

创建topic以及channel

实际上就是注册 channel类别指定topic指定channel 信息,以及注册 topic类别指定topic 信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (s *httpServer) doCreateChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
reqParams, err := http_api.NewReqParams(req)
if err != nil {
return nil, http_api.Err{400, "INVALID_REQUEST"}
}

topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams) // 取出参数
if err != nil {
return nil, http_api.Err{400, err.Error()}
}

s.ctx.nsqlookupd.logf(LOG_INFO, "DB: adding channel(%s) in topic(%s)", channelName, topicName)
key := Registration{"channel", topicName, channelName}
s.ctx.nsqlookupd.DB.AddRegistration(key) // 注册信息

s.ctx.nsqlookupd.logf(LOG_INFO, "DB: adding topic(%s)", topicName)
key = Registration{"topic", topicName, ""}
s.ctx.nsqlookupd.DB.AddRegistration(key)

return nil, nil
}

总结

nsqlookup就是负责管理所有topic以及channel以及生产者的模块




微信关注我,及时接收最新技术文章