kafaka在ELK日志集群中的应用
目录
appserver -->filebeat --> kafka --> logstash --> elasticsearch --> kibana
一、消息队列 (message queue)
1、消息队列是什么
1.消息队列是进程间通信或同一进程间不同线程的通信方式。
2.消息队列提供了异步通信协议。
3.消息的发送者和接收者不需要同时与消息队列交互,消息会保存在队列中, 直到接收者取回它。
4.每一个贮列中的纪录包含详细说明的数据, 包含发生的时间, 输入设备的种类, 以及特定的输入参数。
5.消息队列中间件是分布式系统中重要的组件。
消息队列主要解决应用耦合、异步处理、流量削锋等问题
当前使用较多的消息队列有RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等, 而部分数据库如Redis、Mysql以及phxsql也可实现消息队列的功能。
2、消息队列的优点
解耦:在项目启动之初来预测将来项目会碰到什么需求, 是极其困难的消息系统在处理过程中间插入了一个隐含的、基于数据的接口层, 两边的处理过程都要实现这一接口这允许你独立的扩展或修改两边的处理过程, 只要确保它们遵守同样的接口约束
冗余:有些情况下, 处理数据的过程会失败除非数据被持久化, 否则将造成丢失消息队列把数据进行持久化直到它们已经被完全处理, 通过这一方式规避了数据丢失风险许多消息队列在把一个消息从队列中删除之前, 需要你的处理系统明确的指出该消息已经被处理完毕, 从而确保数据被安全的保存直到你使用完毕
扩展性:因为消息队列解耦了你的处理过程, 所以增大消息入队和处理的频率是很容易的, 只要另外增加处理过程即可不需要改变代码、不需要调节参数,扩展就像调大电力按钮一样简单
峰值处理:在访问量剧增的情况下, 应用仍然需要继续发挥作用, 但是这样的突发流量并不常见,如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费使用消息队列能够使关键组件顶住突发的访问压力, 而不会因为突发的超负荷的请求而完全崩溃
可恢复性:系统的一部分组件失效时, 不会影响到整个系统即使一个处理消息的进程挂掉, 加入队列中的消息仍然可以在系统恢复后被处理
顺序保证:在大多使用场景下, 数据处理的顺序都很重要大部分消息队列本来就是排序的, 并且能保证数据会按照特定的顺序来处理如Kafka能保证一个Partition内的消息的有序性
缓冲:在任何重要的系统中, 都会有需要不同的处理时间的元素例如, 加载一张图片比应用过滤器花费更少的时间消息队列通过一个缓冲层来帮助任务最高效率的执行———写入队列的处理会尽可能的快速该缓冲有助于控制和优化数据流经过系统的速度
异步通信:很多时候, 用户不想也不需要立即处理消息消息队列提供了异步处理机制, 允许用户把一个消息放入队列, 但并不立即处理它想向队列中放入多少消息就放多少, 然后在需要的时候再去处理它们
3、消息队列的模式
1.点对点模式:
角色:消息队列 生产者 消费者
消息发送者生产消息发送到queue中, 然后消息接收者从queue中取出并且消费消息
消息被消费以后, queue中不再有存储, 所以消息接收者不可能消费到已经被消费的消息
特点:
• 每个消息只有一个接收者(Consumer)(即一旦被消费, 消息就不再在消息队列中);
• 发送者和接收者间没有依赖性, 发送者发送消息之后, 不管有没有接收者在运行, 都不会影响 到发送者下次发送消息;
• 接收者在成功接收消息之后需向队列应答成功, 以便消息队列删除当前接收的消息;
2.发布/订阅模式:
角色: 角色主题(Topic) 发布者(Publisher) 订阅者(Subscriber)
特点:
每个消息可以有多个订阅者;
• 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者, 它必须创建一个订阅者之后, 才能消费发布者的消息
• 为了消费消息, 订阅者需要提前订阅该角色主题, 并保持在线运行
3.两者区别:
点对点模式
生产者发送一条消息到queue, 一个queue可以有很多消费者, 但是一个消息只能被一个消费者接受当没有消费者可用时, 这个消息会被保存直到有 一个可用的消费者
发布订阅模式
发布者发送到topic的消息, 只有订阅了topic的订阅者才会收到消息
topic实现了发布和订阅, 当你发布一个消息, 所有订阅这个topic的服务都能得到这个消息, 所以从1到N个订阅者都能得到这个消息的拷贝
4、常用消息队列比较
Kafka常用于分布式架构,对性能要求高的可考虑Kafka
RocketMQ/思路来源于kafka, 改成了主从结构, 在事务性可靠性方面做了优化
广泛来说, 电商、金融等对事务性要求很高的, 可以考虑RabbitMQ和RocketMQ
二、kafka
1、kafka介绍
1.kafka对外使用topic(话题)的概念, 生产者往topic里写消息, 消费者从中读消息。
2.为了做到水平拓展,一个topic实际上是由多个partition(隔扇)组成,当遇到瓶颈时可以通过增加partition的数量来进行横向扩容,单个partition内是保证消息是有序的。
3.每新写入一条消息,kafka就是在对应的文件append(附加)写,所以性能非常高。
Kafka是一个分布式、分区、复制的提交日志服务
kafka对消息保存时根据topic进行归类,发送消息者称为producer(生产商),发送消息者称为consumer(消费者),此外kafka集群又多个kafka实例组成,每个实例(server)称为broker。
无论是kafka集群还是producer和consumer都依赖与zookeeper来保证系统的可用性,zookeeper集群保存一些meat信息。
一个topic可以认为是一类消息,每个topic将被分成多个partition,每个partition在存储层面是append log文件。
任何发布到此partition的消息都会被直接追加到log文件尾部,每条消息在文件中的位置称为offset(偏移量),offset为一个long型数字,它唯一标记一条消息,kafka并没有提供其他额外的索引机制来存储offset,因为在kafka中几乎不允许对消息进行“随机读写”。
2、备份
为了保证分布式可靠性,kafka0.8开始对每个分区的数据进行备份(不同的Broker上),防止其中一个Broker宕机造成分区上的数据不可用。
3、基本概念
1、消费者:(Consumer):从消息队列中请求消息的客户端应用程序
2、生产者:(Producer) :向broker发布消息的应用程序
3、AMQP服务端(broker):用来接收生产者发送的消息并将这些消息路由给服务器中的队列,便于fafka将生产者发送的消息,动态的添加到磁盘并给每一条消息一个偏移量,所以对于kafka一个broker就是一个应用程序的实例
4、主题(Topic):一个主题类似新闻中的体育、娱乐、教育等分类概念,在实际工程中通常一个业务一个主题。
5、分区(Partition):一个Topic中的消息数据按照多个分区组织,分区是kafka消息队列组织的最小单位,一个分区可以看作是一个FIFO( First Input First Output的缩写,先入先出队列)的队列。kafka分区是提高kafka性能的关键所在,当你发现你的集群性能不高时,常用手段就是增加Topic的分区,分区里面的消息是按照从新到老的顺序进行组织,消费者从队列头订阅消息,生产者从队列尾添加消息。
三、部署zookeeper集群
1、zookeeper介绍
zookeeper是一种在分布式系统中被广泛作为:分布式状态管理、分布式协调管理、分布式配置管理和分布式锁服务的集群。
zookeeper的功能十分强大,可以实现诸如分布式应用配置管理、统一命名服务、状态同步服务、集群管理等功能。比如:程序分布式部署在多台服务器上,如果我们要改变程序的配置文件就需要逐台机器的去修改,如果把这些配置全部放到zookeeper上去保存在zookeeper的某个目录节点中,然后所有相关应用程序对这个目录节点进行监听,一旦配置信息发生变化每个应用程序就会收到zookeeper的通知,然后从zookeeper获取新的配置信息应用到系统中。
1、kafka集群依赖于zookeeper进行元数据管理、leader选举、broker注册等功能。确保zookeeper集群的高可用和稳定性可以提高kafka集群的可靠性和可靠性。
2、在kafka集群中,zookeeper用于维护kafka集群的状态信息,如broker的状态、topic/partition信息、以及consumer的位置信息等。如果没有zookeeper集群,kafka集群就无法工作。
3、kafka的增加和减少服务器都会在zookeeper节点上触发相应的事件。
2、zookeeper集群的部署
首先,zookeeper集群的工作是超过半数才能对外提供服务,所以成员数量一般为奇数台。
1.部署Java环境(安装JDK)
2.安装zookeeper(Apache ZooKeeper)
3.修改配置文件
cd 到安装目录中的conf目录下
cp zoo_sample.cfg zoo.cfg
[root@host1 conf]# egrep -v "^$|^#" zoo.cfg
tickTime=2000 #Zookeeper服务器之间或客户端与服务器之间维持心跳的时间间隔
initLimit=10 #这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,
#而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。
#当已经超过 5个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,
#那么表明这个客户端连接失败。总的时间长度就是 10*2=20 秒
syncLimit=5 #这个配置项标识 Leader 与Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,
#总的时间长度就是5*2=10秒
dataDir=/data/zookeeper/data #快照日志的存储路径
dataLogDir=/data/zookeeper/datalog #事物日志的存储路径,如果不配置这个那么事物日志会默认存储到dataDir制定的目录
#这样会严重影响zk的性能,当zk吞吐量较大的时候,产生的事物日志、快照日志太多
clientPort=2181 #这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求
server.1=192.168.10.21:2888:3888
server.2=192.168.10.22:2888:3888
server.3=192.168.10.23:2888:3888`
#server.1 这个1是服务器的标识也可以是其他的数字, 表示这个是第几号服务器,用来标识服务器,这个标识要写到数据目录下面myid文件里
#192.168.10.21为IP地址,第一个端口是leader和follower之间的通信端口,默认是2888,服务启动后,只有leader会监听这个端口
#第二个端口是leader选举的端口,集群刚启动的时候选举或者leader挂掉之后进行新的选举的端口默认是3888
4、创建数据目录及服务器ID(集群中每台机器的id不一样,每台机器都要配置)
[root@host1 conf]# mkdir /data/zookeeper/data{,log} -p
[root@host1 conf]# echo 1 > /data/zookeeper/data/myid
5、启动集群机器中所有服务并查看
[root@host1 ~]# /usr/local/zookeeper/bin/zkServer.sh start #启动服务
[root@host2 ~]# /usr/local/zookeeper/bin/zkServer.sh status #查看状态
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Mode: leader
zk集群一般只有一个leader,多个follower,主一般是相应客户端的读写请求,而从主同步数据,当主挂掉之后就会从follower里投票选举一个leader出来
[root@host2 ~]# jps #查看运行的进程号
10948 Jps
10431 QuorumPeerMain
3、关于zookeeper集群的说明
1)、myid文件和server.myid 在快照目录下存放的标识本台服务器的文件,他是整个zk集群用来发现彼此的一个重要标识。
2)、zoo.cfg 文件是zookeeper配置文件在conf目录里。
3)、log4j.properties文件是zk的日志输出文件, 在conf目录里. 用java写的程序基本上有个共同点:日志都用log4j来进行管理。
4)、zkEnv.sh和zkServer.sh文件
zkServer.sh 主管理程序文件
zkEnv.sh 是主要配置zookeeper集群启动时配置环境变量的文件
5)、还有一个需要注意: zookeeper不会主动的清除旧的快照和日志文件,这个是操作者的责任。
清理方法:
一. 脚本+计划任务清理
二. 使用bin/zkCleanup.sh这个脚本清理,具体使用方法找官方文档
三. 从3.4.0开始,zookeeper提供了自动清理snapshot和事务日志的功能,通过在zoo.cfg中配置两个参数实现:
autopurge.purgeInterval 清理频率,单位是小时,默认是0,表示不开启自动清理功能
autopurge.snapRetainCount 需要保留的文件数目, 默认是保留3个
四、 kafka集群的搭建
虽然kafka中集成了zookeeper,但还是建议使用独立的zk集群。
1.软件下载及安装
2.创建数据目录
mkdir /data/kafka/kafka-logs
3.修改配置文件
vim /usr/local/kafka/config/server.properties
broker.id=1 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
listeners=PLAINTEXT://192.168.10.21:9092 #监听套接字
num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
log.dirs=/data/kafka-logs #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数
#如果配置多个目录,新创建的topic把消息持久化在分区数最少那一个目录中
num.partitions=1 #默认的分区数,一个topic默认1个分区数
num.recovery.threads.per.data.dir=1 #在启动时恢复日志和关闭时刷新日志时每个数据目录的线程的数量,默认1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880 #消息保存的最大值5M
default.replication.factor=2 #kafka保存消息的副本数
replica.fetch.max.bytes=5242880 #取消息的最大字节数
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间,到目录查看是否有过期的消息如果有,删除
zookeeper.connect=192.168.10.21:2181,192.168.10.22:2181,192.168.10.23:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
4.启动Kafka
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
5.查看启动情况
[root@host1 ~]# jps
10754 QuorumPeerMain
11911 Kafka
12287 Jps
6.创建topic进行验证
[root@host1 ~]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.10.21:2181 --replication-factor 2 --partitions 1 --topic qianfeng
Created topic "qianfeng".
在一台服务器上创建一个发布者
[root@host2 ~]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.10.22:9092 --topic qianfeng
> hello kafka
> ni hao ya
>
在另一台服务器上创建一个订阅者
[root@host3 ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.21:9092 --topic qianfeng --from-beginning
...
hello kafka
ni hao ya
如果都能接收到,说明kafka部署成功
-----------------------------------------------------------------------------
[root@host3 ~]# /usr/local/kafka/bin/kafka-topics.sh --zookeeper 192.168.10.23:2181 --list #查看所有topic
[root@host3 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.10.23:2181 --topic qianfeng #查看指定topic的详细信息
Topic:qianfeng PartitionCount:1 ReplicationFactor:2 Configs:
Topic: qianfeng Partition: 0 Leader: 2 Replicas: 2,3 Isr: 2,3
[root@host3 ~]# /usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper 192.168.10.23:2181 --topic qianfeng #删除topic
Topic qianfeng is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
-----------------------------------------------------------------------------
下面我们将logstash的输出改到kafka上面,将数据写入到kafka中
不要过滤, logstash会将message内容写入到队列中
# cat logstash-kafka.conf
input {
file {
type => "sys-log"
path => "/var/log/messages"
start_position => beginning
}
}
output {
kafka {
bootstrap_servers => "192.168.10.21:9092,192.168.10.22:9092,192.168.10.23:9092" #输出到kafka集群
topic_id => "sys-log-messages" #主题名称
compression_type => "snappy" #压缩类型
codec => "json"
}
}
启动logstash
# /usr/local/logstash/bin/logstash -f logstash-kafka.conf
在kafka上查看主题,发现已经有了sys-log-messages,说明写入成功了
[root@host2 ~]# /usr/local/kafka/bin/kafka-topics.sh --zookeeper 192.168.10.22:2181 --list
__consumer_offsets
qianfeng
sys-log-messages
[root@host2 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.10.22:2181 --topic sys-log-messages
Topic:sys-log-messages PartitionCount:1 ReplicationFactor:2 Configs:
Topic: sys-log-messages Partition: 0 Leader: 3 Replicas: 3,2 Isr: 3,2
接下来我们怎么将kafka中的内容写入到es集群中呢? 还是要利用logstash
# cat kafka-es.conf
input {
kafka {
bootstrap_servers => "192.168.10.21:9092,192.168.10.22:9092,192.168.10.23:9092"
topics => "output-test"
codec => "json"
auto_offset_reset => "earliest"
}
}
output {
elasticsearch {
hosts => ["192.168.10.11:9200","192.168.10.12:9200"]
index => "kafka-%{type}-%{+YYYY.MM.dd}"
}
}
五、eflk数据流
appserver -->filebeat --> kafka --> logstash --> elasticsearch --> kibana
filebeat配置
lebeat.prospectors:
- input_type: log
paths:
- /var/log/nginx/access.log
json.keys_under_root: true
json.add_error_key: true
json.message_key: log
output.kafka:
hosts: [ "192.168.10.21:9092","192.168.10.22:9092","192.168.10.23:9092"]
topic: 'nginx-access-log'
logstash[消费者]配置
/usr/local/logstash/config/input-from-kafka.conf
input {
kafka {
bootstrap_servers => "192.168.10.21:9092,192.168.10.22:9092,192.168.10.23:9092"
topics => "nginx-access-log"
auto_offset_reset => "earliest"
codec => "json"
decorate_events => true
}
}
filter {
grok {
match => {"log" => "%{COMBINEDAPACHELOG} %{QS:x-forwarded-for}"}
remove_field => ["error","beat","offset","auth","ident","log"]
}
}
output{
elasticsearch {
hosts => ["192.168.10.11:9200"]
index => "nginx-access-log-%{+YYYY.MM.dd}"
}
}