Skip to main content

消息队列

消息队列是一种应用程序间通信技术,有助于构建通用集成机制,以支持云原生、基于微服务、无服务器和混合云架构。

kafka

KafkaMysql相比,都是需要

  • 先下载安装对应的包
  • 接着可以直接命令行调用
  • 也可以通过 python、java 这样的编程语言来调用
  • 都存在地址接口的概念,localhost:9092 指的是本机地址的 9092 端口
  • 如果你想调用 Docker 里的端口,要提前通过--expose 暴露出这个端口
  • 如果你想调用服务器的 kafka 或者 mysql,得填服务器的 ip+端口

Kafka 的应用场景

  • 异步处理

    • 可以将一些比较耗时的操作放在其他系统中,通过消息队列将需要进行处理的消息进行存储,其他系统可以消费消息队列中的数据
    • 比较常见的:发送短信验证码、发送邮件
  • 系统解耦

    • 原先一个微服务是通过接口(HTTP)调用另一个微服务,这时候耦合很严重,只要接口发生变化就会导致系统不可用
    • 使用消息队列可以将系统进行解耦合,现在第一个微服务可以将消息放入到消息队列中,另一个微服务可以从消息队列中把消息取出来进行处理。进行系统解耦
  • 流量削峰

    • 因为消息队列是低延迟、高可靠、高吞吐的,可以应对大量并发
  • 日志处理

    • 可以使用消息队列作为临时存储,或者一种通信管道

Kafka 消息队列的两种模型

  • 生产者、消费者模型
    • 生产者负责将消息生产到 MQ 中
    • 消费者负责从 MQ 中获取消息
    • 生产者和消费者是解耦的,可能是生产者一个程序、消费者是另外一个程序
  • 消息队列的模式
    • 点对点:一个消费者消费一个消息
    • 发布订阅:多个消费者可以消费一个消息

Kafka 中的重要概念

  • broker
    • Kafka 服务器进程,生产者、消费者都要连接 broker
    • 一个集群由多个 broker 组成,功能实现 Kafka 集群的负载均衡、容错
  • producer:生产者,用于生产数据,可将生产后的消息送入指定的 Topic;
  • consumer:消费者,获取数据,可消费指定的 Topic;
  • topic:一组消息数据的标记符,一个 Kafka 集群中,可以包含多个 topic。一个 topic 可以包含多个分区
    • 是一个逻辑结构,生产、消费消息都需要指定 topic
  • partition:分区,Kafka 集群的分布式就是由分区来实现的。一个 topic 中的消息可以分布在 topic 中的不同 partition 中,为了保证 kafka 的吞吐量,一个 Topic 可以设置多个分区。同一分区只能被一个消费者订阅。
  • replica:副本,实现 Kafkaf 集群的容错,实现 partition 的容错。一个 topic 至少应该包含大于 1 个的副本
  • consumer group:消费者组,一个消费者组中的消费者可以共同消费 topic 中的分区数据。每一个消费者组都一个唯一的名字。配置 group.id 一样的消费者是属于同一个组中,同一个 group 可以有多个消费者,一条消息在一个 group 中,只会被一个消费者获取;
  • offset:偏移量。相对消费者、partition 来说,可以通过 offset 来拉取数据

消费者组

  • 一个消费者组中可以包含多个消费者,共同来消费 topic 中的数据
  • 一个 topic 中如果只有一个分区,那么这个分区只能被某个组中的一个消费者消费
  • 有多少个分区,那么就可以被同一个组内的多少个消费者消费

Kafka 集群搭建

  • Kafka 集群是必须要有 ZooKeeper 的

注意:

  • 每一个 Kafka 的节点都需要修改broker. id(每个节点的标识,不能重复)
  • log.dir 数据存储目录需要配置

Kafka 的生产者/消费者/工具

  • 安装 Kafka 集群,可以测试以下
    • 创建一个 topic 主题(消息都是存放在 topic 中,类似 mysql 建表的过程)
    • 基于 kafka 的内置测试生产者脚本来读取标准输入(键盘输入)的数据,并放入到 topic 中
    • 基于 kafka 的内置测试消费者脚本来消费 topic 中的数据
  • 推荐大家开发的使用 Kafka Tool
    • 浏览 Kafka 集群节点、多少个 topic、多少个分区
    • 创建 topic/删除 topic
    • 浏览 ZooKeeper 中的数据

本地安装与启动(基于 Docker)

1.下载 zookeeper 镜像与 kafka 镜像
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
2.本地启动 zookeeper
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
3. 本地启动 kafka
docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=localhost --env KAFKA_ADVERTISED_PORT=9092  wurstmeister/kafka:latest

注意:上述代码,将 kafka 启动在 9092 端口

4. 进入 kafka bash
docker exec -it kafka bash
cd /opt/kafka/bin
5. 创建 Topic,分区为 2,Topic name 为'kafka_demo'
kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 2 --topic kafka_demo
6. 查看当前所有 topic
kafka-topics.sh --zookeeper zookeeper:2181 --list
7. 本地安装 kafka-python
pip install kafka-python
Docker 中 kafka 的 topic 增删改查命令汇总
创建Topic:
分区为2,Topic name为'kafka_demo'

kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 2 --topic kafka_demo

增加分区:
通过--alter修改主题的分区数,增加分区。通过命令行工具操作,主题的分区只能增加,不能减少。否则报错。

kafka-topics.sh --zookeeper zookeeper:2181 --alter --topic myTop1 -- partitions 2
删除:
名为kafka_demo的主题

kafka-topics.sh --delete --zookeeper zookeeper:2181 --topic kafka_demo
kafka-topics.sh --zookeeper zookeeper:2181 --create --topic topic_test_01 --partitions 2 --replication-factor 1

kafka-topics.sh --zookeeper zookeeper:2181 --alter --topic topic_test_01 --config max.message.bytes=1048576

kafka-topics.sh --zookeeper zookeeper:2181 --describe --topic topic_test_01

kafka-topics.sh --zookeeper zookeeper:2181 --alter --topic topic_test_01 --config segment.bytes=10485760

kafka-topics.sh --zookeeper zookeeper:2181 --alter --delete-config max.message.bytes --topic topic_test_01
查看
当前所有topic

kafka-topics.sh --zookeeper zookeeper:2181 --list

查看
当前所有topic详情

kafka-topics.sh --zookeeper zookeeper:2181 --describe

Kafka 幂等性

  • 生产者消息重复问题

    • Kafka 生产者生产消息到 partition,如果直接发送消息,kafka 会将消息保存到分区中,但 Kafka 会返回一个 ack 给生产者,表示当前操作是否成功,是否已经保存了这条消息。如果 ack 响应的过程失败了,此时生产者会重试,继续发送没有发送成功的消息,Kafka 又会保存一条一模一样的消息
  • 在 Kafka 中可以开启幂等性

    • 当 Kafka 的生产者生产消息时,会增加一个 pid(生产者的唯一编号)和 sequence number(针对消息的一个递增序列)
    • 发送消息,会连着 pid 和 sequence number 一块发送
    • kafka 接收到消息,会将消息和 pid、sequence number 一并保存下来
    • 如果 ack 响应失败,生产者重试,再次发送消息时,Kafka 会根据 pid、sequence number 是否需要再保存一条消息
    • 判断条件:生产者发送过来的 sequence number 是否小于等于 partition 中消息对应的 sequence

Kafka 中的分区副本机制

生产者的分区写入策略

  • 轮询(按照消息尽量保证每个分区的负载)策略,消息会均匀地分布到每个 partition
    • 写入消息的时候,key 为 null 的时候,默认使用的是轮询策略
  • 随机策略(不使用)
  • 按 key 写入策略,key.hash() % 分区的数量
  • 自定义分区策略(类似于 MapReduce 指定分区)

乱序问题

  • 在 Kafka 中生产者是有写入策略,如果 topic 有多个分区,就会将数据分散在不同的 partition 中存储
  • 当 partition 数量大于 1 的时候,数据(消息)会打散分布在不同的 partition 中
  • 如果只有一个分区,消息是有序的

消费组 Consumer Group Rebalance 机制

  • 再均衡:在某些情况下,消费者组中的消费者消费的分区会产生变化,会导致消费者分配不均匀(例如:有两个消费者消费 3 个,因为某个 partition 崩溃了,还有一个消费者当前没有分区要削峰),Kafka Consumer Group 就会启用 rebalance 机制,重新平衡这个 Consumer Group 内的消费者消费的分区分配。
  • 触发时机
    • 消费者数量发生变化
      • 某个消费者 crash
      • 新增消费者
    • topic 的数量发生变化
      • 某个 topic 被删除
    • partition 的数量发生变化
      • 删除 partition
      • 新增 partition
  • 不良影响
    • 发生 rebalance,所有的 consumer 将不再工作,共同来参与再均衡,直到每个消费者都已经被成功分配所需要消费的分区为止(rebalance 结束)

消费者的分区分配策略

分区分配策略:保障每个消费者尽量能够均衡地消费分区的数据,不能出现某个消费者消费分区的数量特别多,某个消费者消费的分区特别少

  • Range 分配策略(范围分配策略):Kafka 默认的分配策略
    • n:分区的数量 / 消费者数量
    • m:分区的数量 % 消费者数量
    • 前 m 个消费者消费 n+1 个分区
    • 剩余的消费者消费 n 个分区
  • RoundRobin 分配策略(轮询分配策略)
    • 消费者挨个分配消费的分区
  • Striky 粘性分配策略
    • 在没有发生 rebalance 跟轮询分配策略是一致的
    • 发生了 rebalance,轮询分配策略,重新走一遍轮询分配的过程。而粘性会保证跟上一次的尽量一致,只是将新的需要分配的分区,均匀的分配到现有可用的消费者中即可
    • 减少上下文的切换

副本的 ACK 机制

producer 是不断地往 Kafka 中写入数据,写入数据会有一个返回结果,表示是否写入成功。这里对应有一个 ACKs 的配置。

  • acks = 0:生产者只管写入,不管是否写入成功,可能会数据丢失。性能是最好的
  • acks = 1:生产者会等到 leader 分区写入成功后,返回成功,接着发送下一条
  • acks = -1/all:确保消息写入到 leader 分区、还确保消息写入到对应副本都成功后,接着发送下一条,性能是最差的

根据业务情况来选择 ack 机制,是要求性能最高,一部分数据丢失影响不大,可以选择 0/1。如果要求数据一定不能丢失,就得配置为-1/all。

分区中是有 leader 和 follower 的概念,为了确保消费者消费的数据是一致的,只能从分区 leader 去读写消息,follower 做的事情就是同步数据,Backup。

高级 API(High-Level API)、低级 API(Low-Level API)

  • 高级 API 就是直接让 Kafka 帮助管理、处理分配、数据
    • offset 存储在 ZK 中
    • 由 kafka 的 rebalance 来控制消费者分配的分区
    • 开发起来比较简单,无需开发者关注底层细节
    • 无法做到细粒度的控制
  • 低级 API:由编写的程序自己控制逻辑
    • 自己来管理 Offset,可以将 offset 存储在 ZK、MySQL、Redis、HBase、Flink 的状态存储
    • 指定消费者拉取某个分区的数据
    • 可以做到细粒度的控制
    • 原有的 Kafka 的策略会失效,需要我们自己来实现消费机制

Kafka 原理

leader 和 follower

  • Kafka 中的 leader 和 follower 是相对分区有意义,不是相对 broker
  • Kafka 在创建 topic 的时候,会尽量分配分区的 leader 在不同的 broker 中,其实就是负载均衡
  • leader 职责:读写数据
  • follower 职责:同步数据、参与选举(leader crash 之后,会选举一个 follower 重新成为分区的 leader
  • 注意和 ZooKeeper 区分
    • ZK 的 leader 负责读、写,follower 可以读取
    • Kafka 的 leader 负责读写、follower 不能读写数据(确保每个消费者消费的数据是一致的),Kafka 一个 topic 有多个分区 leader,一样可以实现数据操作的负载均衡

AR\ISR\OSR

  • AR 表示一个 topic 下的所有副本
  • ISR:In Sync Replicas,正在同步的副本(可以理解为当前有几个 follower 是存活的)
  • OSR:Out of Sync Replicas,不再同步的副本
  • AR = ISR + OSR

leader 选举

  • Controller:controller 是 kafka 集群的老大,是针对 Broker 的一个角色

    • Controller 是高可用的,是用过 ZK 来进行选举
  • Leader:是针对 partition 的一个角色

    • Leader 是通过 ISR 来进行快速选举
  • 如果 Kafka 是基于 ZK 来进行选举,ZK 的压力可能会比较大。例如:某个节点崩溃,这个节点上不仅仅只有一个 leader,是有不少的 leader 需要选举。通过 ISR 快速进行选举。

  • leader 的负载均衡

    • 如果某个 broker crash 之后,就可能会导致 partition 的 leader 分布不均匀,就是一个 broker 上存在一个 topic 下不同 partition 的 leader
    • 通过以下指令,可以将 leader 分配到优先的 leader 对应的 broker,确保 leader 是均匀分配的
  bin/kafka-leader-election.sh --bootstrap-server node1.itcast.cn:9092 --topic test --partition=2 --election-type preferred

Kafka 读写流程

  • 写流程
    • 通过 ZooKeeper 找 partition 对应的 leader,leader 是负责写的
    • producer 开始写入数据
    • ISR 里面的 follower 开始同步数据,并返回给 leader ACK
    • 返回给 producer ACK
  • 读流程
    • 通过 ZooKeeper 找 partition 对应的 leader,leader 是负责读的
    • 通过 ZooKeeper 找到消费者对应的 offset
    • 然后开始从 offset 往后顺序拉取数据
    • 提交 offset(自动提交——每隔多少秒提交一次 offset、手动提交——放入到事务中提交)

Kafka 的物理存储

  • Kafka 的数据组织结构
    • topic
    • partition
    • segment
      • .log 数据文件
      • .index(稀疏索引)
      • .timeindex(根据时间做的索引)
  • 深入了解读数据的流程
    • 消费者的 offset 是一个针对 partition 全局 offset
    • 可以根据这个 offset 找到 segment 段
    • 接着需要将全局的 offset 转换成 segment 的局部 offset
    • 根据局部的 offset,就可以从(.index 稀疏索引)找到对应的数据位置
    • 开始顺序读取

消息传递的语义性

Flink 里面有对应的每种不同机制的保证,提供 Exactly-Once 保障(二阶段事务提交方式)

  • At-most once:最多一次(只管把数据消费到,不管有没有成功,可能会有数据丢失)
  • At-least once:最少一次(有可能会出现重复消费)
  • Exactly-Once:仅有一次(事务性性的保障,保证消息有且仅被处理一次)

Kafka 的消息不丢失

  • broker 消息不丢失:因为有副本 relicas 的存在,会不断地从 leader 中同步副本,所以,一个 broker crash,不会导致数据丢失,除非是只有一个副本。
  • 生产者消息不丢失:ACK 机制(配置成 ALL/-1)、配置 0 或者 1 有可能会存在丢失
  • 消费者消费不丢失:重点控制 offset
    • At-least once:一种数据可能会重复消费
    • Exactly-Once:仅被一次消费

数据积压

  • 数据积压指的是消费者因为有一些外部的 IO、一些比较耗时的操作(Full GC——Stop the world),就会造成消息在 partition 中一直存在得不到消费,就会产生数据积压
  • 在企业中,我们要有监控系统,如果出现这种情况,需要尽快处理。虽然后续的 Spark Streaming/Flink 可以实现背压机制,但是数据累积太多一定对实时系统它的实时性是有说影响的

数据清理&配额限速

  • 数据清理
    • Log Deletion(日志删除):如果消息达到一定的条件(时间、日志大小、offset 大小),Kafka 就会自动将日志设置为待删除(segment 端的后缀名会以 .delete 结尾),日志管理程序会定期清理这些日志
      • 默认是 7 天过期
    • Log Compaction(日志合并)
      • 如果在一些 key-value 数据中,一个 key 可以对应多个不同版本的 value
      • 经过日志合并,就会只保留最新的一个版本
  • 配额限速
    • 可以限制 Producer、Consumer 的速率
    • 防止 Kafka 的速度过快,占用整个服务器(broker)的所有 IO 资源