NSQ
消息队列应用场景
程序解耦
用户注册成功会发送注册成功的短信通知。
传统模式下需要将用户信息持久化到数据库后,再调用发送短信接口。当短信平台暂时出现连通性问题,会导致用户注册失败,也就是数据库存储失败,说明注册模块与短信功能高耦合,违反程序设计原则高内聚低耦合。
引用消息队列,将用户注册数据持久化到数据库后,把消息写入消息队列然后返回用户注册成功,短信平台通过从消息队列中获取消息为用户发送通知。
异步处理
- 被调用者维度(内核):同步和异步
- 同步:在调用者A发起对被调用者B的调用后,B在未生成结果之前不会向A返回结果。
- 异步:在调用者A发起对被调用者B的调用后,B直接返回,待生成结果后通过状态、通知或回调调用者A告知结果。
- 调用者维度(用户):阻塞和非阻塞
- 阻塞:在调用者A发起对调用者B的调用后,A在未得到B告知的结果之前一直处于等待状态(线程挂起,不能进行其他操作)。
- 非阻塞:在调用者A发起对调用者B的调用后,A可以进行的其他操作。
日志处理
- 日志采集客户端:负责日志数据采集和解析,并将日志数据实时/定时写入到消息队列。
- 消息队列:负责日志数据的接收,存储和转发。
- 日志处理应用:订阅消息队列,并消费日志数据。
- 常见应用:ELK。
流量削峰
常见于商城秒杀活动,为解决访问量过大,导致流量暴增,应用挂掉。
常在服务器接收用户数据后,写入消息队列,后台业务根据消息队列中的请求信息继续后续逻辑处理。
消息通信
消息队列一般都内置了高效的通信机制,可用于在消息通讯。比如实现点对点、广播消息队列,或者聊天室等场景。
NSQ简述
NSQ介绍
NSQ是使用Go语言开发的实时分布式消息传递平台。
特点:
- 支持分布式部署,解决单点故障问题
- 水平可扩展(没有代理,可以无缝的将更多节点添加到集群中)
- 基于低延迟推送的消息传递(性能)
- 支持负载均衡和广播消息路由
- 在高吞吐量的工作负载下表现出色
- 消息主要存储与内存(超过内存配置后保存在磁盘上)
- 消费者可通过nsqlookupd服务查找生产者
- 传输层安全性(TLS)
- 任意的数据格式
- 部署简便
- 基于TCP协议,支持任何语言开发客户端库
- 提供HTTP接口,用于统计信息,管理操作功能
- 提供管理功能nsqadmin
- 与statsd集成以进行实时检测
NSQ组件
- nsqd:用于接收、分发、传递消息到客户端的守护程序。常侦听两个TCP端口,4150用于TCP客户端连接,4151用于提供HTTP APl,也可启用HTTPS API。
- nsqlookupd:用于管理nsqd拓扑信息的守护程序。客户端(消费者)查询nsqlookupd以发现nsqd特定topic的生产者,常侦听两个TCP端口,4160用于nsqd的通信,4161用于提供HTTP API。
- nsqadmin:提供一个Web UI,用于实时查看群集信息并进行各种任务管理。
NSQ工具
- nsq_stat:用于获取指定topic,channel统计信息。
- nsq_tail:用于将指定topic,channel中的消息打印到控制台。
- nsq_to_file:用于将指定topic,channel中的消息导出到文件。
- nsq_to_http:用于将指定topic,channel中的消息转发到http服务器。
- nsq_to_nsq:用于将指定topic,channel中的消息转发到其他nsqd服务。
- to_nsq:用于将控制台消息发送到指定topic。
NSQ架构
NSQ数据流
- 生产者客户端连接到nsqd,并将数据消息发送到指定的topic中。
- nsqd将topic中数据分发到所有关联的channel中。
- nsqd将channel中的消息推送到订阅channel的某个客户中进行处理。
NSQ部署
部署方式
- 单机
- nsqd
- 集群
- nsqlookup
- nsqd
- 管理平台
- nsqadmin
单机部署
启动nsqd:
nsqd.exe --data-path=./datas
启动nsqdadmin:
nsqdadmin.exe --nsqd-http-address="localhost:4151"
测试:
- 生产者发送消息:启动to_nsq从控制台将消息发送到test主题。
to_nsq.exe --nsqd-tcp-address="localhost:4150" --topic=test
- 消费者订阅消息:启动nsq_tail订阅test主题channel管道并将消息打印到控制台。
nsqd_tail.exe --nsqd-tcp-address="localhost:4150" --topic test --channel test
集群部署
启动nsqlookupd:
nsqlookupd.exe
启动nsqd:
nsqd.exe --data-path=./datas --lookupd-tcp-address="localhost:4160"
启动nsqdadmin:
nsqdadmin.exe --nsqd-http-address="localhost:4161"
测试:
- 生产者发送消息:启动to_nsq从控制台将消息发送到test主题
to_nsq.exe --nsqd-tcp-address="localhost:4150" --topic=test
- 消费者订阅消息:启动nsq _tail订阅test主题channel管道并将消息打印到控制台
nsqd_tail.exe --lookupd-tcp-address="localhost:4161" --topic test --channel test
集成到Go
步骤
- SDK:github.com/nsqio/go-nsq
- 生产者
- 定义连接配置
- 定义生产者
- 发布消息
- 关闭连接
- 消费者
- 定义连接配置
- 定义消费者
- 定义消息处理器
- 连接nsq或nsqlookupd
- 停止连接
生产者
package main
import (
"fmt"
"log"
"github.com/nsqio/go-nsq"
)
func main() {
topic := "test"
addr := "localhost:4150"
config := nsq.NewConfig() //定义nsq生产者配置
productor, err := nsq.NewProducer(addr, config) // 创建生产者
if err != nil {
log.Fatal(err)
}
// 产生消息
for i := 0; i < 10; i++ {
productor.Publish(topic, []byte(fmt.Sprintf("message %d", i))) // 发布消息
}
productor.Stop() // 停止连接
}
消费者
package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"github.com/nsqio/go-nsq"
)
func main() {
topic := "test"
channel := "testgo"
lookupAddr := "localhost:4161"
// addr := "localhost:4150"
config := nsq.NewConfig() // 定义nsq消费者配置
consumer, err := nsq.NewConsumer(topic, channel, config) //创建消费者
if err != nil {
log.Fatal(err)
}
// 定义nsq处理器
consumer.AddHandler(nsq.HandlerFunc(func(m *nsq.Message) error {
fmt.Println(string(m.Body))
return nil
}))
err = consumer.ConnectToNSQLookupd(lookupAddr)
// err = consumer.ConnectToNSQD(addr)
if err != nil {
log.Fatal(err)
}
// 等待系统信号
interrupt := make(chan os.Signal)
signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)
<-interrupt
consumer.Stop()
}
NSQ对客户端认证
认证方式
- TLS:nsqd通过命令行参数
--tls-cert, --tls-key, --tls-client-auth-policy, -tls-required, -tls-root-ca-file
等参数配置协商客户使用tls以增加安全性。 - 通过webhook提供第三方认证和访问控制:nsqd通过
--auth-http-address
可配置第三方认证,只对TCP连接有效。
参考链接: