Kafka MirrorMaker迁移
一、迁移流程
1.1 准备 kafka 集群
https://blog.csdn.net/qq_39680564/article/details/127048316
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JuvCm2kb-1664157875503)(img/image-20220721195456924.png)]](https://images2.imgbox.com/9f/b7/g8w8dvR5_o.png)
1.2 接入 MirrorMaker
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ovognxDV-1664157875504)(img/image-20220721195752971.png)]](https://images2.imgbox.com/67/a7/5axzDf38_o.png)
1.3 配置 MirrorMaker 信息
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-IslQBRhQ-1664157875505)(img/image-20220721195544928.png)]](https://images2.imgbox.com/9a/4e/9uun1M3P_o.png)
1.4 启动迁移任务
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-11WE8zJI-1664157875505)(img/image-20220721200134456.png)]](https://images2.imgbox.com/05/5a/eZXxFbRH_o.png)
1.5 切换生产者、消费者
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Yaw0f0Kh-1664157875505)(img/image-20220721200241579.png)]](https://images2.imgbox.com/9e/81/qkbatu0L_o.png)
二、迁移原理
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-svww9E0N-1664157875506)(img/p98881.png)]](https://images2.imgbox.com/23/19/AgZ46GBm_o.png)
-
MirrorMaker 创建一个通道将源端与目的端联通
-
MirrorMaker 作为消费者,消费源集群;MirrorMaker 作为生产者,将消息推送到目标集群
-
MirrorMaker 可将 topic 中的历史消息,以及后续产生的消息实时同步到目标集群
-
该工具在 kafka 安装包的 bin 目录
kafka-mirror-maker.sh -
部署 MirrorMaker 的机器需要打通源端与目标端的网络
三、数据迁移
3.1 修改 consumer 配置
vim config/consumer.properties
bootstrap.servers=10.11.113.181:9092,10.11.113.182:9092,10.11.113.183:9092
group.id=test-consumer-group
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
auto.offset.reset=earliest
| 参数 | 说明 |
|---|---|
| bootstrap.servers | 自建实例的 broker 接入点列表 |
| group.id | 迁移数据时用到的消费者组 ID,请勿与实例已存在的消费者命名重复冲突 |
| partition.assignment.strategy | 分区分配的策略 |
| auto.offset.reset | 消息读取范围,earliest:从最早消息开始 |
3.2 修改 producer 配置
vim config/producer.properties
bootstrap.servers=10.11.113.184:9092,10.11.113.185:9092,10.11.113.186:9092
compression.type=none
| 参数 | 说明 |
|---|---|
| bootstrap.servers | 目标端实例的 broker 接入点列表。 |
| compression.type | 数据压缩类型 |
3.3 启动迁移
bin/kafka-mirror-maker.sh \
--consumer.config config/consumer.properties \
--producer.config config/producer.properties \
--whitelist my-test-topic
whitelist: 为 java 正则表达式,迁移匹配正则名称的 Topic。
3.4 查看进度
bin/kafka-consumer-groups.sh \
--bootstrap-server 10.11.113.181:9092,10.11.113.182:9092,10.11.113.183:9092 \
--describe \
--group test-consumer-group
四、迁移后验证
4.1 查看历史消息
目标端查看 topic 列表
bin/kafka-topics.sh --zookeeper 10.11.113.184:2181 --list
查看 topic 中的历史消息
bin/kafka-console-consumer.sh --bootstrap-server 10.11.113.184:9092 --topic my-test-topic --from-beginning
4.2 查看增量消息
源端向该 topic 实时生产消息
$ bin/kafka-console-producer.sh --bootstrap-server 10.11.113.181:9092 --topic my-test-topic
>1
>2
>3
>4
>5
>6
>7
>8
>9
查看目标端的 topic 是否接收
$ bin/kafka-console-consumer.sh --bootstrap-server 10.11.113.184:9092 --topic my-test-topic --from-beginning
1
2
3
4
5
6
7
8
9