搭建高可用kafka集群
搭建高可用kafka集群
一、环境准备
- 搭建集群至少需要三台机器,奇数节点
- Kafka的安装需要Java环境,JDK1.8以上
- 本次离线安装包包括 jdk1.8.0_161.tar.gz 、kafka_2.12-3.4.0.tgz
- 假设3台服务器分别为:ip1、ip2、ip3
二、安装jdk环境
- 解压jdk安装文件:
tar -zxvf /app/jdk1.8.0_161.tar.gz
- 设置环境变量:
export JAVA_HOME=/app/jdk1.8.0_161 export PATH=$JAVA_HOME/bin:$PATH
- 添加环境变量至/etc/profile文件:
export JAVA_HOME=/app/jdk1.8.0_161 PATH=$JAVA_HOME/bin:$PATH
- 查看Java版本
java -version
三、zookeeper集群搭建
- 编辑Zookeeper配置文件
#解压 tar -zxvf /app/kafka_2.12-3.4.0.tgz #进入目录 cd /app/kafka_2.12-3.4.0 #创建zookeeper数据目录 mkdir zk_kfk_data #更换名称 mv kafka_2.12-3.4.0 kafka #进入kafka目录 cd kafka #更改zookeeper配置,添加以下配置 vim config/zookeeper.properties
zookeeper配置文件如下(三台机器上的zookeeper.properties文件配置相同):
#Zookeeper数据文件的目录 dataDir=/app/kafka/zk_kfk_data #Zookeeper保存日志文件的目录 dataLogDir=/app/kafka/logs # the port at which the clients will connect clientPort=2181 # disable the per-ip limit on the number of connections since this is a non-production config maxClientCnxns=0 tickTime=2000 initLimit=10 syncLimit=5 # Disable the adminserver by default to avoid port conflicts. # Set the port to something non-conflicting if choosing to enable this admin.enableServer=false # admin.serverPort=8080 server.1=ip1:2888:3888 server.2=ip2:2888:3888 server.3=ip3:2888:3888
zookeeper配置文件描述
三台机器上的zookeeper.properties文件配置相同,data.Dir 为zk的数据目录,server.1、server.2、server.3 为集群信息。 2888端口号是zookeeper服务之间通信的端口 3888端口是zookeeper与其他应用程序通信的端口。 tickTime:CS通信心跳数 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。 tickTime以毫秒为单位。 tickTime:该参数用来定义心跳的间隔时间,zookeeper的客户端和服务端之间也有和web开发里类似的session的概念,而zookeeper里最小的session过期时间就是tickTime的两倍。 initLimit:LF初始通信时限 集群中的follower服务器(F)与leader服务器(L)之间 初始连接 时能容忍的最多心跳数(tickTime的数量) syncLimit:LF同步通信时限 集群中的follower服务器(F)与leader服务器(L)之间 请求和应答 之间能容忍的最多心跳数(tickTime的数量) ZooKeeper配置参数解读 Server.A=B:C:D。 A是一个数字,表示这个是第几号服务器; B是这个服务器的IP地址; C是这个服务器与集群中的Leader服务器交换信息的端口; D是当集群中的Leader服务器故障,需要一个端口来重新进行选举,选出一个新的Leader,而端口D就是用来执行选举时服务器相互通信的端口。 集群模式下配置一个文件myid,这个文件在dataDir目录下,这个文件里面有一个数据就是A的值,Zookeeper启动时读取此文件,拿到里面的数据与zoo.cfg里面的配置信息比较从而判断到底是哪个server。
- 构建myid文件
创建myid文件:进入/app/kafka/zk_kfk_data目录,创建myid文件,将三台服务器上的myid文件分别写入1,2,3。myid是zookeeper集群用来发现彼此的标识,必须创建,且不能相同。
echo "1">/app/kafka/zk_kfk_data/myid echo "2">/app/kafka/zk_kfk_data/myid echo "3">/app/kafka/zk_kfk_data/myid
- 配置环境变量
操作命令:
cd /app/kafka/bin/ echo "export PATH=${PATH}:`pwd`" >>/etc/profile source /etc/profile
- 启动zookeeper集群
启动命令:
#创建日志目录 mkdir /app/kafka/logs #启动 nohup /app/kafka/bin/zookeeper-server-start.sh /app/kafka/config/zookeeper.properties >>/app/kafka/logs/zookeeper.log & #查看启动日志 cat /app/kafka/logs/zookeeper.log
停止命令
cd /app/kafka/bin jps kill -9 进程
四、kafka集群搭建
- 编辑配置文件server.properties
#进入目录: cd /app/kafka #创建 kafka 日志数据目录: mkdir kafka-logs #进入配置目录: cd /app/kafka/config #修改 server.properties 配置文件 vim server.properties
kafka配置文件如下(三台机器上的server.properties如下):
- 配置文件一
broker.id=0 advertised.listeners=PLAINTEXT:ip1//:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/app/kafka/kafka-logs num.partitions=5 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=24 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=ip1:2181,ip2:2181,ip3:2181 zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0
- 配置文件二
broker.id=1 advertised.listeners=PLAINTEXT:ip2//:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/app/kafka/kafka-logs num.partitions=5 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=24 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=ip1:2181,ip2:2181,ip3:2181 zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0
- 配置文件三
broker.id=2 advertised.listeners=PLAINTEXT:ip3//:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/app/kafka/kafka-logs num.partitions=5 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=24 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=ip1:2181,ip2:2181,ip3:2181 zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0
kafka配置文件描述
broker.id:三个节点要配置不同的值,分别配置为0,1,2 advertised.listeners:对外监听端口 log.dirs:日志目录 num.partitions:默认分区数量 log.retention.hours:日志保留时间 zookeeper.connect:zookeeper连接地址,多个以逗号隔开
-
启动kafka集群
启动命令:
#启动 nohup /app/kafka/bin/kafka-server-start.sh /app/kafka/config/server.properties >>/app/kafka/logs/kafka.log & #查看日志 less /app/kafka/logs/kafka.log
停止命令:
cd /app/kafka/bin jps kill -9 kafka进程
五、测试kafka集群
1. 创建Topic
方式一:
使用replica-assignment参数手动指定Topic、Partition、Replica与Kafka Broker之间的存储映射关系 #指定分区数为3,副本数为2,0:1,1:2,2:0均为broker.id ./kafka-topics.sh --create --bootstrap-server ip1:9092,ip2:9092,ip3:9092 --topic topic1 --replica-assignment 0:1,1:2,2:0
方式二:
使用partitions和replication-factor参数自动分配存储映射关系 #指定分区数为3,副本数为2 ./kafka-topics.sh --create --bootstrap-server ip1:9092,ip2:9092,ip3:9092 --topic topic2 --partitions 3 --replication-factor 2
方式三:
创建Topic时指定参数 #指定分区数为3,副本数为2,指定cleanup.policy和retention参数 ./kafka-topics.sh --create --bootstrap ip1:9092,ip2:9092,ip3:9092 --topic topic3 --partitions 3 --replication-factor 2 --config cleanup.policy=compact --config retention.ms=500
2.查看Topic
#查看topic列表 ./kafka-topics.sh --bootstrap-server ip1:9092,ip2:9092,ip3:9092 --list
#查看Topic描述 ./kafka-topics.sh --describe --bootstrap-server ip1:9092,ip2:9092,ip3:9092 --topic topic1
3.修改Topic
./kafka-topics.sh --create --bootstrap-server ip1:9092,ip2:9092,ip3:9092 --topic topic --partitions 2 --replication-factor 2
增加分区数:
./kafka-topics.sh --alter --bootstrap-server ip1:9092,ip2:9092,ip3:9092 --topic topic --partitions 3
增加配置:
./kafka-topics.sh --alter --bootstrap-server ip1:9092,ip2:9092,ip3:9092 --topic topic --config flush.messages=1
删除配置:
./kafka-topics.sh --alter --bootstrap-server ip1:9092,ip2:9092,ip3:9092 --topic topic --delete-config flush.messages
4.删除Topic
./kafka-topics.sh --delete --bootstrap-server ip1:9092,ip2:9092,ip3:9092 --topic topic
5.启动生产者
#在任意Kafka节点上启动Producer生产数据 ./kafka-console-producer.sh --broker-list ip1:9092,ip2:9092,ip3:9092 --topic topic
6.启动消费者
消费数据
#在任意Kafka节点上启动Consumer消费数据 ./kafka-console-consumer.sh --bootstrap-server ip1:9092,ip2:9092,ip3:9092 --topic topic
消费多主题
#Kafka自带的脚本kafka-console-consumer.sh 的 topic参数并不支持同时指定多个主题,但该脚本提供了另外一个参数whitelist(白名单),该参数可同时指定多个主题,且支持正则表达式。 ./kafka-console-consumer.sh --bootstrap-server ip1:9092,ip2:9092,ip3:9092 --consumer-property group.id=grouptest --consumer-property consumer.id=old-consumer-cl --whitelist "topic1|topic2"
消息单播
#一条消息只能被某一个消费者消费的模式成为单播。 #要实现消息单播,只要让这些消费者属于同一个消费组即可。 #首先启动一个生产者向topictest主题发送消息 ./kafka-console-consumer.sh --bootstrap-server ip1:9092,ip2:9092,ip3:9092 --topic topictest #其次启动两个消费者 /app/kafka/bin/kafka-console-consumer.sh --bootstrap-server ip1:9092,ip2:9092,ip3:9092 --consumer-property group.id=single-consumer-group --topic topictest
消息多播
#一条消息能够被多个消费者消费的模式称为多播。之所以不称之为广播,是因为一条消息只能被Kafka同一个分组下某一个消费者消费,而不是所有消费者都能消费,所以从严格意义上来讲并不能算是广播模式,当然如果希望实现广播模式只要保证每个消费者均属于不同的消费组。针对Kafka同一条只能被同一个消费组下的某一个消费者消费的特性,要实现多播只要保证这些消费者属于不同的消费组即可。 #再增加一个消费者,该消费者属于multi-consumer-group消费组 ./kafka-console-consumer.sh --bootstrap-server ip1:9092,ip2:9092,ip3:9092 --consumer-property group.id=multi-consumer-group --topic topictest
7.创建Group
#创建消费者组 ./kafka-console-consumer.sh --bootstrap-server ip1:9092,ip2:9092,ip3:9092 --topic test --group kafkatest
8.查看Group
#查看消费者组 ./kafka-consumer-groups.sh --bootstrap-server ip1:9092,ip2:9092,ip3:9092 --list #查看消费者详情 #LogEndOffset下一条将要被加入到日志的消息的位移 #CurrentOffset当前消费的位移 #LAG 消息堆积量 ./kafka-consumer-groups.sh --bootstrap-server ip1:9092,ip2:9092,ip3:9092 --describe --group kafkatest ####移动消费组偏移到某个位置 #最早处 ./kafka-consumer-groups.sh --bootstrap-server ip1:9092,ip2:9092,ip3:9092 --group kafkatest --reset-offsets --all-topics --to-earliest --execute #最新处 ./kafka-consumer-groups.sh --bootstrap-server ip1:9092,ip2:9092,ip3:9092 --group kafkatest --reset-offsets --all-topics --to-latest --execute #某个位置 ./kafka-consumer-groups.sh --bootstrap-server ip1:9092,ip2:9092,ip3:9092 --group kafkatest --reset-offsets --all-topics --to-offset 2000 --execute #调整到某个时间之后得最早位移 ./kafka-consumer-groups.sh --bootstrap-server ip1:9092,ip2:9092,ip3:9092 --group kafkatest --reset-of #向前移动n(10)个消息 ./kafka-consumer-groups.sh --bootstrap-server ip1:9092,ip2:9092,ip3:9092 --group group-for --reset-offsets --topic test --shift-by -10 --execute
六、部署注意事项
1.本地项目连接错误
错误描述:
#服务器部署kafka时,本地项目通过外网连接服务器kafka报错: kafka.common.KafkaException: Socket server failed to bind to xx:9092: Cannot assign requested address
解决:
#在配置文件里添加这两项: listeners=PLAINTEXT://内网IP:9092 advertised.listeners=PLAINTEXT://外网IP:9092
2.zookeeper集群搭建启动服务失败
启动服务时失败,查看log文件(/app/kafka/logs/zookeeper.log)发现问题是:Cannot open channel to 3 at election address 主机名/ip:端口.
具体错误日志如下:
Cannot open channel to 2 at election address /xx.xx.xx.x:3888 (org.apache.zookeeper.server.quorum.QuorumCnxManager) java.net.NoRouteToHostException:No route to host (Host unreachablo) - at java,net.PlainSocketImpl.socketConnect(Native Method)_ at java.net.AbstractPlainSocketImp1.doConnect(AbstractPlainSocketImp1.java:358) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.iava.con e . at org.apache.z00keeper.server.quorum.QuorumCnxManager,initiateConnection (quorumCnxManager.java:383) at org.apache.zookeeper.server-quorum.QuorumCn xManagersQuorumConnectionReqThread. run (QuorumCnxManager.java:457) at java,util.concurrent,ThreadPoolExecutor.runworker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
解决办法:
在网上查了之后发现有人说需要修改hosts文件,有人说需要将data/目录下的 zookeeper_server.pid文件删除重启。但是这些方法试过后都不管用,最后发现是防火墙没有关闭的原因。 zookeeper集群中的各个机器之间需要通过配置的端口号进行通信,端口没打开显然不会启动成功。 关闭防火墙方法: sudo service iptables stop 为了防止下次系统重启时防火墙再次被打开,需要关闭自启动,关闭自启动方法:sudo chkconfig iptables off