Kafka 常规知识整理
概述
Apache kafka 是消息中间件的一种,类似的消息系统还有 RabbitMQ、RocketMQ、ActiveMQ 等。
Kafka 最初是由 Linkedin 公司基于 Scala 和 Java 语言开发的分布式消息发布订阅系统,现已捐献给 Apache 软件基金会。kafka 的核心数据结构本质上是一个“按照分布式事务日志架构的大规模发布/订阅消息队列”。总的来讲,kafka 通常具有如下三重角色:
-
消息系统
Kafka 和传统的消息队列类似,支持流量削锋、服务解耦、异步通信等核心功能。
-
流处理平台
Kafka 不仅能够与大多数流式计算框架完美整合,并且自身也提供了一个完整的流式处理库,即 kafka Streaming。kafka Streaming 提供了类似 Flink 中的窗口、聚合、变换、连接等功能。
-
存储系统
通常消息队列会把消息持久化到磁盘,防止消息丢失,保证消息可靠性。Kafka 的消息持久化机制和多副本机制使其能够作为通用数据存储系统来使用。
Kafka 是一个分布式的基于发布/订阅模式的消息队列(Message Queue),在业界主要应用于大数据实时处理领域。
异步通信原理
观察者模式
-
观察者模式(Observer),又称为发布-订阅模式(Publish / Subcribe)。
-
定义对象间一种一对多的依赖关系,使得当一个对象改变状态,则所有依赖于它的对象都会收到通知并自动更新。
-
一个对象的状态发生变更,所有依赖对象(观察者对象)都会得到通知。
生产者消费者模式
-
传统模式
-
生产者直接将消息传递给消费者。
-
耦合性高,当生产者或者消费者发生变化,都需要重写业务逻辑。
-
-
生产者消费者模式
通过一个容器来解决生产者和消费者的强耦合性问题,生产者和消费者彼此之间不直接通讯,而是通过阻塞队列来进行通讯。
-
数据传输流程
-
生产者消费者模式,即 N 个线程进行生产,同时 N 个线程进行消费,两个角色通过内存缓冲区进行通讯。
-
生产者负责向缓冲区写入数据单元。
-
消费者负责从缓冲区读取数据单元。
生产者消费者模式一般遵循先进先出原则。
-
Kafka 体系结构
概念解析
-
Topic:Kafka 将消息分门别类,每一类的消息称之为一个主题(Topic)。
-
Producer:发布消息的对象称之为主题生产者(Kafka topic producer)。
-
Consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)。
-
Broker:已发布的消息保存在一组服务器中,称之为 Kafka 集群。集群中的每一个服务器都是一个代理(Broker)。消费者可以订阅一个或多个主题(topic),并从 Broker 拉数据,从而消费这些已发布的消息。
-
Partition:Topic 的分区,每个 topic 可以有多个分区,其作用是做负载,提高 kafka 的吞吐量,并且同一个 topic 在不同的分区的数据是不重复的。
-
Replication:每一个分区都可以有多个副本,当主分区(Leader)故障的时候会选择一个副本(Follower)上位,成为 Leader。
同一分区的不同副本中保存的消息是相同的。但需要注意的是,在同一时刻,副本之间并非完全一样,因为同步存在延迟。副本之间是一主多从的关系。leader 副本负责处理读写请求, follower 副本只负责与 leader 副本进行消息同步。
在 kafka 中默认副本的最大数量是10个,且副本的数量不能大于 Broker 的数量,follower 和 leader 绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。
-
Message:消息主体。
-
Zookeeper:kafka 集群依赖 zookeeper 来保存集群的的元信息和系统的可用性。
-
Consumer Group:
-
Segment:分段,Partition 物理上由多个 Segment 组成。
-
offset:每一个 Partition 都由一系列有序的、不可变的消息组成,这些消息被连续的追加到 Partition 中。Partition 中的每一个消息都有一个连续的序列号叫做 offset,用于 Partition 中唯一标识的这条消息。
主题和日志
Topic 是发布的消息的类别名,一个 topic 可以有零个,一个或多个消费者订阅该主题的消息。对于每个 topic,Kafka 集群都会维护一个分区 log。
每一个分区都是一个顺序的、不可变的消息队列, 并且可以持续的添加。分区中的消息都被分了一个序列号,称之为偏移量(offset),在每个分区中此偏移量都是唯一的。
Kafka 集群保持所有的消息,直到它们过期(无论消息是否被消费)。实际上消费者所持有的仅有的元数据就是这个 offset(偏移量),也就是说 offset 由消费者来控制:正常情况当消费者消费消息的时候,偏移量也线性的的增加。但是实际偏移量由消费者控制,消费者可以将偏移量重置为更早的位置,重新读取消息。可以看到这种设计对消费者来说操作自如,一个消费者的操作不会影响其它消费者对此 log 的处理。
Geo-Replication
Geo-Replication 是一种异地数据同步技术,Kafka MirrorMaker 为群集提供 geo-replication
支持。借助 MirrorMaker
,消息可以跨多个数据中心或云区域进行复制。 您可以在 active/passive 场景中用于备份和恢复;或者在 active/passive 方案中将数据置于更接近用户的位置,或数据本地化。
Producers
生产者往某个 Topic 上发布消息。生产者也负责选择发布到 Topic 上的哪一个分区。最简单的方式从分区列表中轮流选择。也可以根据某种算法依照权重选择分区。开发者负责如何选择分区的算法。
Consumers
消费者模型通常可以分为如下两种:
-
队列模式
-
发布-订阅模式
文件存储机制
借助一些 kafka 客户端工具,我们可以看到如下信息:
从图中可以看到,同一 Kafka 主题下存在多个 Partition,且每一个 Partition 都是由一系列有序的、不可变的消息组成,这些消息被连续追加到 Partition 中,并且以 Offset 作为每一条消息的唯一序列号。
Topic 中 Partition 存储分布
-
如果 Kafka 集群中只有一个 Broker,则
/message-folder
将作为数据文件存储的根文件夹。若同一 Topic 指定多个 Partition,则对应 Partition 的存储位置将在 Topic 名称后追加有序序号进行区分,例如:/message-folder/test_partition-0。 -
如果 Kafka 集群中存在多个 Broker,
Partition 文件存储方式
如图所示,每一个 Partition 相当于一个巨型文件被平均分配到多个大小相等的段(segment)。
但每个 Segment File 中的消息数量并不一定相等,这样的特性方便old segment file高速被删除。(默认情况下每一个文件大小为1G)
Segment 文件存储方式
一个 Segment 文件由两个部分组成,分别是 .index
和 .log
,例如:
00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
index 和 log 文件以当前 segment 的第一条消息的 offset 命名。其中,index 文件用于存储大量的索引信息,log 文件则存储其对应的消息数据,索引文件中的元数据指向对应数据文件中消息数据的物理偏移地址。
安装与配置
zookeeper 安装
注意:当您看到这篇文章时,也许 Kafka 将不再需要 Zookeeper 的支持,相关消息指出,在即将发布的 Apache Kafka 2.8 版本中,它将不再需要 Zookeeper 的支持。但目前尚不稳定,本文示例使用版本仍需安装 Zookeeper。
您可以选择尝试使用新版 Kafka,或者使用稳定方案——安装 Zookeeper(Zookeeper 安装以及相关知识点,可阅读 Zookeeper 基础教程,本文不再赘述)。
kafka 安装
-
下载安装
你可以前往官网进行下载。
注意:
kafka_x.x.x-src.tgz
是源文件,你应当下载的是kafka_x.x.x.tgz
。tar -zxvf kafka_2.13-3.0.0.tgz cd kafka_2.13-3.0.0/ # 启动kafka ./bin/kafka-server-start.sh ./config/server.properties &
-
创建主题
# 创建一个名为test的topic ./bin/kafka-topics.sh --describe --topic test --bootstrap-server localhost:9092 # 查看已创建的topic信息 ./bin/kafka-topics.sh --describe --topic test --bootstrap-server localhost:9092
-
发送消息
Kafka 提供了一个命令行的工具,可以从输入文件或者命令行中读取消息并发送给 Kafka 集群。
# 向名为test的主题中发送消息 ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
按 Ctrl + C 可退出。
-
消费消息
Kafka 也提供了一个消费消息的命令行工具,可以将存储的信息输出出来。
# 从名为test的主题中消费消息 ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
监控管理界面
Kafka 消可视化监控管理工具的实现较多,例如:
-
Kafka Eagle:http://download.kafka-eagle.org
-
Kafka Manager:https://github.com/yahoo/kafka-manager
-
Kafka Monitor:https://github.com/Morningstar/kafka-offset-monitor
这里以 Kafka Eagle 为例,进行演示安装:
cd /opt/module
cp /mnt/hgfs/share/kafka-eagle-bin-2.0.9.tar.gz ./
tar xf kafka-eagle-bin-2.0.9.tar.gz
cd kafka-eagle-bin-2.0.9/
tar xf efak-web-2.0.9-bin.tar.gz
mv efak-web-2.0.9 kafka-eagle
vim kafka-eagle/conf/system-config.properties # 修改zk节点,以及相关的数据库信息
cd ../bin/
chmod +x ke.sh
./ke.sh start # 启动
启动成功后,访问 http://localhost:8048/ke
默认账号:admin,默认密码:12345,得到如下界面:
注:如您仅仅是需要客户端工具,也可以上文截图使用到的 Offset Explorer。
API
Producer API
使用 Kafka Java 客户端 API,需要引用核心依赖包 org.apache.kafka:kafka-clients:3.0.0
(版本视需要而定)。
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "host01:9092,host02:9092");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024 * 32);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024 * 1024 * 32);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));// 异步发送
// producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))).get();// 同步发送
}
producer.close();
如发送消息需要获取回调,只需要指定回调方法即可:
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e == null) {
Sysout.out.println("send kafka message success -> " + metadata.offset())
} else {
e.printStackTrace();
}
}
});
参数说明:
-
asks
取值范围为:0、1、all。-
acks = 0,表示 Kafka 客户端发出消息后,不论该消息是否被接收处理,都会被认为发送成功。
-
acks = 1,表示 Kafka 客户端发出消息后,只要 Partition Leader 接收并写入本地磁盘,不论是否同步到其他 Follower,都会被认为发送成功,是默认设置。
-
acks = all,表示 Kafka 客户端发出消息后,Partition Leader 接收到消息,并且完成与其他 Follower 同步,才会被认为发送成功。
-
-
retries
表示请求失败后生产者自动重试次数,但如果启用重试,也会有重复消息的可能性。 -
生产者会缓存每个分区未发送的消息,其缓存的大小是通过
batch.size
配置指定的。消息按批发送,缓存批设置较大的话,就需要更多的内存。默认缓冲可立即发送,即便缓冲空间还没有满,但是,如果你想减少请求的数量,可以设置
linger.ms
大于0。这将指示生产者发送请求之前等待一段时间,希望更多的消息填补到未满的批中。 -
buffer.memory
控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过max.block.ms
设定,之后它将抛出 TimeoutException。
幂等和事务
KafkaProducer 支持两种模式:幂等生产者、事务生产者。
-
幂等生产者
在 Kafka 中,Producer 默认不是幂等性的,即 Kafka 向分区发送数据时,可能会出现同一条消息被发送了多次,导致消息重复的情况。为了实现 Producer 的幂等性,Kafka 引入了 Producer ID(即 PID)和 Sequence Number。
-
PID,每个新的 Producer 在初始化的时候会被分配一个唯一的对用户不可见的 PID。
-
而对于每个 PID,该 Producer 发送消息的每个主题和分区,又都对应一个从 0 开始单调递增的 Sequence Number。
要使用幂等生产者,就必须指定
enable.idempotence
为true
,此时retries
将默认为Integer.MAX_VALUE
,acks 将默认为all
,指定幂等后,Producer 自动升级成幂等生产者,Kafka 会对消息自动去重。props.put("enable.idempotence", true); // 或者 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
-
-
事务生产者
事务生产者能够保证 Kafka 将消息原子性地写入到多个分区中,这批消息要么全部写入成功,要么全部失败。并且,Producer 即便出现进程重启,也依然能够保证它们发送消息的精确一次处理。
要使用事务生产者,就必须设置
transactional.id
,此时幂等性会和幂等所依赖的生产者配置一起自动启用,即使用事务则必须开启幂等。Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "host01:9092,host02:9092"); props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id"); Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); producer.initTransactions(); try { producer.beginTransaction(); for (int i = 0; i < 100; i++) { producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i))); } producer.commitTransaction(); } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { producer.close(); } catch (KafkaException e) { producer.abortTransaction(); } producer.close();
事务生产者使用异常来传递错误状态。它不需要为
producer.send()
指定回调,也不需要在返回的Future上调用.get()
:如果任何消息发送或事务性调用在事务过程中遇到不可恢复的错误,则会抛出KafkaException
。
Consumer API
使用 Kafka Java 客户端 API,需要引用核心依赖包 org.apache.kafka:kafka-clients:3.0.0
(版本视需要而定)。
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "host01:9092,host02:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMITCONFIG, "true");// 是否自动提交offset
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");// 每秒提交一次
props.put(ConsumerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ConsumerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMills(100));
records.forEach(record -> {
Sysout.out.printf("topic = %s, partition = %d, offset = %d, key = %s, value =%s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
// kafkaConsumer.commitSync();// 手动同步提交offset
})
}
如果需要进行异步提交 offset,可通过如下方式进行指定:
kafkaConsumer.commitSync(new OffsetCommitCallback() {
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
if (e != null) {
Sysout.out.println("kafka offsets commit failed -> " + offsets)
}
}
});
此外,也可以提交自定义 offset:
kafkaConsumer.subscribe(Arrays.asList("my-topic"), new ConsumerRebalanceListener() {
// 在Rebalanced之前执行
@Override
public void onPartitionRevoked(Collection<TopicPartition> partitions) {
for(TopicPartition topicPartition: partitions) {
int partition = topicPartition.partition();
long offset = kafkaConsumer.position(topicPartition);
commitOffset(group, offset, partition, topic);//
}
}
// 在Rebalanced之后执行
@Override
public void onPartitionAssigned(Collection<TopicPartition> partitions) {
// ...
}
});
Streams API
什么是流计算
使用 Kafka Streams API,需要引用核心依赖包 org.apache.kafka:kafka-streams:3.1.0
(版本视需要而定)。
Connect API
Admin API
Kafka 事务
参考
著作権声明
本記事のリンク:https://www.chinmoku.cc/dev/java/advanced/kafka-tutorial/
本博客中的所有内容,包括但不限于文字、图片、音频、视频、图表和其他可视化材料,均受版权法保护。未经本博客所有者书面授权许可,禁止在任何媒体、网站、社交平台或其他渠道上复制、传播、修改、发布、展示或以任何其他方式使用此博客中的任何内容。