Blog

NSQ

消息队列应用场景

程序解耦

用户注册成功会发送注册成功的短信通知。

传统模式下需要将用户信息持久化到数据库后,再调用发送短信接口。当短信平台暂时出现连通性问题,会导致用户注册失败,也就是数据库存储失败,说明注册模块与短信功能高耦合,违反程序设计原则高内聚低耦合。

引用消息队列,将用户注册数据持久化到数据库后,把消息写入消息队列然后返回用户注册成功,短信平台通过从消息队列中获取消息为用户发送通知。

异步处理

  • 被调用者维度(内核):同步和异步
    • 同步:在调用者A发起对被调用者B的调用后,B在未生成结果之前不会向A返回结果。
    • 异步:在调用者A发起对被调用者B的调用后,B直接返回,待生成结果后通过状态、通知或回调调用者A告知结果。
  • 调用者维度(用户):阻塞和非阻塞
    • 阻塞:在调用者A发起对调用者B的调用后,A在未得到B告知的结果之前一直处于等待状态(线程挂起,不能进行其他操作)。
    • 非阻塞:在调用者A发起对调用者B的调用后,A可以进行的其他操作。

日志处理

  • 日志采集客户端:负责日志数据采集和解析,并将日志数据实时/定时写入到消息队列。
  • 消息队列:负责日志数据的接收,存储和转发。
  • 日志处理应用:订阅消息队列,并消费日志数据。
  • 常见应用:ELK。

流量削峰

常见于商城秒杀活动,为解决访问量过大,导致流量暴增,应用挂掉。

常在服务器接收用户数据后,写入消息队列,后台业务根据消息队列中的请求信息继续后续逻辑处理。

消息通信

消息队列一般都内置了高效的通信机制,可用于在消息通讯。比如实现点对点、广播消息队列,或者聊天室等场景。

NSQ简述

NSQ介绍

NSQ是使用Go语言开发的实时分布式消息传递平台。

特点:

  • 支持分布式部署,解决单点故障问题
  • 水平可扩展(没有代理,可以无缝的将更多节点添加到集群中)
  • 基于低延迟推送的消息传递(性能)
  • 支持负载均衡和广播消息路由
  • 在高吞吐量的工作负载下表现出色
  • 消息主要存储与内存(超过内存配置后保存在磁盘上)
  • 消费者可通过nsqlookupd服务查找生产者
  • 传输层安全性(TLS)
  • 任意的数据格式
  • 部署简便
  • 基于TCP协议,支持任何语言开发客户端库
  • 提供HTTP接口,用于统计信息,管理操作功能
  • 提供管理功能nsqadmin
  • 与statsd集成以进行实时检测

NSQ组件

  • nsqd:用于接收、分发、传递消息到客户端的守护程序。常侦听两个TCP端口,4150用于TCP客户端连接,4151用于提供HTTP APl,也可启用HTTPS API。
  • nsqlookupd:用于管理nsqd拓扑信息的守护程序。客户端(消费者)查询nsqlookupd以发现nsqd特定topic的生产者,常侦听两个TCP端口,4160用于nsqd的通信,4161用于提供HTTP API。
  • nsqadmin:提供一个Web UI,用于实时查看群集信息并进行各种任务管理。

NSQ工具

  • nsq_stat:用于获取指定topic,channel统计信息。
  • nsq_tail:用于将指定topic,channel中的消息打印到控制台。
  • nsq_to_file:用于将指定topic,channel中的消息导出到文件。
  • nsq_to_http:用于将指定topic,channel中的消息转发到http服务器。
  • nsq_to_nsq:用于将指定topic,channel中的消息转发到其他nsqd服务。
  • to_nsq:用于将控制台消息发送到指定topic。

NSQ架构

NSQ数据流

  • 生产者客户端连接到nsqd,并将数据消息发送到指定的topic中。
  • nsqd将topic中数据分发到所有关联的channel中。
  • nsqd将channel中的消息推送到订阅channel的某个客户中进行处理。

NSQ部署

部署方式

  • 单机
    • nsqd
  • 集群
    • nsqlookup
    • nsqd
  • 管理平台
    • nsqadmin

单机部署

启动nsqd:

nsqd.exe --data-path=./datas

启动nsqdadmin:

nsqdadmin.exe --nsqd-http-address="localhost:4151"

测试:

  • 生产者发送消息:启动to_nsq从控制台将消息发送到test主题。
    to_nsq.exe --nsqd-tcp-address="localhost:4150" --topic=test
    
  • 消费者订阅消息:启动nsq_tail订阅test主题channel管道并将消息打印到控制台。
    nsqd_tail.exe --nsqd-tcp-address="localhost:4150" --topic test --channel test
    

集群部署

启动nsqlookupd:

nsqlookupd.exe 

启动nsqd:

nsqd.exe --data-path=./datas --lookupd-tcp-address="localhost:4160"

启动nsqdadmin:

nsqdadmin.exe --nsqd-http-address="localhost:4161"

测试:

  • 生产者发送消息:启动to_nsq从控制台将消息发送到test主题
    to_nsq.exe --nsqd-tcp-address="localhost:4150" --topic=test
    
  • 消费者订阅消息:启动nsq _tail订阅test主题channel管道并将消息打印到控制台
    nsqd_tail.exe --lookupd-tcp-address="localhost:4161" --topic test --channel test
    

集成到Go

步骤

  • SDK:github.com/nsqio/go-nsq
  • 生产者
    • 定义连接配置
    • 定义生产者
    • 发布消息
    • 关闭连接
  • 消费者
    • 定义连接配置
    • 定义消费者
    • 定义消息处理器
    • 连接nsq或nsqlookupd
    • 停止连接

生产者

package main

import (
	"fmt"
	"log"

	"github.com/nsqio/go-nsq"
)

func main() {
	topic := "test"
	addr := "localhost:4150"

	config := nsq.NewConfig()                       //定义nsq生产者配置
	productor, err := nsq.NewProducer(addr, config) // 创建生产者
	if err != nil {
		log.Fatal(err)
	}
    // 产生消息
	for i := 0; i < 10; i++ {
		productor.Publish(topic, []byte(fmt.Sprintf("message %d", i))) // 发布消息
	}
	productor.Stop() // 停止连接
}

消费者

package main

import (
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"

	"github.com/nsqio/go-nsq"
)

func main() {
	topic := "test"
	channel := "testgo"

	lookupAddr := "localhost:4161"
	// addr := "localhost:4150"

	config := nsq.NewConfig()                                // 定义nsq消费者配置
	consumer, err := nsq.NewConsumer(topic, channel, config) //创建消费者
	if err != nil {
		log.Fatal(err)
	}

	// 定义nsq处理器
	consumer.AddHandler(nsq.HandlerFunc(func(m *nsq.Message) error {
		fmt.Println(string(m.Body))
		return nil
	}))

	err = consumer.ConnectToNSQLookupd(lookupAddr)
	// err = consumer.ConnectToNSQD(addr)
	if err != nil {
		log.Fatal(err)
	}

    // 等待系统信号
	interrupt := make(chan os.Signal)
	signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)
	<-interrupt
	consumer.Stop()
}

NSQ对客户端认证

认证方式

  • TLS:nsqd通过命令行参数--tls-cert, --tls-key, --tls-client-auth-policy, -tls-required, -tls-root-ca-file等参数配置协商客户使用tls以增加安全性。
  • 通过webhook提供第三方认证和访问控制:nsqd通过--auth-http-address可配置第三方认证,只对TCP连接有效。

参考链接: