Blog

Kafka

消息队列

为什么使用消息队列

使⽤同步的通信⽅式来解决多个服务之间的通信:

同步的通信方式会存在性能和稳定性的问题。

使用异步的通信方式:

针对于同步的通信方式来说,异步的方式,可以让上游快速成功,极大提高了系统的吞吐量。而且在分布式系统中,通过下游多个服务的分布式事务的保障,也能保障业务执行之后的最终一致性。

消息队列解决具体的是通信问题

介绍

Message Queue(MQ),消息队列中间件。很多人都说:MQ通过将消息的发送和接收分 离来实现应用程序的异步和解偶,这个给人的直觉是:MQ是异步的,用来解耦的。但是这个只是MQ的效果而不是目的。

MQ真正的目的是为了通讯,屏蔽底层复杂的通讯协议,定义了一套应用层的、更加简单的通讯协议。一个分布式系统中两个模块之间通讯要么是HTTP,要么是自己开发的(rpc) TCP,但是这两种协议其实都是原始的协议。

HTTP协议很难实现两端通讯(模块A可以调用B,B也可以主动调用A),如果要做到这个两端都要背上WebServer,而且还不支持⻓连接(HTTP 2.0 的库根本找不到)。

TCP就更加原始了,粘包、心跳、私有的协议。MQ所要做的就是在这些协议之上构建一个简单的“协议”——生产者/消费者模型。

MQ带给我的“协议”不是具体的通讯协议,而是更高层次 通讯模型。它定义了两个对象——发送数据的叫生产者;接收数据的叫消费者,提供一个SDK让我们可以定义自己的生产者和消费者实现消息通讯而无视底层通讯协议。

消息队列的分类

产品

目前消息队列的中间件选型有很多种:

  • rabbitMQ:内部的功能性是非常强的
  • rocketMQ:阿里内部一个大神,根据kafka的内部执行原理,手写的一个消息队列中间件。性能是与Kafka相比肩,除此之外,在功能上封装了更多的功能。
  • kafka:全球消息处理性能最快的一款MQ
  • zeroMQ

有Broker的MQ

通常有一台服务器作为Broker,所有的消息都通过它中转。生产者把消息发送给它,就结束自己的任务了,Broker则把消息主动推送给消费者(或者消费者主动轮询)。

重Topic

kafka、JMS(ActiveMQ)就属于这个类型,生产者会发送 key和数据到Broker,由roker比较key之后决定给哪个消费者。

这种模式是我们最常⻅的模式,是我们对MQ最多的印象。在这种模式下一个topic往往是一个比较大的概念,甚至一个系统中就可能只有一个topic,topic某种意义上就是queue。

轻Topic

这种的代表是RabbitMQ(或者说是AMQP)。生产者发送key和数据,消费者定义订阅的队列,Broker收到数据之后会通过一定的逻辑计算出key对应的队列,然后把数据交给队列。

这种模式下解耦了key和queue,在这种架构中queue是非常轻量级的(在RabbitMQ中它的上限取决于你的内存),消费者关心的只是自己的queue;生产者不必关心数据最终给谁只要指定key就行了,中间的那层映射在AMQP中叫exchange(交换机)。

无Broker的MQ

在生产者和消费者之间没有使用broker,例如zeroMQ,直接使用socket进行通信。

Kafka基本使用

介绍

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的 (replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。

使用场景

  • 日志收集:一个公司可以用Kafka收集各种服务的log,通过kafka以统一接口服务的方式 开放给各种consumer,例如hadoop、Hbase、Solr等。
  • 消息系统:解耦和生产者和消费者、缓存消息等。
  • 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网⻚、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过 订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
  • 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。

搭建

配置/opt/homebrew/etc/kafka/server.properties

#broker.id属性在kafka集群中必须要是唯一 
broker.id=0 
#kafka部署的机器ip和提供服务的端口号 
listeners=PLAINTEXT://:9092
#kafka的消息存储文件 
log.dirs=/opt/homebrew/var/lib/kafka-logs
#kafka连接zookeeper的地址 
zookeeper.connect=localhost:2181

启动:

brew services start kafka

基本概念

名称 解释
Broker 消息中间件处理节点,一个Kafka节点就是一个Broker,一个或者多个 Broker可以组成一个Kafka集群
Topic Kafka根据topic对消息进行归类,发布到Kafka集群的每条消息都需要指定一个topic
Producer 消息生产者,向Broker发送消息的客户端
Consumer 消息消费者,从Broker读取消息的客户端

创建Topic

创建主题:

kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

查看所有主题:

kafka-topics --list --bootstrap-server localhost:9092

发送消息

kafka自带了一个producer命令客户端,可以从本地文件中读取内容,或者我们也可以以命令行中直接输入内容,并将这些内容以消息的形式发送到kafka集群中。在默认情况下,每一个行会被当做成一个独立的消息。使用kafka的发送消息的客户端,指定发送到的kafka服务器地址和topic。

kafka-console-producer --broker-list localhost:9092 --topic test

消费消息

对于consumer,kafka同样也携带了一个命令行客户端,会将获取到内容在命令中进行输出,默认是消费最新的消息。使用kafka的消费者消息的客户端,从指定kafka服务器的指定topic中消费消息。

  • 方式一:从当前主题中的最后一条消息的offset(偏移量位置)+1 开始消费
    kafka-console-consumer --bootstrap-server localhost:9092 --topic test
    
  • 方式二:从当前主题中的第一条消息 开始消费
    kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
    

注意:

  • 消息会被存储
  • 消息是顺序存储
  • 消息是有偏移量的
  • 消费时可以指明偏移量进行消费

消息的细节

  • 生产者将消息发送给broker,broker会将消息保存在本地的日志文件中。
  • 消息的保存是有序的,通过offset偏移量来描述消息的有序性。
  • 消费者消费消息时也是通过offset来描述当前要消费的那条消息的位置。

单播消息

在一个kafka的topic中,启动两个消费者,一个生产者,问:生产者发送消息,这条消息是否同时会被两个消费者消费?

如果多个消费者在同一个消费组,那么只有一个消费者可以收到订阅的topic中的消息。换言之,同一个消费组中只能有一个消费者收到一个topic中的消息。

kafka-console-consumer --bootstrap-server localhost:9092 --consumer-property group.id=testGroup --topic test

多播消息

不同的消费组订阅同一个topic,那么不同的消费组中只有一个消费者能收到消息。实际上也是多个消费组中的多个消费者收到了同一个消息。

kafka-console-consumer --bootstrap-server localhost:9092 --consumer-property group.id=testGroup1 --topic test

kafka-console-consumer --bootstrap-server localhost:9092 --consumer-property group.id=testGroup2 --topic test

查看消费组的详细信息

kafka-console-consumer --bootstrap-server localhost:9092 --describe --group testGroup
  • Current-offset:最后被消费的消息的偏移量
  • Log-end-offset:消息总量(最后一条消息的偏移量)
  • Lag:积压了多少条消息

主题和分区

主题Topic

主题-topic在kafka中是一个逻辑的概念,kafka通过topic将消息进行分类。不同的topic会被 订阅该topic的消费者消费。

但是有一个问题,如果说这个topic中的消息非常多,多到需要几T来存,因为消息是会被保存到log日志文件中的。为了解决这个文件过大的问题,kafka提出了Partition分区的概念。

Partition分区

通过Partition将一个topic中的消息分区来存储。这样的好处有多个:

  • 分区存储,可以解决统一存储文件过大的问题
  • 提供了读写的吞吐量,读和写可以同时在多个分区中进行

创建多分区的主题:

kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic test1

消息日志文件中保存的内容

  • 00000.log:这个文件中保存的就是消息
  • __consumer_offsets-49:kafka内部自己创建了__consumer_offsets主题包含了50个分区。这个主题用来存放消费者消费某个主题的偏移量。因为每个消费者都会自己维护着消费的主题的偏移量,也就是说每个消费者会把消费的主题的偏移量自主上报给kafka中的默认主题:consumer_offsets。因此kafka为了提升这个主题的并发性,默认设置了50个分区。
  • 提交到哪个分区:通过hash函数:hash(consumerGroupId) % __consumer_offsets 主题的分区数 提交到该主题中的内容是:key是consumerGroupId+topic+分区号,value就是当前offset的值文件中保存的消息,默认保存7天。七天到后消息会被删除。

集群操作

搭建集群(三个broker)

  1. 创建三个server.properties文件,修改id、port和日志文件。
  2. 通过命令来启动三台broker

副本

在创建主题时,除了指明了主题的分区数以外,还指明了副本数。

副本是为了为主题中的分区创建多个备份,多个副本在kafka集群的多个broker中,会有一个副本作为leader,其他是follower。

  • replicas:当前副本存在的broker节点
  • leader:kafka的写和读的操作,都发生在leader上。leader负责把数据同步给follower。当leader挂了,经过主从选举,从多个follower中选举产生一个新的leader。
  • follower:leader处理所有针对这个partition的读写请求,而follower被动复制leader,不提供读写(主要是为了保证多副本数据与消费的一致性),如果leader所在的broker挂掉,那么就会进行新leader的选举。
  • isr:可以同步和已同步的节点会被存入到isr集合中。如果isr中的节点性能较差,会被踢出isr集合。

集群中有多个broker,创建主题时可以指明主题有多个分区(把消息拆分到不同的分区中存储),可以为分区创建多个副本,不同的副本存放在不同的broker里。

集群消费

向集群发送消息:

./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094 --from-beginning --consumer-property group.id=testGroup1 --topic my-replicated-topic

从集群中消费消息:

./kafka-console-producer.sh --broker-list 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094 --topic my- replicated-topic

指定消费组来消费消息:

./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094 --from-beginning --consumer-property group.id=testGroup1 --topic my-replicated-topic

总结

  • 一个partition只能被一个消费组中的一个消费者消费,目的是为了保证消费的顺序性,但是多个partion的多个消费者消费的总的顺序性是得不到保证的。
  • partition的数量决定了消费组中消费者的数量,建议同一个消费组中消费者的数量不要超过partition的数量,否则多的消费者消费不到消息
  • 如果消费者挂了,那么会触发rebalance机制(后面介绍),会让其他消费者来消费该分区。

集群中的controller、rebalance、HW

controller

集群中谁来充当controller:每个broker启动时会向zk创建一个临时序号节点,获得的序号最小的那个broker将会作为集群中的controller。

负责:

  • 当集群中有一个副本的leader挂掉,需要在集群中选举出一个新的leader,选举的规则是从isr集合中最左边获得。
  • 当集群中有broker新增或减少,controller会同步信息给其他broker。
  • 当集群中有分区新增或减少,controller会同步信息给其他broker。

rebalance机制

  • 前提:消费组中的消费者没有指明分区来消费。
  • 触发条件:当消费组中的消费者和分区的关系发生变化的时候。
  • 分区分配的策略:在rebalance之前,分配分区的三种策略
    • range:根据公式计算得到每个消费消费哪几个分区。
    • round-robin:轮询分配。
    • sticky:粘合策略,如果需要rebalance,会在之前已分配的基础上调整,不会改变之前的分配情况。如果这个策略没有开,那么就要进行全部的重新分配。建议开启。

HW和LEO

  • HW(High Watermark)俗称高水位,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个offset之前的消息。
  • LEO (Log End Offset),标识当前日志文件中下一条待写入的消息的offset。

HW是已完成同步的位置。消息在写入broker时,且每个broker完成这条消息的同步后,hw 才会变化。在这之前消费者是消费不到这条消息的。在同步完成之后,HW更新之后,消费者 才能消费到这条消息,这样的目的是防止消息的丢失。

Kafka中的优化问题

如何防止消息丢失

  • 生产者:
    1. 使用同步发送
    2. 把ack设成1或者all,并且设置同步的分区数>=2
  • 消费者:把自动提交改成手动提交

如何防止重复消费

在防止消息丢失的方案中,如果生产者发送完消息后,因为网络抖动,没有收到ack,但实际上broker已经收到了。 此时生产者会进行重试,于是broker就会收到多条相同的消息,而造成消费者的重复消费。

解决:

  • 生产者关闭重试:会造成丢消息(不建议)
  • 消费者解决非幂等性消费问题:
    • 幂等性:多次访问的结果是一样的。对于rest的请求(get(幂等)、post(非幂等)、put(幂等)、delete(幂等))
    • 解决方案:
      • 在数据库中创建联合主键,防止相同的主键 创建出多条记录。
      • 使用分布式锁,以业务id为锁。保证只有一条记录能够创建成功。

如何做到消息的顺序消费

  • 生产者:保证消息按顺序消费,且消息不丢失:使用同步的发送,ack设置成非0的值。
  • 消费者:主题只能设置一个分区,消费组中只能有一个消费者。

Kafka的顺序消费使用场景不多,因为牺牲掉了性能,但是有比如RocketMQ在这一块有专⻔的功能已设计好。

如何解决消息积压问题

消息积压问题的出现

消息的消费者的消费速度远赶不上生产者的生产消息的速度,导致kafka中有大量的数据没有被消费。随着没有被消费的数据堆积越多,消费者寻址的性能会越来越差,最后导致整个kafka对外提供的服务的性能很差,从而造成其他服务也访问速度变慢,造成服务雪崩。

消息积压的解决方案

  • 在这个消费者中,使用多线程,充分利用机器的性能进行消费消息。
  • 通过业务的架构设计,提升业务层面消费的性能。
  • 创建多个消费组,多个消费者,部署到其他机器上,一起消费,提高消费者的消费速度。
  • 创建一个消费者,该消费者在kafka另建一个主题,配上多个分区,多个分区再配上多个消费者。该消费者将poll下来的消息,不进行消费,直接转发到新建的主题上。此时,新的主题的多个分区的多个消费者就开始一起消费了。(不常用)

实现延时队列的效果

应用场景

订单创建后,超过30分钟没有支付,则需要取消订单,这种场景可以通过延时队列来实现。

具体方案

  1. Kafka中创建创建相应的多个topic。
  • topic_5s
  • topic_1m
  • topic_30m
  1. 消息发送者发送消息到相应的topic,并带上消息的发送时间。
  2. 消费者订阅相应的topic,消费时轮询整个topic中的消息。
  • 如果消息的发送时间和消费的当前时间超过阈值:去数据库中修改订单状态为已取消
  • 如果没有超过阈值:记录当前消息的offset,并不再继续消费之后的消息。等待1分钟后,再次向kafka拉取该offset及之后的消息,继续进行判断,以此反复。

Kafka-eagle监控平台

  • 下载安装包:http://download.kafka-eagle.org/
  • 分配一台虚拟机
  • 虚拟机中安装jdk
  • 解压缩kafka-eagle的压缩包
  • 给kafka-eagle配置环境变量
  • 需要修改kafka-eagle内部的配置文件:vim system-config.properties修改里面的zk的地址和MySQL的地址
  • 进入到bin中,通过命令来启动
    ./ke.sh start
    


参考链接: