1 Kafka概述
1.1 简介
Kafka(官网地址:Apache Kafka)是一个由Scala和Java语言开发的,经典高吞吐量的分布式消息发布和订阅系统,也是大数据技术领域中用作数据交换的核心组件之一。以高吞吐,低延迟,高伸缩,高可靠性,高并发,且社区活跃度高等特性,从而备受广大技术组织的喜爱
1.2 消息队列
Kafka软件最初的设计就是专门用于数据传输的消息系统,类似功能的软件有RabbitMQ、ActiveMQ、RocketMQ等。这些软件名称中的MQ是英文单词Message Queue的简称,也就是所谓的消息队列的意思。这些软件的核心功能是传输数据,而Java中如果想要实现数据传输功能,那么这个软件一般需要遵循Java消息服务技术规范JMS(Java Message Service)。前面提到的ActiveMQ软件就完全遵循了JMS技术规范,而RabbitMQ是遵循了类似JMS规范并兼容JMS规范的跨平台的AMQP(Advanced Message Queuing Protocol)规范。除了上面描述的JMS,AMQP外,还有一种用于物联网小型设备之间传输消息的MQTT通讯协议。
Kafka拥有作为一个消息系统应该具备的功能,但是却有着独特的设计。可以这样说,Kafka借鉴了JMS规范的思想,但是却并没有完全遵循JMS规范。这也恰恰是软件名称为Kafka,而不是KafkaMQ的原因。
JMS是Java平台的消息中间件通用规范,定义了主要用于消息中间件的标准接口。JMS定义的就是系统和系统之间传输消息的接口。
为了实现系统和系统之间的数据传输,JMS规范中定义很多用于通信的组件:
JMS Provider:JMS消息提供者。其实就是实现JMS接口和规范的消息中间件,也就是我们提供消息服务的软件系统,比如RabbitMQ、ActiveMQ、Kafka。
JMS Message:JMS消息。这里的消息指的就是数据。一般采用Java数据模型进行封装,其中包含消息头,消息属性和消息主体内容。
JMS Producer:JMS消息生产者。所谓的生产者,就是生产数据的客户端应用程序,这些应用通过JMS接口发送JMS消息。
JMS Consumer:JMS消息消费者。所谓的消费者,就是从消息提供者(JMS Provider)中获取数据的客户端应用程序,这些应用通过JMS接口接收JMS消息。
JMS支持两种消息发送和接收模型:一种是P2P(Peer-to-Peer)点对点模型,另外一种是发布/订阅(Publish/Subscribe)模型。
P2P模型:P2P模型是基于队列的,消息生产者将数据发送到消息队列中,消息消费者从消息队列中接收消息。因为队列的存在,消息的异步传输成为可能。P2P模型的规定就是每一个消息数据,只有一个消费者,当发送者发送消息以后,不管接收者有没有运行都不影响消息发布到队列中。接收者在成功接收消息后会向发送者发送接收成功的消息
发布 / 订阅模型:所谓得发布订阅模型就是事先将传输的数据进行分类,我们管这个数据的分类称之为主题(Topic)。也就是说,生产者发送消息时,会根据主题进行发送。
1.3 消息中间件对比
2 Kafka快速入门
2.1 单机环境安装
2.1.1 Docker环境搭建
Docker一键启动:
docker run -d --name kafka \
--network dev \
-v infrastructure_kafka:/bitnami/kafka \
-p 9092:9092 \
-e ALLOW_PLAINTEXT_LISTENER=yes \
-e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.18.100:9092 \
registry.cn-hangzhou.aliyuncs.com/adrainty/kafka:3.6.1
2.1.2 安装包搭建
下载软件安装包:kafka_2.12-3.6.1.tgz,下载地址:https://kafka.apache.org/downloads
这里的3.6.1,是Kafka软件的版本。截至到2023年12月24日,Kafka最新版本为3.6.1。
2.12是对应的Scala开发语言版本。Scala2.12和Java8是兼容的,所以可以直接使用。
当前版本Kafka软件内部依然依赖ZooKeeper进行多节点协调调度,所以启动Kafka软件之前,需要先启动ZooKeeper软件。这里不解释了,可以直接看我的zookeeper文章
进入Kafka解压缩文件夹的config目录,修改server.properties配置文件
# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
# 客户端访问Kafka服务器时,默认连接的服务为本机的端口9092,如果想要改变,可以修改如下配置
# 此处我们不做任何改变,默认即可
#advertised.listeners=PLAINTEXT://your.host.name:9092
# A comma separated list of directories under which to store log files
# 配置Kafka数据的存放位置,如果文件目录不存在,会自动生成。
log.dirs=/var/log/kafka
执行
./kafka-server-start.sh ../config/server.properties
2.2 Kafka快速上手
2.2.1 消息主题
在消息发布/订阅(Publish/Subscribe)模型中,为了可以让消费者对感兴趣的消息进行消费,而不是对所有的数据进行消费,包括那些不感兴趣的消息,所以定义了主题(Topic)的概念,也就是说将不同的消息进行分类,分成不同的主题(Topic),然后消息生产者在生成消息时,就会向指定的主题(Topic)中发送。而消息消费者也可以订阅自己感兴趣的主题(Topic)并从中获取消息。
2.2.1.1 创建主题
进入bin目录,执行:
root@41fd5a83f8d7:/opt/bitnami/kafka/bin$ sh kafka-topics.sh --bootstrap-server localhost:9092 --topic test --create
bootstrap-server:连接kafka的地址,kafka默认端口是9092
topic:要创建的主题名称
create:操作,指的是[create new]
2.2.1.2 查询主题
查询所有的主题:
root@41fd5a83f8d7:/opt/bitnami/kafka/bin$ sh kafka-topics.sh --bootstrap-server localhost:9092 --list
查询某个主题的详细信息:
root@41fd5a83f8d7:/opt/bitnami/kafka/bin$ sh kafka-topics.sh --bootstrap-server localhost:9092 --topic test --describe
Topic: test TopicId: X8PKjcjUTFmOk8ghe7yetQ PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: test Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
2.2.1.3 修改主题
root@41fd5a83f8d7:/opt/bitnami/kafka/bin$ sh kafka-topics.sh --bootstrap-server localhost:9092 --topic test --alter --partitions 2
root@41fd5a83f8d7:/opt/bitnami/kafka/bin$ sh kafka-topics.sh --bootstrap-server localhost:9092 --topic test --describe
Topic: test TopicId: X8PKjcjUTFmOk8ghe7yetQ PartitionCount: 2 ReplicationFactor: 1 Configs:
Topic: test Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: test Partition: 1 Leader: 1001 Replicas: 1001 Isr: 1001
2.2.1.4 删除主题
root@41fd5a83f8d7:/opt/bitnami/kafka/bin$ sh kafka-topics.sh --bootstrap-server localhost:9092 --topic test --delete
windows系统中由于权限或进程锁定的问题,删除topic会导致kafka服务节点异常关闭。
2.2.2 生产数据
2.2.2.1 控制台操作
sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
>hello kafka
生产数据使用kafka-console-producer.sh这个脚本
--topic指定生产到哪个主题
脚本执行后就可以生产数据了(有一个箭头>),每一行代表你的数据,回车之后才能真正将数据发送到Kafka服务器
2.2.2.2 Java API
项目中引入依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.1</version>
</dependency>
代码如下
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.100:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 1. 创建kafka生产者对象
try (KafkaProducer<String, String> producer = new KafkaProducer<>(configs)) {
for (int i = 0; i < 10; i++) {
// 2. 创建数据
// param1 topic 主题名称
// param2 key 数据的key
// param3 value 数据的值
ProducerRecord<String, String> records = new ProducerRecord<>("test", "key" + i, "value" + i);
// 3. 通过生产者对象将数据发送到kafka
producer.send(records);
}
}
}
2.2.3 消费数据
2.2.3.1 控制台操作
root@41fd5a83f8d7:/opt/bitnami/kafka/bin$ sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
hello kafka
消费数据使用kafka-console-consumer.sh这个脚本
--topic指定消费哪个主题的消息
2.2.3.2 Java API
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.100:9092");
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "adrainty");
// 1. 创建kafka消费者对象
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs)) {
// 2. 订阅主题
consumer.subscribe(List.of("test"));
// 3. 从kafka拉取数据
while (true) {
ConsumerRecords<String, String> dataRecords = consumer.poll(Duration.ofMillis(100));
dataRecords.forEach(data -> {
log.info("topic: {}, partition: {}, offset: {}, key: {}, value: {}",
data.topic(), data.partition(), data.offset(), data.key(), data.value());
});
}
}
}
2.2.4 Springboot集成
添加依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.1</version>
</dependency>
application.yml配置文件:
spring:
kafka:
bootstrap-servers: 192.168.18.100:9092
producer:
acks: all
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
retries: 0
consumer:
group-id: test#消费者组
#消费方式: 在有提交记录的时候,earliest与latest是一样的,从提交记录的下一条开始消费
# earliest:无提交记录,从头开始消费
#latest:无提交记录,从最新的消息的下一条开始消费
auto-offset-reset: earliest
enable-auto-commit: true #是否自动提交偏移量offset
auto-commit-interval: 1s #前提是 enable-auto-commit=true。自动提交的频率
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 2
properties:
#如果在这个时间内没有收到心跳,该消费者会被踢出组并触发{组再平衡 rebalance}
session.timeout.ms: 120000
#最大消费时间。此决定了获取消息后提交偏移量的最大时间,超过设定的时间(默认5分钟),服务端也会认为该消费者失效。踢出并再平衡
max.poll.interval.ms: 300000
#配置控制客户端等待请求响应的最长时间。
#如果在超时之前没有收到响应,客户端将在必要时重新发送请求,
#或者如果重试次数用尽,则请求失败。
request.timeout.ms: 60000
#订阅或分配主题时,允许自动创建主题。0.11之前,必须设置false
allow.auto.create.topics: true
#poll方法向协调器发送心跳的频率,为session.timeout.ms的三分之一
heartbeat.interval.ms: 40000
#每个分区里返回的记录最多不超max.partitions.fetch.bytes 指定的字节
#0.10.1版本后 如果 fetch 的第一个非空分区中的第一条消息大于这个限制
#仍然会返回该消息,以确保消费者可以进行
#max.partition.fetch.bytes=1048576 #1M
listener:
#当enable.auto.commit的值设置为false时,该值会生效;为true时不会生效
#manual_immediate:需要手动调用Acknowledgment.acknowledge()后立即提交
#ack-mode: manual_immediate
missing-topics-fatal: true #如果至少有一个topic不存在,true启动失败。false忽略
#type: single #单条消费?批量消费? #批量消费需要配合 consumer.max-poll-records
type: batch
concurrency: 2 #配置多少,就为为每个消费者实例创建多少个线程。多出分区的线程空闲
template:
default-topic: "test"
server:
port: 9999
Springboot中会注入kafkaTemplate,直接注入即可
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
生产者:
public void produce(Object obj) {
try {
String obj2String = JSONUtil.toJsonStr(obj);
kafkaTemplate.send(SpringBootKafkaConfig.TOPIC_TEST, obj2String);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
消费者:
@KafkaListener(topics = SpringBootKafkaConfig.TOPIC_TEST, groupId = SpringBootKafkaConfig.GROUP_ID)
public void topic_test(List<String> messages, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
for (String message : messages) {
final JSONObject entries = JSONUtil.parseObj(message);
System.out.println(SpringBootKafkaConfig.GROUP_ID + " 消费了: Topic:" + topic + ",Message:" + entries.getStr("data"));
//ack.acknowledge();
}
}
2.3 Kafka集群部署
2.3.1 zookeeper集群部署
略
2.3.2 启动kafka
需要修改kafka的配置文件
#broker的全局唯一编号,每个服务节点不能重复,只能是数字。
broker.id=1
#broker对外暴露的IP和端口 (每个节点单独配置)
advertised.listeners=PLAINTEXT://192.168.18.100:9092
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志(数据)存放的路径,路径不需要提前创建,kafka自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/bitnami/kafka
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
# 每个topic创建时的副本数,默认时1个副本
offsets.topic.replication.factor=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#每个segment文件的大小,默认最大1G
log.segment.bytes=1073741824
# 检查过期数据的时间,默认5分钟检查一次是否数据过期
log.retention.check.interval.ms=300000
#配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便管理)
zookeeper.connect=192.168.18.100:2181,192.168.18.101:2181,192.168.18.102:2181/kafka
修改broker.id和advertised.listeners启动就行
停止Kafka集群时,一定要等Kafka所有节点进程全部停止后再停止ZooKeeper集群。因为Zookeeper集群当中记录着Kafka集群相关信息,Zookeeper集群一旦先停止,Kafka集群就没有办法再获取停止进程的信息,只能手动杀死Kafka进程了。
2.3.3 Kafka-Eagle监控
Kafka-Eagle框架可以监控Kafka集群的整体运行情况,在生产环境中经常使用。Kafka-Eagle的安装依赖于MySQL,MySQL主要用来存储可视化展示的数据
Eagle配置文件
######################################
efak.zk.cluster.alias=cluster1
cluster1.zk.list=kafka1:2181,kafka2:2181,kafka3:2181/kafka
....
# 配置mysql连接
efak.driver=com.mysql.jdbc.Driver
efak.url=jdbc:mysql://mysql8:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=123456
启动eagle
bin/ke.sh start
进入页面登陆查看http://hostname:8048/
账号为admin,密码为123456
3 Kafka进阶
3.1 kafka相关概念
3.1.1 Broker
使用Kafka前,我们都会启动Kafka服务进程,这里的Kafka服务进程我们一般会称之为Kafka Broker或Kafka Server。因为Kafka是分布式消息系统,所以在实际的生产环境中,是需要多个服务进程形成集群提供消息服务的。所以每一个服务节点都是一个broker,而且在Kafka集群中,为了区分不同的服务节点,每一个broker都应该有一个不重复的全局ID,称之为broker.id,这个ID可以在kafka软件的配置文件server.properties中进行配置
# The id of the broker. This must be set to a unique integer for each broker
broker.id=0
3.1.2 Controller
Kafka是分布式消息传输系统,所以存在多个Broker服务节点,但是它的软件架构采用的是分布式系统中比较常见的主从(Master - Slave)架构,也就是说需要从多个Broker中找到一个用于管理整个Kafka集群的Master节点,这个节点,我们就称之为Controller。它是Apache Kafka的核心组件非常重要。它的主要作用是在Apache Zookeeper的帮助下管理和协调控制整个Kafka集群。
如果在运行过程中,Controller节点出现了故障,那么Kafka会依托于ZooKeeper软件选举其他的节点作为新的Controller,让Kafka集群实现高可用。
Kafka集群中Controller的基本功能:
Broker管理:监听 /brokers/ids节点相关的变化
Broker数量增加或减少的变化
Broker对应的数据变化
Topic管理
新增:监听 /brokers/topics节点相关的变化
修改:监听 /brokers/topics节点相关的变化
删除:监听 /admin/delete_topics节点相关的变化
Partation管理:
监听 /admin/reassign_partitions节点相关的变化
监听 /isr_change_notification节点相关的变化
监听 /preferred_replica_election节点相关的变化
数据服务
启动分区状态机和副本状态机
3.1.3 Log
Kafka最开始的应用场景就是日志场景或MQ场景,更多的扮演着一个日志传输和存储系统,这是Kafka立家之本。所以Kafka接收到的消息数据最终都是存储在log日志文件中的,底层存储数据的文件的扩展名就是log。
主题创建后,会创建对应的分区数据Log日志。并打开文件连接通道,随时准备写入数据。
3.1.4 Topic
Kafka是分布式消息传输系统,采用的数据传输方式为发布,订阅模式,也就是说由消息的生产者发布消息,消费者订阅消息后获取数据。为了对消费者订阅的消息进行区分,所以对消息在逻辑上进行了分类,这个分类我们称之为主题:Topic。消息的生产者必须将消息数据发送到某一个主题,而消费者必须从某一个主题中获取消息,并且消费者可以同时消费一个或多个主题的数据。Kafka集群中可以存放多个主题的消息数据。
为了防止主题的名称和监控指标的名称产生冲突,官方推荐主题的名称中不要同时包含下划线和点。
3.1.5 Partition
Kafka消息传输采用发布、订阅模式,所以消息生产者必须将数据发送到一个主题,假如发送给这个主题的数据非常多,那么主题所在broker节点的负载和吞吐量就会受到极大的考验,甚至有可能因为热点问题引起broker节点故障,导致服务不可用。一个好的方案就是将一个主题从物理上分成几块,然后将不同的数据块均匀地分配到不同的broker节点上,这样就可以缓解单节点的负载问题。这个主题的分块我们称之为:分区partition。默认情况下,topic主题创建时分区数量为1,也就是一块分区,可以指定参数--partitions改变。Kafka的分区解决了单一主题topic线性扩展的问题,也解决了负载均衡的问题。
topic主题的每个分区都会用一个编号进行标记,一般是从0开始的连续整数数字。Partition分区是物理上的概念,也就意味着会以数据文件的方式真实存在。每个topic包含一个或多个partition,每个partition都是一个有序的队列。partition中每条消息都会分配一个有序的ID,称之为偏移量:Offset
3.1.6 Replication
分布式系统出现错误是比较常见的,只要保证集群内部依然存在可用的服务节点即可,当然效率会有所降低,不过只要能保证系统可用就可以了。咱们Kafka的topic也存在类似的问题,也就是说,如果一个topic划分了多个分区partition,那么这些分区就会均匀地分布在不同的broker节点上,一旦某一个broker节点出现了问题,那么在这个节点上的分区就会出现问题,那么Topic的数据就不完整了。所以一般情况下,为了防止出现数据丢失的情况,我们会给分区数据设定多个备份,这里的备份,我们称之为:副本Replication。
Kafka支持多副本,使得主题topic可以做到更多容错性,牺牲性能与空间去换取更高的可靠性。
假设我们有一份文件,一般情况下,我们对副本的理解应该是有一个正式的完整文件,然后这个文件的备份,我们称之为副本。但是在Kafka中,不是这样的,所有的文件都称之为副本,只不过会选择其中的一个文件作为主文件,称之为:Leader(主导)副本,其他的文件作为备份文件,称之为:Follower(追随)副本。在Kafka中,这里的文件就是分区,每一个分区都可以存在1个或多个副本,只有Leader副本才能进行数据的读写,Follower副本只做备份使用。
3.2 Controller选举
Controller,是Apache Kafka的核心组件。它的主要作用是在Apache Zookeeper的帮助下管理和协调控制整个Kafka集群。
集群中的任意一台Broker都能充当Controller的角色,但是,在整个集群运行过程中,只能有一个Broker成为Controller。也就是说,每个正常运行的Kafka集群,在任何时刻都有且只有一个Controller。
最先在Zookeeper上创建临时节点/controller成功的Broker就是Controller。Controller重度依赖Zookeeper,依赖zookeepr保存元数据,依赖zookeeper进行服务发现。Controller大量使用Watch功能实现对集群的协调管理。如果此时,作为Controller的Broker节点宕掉了。那么zookeeper的临时节点/controller就会因为会话超时而自动删除。而监控这个节点的Broker就会收到通知而向ZooKeeper发出创建/controller节点的申请,一旦创建成功,那么创建成功的Broker节点就成为了新的Controller。
有一种特殊的情况,就是Controller节点并没有宕掉,而是因为网络的抖动,不稳定,导致和ZooKeeper之间的会话超时,那么此时,整个Kafka集群就会认为之前的Controller已经下线(退出)从而选举出新的Controller,而之前的Controller的网络又恢复了,以为自己还是Controller了,继续管理整个集群,那么此时,整个Kafka集群就有两个controller进行管理,那么其他的broker就懵了,不知道听谁的了,这种情况,我们称之为脑裂现象,为了解决这个问题,Kafka通过一个任期(epoch:纪元)的概念来解决,也就是说,每一个Broker当选Controller时,会告诉当前Broker是第几任Controller,一旦重新选举时,这个任期会自动增1,那么不同任期的Controller的epoch值是不同的,那么旧的controller一旦发现集群中有新任controller的时候,那么它就会完成退出操作(清空缓存,中断和broker的连接,并重新加载最新的缓存),让自己重新变成一个普通的Broker。
3.3 Broker的组件
Kafka Broker中有很多的服务对象,用于实现内部管理和外部通信操作。
3.3.1 LogManager
每一个Broker在启动时都会创建数据管理器(LogManager),用于接收到消息后,完成后续的数据创建,查询,清理等处理。
3.3.2 ZKClient
用于和Zookeeper的交互,Kafka底层对其进行了封装
3.3.3 NetworkClient
每一个Broker在启动时会创建Broker之间的通道管理器对象(BrokerToControllerChannelManager),用于管理Broker和Controller之间的通信。
3.3.4 SocketServer
每一个Broker在启动时会创建自己的网络通信对象(SocketServer),用于和其他Broker之间的进行通信,其中包含了Java用于NIO通信的Channel、Selector对象
3.3.5 ReplicaManager
每一个Broker在启动时都会创建副本管理器(ReplicaManager),用于对主题的副本进行处理。
3.4 创建主题
创建主题Topic的方式有很多种:命令行,工具,客户端API,自动创建。在server.properties文件中配置参数auto.create.topics.enable=true(默认是true)时,如果访问的主题不存在,那么Kafka就会自动创建主题
3.4.1 Java API创建
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.18.100");
// 创建Admin对象
try (Admin admin = Admin.create(configs)) {
// 创建Topic
// String name 主题名称, int numPartitions 分区数量, short replicationFactor 主题分区副本因子(数量)
NewTopic topic1 = new NewTopic("test1", 1, (short) 1);
NewTopic topic2 = new NewTopic("test2", 1, (short) 1);
admin.createTopics(Arrays.asList(topic1, topic2));
}
}
3.4.2 主题分区副本分配策略
在创建主题时,如果使用了replica-assignment参数,那么就按照指定的方案来进行分区副本的创建;如果没有指定replica-assignment参数,那么就按照Kafka内部逻辑来分配,内部逻辑按照机架信息分为两种策略:【未指定机架信息】和【指定机架信息】。
未指定机架信息:
分区起始索引设置0
轮询所有分区,计算每一个分区的所有副本位置:
副本起始索引 = (分区编号 + 随机值) % BrokerID列表长度。
其他副本索引 = (第一个副本索引 + (1 +(副本分配间隔 + Max(0, i - 2))% (BrokerID列表长度 - 1))) % BrokerID列表长度
创建主题时,还可以自定义分区副本分配策略
// 指定分区副本策略
Map<Integer, List<Integer>> replicasAssignments = new HashMap<>();
// 0号分区,第一个副本放在id等于3的结点中,第二个副本放在id为1的结点中
replicasAssignments.put(0, Arrays.asList(3, 1));
// 1号分区,第一个副本放在id等于2的结点中,第二个副本放在id为3的结点中
replicasAssignments.put(1, Arrays.asList(2, 3));
// 2号分区,第一个副本放在id等于2的结点中,第二个副本放在id为1的结点中
replicasAssignments.put(2, Arrays.asList(2, 1));
NewTopic topic3 = new NewTopic("test3", replicasAssignments);
3.4.3 创建主题的流程
通过命令行提交指令,指令中会包含操作类型(--create)、topic的名称(--topic)、主题分区数量(--partitions)、主题分区副本数量(--replication-facotr)、副本分配策略(--replica-assignment)等参数。
指令会提交到客户端进行处理,客户端获取指令后,会首先对指令参数进行校验。
操作类型取值:create、list、alter、describe、delete,只能存在一个。
分区数量为大于1的整数。
主题是否已经存在
分区副本数量大于1且小于Short.MaxValue,一般取值小于等于Broker数量。
将参数封装主题对象(NewTopic)。
创建通信对象,设定请求标记(CREATE_TOPICS),查找Controller,通过通信对象向Controller发起创建主题的网络请求。
通过ZK客户端在ZK端创建节点
3.5 生产消息
3.5.1 Java API生产消息
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.18.100:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 1. 创建kafka生产者对象
try (KafkaProducer<String, String> producer = new KafkaProducer<>(configs)) {
for (int i = 0; i < 10; i++) {
// 2. 创建数据
// param1 topic 主题名称
// param2 key 数据的key
// param3 value 数据的值
ProducerRecord<String, String> records = new ProducerRecord<>("test", "key" + i, "value" + i);
// 3. 通过生产者对象将数据发送到kafka
producer.send(records);
}
}
}
3.5.2 生产消息的流程
在kafka中传递的数据我们称之为消息(message)或记录(record),所以Kafka发送数据前,需要将待发送的数据封装为指定的数据模型(ProducerRecord)
如果配置拦截器栈(interceptor.classes),那么将数据进行拦截处理。某一个拦截器出现异常并不会影响后续的拦截器处理。
因为发送的数据为KV数据,所以需要根据配置信息中的序列化对象对数据中Key和Value分别进行序列化处理。
计算数据所发送的分区位置
进行数据校验,例如数据大小是否大于缓存区的大小(默认32M)
将数据追加到数据收集器中。数据收集器(RecordAccumulator):用于收集,转换我们产生的数据,类似于生产者消费者模式下的缓冲区。为了优化数据的传输,Kafka并不是生产一条数据就向Broker发送一条数据,而是通过合并单条消息,进行批量(批次)发送,提高吞吐量,减少带宽消耗。
数据发送器(Sender):线程对象,用于从收集器对象中获取数据,向服务节点发送。类似于生产者消费者模式下的消费者。因为是线程对象,所以启动后会不断轮询获取数据收集器中已经关闭的批次数据。对批次进行整合后再发送到Broker节点中
3.5.3 拦截器栈
如果配置拦截器栈(interceptor.classes),那么将数据进行拦截处理。某一个拦截器出现异常并不会影响后续的拦截器处理。
具体源码在org.apache.kafka.clients.producer.KafkaProducer
中
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
可以看到onSend方法拦截器处理假如出现问题,是不会影响数据的发送的
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
ProducerRecord<K, V> interceptRecord = record;
for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
try {
interceptRecord = interceptor.onSend(interceptRecord);
} catch (Exception e) {
// do not propagate interceptor exception, log and continue calling other interceptors
// be careful not to throw exception from here
if (record != null)
log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);
else
log.warn("Error executing interceptor onSend callback", e);
}
}
return interceptRecord;
}
拦截器的核心是实现org.apache.kafka.clients.producer.ProducerInterceptor
接口,例如:
@Slf4j
public class KafkaProducerInterceptor implements ProducerInterceptor<String, String> {
/**
* 发送数据会调用该接口
*/
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> data) {
log.info("拦截器拦截到数据:{}", data);
return new ProducerRecord<>(data.topic(), data.key(), data.value() + data.value());
}
/**
* 发送数据成功后服务器返回的响应
*/
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (exception != null) {
log.error("发送数据失败:{}", exception.getMessage());
} else {
log.info("发送数据成功:{}", metadata);
}
}
/**
* 生产者对象关闭时的回调
*/
@Override
public void close() {
log.info("生产者关闭");
}
/**
* 创建生产者的时候的配置
*/
@Override
public void configure(Map<String, ?> configs) {
log.info("生产者配置:{}", configs);
}
}
在生产者配置中可以配置:
configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, KafkaProducerInterceptor.class.getName());
3.5.4 分区计算逻辑
生产者对象需要把数据放到指定分区,分区的计算需要使用元数据缓存(MetadataCache),元数据缓存包含了集群的相关信息
源码:org.apache.kafka.clients.MetadataCache
public class MetadataCache {
private final String clusterId;
private final Map<Integer, Node> nodes;
private final Set<String> unauthorizedTopics;
private final Set<String> invalidTopics;
private final Set<String> internalTopics;
private final Node controller;
// 分区的详细信息
private final Map<TopicPartition, PartitionMetadata> metadataByPartition;
private final Map<String, Uuid> topicIds;
......
}
分区具体是怎么计算呢,源码在KafkaProducer
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
if (record.partition() != null)
// 如果指定分区编号,就使用指定的分区
// 这里分区不作校验,如果分区不存在会卡在send方法
return record.partition();
// 如果自定义了分区器,就使用自定义分区器的逻辑
if (partitioner != null) {
int customPartition = partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
if (customPartition < 0) {
throw new IllegalArgumentException("...");
}
return customPartition;
}
if (serializedKey != null && !partitionerIgnoreKeys) {
// 使用MurmurHash算法进行hash处理
return BuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size());
} else {
return RecordMetadata.UNKNOWN_PARTITION;
}
}
在数据收集器(RecordAccmulartor)中,如果配置的是未知分区,就会根据当前主题的负载情况动态计算分区
if (partition == RecordMetadata.UNKNOWN_PARTITION) {
partitionInfo = topicInfo.builtInPartitioner.peekCurrentPartitionInfo(cluster);
effectivePartition = partitionInfo.partition();
} else {
partitionInfo = null;
effectivePartition = partition;
}
自定义分区器可以实现org.apache.kafka.clients.producer.Partitioner
接口
public class KafkaPartition implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
return 0;
}
// 其他实现方法省略
......
}
然后在producer配置
configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaPartition.class.getName());
3.5.5 数据收集器
数据收集器(RecordAccumulator):用于收集,转换我们产生的数据,类似于生产者消费者模式下的缓冲区。为了优化数据的传输,Kafka并不是生产一条数据就向Broker发送一条数据,而是通过合并单条消息,进行批量(批次)发送,提高吞吐量,减少带宽消耗。
默认情况下,一个发送批次的数据容量为16K,这个可以通过参数batch.size进行改善。
批次是和分区进行绑定的。也就是说发往同一个分区的数据会进行合并,形成一个批次。
如果当前批次能容纳数据,那么直接将数据追加到批次中即可,如果不能容纳数据,那么会产生新的批次放入到当前分区的批次队列中,这个队列使用的是Java的双端队列Deque。旧的批次关闭不再接收新的数据,等待发送
3.5.6 数据发送器
数据发送器(Sender):线程对象,用于从收集器对象中获取数据,向服务节点发送。类似于生产者消费者模式下的消费者。因为是线程对象,所以启动后会不断轮询获取数据收集器中已经关闭的批次数据。对批次进行整合后再发送到Broker节点中
因为数据真正发送的地方是Broker节点,不是分区。所以需要将从数据收集器中收集到的批次数据按照可用Broker节点重新组合成List集合。
将组合后的<节点,List<批次>>的数据封装成客户端请求(请求键为:Produce)发送到网络客户端对象的缓冲区,由网络客户端对象通过网络发送给Broker节点。
Broker节点获取客户端请求,并根据请求键进行后续的数据处理:向分区中增加数据。
3.5.7 消息可靠性
Kafka发送数据时,可以同时传递回调对象(Callback)用于对数据的发送结果进行对应处理,具体代码实现采用匿名类或Lambda表达式都可以。
producer.send(records, (metadata, exception) -> {
if (exception == null) {
log.info("发送成功: {}", metadata);
} else {
log.error("发送失败: {}", exception.getMessage());
}
});
Kafka提供了3种应答处理,可以通过配置对象进行配置
configs.put(ProducerConfig.ACKS_CONFIG, "all");
ACK = 0
这种应答方式,数据已经走网络给Kafka发送了,但这其实并不能保证Kafka能正确地接收到数据,在传输过程中如果网络出现了问题,那么数据就丢失了。也就是说这种应答确认的方式,数据的可靠性是无法保证的。不过相反,因为无需等待Kafka服务节点的确认,通信效率倒是比较高的,也就是系统吞吐量会非常高。
ACK = -1或者all(默认)
这种应答方式,数据同时存储到了分区Leader副本和follower副本中,那么数据已经非常安全了,可靠性也是最高的。此时,如果Leader副本出现了故障,那么follower副本能够开始起作用,因为数据已经存储了,所以数据不会丢失。
ACK = 1
这种应答方式,数据已经存储到了分区Leader副本中,那么数据相对来讲就比较安全了,也就是可靠性比较高。之所以说相对来讲比较安全,就是因为现在只有一个节点存储了数据,而数据并没有来得及进行备份到follower副本,那么一旦当前存储数据的broker节点出现了故障,数据也依然会丢失。
3.5.8 消息重试
由于网络或服务节点的故障,Kafka在传输数据时,可能会导致数据丢失,所以我们才会设置ACK应答机制,尽可能提高数据的可靠性。但其实在某些场景中,数据的丢失并不是真正地丢失,而是“虚假丢失”,比如咱们将ACK应答设置为1,也就是说一旦Leader副本将数据写入文件后,Kafka就可以对请求进行响应了。
此时,如果假设由于网络故障的原因,Kafka并没有成功将ACK应答信息发送给Producer,那么此时对于Producer来讲,以为kafka没有收到数据,所以就会一直等待响应,一旦超过某个时间阈值,就会发生超时错误,也就是说在Kafka Producer眼里,数据已经丢了。所以在这种情况下,kafka Producer会尝试对超时的请求数据进行重试(retry)操作。通过重试操作尝试将数据再次发送给Kafka。
重试可能会导致:数据重复和数据乱序
为了解决Kafka传输数据时,所产生的数据重复和乱序问题,Kafka引入了幂等性操作,所谓的幂等性,就是Producer同样的一条数据,无论向Kafka发送多少次,kafka都只会存储一条。注意,这里的同样的一条数据,指的不是内容一致的数据,而是指的不断重试的数据。
默认幂等性是不起作用的,所以如果想要使用幂等性操作,只需要在生产者对象的配置中开启幂等性配置即可
kafka是如何实现数据的幂等性操作呢,我们这里简单说一下流程:
开启幂等性后,为了保证数据不会重复,那么就需要给每一个请求批次的数据增加唯一性标识,kafka中,这个标识采用的是连续的序列号数字sequencenum,但是不同的生产者Producer可能序列号是一样的,所以仅仅靠seqnum还无法唯一标记数据,所以还需要同时对生产者进行区分,所以Kafka采用申请生产者ID(producerid)的方式对生产者进行区分。这样,在发送数据前,我们就需要提前申请producerid以及序列号sequencenum
Broker中会给每一个分区记录生产者的生产状态:采用队列的方式缓存最近的5个批次数据。队列中的数据按照seqnum进行升序排列。这里的数字5是经过压力测试,均衡空间效率和时间效率所得到的值,所以为固定值,无法配置且不能修改。
如果Borker当前新的请求批次数据在缓存的5个旧的批次中存在相同的,如果有相同的,那么说明有重复,当前批次数据不做任何处理。
如果Broker当前的请求批次数据在缓存中没有相同的,那么判断当前新的请求批次的序列号是否为缓存的最后一个批次的序列号加1,如果是,说明是连续的,顺序没乱。那么继续,如果不是,那么说明数据已经乱了,发生异常。
Broker根据异常返回响应,通知Producer进行重试。Producer重试前,需要在缓冲区中将数据重新排序,保证正确的顺序后。再进行重试即可。
如果请求批次不重复,且有序,那么更新缓冲区中的批次数据。将当前的批次放置再队列的结尾,将队列的第一个移除,保证队列中缓冲的数据最多5个。
幂等性的producer仅做到单分区上的幂等性,即单分区消息有序不重复,多分区无法保证幂等性。
只能保持生产者单个会话的幂等性,无法实现跨会话的幂等性,也就是说如果一个producer挂掉再重启,那么重启前和重启后的producer对象会被当成两个独立的生产者,从而获取两个不同的独立的生产者ID,导致broker端无法获取之前的状态信息,所以无法实现跨会话的幂等。要想解决这个问题,可以采用后续的事务功能。
3.5.9 数据事务
对于幂等性的缺陷,kafka可以采用事务的方式解决跨会话的幂等性。基本的原理就是通过事务功能管理生产者ID,保证事务开启后,生产者对象总能获取一致的生产者ID。
为了实现事务,Kafka引入了事务协调器(TransactionCoodinator)负责事务的处理,所有的事务逻辑包括分派PID等都是由TransactionCoodinator负责实施的。TransactionCoodinator 会将事务状态持久化到该主题中。事务基本的实现思路就是通过配置的事务ID,将生产者ID进行绑定,然后存储在Kafka专门管理事务的内部主题 __transaction_state中,而内部主题的操作是由事务协调器(TransactionCoodinator)对象完成的,这个协调器对象有点类似于咱们数据发送时的那个副本Leader。其实这种设计是很巧妙的,因为kafka将事务ID和生产者ID看成了消息数据,然后将数据发送到一个内部主题中。
使用如下:
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.18.100:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 自定义拦截器
configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, KafkaProducerInterceptor.class.getName());
// 自定义分区器
configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaPartition.class.getName());
// 幂等性操作
configs.put(ProducerConfig.ACKS_CONFIG, "-1");
configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
configs.put(ProducerConfig.RETRIES_CONFIG, 5);
configs.put(ProducerConfig.BATCH_SIZE_CONFIG, 5);
configs.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000);
// 事务ID, 事务是基于幂等性的操作
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction-id");
// 创建kafka生产者对象
try (KafkaProducer<String, String> producer = new KafkaProducer<>(configs)) {
// 1. 初始化事务
producer.initTransactions();
try {
// 2. 开启事务
producer.beginTransaction();
for (int i = 0; i < 10; i++) {
// 创建数据
ProducerRecord<String, String> records = new ProducerRecord<>("test", "key" + i, "value" + i);
// 通过生产者对象将数据发送到kafka
producer.send(records, (metadata, exception) -> {
if (exception == null) {
log.info("发送成功: {}", metadata);
} else {
log.error("发送失败: {}", exception.getMessage());
}
});
}
// 3. 提交事务
producer.commitTransaction();
} catch (Exception e) {
// 4. 如果发送失败, 回滚事务
producer.abortTransaction();
}
}
}
事务底层处理如下图
3.6 存储消息
数据已经由生产者Producer发送给Kafka集群,当Kafka接收到数据后,会将数据写入本地文件中。常用的数据存储配置:
3.6.1 存储文件格式
3.6.1.1 数据日志文件
Kafka系统早期设计的目的就是日志数据的采集和传输,所以数据是使用log文件进行保存的。我们所说的数据文件就是以.log作为扩展名的日志文件。文件名长度为20位长度的数字字符串,数字含义为当前日志文件的第一批数据的基础偏移量,也就是文件中保存的第一条数据偏移量。字符串数字位数不够的,前面补0。
文件里面大部分是乱码,具体怎么查看文件内容呢,可以使用kafka的工具
sh kafka-run-class.sh kafka.tools.DumpLogSegments --files={日志文件路径} --print-data-log
内容如下:
Dumping /bitnami/kafka/data/test-0/00000000000000000000.log
Log starting offset: 0
baseOffset: 0 lastOffset: 9 count: 10 baseSequence: 0 lastSequence: 9 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 0 CreateTime: 1719682760544 size: 291 magic: 2 compresscodec: none crc: 941939228 isvalid: true
| offset: 0 CreateTime: 1719682760525 keySize: 4 valueSize: 12 sequence: 0 headerKeys: [] key: key0 payload: value0
| offset: 1 CreateTime: 1719682760542 keySize: 4 valueSize: 12 sequence: 1 headerKeys: [] key: key1 payload: value1
| offset: 2 CreateTime: 1719682760542 keySize: 4 valueSize: 12 sequence: 2 headerKeys: [] key: key2 payload: value2
| offset: 3 CreateTime: 1719682760543 keySize: 4 valueSize: 12 sequence: 3 headerKeys: [] key: key3 payload: value3
| offset: 4 CreateTime: 1719682760543 keySize: 4 valueSize: 12 sequence: 4 headerKeys: [] key: key4 payload: value4
| offset: 5 CreateTime: 1719682760543 keySize: 4 valueSize: 12 sequence: 5 headerKeys: [] key: key5 payload: value5
| offset: 6 CreateTime: 1719682760543 keySize: 4 valueSize: 12 sequence: 6 headerKeys: [] key: key6 payload: value6
| offset: 7 CreateTime: 1719682760544 keySize: 4 valueSize: 12 sequence: 7 headerKeys: [] key: key7 payload: value7
| offset: 8 CreateTime: 1719682760544 keySize: 4 valueSize: 12 sequence: 8 headerKeys: [] key: key8 payload: value8
| offset: 9 CreateTime: 1719682760544 keySize: 4 valueSize: 12 sequence: 9 headerKeys: [] key: key9 payload: value9
3.6.1.2 数据索引文件
Kafka的基础设置中,数据日志文件到达1G才会滚动生产新的文件。那么从1G文件中想要快速获取我们想要的数据,效率还是比较低的。通过前面的介绍,如果我们能知道数据在文件中的位置(position),那么定位数据就会快很多,问题在于我们如何才能在知道这个位置呢。
Kafka在存储数据时,都会保存数据的偏移量信息,而偏移量是从0开始计算的。简单理解就是数据的保存顺序。比如第一条保存的数据,那么偏移量就是0,第二条保存的数据偏移量就是1,但是这个偏移量只是告诉我们数据的保存顺序,却无法定位数据,不过需要注意的是,每条数据的大小是可以确定的。既然可以确定,那么数据存放在文件的位置起始也就是确定了,所以Kafka在保存数据时,其实是可以同时保存位置的,那么我们在访问数据时,只要通过偏移量其实就可以快速定位日志文件的数据了。
索引文件中保存的就是逻辑偏移量和物理偏移量位置的关系。
Kafka底层实现时,采用的是虚拟内存映射技术mmap,将内存和文件进行双向映射,操作内存数据就等同于操作文件,所以效率是非常高的,但是因为是基于内存的操作,所以并不稳定,容易丢数据,因此Kafka的索引文件中的索引信息是不连续的,而且为了效率,kafka默认情况下,4kb的日志数据才会记录一次索引,但是这个是可以进行配置修改的,参数为log.index.interval.bytes,默认值为4096。所以我们有的时候会将kafka的不连续索引数据称之为稀疏索引。
3.6.1.3 数据时间索引文件
某些场景中,我们不想根据顺序(偏移量)获取Kafka的数据,而是想根据时间来获取的数据。这个时候,可没有对应的偏移量来定位数据,那么查找的效率就非常低了,因为kafka还提供了时间索引文件。
这个时间索引文件起始就是将时间戳和偏移量对应起来了,那么此时通过时间戳就可以找到偏移量,再通过偏移量找到定位信息,再通过定位信息就找到数据了
3.6.2 数据存储的组件
KafkaApis : Kafka应用接口组件,当Kafka Producer向Kafka Broker发送数据请求后,Kafka Broker接收请求,会使用Apis组件进行请求类型的判断,然后选择相应的方法进行处理。
ReplicaManager : 副本管理器组件,用于提供主题副本的相关功能,在数据的存储前进行ACK校验和事务检查,并提供数据请求的响应处理
Partition : 分区对象,主要包含分区状态变换的监控,分区上下线的处理等功能,在数据存储是主要用于对分区副本数量的相关校验,并提供追加数据的功能
UnifiedLog : 同一日志管理组件,用于管理数据日志文件的新增,删除等功能,并提供数据日志文件偏移量的相关处理。
LocalLog : 本地日志组件,管理整个分区副本的数据日志文件。假设当前主题分区中有3个日志文件,那么3个文件都会在组件中进行管理和操作。
LogSegment : 文件段组件,对应具体的某一个数据日志文件,假设当前主题分区中有3个日志文件,那么3个文件每一个都会对应一个LogSegment组件,并打开文件的数据管道FileChannel。数据存储时,就是采用组件中的FileChannel实现日志数据的追加
LogConfig: 日志配置对象,常用的数据存储配置
3.6.3 数据同步
Kafka中,分区的某个副本会被指定为 Leader,负责响应客户端的读写请求。分区中的其他副本自动成为 Follower,主动拉取(同步)Leader 副本中的数据,写入自己本地日志,确保所有副本上的数据是一致的。
Kafka的设计目标是:高吞吐、高并发、高性能。为了做到以上三点,它必须设计成分布式的,多台机器可以同时提供读写,并且需要为数据的存储做冗余备份。
我们来看这样的一个场景:一个分区有3个副本,一个Leader和两个Follower。Leader副本作为数据的读写副本,所以生产者的数据都会发送给leader副本,而两个follower副本会周期性地同步leader副本的数据,但是因为网络,资源等因素的制约,同步数据的过程是有一定延迟的,所以3个副本之间的数据可能是不同的。具体如下图所示:
此时,假设leader副本因为意外原因宕掉了,那么Kafka为了提高分区可用性,此时会选择2个follower副本中的一个作为Leader对外提供数据服务。此时我们就会发现,对于消费者而言,之前leader副本能访问的数据是D,但是重新选择leader副本后,能访问的数据就变成了C,这样消费者就会认为数据丢失了,也就是所谓的数据不一致了。
为了提升数据的一致性,Kafka引入了高水位(HW :High Watermark)机制,Kafka在不同的副本之间维护了一个水位线的机制(其实也是一个偏移量的概念),消费者只能读取到水位线以下的的数据。这就是所谓的木桶理论:木桶中容纳水的高度,只能是水桶中最短的那块木板的高度。这里将整个分区看成一个木桶,其中的数据看成水,而每一个副本就是木桶上的一块木板,那么这个分区(木桶)可以被消费者消费的数据(容纳的水)其实就是数据最少的那个副本的最后数据位置(木板高度)。
也就是说,消费者一开始在消费Leader的时候,虽然Leader副本中已经有a、b、c、d 4条数据,但是由于高水位线的限制,所以也只能消费到a、b这两条数据。
HW高水位线会随着follower的数据同步操作,而不断上涨,也就是说,follower同步的数据越多,那么水位线也就越高,那么消费者能访问的数据也就越多。
首先,初始状态下,Leader和Follower都没有数据,所以和偏移量相关的值都是初始值0,而由于Leader需要管理follower,所以也包含着follower的相关偏移量(LEO)数据。
生产者向Leader发送两条数据,Leader收到数据后,会更新自身的偏移量信息。
接下来,Follower开始同步Leader的数据,同步数据时,会将自身的LEO值作为参数传递给Leader。此时,Leader会将数据传递给Follower,且同时Leader会根据所有副本的LEO值更新HW。
HW = Math.max[HW, min(LeaderLEO,F1-LEO,F2-LEO)]=0
由于两个Follower的数据拉取速率不一致,所以Follower-1抓取了2条数据,而Follower-2抓取了1条数据。Follower再收到数据后,会将数据写入文件,并更新自身的偏移量信息。
接下来Leader收到了生产者的数据C,那么此时会根据相同的方式更新自身的偏移量信息
follower接着向Leader发送Fetch请求,同样会将最新的LEO作为参数传递给Leader。Leader收到请求后,会更新自身的偏移量信息。此时,Leader会将数据发送给Follower,同时也会将HW一起发送。Follower收到数据后,会将数据写入文件,并更新自身偏移量信息
Offset: Kafka的每个分区的数据都是有序的,所谓的数据偏移量,指的就是Kafka在保存数据时,用于快速定位数据的标识,类似于Java中数组的索引,从0开始。
LSO: 起始偏移量(Log Start Offset),每个分区副本都有起始偏移量,用于表示副本数据的起始偏移位置,初始值为0。
LEO: 日志末端位移(Log End Offset),表示下一条待写入消息的offset,每个分区副本都会记录自己的LEO。对于Follower副本而言,它能读取到Leader副本 LEO 值以下的所有消息。
HW: 高水位值(High Watermark),定义了消息可见性,标识了一个特定的消息偏移量(offset),消费者只能拉取到这个水位offset之前的消息,同时这个偏移量还可以帮助Kafka完成副本数据同步操作。
3.6.4 日志清理和压缩
Kafka软件的目的本质是用于传输数据,而不是存储数据,但是为了均衡生产数据速率和消费者的消费速率,所以可以将数据保存到日志文件中进行存储。默认的数据日志保存时间为7天,可以通过调整如下参数修改保存时间:
log.retention.hours:小时(默认:7天,最低优先级)
log.retention.minutes,分钟
log.retention.ms,毫秒(最高优先级)
log.retention.check.interval.ms,负责设置检查周期,默认5分钟。
日志一旦超过了设置的时间,Kafka中提供了两种日志清理策略:delete和compact。
log.cleanup.policy = delete
delete: 将过期数据删除
基于时间:默认打开。以segment中所有记录中的最大时间戳作为该文件时间戳。
基于大小:默认关闭。超过设置的所有日志总大小,删除最早的segment。
compact:日志压缩,基本思路就是将相同key的数据,只保留最后一个。因为数据会丢失,所以这种策略只适用保存数据最新状态的特殊场景。
3.7 消费消息
3.7.1 偏移量
偏移量offset是消费者消费数据的一个非常重要的属性。默认情况下,消费者如果不指定消费主题数据的偏移量,那么消费者启动消费时,无论当前主题之前存储了多少历史数据,消费者只能从连接成功后当前主题最新的数据偏移位置读取,而无法读取之前的任何数据,如果想要获取之前的数据,就需要设定配置参数或指定数据偏移量。
在消费者的配置中,我们可以增加偏移量相关参数auto.offset.reset,用于配置从哪里开始获取主题数据
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
earliest:对于同一个消费者组,从头开始消费。就是说如果这个topic有历史消息存在,现在新启动了一个消费者组,且auto.offset.reset=earliest,那将会从头开始消费(未提交偏移量的场合)。
latest:对于同一个消费者组,消费者只能消费到连接topic后,新产生的数据(未提交偏移量的场合)。
none:生产环境不使用
除了从最开始的偏移量或最后的偏移量读取数据以外,Kafka还支持从指定的偏移量的位置开始消费数据。
AtomicBoolean flag = new AtomicBoolean(true);
do {
ConsumerRecords<String, String> dataRecords = consumer.poll(Duration.ofMillis(100));
dataRecords.forEach(data -> log.info("{}", data.value()));
// 获取集群信息
Set<TopicPartition> assignment = consumer.assignment();
if (assignment != null && !assignment.isEmpty()) {
assignment.forEach(topicPartition -> {
if ("test".equals(topicPartition.topic())) {
// 指定分区的偏移量
consumer.seek(topicPartition, 2);
flag.set(false);
}
});
}
} while (flag.get());
由于消费者在消费消息的时候可能会由于各种原因而断开消费,当重新启动消费者时我们需要让它接着上次消费的位置offset继续消费,因此消费者需要实时的记录自己以及消费的位置。
每个consumer会定期将自己消费分区的offset提交给kafka内部topic:__consumer_offsets
,提交过去的时候,key是consumerGroupId+topic+分区号
,value就是当前offset的值,kafka会定期清理topic里的消息,最后就保留最新的那条数据。提交的时间间隔可以通过auto.commit.interval.ms
来控制
也可以手动保存偏移量
配置:
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
使用:
consumer.commitSync(); // 同步提交
consumer.commitAsync(); // 异步提交
3.7.2 消费者事务
无论偏移量使用自动提交还是,手动提交,特殊场景中数据都有可能会出现重复消费。
对于单独的Consumer来讲,事务保证会比较弱,尤其是无法保证提交的信息被精确消费,主要原因就是消费者可以通过偏移量访问信息,而不同的数据文件生命周期不同,同一事务的信息可能会因为重启导致被删除的情况。所以一般情况下,想要完成kafka消费者端的事务处理,需要将数据消费过程和偏移量提交过程进行原子性绑定,也就是说数据处理完了,必须要保证偏移量正确提交,才可以做下一步的操作,如果偏移量提交失败,那么数据就恢复成处理之前的效果。
对于生产者事务而言,消费者消费的数据也会受到限制。默认情况下,消费者只能消费到生产者提交的数据,也就是未提交完成的数据,消费者是看不到的。如果想要消费到未提交的数据,需要更高消费事务隔离级别
// 隔离级别:已提交读,读取已经提交事务成功的数据(默认)
// configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
// 隔离级别:未提交读,读取已经提交事务成功和未提交事务成功的数据
configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted")
3.7.3 消费者组
Kafka的主题如果就一个分区的话,那么在硬件配置相同的情况下,消费者Consumer消费主题数据的方式没有什么太大的差别。消费者可以根据自身的消费能力主动拉取Kafka的数据,但是毕竟自身的消费能力有限,如果主题分区的数据过多,那么消费的时间就会很长。对于kafka来讲,数据就需要长时间的进行存储,那么对Kafka集群资源的压力就非常大。如果希望提高消费者的消费能力,并且减少kafka集群的存储资源压力。所以有必要对消费者进行横向伸缩,从而提高消息消费速率。
不过这么做有一个问题,就是每一个消费者是独立,那么一个消费者就不能消费主题中的全部数据,简单来讲,就是对于某一个消费者个体来讲,主题中的部分数据是没有消费到的,也就会认为数据丢了,这个该如何解决呢?那如果我们将这多个消费者当成一个整体,是不是就可以了呢?这就是所谓的消费者组 Consumer Group。在kafka中,每个消费者都对应一个消费组,消费者可以是一个线程,一个进程,一个服务实例,如果kafka想要消费消息,那么需要指定消费那个topic的消息以及自己的消费组id(groupId)。
同一个消费者组的消费者都订阅同一个主题,所以消费者组中的多个消费者可以共同消费一个主题中的所有数据。
为了避免数据被重复消费,所以主题一个分区的数据只能被组中的一个消费者消费,也就是说不能两个消费者同时消费一个分区的数据。但是反过来,一个消费者是可以消费多个分区数据的。
消费者组中的消费者数量最好不要超出主题分区的数据,就会导致多出的消费者是无法消费数据的,造成了资源的浪费。
消费者想要拉取数据,首先必须要加入到一个组中,成为消费组中的一员,同样道理,如果消费者出现了问题,也应该从消费者组中剥离。而这种加入组和退出组的处理,都应该由专门的管理组件进行处理,这个组件在kafka中,我们称之为消费者组调度器(协调)(Group Coordinator)。
Group Coordinator是Broker上的一个组件,用于管理和调度消费者组的成员、状态、分区分配、偏移量等信息。每个Broker都有一个Group Coordinator对象,负责管理多个消费者组,但每个消费者组只有一个Group Coordinator
消费者中的每个消费者到底消费哪一个主题分区,这个分配策略其实是由消费者的Leader决定的,这个Leader我们称之为群主。群主是多个消费者中,第一个加入组中的消费者,其他消费者我们称之为Follower,称呼上有点类似与分区的Leader和Follower。
当消费者加入群组的时候,会发送一个JoinGroup请求。群主负责给每一个消费者分配分区。每个消费者只知道自己的分配信息,只有群主知道群组内所有消费者的分配信息。
指定分配策略的基本流程:
第一个消费者设定group.id为test,向当前负载最小的节点发送请求查找消费调度器
找到消费调度器后,消费者向调度器节点发出JOIN_GROUP请求,加入消费者组。
当前消费者当选为群主后,根据消费者配置中分配策略设计分区分配方案,并将分配好的方案告知调度器
此时第二个消费者设定group.id为test,申请加入消费者组
加入成功后,kafka将消费者组状态切换到准备rebalance,关闭和消费者的所有链接,等待它们重新加入。客户端重新申请加入,kafka从消费者组中挑选一个作为leader,其它的作为follower。
Leader会按照分配策略对分区进行重分配,并将方案发送给调度器,由调度器通知所有的成员新的分配方案。组成员会按照新的方案重新消费数据
Kafka提供的分区分配策略常用的有4个:
RoundRobinAssignor轮询分配策略):每个消费者组中的消费者都会含有一个自动生产的UUID作为memberid。轮询策略中会将每个消费者按照memberid进行排序,所有member消费的主题分区根据主题名称进行排序。将主题分区轮询分配给对应的订阅用户,注意未订阅当前轮询主题的消费者会跳过。
RangeAssignor(范围分配策略):按照每个topic的partition数计算出每个消费者应该分配的分区数量,然后分配,分配的原则就是一个主题的分区尽可能的平均分,如果不能平均分,那就按顺序向前补齐即可。Range分配策略针对单个Topic的情况下显得比较均衡,但是假如Topic多的话, member排序靠前的可能会比member排序靠后的负载多很多。还有就是如果新增或移除消费者成员,那么会导致每个消费者都需要去建立新的分区节点的连接,更新本地的分区缓存,效率比较低。
StickyAssignor(粘性分区):在第一次分配后,每个组成员都保留分配给自己的分区信息。如果有消费者加入或退出,那么在进行分区再分配时(一般情况下,消费者退出45s后,才会进行再分配,因为需要考虑可能又恢复的情况),尽可能保证消费者原有的分区不变,重新对加入或退出消费者的分区进行分配。
CooperativeStickyAssignor:前面的三种分配策略再进行重分配时使用的是EAGER协议,会让当前的所有消费者放弃当前分区,关闭连接,资源清理,重新加入组和等待分配策略。明显效率是比较低的,所以从Kafka2.4版本开始,在粘性分配策略的基础上,优化了重分配的过程,使用的是COOPERATIVE协议,特点就是在整个再分配的过程中,粘性分区分配策略分配的会更加均匀和高效一些,COOPERATIVE协议将一次全局重平衡,改成每次小规模重平衡,直至最终收敛平衡的过程。
3.7.4 消费数据的流程
服务端获取到用户拉取数据的请求。Kafka消费客户端会向Broker发送拉取数据的请求FetchRequest,服务端Broker获取到请求后根据请求标记FETCH交给应用处理接口KafkaApis进行处理。
通过副本管理器拉取数据。副本管理器需要确定当前拉取数据的分区,然后进行数据的读取操作
判定首选副本。2.4版本前,数据读写的分区都是Leader分区,从2.4版本后,kafka支持Follower副本进行读取。主要原因就是跨机房或者说跨数据中心的场景,为了节约流量资源,可以从当前机房或数据中心的副本中获取数据。这个副本称之未首选副本。
拉取分区数据。Kafka的底层读取数据是采用日志段LogSegment对象进行操作的
零拷贝。为了提高数据读取效率,Kafka的底层采用nio提供的FileChannel零拷贝技术,直接从操作系统内核中进行数据传输,提高数据拉取的效率。
4 Kafka优化
4.1 资源配置
4.1.1 操作系统
Kafka的网络客户端底层使用Java NIO的Selector方式,而Selector在Linux的实现是epoll,在Windows上实现机制为select。因此Kafka部署在Linux会有更高效的I/O性能。
数据在磁盘和网络之间进行传输时候,在Linux上可以享受到零拷贝机制带来的快捷和便利高效,而Windows在一定程度上会使用零拷贝操作。
所以建议Kafka部署在Linux操作系统上。
4.1.2 磁盘选择
Kafka 存储方式为顺序读写,机械硬盘的最大劣势在于随机读写慢。所以使用机械硬盘并不会造成性能低下。所以磁盘选用普通机械硬盘即可,Kafka自身已经有冗余机制,而且通过分区的设计,实现了负载均衡的功能。不做磁盘组raid阵列也是可以的。
4.1.3 网络带宽
服务器台数 = 2 × (生产者峰值生产速率 × 副本数 ÷ 100) + 1
带宽情况最容易成为 kafka 的瓶颈。
如果机房为千兆带宽,我们需要在一小时内处理1TB的数据,需要多少台kafka 服务器?
由于带宽为千兆网,1000Mbps=1Gbps,则每秒钟每个服务器能收到的数据量为 1Gb=1000Mb
假设 Kafka 占用整个服务器网络的70%(其他 30%为别的服务预留),则Kafka可以使用到700Mb 的带宽,但是如果从常规角度考虑,我们不能总让Kafka顶满带宽峰值,所以需要预留出2/3甚至3/4的资源,也就是说,Kafka单台服务器使用带宽实际应为 700Mb/3=240Mb
1 小时需要处理1TB数据,1TB=102410248Mb=8000000Mb,则一秒钟处理数据量为:8000000Mb/3600s=2330Mb 数据。
需要的服务器台数为:2330Mb/240Mb≈10 台。
考虑到消息的副本数如果为 2,则需要20台服务器,副本如果为3,则需要30台服务器。
4.1.4 内存配置
Kafka运行过程中设计到的内存主要为JVM的堆内存和操作系统的页缓存,每个Broker节点的堆内存建议10-15G内存,而数据文件(默认为1G)的25%在内存就可以了。综合上述,Kafka在大数据场景下能够流畅稳定运行至少需要11G,建议安装Kafka的服务器节点的内存至少大于等于16G。
4.1.5 CPU选择
在生产环境中,建议CPU核数最少为16核,建议32核以上,方可保证大数据环境中的Kafka集群正常处理与运行。
4.2 集群容错
4.2.1 副本分配策略
Kafka采用分区机制对数据进行管理和存储,每个Topic可以有多个分区,每个分区可以有多个副本。应根据业务需求合理配置副本,一般建议设置至少2个副本以保证高可用性。
4.2.2 故障转移方案
当Kafka集群中的某个Broker节点发生故障时,其负责的分区副本将会被重新分配到其他存活的Broker节点上,并且会自动选择一个备份分区作为新的主分区来处理消息的读写请求。
4.2.3 数据备份与恢复
Kafka采用基于日志文件的存储方式,每个Broker节点上都有副本数据的本地备份。在数据备份方面,可以通过配置Kafka的数据保留策略和数据分区调整策略来保证数据的持久性和安全性;在数据恢复方面,可以通过查找备份数据并进行相应的分区副本替换来恢复数据。
4.3 参数配置优化
4.4 数据压缩和批量发送
通过压缩和批量发送可以优化Kafka的性能表现。Kafka支持多种数据压缩算法,包括Gzip、Snappy、LZ4和zstd。在不同场景下,需要选择合适的压缩算法,以确保性能最优。zstd有着最高得压缩比,而LZ4算法,在吞吐量上表现得非常高效。对于Kafka而言,在吞吐量上比较:lz4 > snappy>zstd>gzip。而在压缩比上:zstd>lz4>gzip>snappy
Kafka支持两种批处理方式:异步批处理和同步批处理。在不同场景下,需要选择合适的批处理方式,进行性能优化。同时需要合理设置批处理参数,如batch.size、linger.ms等。