Kafka MirrorMaker迁移

一、迁移流程

1.1 准备 kafka 集群

https://blog.csdn.net/qq_39680564/article/details/127048316

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JuvCm2kb-1664157875503)(img/image-20220721195456924.png)]

1.2 接入 MirrorMaker

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ovognxDV-1664157875504)(img/image-20220721195752971.png)]

1.3 配置 MirrorMaker 信息

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-IslQBRhQ-1664157875505)(img/image-20220721195544928.png)]

1.4 启动迁移任务

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-11WE8zJI-1664157875505)(img/image-20220721200134456.png)]

1.5 切换生产者、消费者

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Yaw0f0Kh-1664157875505)(img/image-20220721200241579.png)]

二、迁移原理

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-svww9E0N-1664157875506)(img/p98881.png)]

  • MirrorMaker 创建一个通道将源端与目的端联通

  • MirrorMaker 作为消费者,消费源集群;MirrorMaker 作为生产者,将消息推送到目标集群

  • MirrorMaker 可将 topic 中的历史消息,以及后续产生的消息实时同步到目标集群

  • 该工具在 kafka 安装包的 bin 目录 kafka-mirror-maker.sh

  • 部署 MirrorMaker 的机器需要打通源端与目标端的网络

三、数据迁移

3.1 修改 consumer 配置

vim config/consumer.properties
bootstrap.servers=10.11.113.181:9092,10.11.113.182:9092,10.11.113.183:9092
group.id=test-consumer-group
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
auto.offset.reset=earliest
参数说明
bootstrap.servers自建实例的 broker 接入点列表
group.id迁移数据时用到的消费者组 ID,请勿与实例已存在的消费者命名重复冲突
partition.assignment.strategy分区分配的策略
auto.offset.reset消息读取范围,earliest:从最早消息开始

3.2 修改 producer 配置

vim config/producer.properties
bootstrap.servers=10.11.113.184:9092,10.11.113.185:9092,10.11.113.186:9092
compression.type=none
参数说明
bootstrap.servers目标端实例的 broker 接入点列表。
compression.type数据压缩类型

3.3 启动迁移

bin/kafka-mirror-maker.sh \
  --consumer.config config/consumer.properties \
  --producer.config config/producer.properties \
  --whitelist my-test-topic
whitelist: 为 java 正则表达式,迁移匹配正则名称的 Topic。

3.4 查看进度

bin/kafka-consumer-groups.sh \
  --bootstrap-server 10.11.113.181:9092,10.11.113.182:9092,10.11.113.183:9092 \
  --describe \
  --group test-consumer-group

四、迁移后验证

4.1 查看历史消息

目标端查看 topic 列表

bin/kafka-topics.sh --zookeeper 10.11.113.184:2181 --list

查看 topic 中的历史消息

bin/kafka-console-consumer.sh --bootstrap-server 10.11.113.184:9092 --topic my-test-topic --from-beginning

4.2 查看增量消息

源端向该 topic 实时生产消息

$ bin/kafka-console-producer.sh --bootstrap-server 10.11.113.181:9092 --topic my-test-topic
>1
>2
>3
>4
>5
>6
>7
>8
>9

查看目标端的 topic 是否接收

$ bin/kafka-console-consumer.sh --bootstrap-server 10.11.113.184:9092 --topic my-test-topic --from-beginning
1
2
3
4
5
6
7
8
9