【Kafka】在Kafka和ZK“分手”之前,来看看Kafka在ZK中到底保存了什么

在《深入理解Kafka:核心设计与实践原理》一书中,介绍了Kafka的整体架构:

Kafka将broker、topic、config等元数据信息存放在zookeeper中(但是随着Kafka的更新,可以看到Kafka想慢慢地“脱离”Zookeeper),详细的结构如下:(基于Kafka2.1.1)

在Kafka源码的core模块下,kafka.zk.ZkData中定义了Kafka在zk中元数据存储的目录及内容:

下面将一一解释。

admin

        存放两个Kafka的管理信息:

  • delete_topics:在低版本的kafka中,用于保存已经标记为删除的topic名称(只有名称,没有其他数据)
  • reassign_partitions:管理工具发出重新分配Partition请求后,会将相应信息写到/admin/reassign_partitions上

        在Kafka1.0之前的版本中,delete.topic.enable属性值默认为false,因此若想删除topic,需要在server.properties配置文件中显式增加delete.topic.enable=true这一项配置。然而,在Kafka1.0中,该配置项默认就是true。另外,在Kafka1.0之前的版本中,如果删除了topic,那么被删除的topic名字会保存到ZooKeeper的/kafka名称/admin/delete_topics节点中。虽然topic被删除了,但与topic相关的消息数据依然还会保留,需要用户手动到相关的数据目录下自行删除,然后这一切在Kafka1.0中都发生了变化。在Kafka1.0中,当topic被删除后,与topic相关的数据也会一并删除,并且不可逆。开发者估计也是考虑到了删除topic没必要这么麻烦,要么不允许删除,要么就一步到位。


[root@XXGL-T-TJSYZ-app-025 bin]# 
[root@XXGL-T-TJSYZ-app-025 bin]# ./kafka-topics.sh --zookeeper localhost:2181/kafka211 --delete --topic testkafak1
Topic testkafak1 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
[root@XXGL-T-TJSYZ-app-025 bin]#

[zk: localhost:2181(CONNECTED) 6] get /kafka211/admin/delete_topics
null
[zk: localhost:2181(CONNECTED) 7] 

可以看到,高版本的kafka,对已经删除的topic,不会再保存到delete_topics目录下。

brokers

/kafka211/brokers目录下存放Kafka broker相关的信息:ids、seqid、topics

(kafka211为演示使用的Kafka集群名,也是该集群在zk中的根目录)

/kafka211/brokers/ids

该目录下存放的是Kafka集群所有的broker id


[zk: localhost:2181(CONNECTED) 0] 
[zk: localhost:2181(CONNECTED) 0] ls /kafka211/brokers
[ids, seqid, topics]
[zk: localhost:2181(CONNECTED) 1] 
[zk: localhost:2181(CONNECTED) 1] ls /kafka211/brokers/ids
[1, 2, 3]
[zk: localhost:2181(CONNECTED) 2] get /kafka211/brokers/ids/1
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://172.24.29.163:9092"],"jmx_port":-1,"host":"172.24.29.163","timestamp":"1605343832663","port":9092,"version":4}
[zk: localhost:2181(CONNECTED) 3] 
  • listener_security_protocol_map:集群使用的安全协议
  • jmx_port:JMX端口号
  • host:所在主机名或IP地址
  • timestamp:启动时的时间戳
  • port:开放的TCP端口号
  • version:版本号

/kafka211/brokers/seqid

       该目录的作用是帮助kafka自动生成broker.id的。自动生成broker.id的原理是先往/brokers/seqid节点中写入一个空字符串,然后获取返回的Stat信息中的version的值,然后将version的值和reserved.broker.max.id参数配置的值相加可得。之所以是先往节点中写入数据再获取Stat信息,这样可以确保返回的version值大于0,进而就可以确保生成的broker.id值大于reserved.broker.max.id参数配置的值,符合非自动生成的broker.id的值在[0, reserved.broker.max.id]区间的设定。

/kafka211/brokers/topics

该目录下主要存放的是Kafka集群内所有topic的分区信息、ISR信息等。


[zk: localhost:2181(CONNECTED) 5] 
[zk: localhost:2181(CONNECTED) 5] ls /kafka211/brokers/topics
[__consumer_offsets, _schemas, a, bigkey, canal-1, datasize, example, kafka-test, oa-canal, projects.oa.topics.analyse, slowlog, subject-schema-topic, td_oa.flow_run, td_oa.flow_run_prcs, test, test-delete-records, test-kafka, test-producer_3_1, test-schema-topic, test-t1-template, test-t2-template, test-xcloud, test_producer_12, test_producer_12_1, test_producer_15_1, test_producer_3_1, test_producer_6, test_producer_6_1, test_producer_9_1, test_producer_9_2, test_producer_9_3, testkafak1, testkafka, testkafka1, topic1, topic2, tsl-topic, tsl-topic5, x, xadd.redis-tool.bigkey, xadd.redis-tool.datasize, xadd.redis-tool.slowlog, xcloud-starter-kafka-sample, xcloud-topic1, y]
[zk: localhost:2181(CONNECTED) 6] 
[zk: localhost:2181(CONNECTED) 6] ls /kafka211/brokers/topics/kafka-test
[partitions]
[zk: localhost:2181(CONNECTED) 7] ls /kafka211/brokers/topics/kafka-test/partitions
[0, 1, 10, 11, 12, 13, 14, 15, 2, 3, 4, 5, 6, 7, 8, 9]
[zk: localhost:2181(CONNECTED) 8] ls /kafka211/brokers/topics/kafka-test/partitions/0
[state]
[zk: localhost:2181(CONNECTED) 9] get /kafka211/brokers/topics/kafka-test/partitions/0/state
{"controller_epoch":25,"leader":1,"version":1,"leader_epoch":1,"isr":[1]}
[zk: localhost:2181(CONNECTED) 10] 
[zk: localhost:2181(CONNECTED) 10] get /kafka211/brokers/topics/kafka-test
{"version":1,"partitions":{"12":[1],"8":[3],"4":[2],"15":[1],"11":[3],"9":[1],"13":[2],"5":[3],"10":[2],"6":[1],"1":[2],"14":[3],"0":[1],"2":[3],"7":[2],"3":[1]}}
[zk: localhost:2181(CONNECTED) 11] 
[zk: localhost:2181(CONNECTED) 11] 

可以直接使用命令:get /kafka211/brokers/topics/kafka-test查看所有partition的对应的ISR列表,也可以分别查看每个topic的partition对应的state信息。

  • controller_epoch:controller的纪元,即集群重新选举controller的次数
  • leader:当前partition leader的broker id
  • leader_epoch:partition leader的纪元,即当前partition重新选举leader的次数
  • isr:该partition对应的ISR中各个broker id的列表

config

/kafka211/config目录下存放的是Kafka集群broker、client、topic、user发生变更的配置信息,如果在Kafka集群启动后未对这些配置进行过修改,则不会进行记录

[zk: localhost:2181(CONNECTED) 1] 
[zk: localhost:2181(CONNECTED) 1] ls /kafka211/config
[brokers, changes, clients, topics, users]
[zk: localhost:2181(CONNECTED) 2] ls /kafka211/config/brokers
[]
[zk: localhost:2181(CONNECTED) 3] ls /kafka211/config/clients
[]
[zk: localhost:2181(CONNECTED) 4] ls /kafka211/config/topics
[__consumer_offsets, _schemas, a, bigkey, canal-1, datasize, example, kafka-test, oa-canal, projects.oa.topics.analyse, slowlog, subject-schema-topic, td_oa.flow_run, td_oa.flow_run_prcs, test, test-delete-records, test-kafka, test-producer_3_1, test-schema-topic, test-t1-template, test-t2-template, test-xcloud, test_producer_12, test_producer_12_1, test_producer_15_1, test_producer_3_1, test_producer_6, test_producer_6_1, test_producer_9_1, test_producer_9_2, test_producer_9_3, testkafak1, testkafka, testkafka1, topic1, topic2, tsl-topic, tsl-topic5, x, xadd.redis-tool.bigkey, xadd.redis-tool.datasize, xadd.redis-tool.slowlog, xcloud-starter-kafka-sample, xcloud-topic1, y]
[zk: localhost:2181(CONNECTED) 5] 
[zk: localhost:2181(CONNECTED) 5] get /kafka211/config/topics/kafka-test
{"version":1,"config":{}}
[zk: localhost:2181(CONNECTED) 6] 
[zk: localhost:2181(CONNECTED) 6] ls /kafka211/config/users
[]
[zk: localhost:2181(CONNECTED) 7] 

可以看到该集群并未对配置进行过修改,所以都是空的,下面找了一个修改过user配置的集群进行演示

[zk: 172.24.29.213:12181(CONNECTED) 16] 
[zk: 172.24.29.213:12181(CONNECTED) 16] ls /kafka_scram/config
[brokers, changes, clients, topics, users]
[zk: 172.24.29.213:12181(CONNECTED) 17] ls /kafka_scram/config/users
[admin, producer, xdf-user]
[zk: 172.24.29.213:12181(CONNECTED) 18] get /kafka_scram/config/users/xdf-user
{"version":1,"config":{"SCRAM-SHA-512":"salt=ejhvZG0zNWFwZ3VwOGJxZXNncHJyZWhlag==,stored_key=qNDg1zDc8VV6g8LRR2HczopgmA42ToAvH5bT+E6BcAPlxD0ClUD+/FIPizNzVbYHBDC+hpH8CkoUn2q2iGMK6w==,server_key=WRQD8hFgFFkvIwDyIPNBxLgn60cs52i9pfr2CFQi5iqC36g/9EhbR1SSMKdk/gksg1sn45IaHv5x8RfGCoNEHA==,iterations=4096","SCRAM-SHA-256":"salt=ampsMHFrM3MxMThmcnJrYXhsY3EyZzN1ZQ==,stored_key=RQgbxhBoavgKmpRTSZj+8hErfMVg/Edw2lyUCpbTnf4=,server_key=NZ4KCQtkbIfAGC2KNsJ8oxr81pGF9AIaxtk2E/whOmM=,iterations=8192"}}
[zk: 172.24.29.213:12181(CONNECTED) 19] 
[zk: 172.24.29.213:12181(CONNECTED) 19] ls /kafka_scram/config/changes
[config_change_0000000007]
[zk: 172.24.29.213:12181(CONNECTED) 20] get /kafka_scram/config/changes/config_change_0000000007
{"version":2,"entity_path":"users/xdf-user"}
[zk: 172.24.29.213:12181(CONNECTED) 21] 
[zk: 172.24.29.213:12181(CONNECTED) 21] 

上面演示的是对Kafka用户进行了添加操作后的记录,可以看到,对配置进行操作之后,会在/kafka名称/config/changes目录下生成一条记录,保存了本次修改结果在zk中保存的记录路径

cluster

在/kafka211/cluster/id目录下保存的是该集群的唯一标识。


[zk: localhost:2181(CONNECTED) 0] 
[zk: localhost:2181(CONNECTED) 0] ls /kafka211/cluster
[id]
[zk: localhost:2181(CONNECTED) 1] get /kafka211/cluster/id
{"version":"1","id":"flT3lyIBRcSZ4B9Z-2a7PQ"}
[zk: localhost:2181(CONNECTED) 2] 

consumers

        高版本的Kafka已经不再将消费者的信息存储在/kafka名称/consumers目录下,而是保存到了Kafka内部名为__consumer_offsets的topic下,__consumer_offsets是Kafka自行创建的,和普通的topic相同。它存在的目的之一就是保存consumer提交的偏移量,该topic的key由以下字段组成:version、group、topic、partition,value由以下字段组成:version、offset、metadata、commit_timestamp、expire_timestamp。

controller

/kafka211/controller目录下保存的是Kafka集群当前controller的broker id,以及最后一次更换controller的时间戳。

[zk: localhost:2181(CONNECTED) 0] 
[zk: localhost:2181(CONNECTED) 0] get /kafka211/controller
{"version":1,"brokerid":3,"timestamp":"1605261910053"}
[zk: localhost:2181(CONNECTED) 1] 
[zk: localhost:2181(CONNECTED) 1] 
  • brokerid:现在集群中controller节点的broker id。
  • timestamp:最近一次controller变化的时间戳。

controller_epoch

/kafka211/controller_epoch目录下保存的是当前Kafka集群的controller纪元。


[zk: localhost:2181(CONNECTED) 1] 
[zk: localhost:2181(CONNECTED) 1] get /kafka211/controller_epoch
25
[zk: localhost:2181(CONNECTED) 2] 
[zk: localhost:2181(CONNECTED) 2] 

        此值为一个数字,kafka集群中第一个broker第一次启动时为0,以后只要集群中controller变更或挂掉,就会重新选举新的controller,每次controller变更controller_epoch值就会+1。

isr_change_notification

        各个partition的ISR集合并不是一成不变的。当ISR发生变化(比如有replica超时)时,controller会将发生变化的那个partition存入/kafka名称/isr_change_notification/[isr_change_x]

latest_producer_id_block

/kafka211/latest_producer_id_block存放的数据用于保证producer的幂等性,集群中所有broker启动时都会启动一个叫TransactionCoordinator的组件,该组件能够执行预分配producer id块和分配producer id的工作,而所有broker都使用/latest_producer_id_block节点来保存producer id块。想进一步了解Kafka producer幂等实现的,可以参考:http://matt33.com/2018/10/24/kafka-idempotent/


[zk: localhost:2181(CONNECTED) 0] 
[zk: localhost:2181(CONNECTED) 0] get /kafka211/latest_producer_id_block
{"version":1,"broker":1,"block_start":"45000","block_end":"45999"}
[zk: localhost:2181(CONNECTED) 1] 
[zk: localhost:2181(CONNECTED) 1] 

log_dir_event_notification

todo

kafka-acl

todo

kafka-acl-changes

todo