kafka知识点整理
Kafka 是什么
Apache Kafka是一个开源消息系统,由Scala写成。是由Apache软件基金会开发的一个开源消息系统项目
Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。无论是kafka集群,还是producer和consumer都依赖于zookeeper集群保存一些meta信息,来保证系统可用性
Kafka 核心组件
-
Producer:消息生产者,产生的消息将会被发送到某个topic
-
Consumer:消息消费者,消费的消息内容来自某个topic
-
Topic:消息根据topic进行归类,topic其本质是一个目录,即将同一主题消息归类到同一个目录
-
Broker:每一个kafka实例(或者说每台kafka服务器节点)就是一个broker,一个broker可以有多个topic
Zookeeper:zookeeper集群不属于kafka内的组件,但kafka依赖zookeeper集群保存meta信息,所以在此做声明其重要性。
Kafka 整体架构以及解析
Kafka数据处理步骤
- 1、Producer产生消息,发送到Broker中
- 2、Leader状态的Broker接收消息,写入到相应topic中
- 3、Leader状态的Broker接收完毕以后,传给Follow状态的Broker作为副本备份
- 4、Consumer消费Broker中的消息
Kafka名词解释和工作方式
-
Producer
消息生产者,就是向kafka broker发消息的客户端。
-
Consumer
消息消费者,向kafka broker取消息的客户端
-
Topic
可以理解为一个队列。
-
Consumer Group (CG)
这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic。
-
Broker
一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
-
Partition
为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序。
Consumer与topic关系
kafka只支持Topic
-
每个group中可以有多个consumer,每个consumer属于一个consumer group;通常情况下,一个group中会包含多个consumer,这样不仅可以提高topic中消息的并发消费能力,而且还能提高"故障容错"性,如果group中的某个consumer失效那么其消费的partitions将会有其他consumer自动接管。
-
对于Topic中的一条特定的消息,只会被订阅此Topic的每个group中的其中一个consumer消费,此消息不会发送给一个group的多个consumer;那么一个group中所有的consumer将会交错的消费整个Topic,每个group中consumer消息消费互相独立,我们可以认为一个group是一个"订阅"者。
-
在kafka中,一个partition中的消息只会被group中的一个consumer消费(同一时刻);
一个Topic中的每个partions,只会被一个"订阅者"中的一个consumer消费,不过一个consumer可以同时消费多个partitions中的消息。 -
kafka的设计原理决定,对于一个topic,同一个group中不能有多于partitions个数的consumer同时消费,否则将意味着某些consumer将无法得到消息。
kafka只能保证一个partition中的消息被某个consumer消费时是顺序的;事实上,从Topic角度来说,当有多个partitions时,消息仍不是全局有序的。
Kafka消息的分发
-
Producer客户端负责消息的分发
-
kafka集群中的任何一个broker都可以向producer提供metadata信息,这些metadata中包含"集群中存活的servers列表"、"partitions leader列表"等信息;
-
当producer获取到metadata信息之后, producer将会和Topic下所有partition leader保持socket连接;
-
消息由producer直接通过socket发送到broker,中间不会经过任何"路由层"。事实上,消息被路由到哪个partition上由producer客户端决定,比如可以采用"random""key-hash""轮询"等。
如果一个topic中有多个partitions,那么在producer端实现"消息均衡分发"是必要的。
-
在producer端的配置文件中,开发者可以指定partition路由的方式。
-
Producer消息发送的应答机制
设置发送数据是否需要服务端的反馈,有三个值0,1,-1
0: producer不会等待broker发送ack
1: 当leader接收到消息之后发送ack
-1: 当所有的follower都同步消息成功后发送ack
request.required.acks=0
Consumer的负载均衡
当一个group中,有consumer加入或者离开时,会触发partitions均衡.均衡的最终目的,是提升topic的并发消费能力,步骤如下:
-
1、假如topic1,具有如下partitions: P0,P1,P2,P3
-
2、加入group A 中,有如下consumer: C0,C1
-
3、首先根据partition索引号对partitions排序: P0,P1,P2,P3
-
4、根据consumer.id排序: C0,C1
-
5、计算倍数: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整)
-
6、然后依次分配partitions: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i * M),P((i + 1) * M -1)]
kafka的分区策略有哪些?
分区分配给消费者的分区策略
范围分区:每个消费者会负责一个连续范围内的分区,范围的长度由分区总数除上消息者组中的节点数得到。默认分区策略。计算时,只是针对某个topic进行范围分区。
轮询策略:有非常优秀的负载均衡表现,它总是能分区能最大限度的分配到消费者上。计算时,是针对所有的topic中的分区,进行轮询。
粘性策略:在保证分区均匀的同时,尽量保证与上一次分配的结结果不要有太多的变化。
消费分配给哪个topic分区的策略?
轮询策略:有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一。默认策略。
kafka的压缩策略
Producer 端压缩、Broker 端保持、Consumer 端解压缩。
最佳实践:首先来说压缩。何时启用压缩是比较合适的时机呢?你现在已经知道 Producer 端完成的压缩,那么启用压缩的一个条件就是 Producer 程序运行机器上的 CPU 资源要很充足。如果 Producer 运行机器本身 CPU 已经消耗殆尽了,那么启用消息压缩无疑是雪上加霜,只会适得其反。除了 CPU 资源充足这一条件,如果你的环境中带宽资源有限,那么我也建议你开启压缩。事实上我见过的很多 Kafka 生产环境都遭遇过带宽被打满的情况。这年头,带宽可是比 CPU 和内存还要珍贵的稀缺资源,毕竟万兆网络还不是普通公司的标配,因此千兆网络中 Kafka 集群带宽资源耗尽这件事情就特别容易出现。如果你的客户端机器 CPU 资源有很多富余,我强烈建议你开启 zstd 压缩,这样能极大地节省网络资源消耗。其次说说解压缩。其实也没什么可说的。一旦启用压缩,解压缩是不可避免的事情。这里只想强调一点:我们对不可抗拒的解压缩无能为力,但至少能规避掉那些意料之外的解压缩。就像我前面说的,因为要兼容老版本而引入的解压缩操作就属于这类。有条件的话尽量保证不要出现消息格式转换的情况。
kafka怎样保证消息不丢失?
一句话概括,Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证。这句话里面有两个核心要素,我们一一来看。第一个核心要素是“已提交的消息”。什么是已提交的消息?当 Kafka 的若干个 Broker 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交。此时,这条消息在 Kafka 看来就正式变为“已提交”消息了。那为什么是若干个 Broker 呢?这取决于你对“已提交”的定义。你可以选择只要有一个 Broker 成功保存该消息就算是已提交,也可以是令所有 Broker 都成功保存该消息才算是已提交。不论哪种情况,Kafka 只对已提交的消息做持久化保证这件事情是不变的。第二个核心要素就是“有限度的持久化保证”,也就是说 Kafka 不可能保证在任何情况下都做到不丢失消息。举个极端点的例子,如果地球都不存在了,Kafka 还能保存任何消息吗?显然不能!倘若这种情况下你依然还想要 Kafka 不丢消息,那么只能在别的星球部署 Kafka Broker 服务器了。
最佳实践:
不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法。
设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。
设置 replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。
确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。确保消息消费完成再提交。
Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。
kafka的幂等性和事务控制?
幂等性:在 0.11 之后,指定 Producer 幂等性的方法很简单,仅需要设置一个参数即可,即 props.put(“enable.idempotence”, ture),或 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)。enable.idempotence 被设置成 true 后,Producer 自动升级成幂等性 Producer,其他所有的代码逻辑都不需要改变。Kafka 自动帮你做消息的重复去重。底层具体的原理很简单,就是经典的用空间去换时间的优化思路,即在 Broker 端多保存一些字段。当 Producer 发送了具有相同字段值的消息后,Broker 能够自动知晓这些消息已经重复了,于是可以在后台默默地把它们“丢弃”掉。它只能保证单分区上的幂等性,即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。其次,它只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,你可以理解为 Producer 进程的一次运行。当你重启了 Producer 进程之后,这种幂等性保证就丧失了。
事务控制:事务型 Producer 能够保证将消息原子性地写入到多个分区中。
置事务型 Producer 的方法也很简单,满足两个要求即可:和幂等性 Producer 一样,开启 enable.idempotence = true。设置 Producer 端参数 transactional. id。最好为其设置一个有意义的名字。
代码如下:
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch (KafkaException e)
{
producer.abortTransaction();
}
目前kafka是支持read commited(读已提交) 和read uncommited(读未提交)这两个隔离级别。
kafka的版本
深度解读:Kafka放弃ZooKeeper,消息系统兴起二次革命_架构_Tina_InfoQ精选文章kafka的版本号与版本演进_51CTO博客_kafka 版本
我们现在使用的是1.1.1版本,从0.8版本开始,消息的偏移量offset由原来的保存在zookeeper上改为现在的保存在kafka上。
kafka的版本号与版本演进_51CTO博客_kafka 版本
zookeeper在kafka中提供的最要能力有哪些?
基础能力:信息记录,包括记录producer,broker,topic,consumer的节点信息
核心能力:消息发送至partiton负载均衡,partition分配至consumer的负载均衡,rebalance能力,记录消费的offset,leader故障后重新选举leader的机制,
Kafka 如何保证消息的消费顺序?
kafka只保证同一个partiton内分区的顺序性,所以如果你想要实现消息顺序消费的话,发送消息时请带上key值,使用key的hash值对分区数进行求余(hash(key) % numPartitions)。这里可能会存在一个问题,如果某个key的消息过多,某个分区的消息数会非常多。
Kafka对于消费失败的消息会如何处理?
首先处理消息的是kafka客户端。因为kafka提供的是数据拉取,以及记录最高消费位点的能力。所以如果不想被处理异常的消息阻塞消费速度的话,消息失败的消息也应该要递交offset的(就算这个offset不提交,下一个提交的offset也会覆盖掉它,因为offset服务端只记录最高的offset)。所以 如果想处理失败的消息。需要自己编写失败队列进行处理。像spring的kafkaListener,有提供
ErrorHandler接口作为扩展点去处理“处理失败的消息”。