2种方式将日志同步ES中并且用kibana展示
零、前提
0.1 配置es
解压文件
tar zxvf elasticsearch-6.2.2.tar.gz
由于es不能用root账户启动,所以需要添加一个非root账户
useradd es
修改es文件夹的权限
chown -R es:es elasticsearch-6.2.2
修改配置文件
vi /usr/local/elasticsearch-6.2.2/config/elasticsearch.yml
修改elasticsearch.yml的内容如下
#端口
http.port: 9200
#IP
network.host: 0.0.0.0
#外网IP
network.publish_host: 121.41.1.1
http.cors.enabled: true
http.cors.allow-origin: "*"
#data路径
path.data: /usr/local/elasticsearch-6.2.2/data
#logs路径
path.logs: /usr/local/elasticsearch-6.2.2/logs
创建data和logs文件夹
mkdir data logs
启动es(为了方便观察日志没有后台启动)
./bin/elasticsearch
0.2 logstash 配置
解压文件
tar zxvf logstash-6.2.2.tar.gz
进入config文件夹新建logstash-spring.conf文件并编写内容
mkdir logstash-spring.conf
vi logstash-spring.conf
log4j_es.conf内容
input {
tcp {
port => 5000
codec => json_lines
}
}
output{
elasticsearch { hosts => ["localhost:9200"] }
stdout { codec => rubydebug }
}
启动logstash (为了方便观察日志没有后台启动)
./bin/logstash -f config/logstash-spring.conf
0.3 kibana 配置
解压
tar zxvf kibana-6.2.2-linux-x86_64.tar.gz
修改配置文件kibana.yml
server.port: 5601
# To allow connections from remote users, set this parameter to a non-loopback address.
server.host: "121.41.1.1"
# The URL of the Elasticsearch instance to use for all your queries.
elasticsearch.url: "http://121.41.1.1:9200"
启动kibana
./bin/kibana
3.kibana界面设置
进入kibana界面,点击LogTrail可以看到我们的日志了。

一、第一种方式采用springboot+logback+logstash+es+kibana
思想:采用logstash作为客户端进行将日志推送到服务端、同时过滤并接收数据,通过配置同步到es中。
二、第二种方式采用springboot+logback+es+kibana
思想:通过重新logback的代码、然后拿到日志,将日志放入队列中、再用定时任务循环去存入es中、之所以需要层、原因通过重写方式,直接存入es,bean容器加载不了es。但是可以拿到es的bean和logback的bean,在把es的bean加到logback的bean中,然后启动、也可以开启异步。AppenderBase和PatternLayoutEncoder方法。
三、代码展示

pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.8.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.xxxx</groupId>
<artifactId>logtoes</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>logtoes</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Logstash encoder -->
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>5.3</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.10</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
logback-spring.xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE configuration>
<configuration>
<include resource="org/springframework/boot/logging/logback/base.xml" />
<!--引入在application.properties文件中的变量-->
<property resource="application.properties"></property>
<!--定义logstash 传输方式 以及地址-->
<appender name="LOGSTASH" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<destination>${LOGSTASH_HOST}:${LOGSTASH_PORT}</destination>
<encoder charset="UTF-8" class="net.logstash.logback.encoder.LogstashEncoder"/>
</appender>
<root level="INFO">
<appender-ref ref="LOGSTASH"/>
<appender-ref ref="CONSOLE"/>
</root>
</configuration>
server.port=8080
logging.config=classpath:logback-spring.xml
LOGSTASH_HOST=121.41.1.1
LOGSTASH_PORT=5000
spring.data.elasticsearch.cluster-name=application-dev
spring.data.elasticsearch.cluster-nodes=121.41.1.1:9300
LogtoesApplication.java
package com.xxxx.logtoes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@SpringBootApplication
@RestController
@EnableScheduling
public class LogtoesApplication {
private static final Logger logger = LoggerFactory.getLogger(LogtoesApplication.class);
@GetMapping("test")
public void test(){
logger.info("---------info---------");
logger.warn("---------warn---------");
logger.debug("--------debug--------");
logger.error("--------error--------");
}
//将logback日志放入es中
public static void main(String[] args) {
SpringApplication.run(LogtoesApplication.class, args);
}
}
Log.java
package com.xxxx.logtoes;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.springframework.data.elasticsearch.annotations.Document;
import java.io.Serializable;
@Document(indexName = "logs_message", type = "message")
@Data
@AllArgsConstructor
public class Log implements Serializable {
private static final long serialVersionUID = -8865560702631716573L;
private String id;
/**
* 类的全路径名称
*/
private String loggerName;
/**
* 打印的消息
*/
private String message;
/**
* 线程名称
*/
private String threadName;
/**
* 时间戳 日志打印的时间
*/
private String timeStamp;
/**
* 日志打印级别
*/
private String level;
}
LoggingService.java
package com.xxxx.logtoes;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.spi.IThrowableProxy;
import ch.qos.logback.classic.spi.LoggerContextVO;
import ch.qos.logback.core.spi.DeferredProcessingAware;
import java.util.Map;
import org.slf4j.Marker;
public interface LoggingService extends DeferredProcessingAware {
String getThreadName();
Level getLevel();
String getMessage();
Object[] getArgumentArray();
String getFormattedMessage();
String getLoggerName();
LoggerContextVO getLoggerContextVO();
IThrowableProxy getThrowableProxy();
StackTraceElement[] getCallerData();
boolean hasCallerData();
Marker getMarker();
Map<String, String> getMDCPropertyMap();
/** @deprecated */
Map<String, String> getMdc();
long getTimeStamp();
void prepareForDeferredProcessing();
}
LoggingServiceImpl.java
package com.xxxx.logtoes;
import ch.qos.logback.core.AppenderBase;
import com.alibaba.fastjson.JSON;
import java.util.UUID;
/**
* 日志服务
*/
public class LoggingServiceImpl extends AppenderBase<LoggingService> {
@Override
protected void append(LoggingService loggingService) {
Log log = new Log(UUID.randomUUID().toString(),loggingService.getLoggerName(), loggingService.getMessage(), loggingService.getThreadName(), String.valueOf(loggingService.getTimeStamp()),loggingService.getLevel().toString());
//添加到阻塞队列
LogQueue.tempLogQueue.add(log);
System.out.println(JSON.toJSONString(log,true));
}
}
LogQueue.java
package com.xxxx.logtoes;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class LogQueue {
public static BlockingQueue<Log> tempLogQueue = new LinkedBlockingQueue<>();
}
LogReposiory.java
package com.xxxx.logtoes;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface LogReposiory extends ElasticsearchRepository<Log, String> {
}
LogTask.java
package com.xxxx.logtoes;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Scheduled;
@Configuration
public class LogTask {
@Autowired
private LogReposiory logReposiory;
/**
* 接着上一次程序跑完后的5秒后执行
*/
@Scheduled(fixedRate = 5000)//每5秒执行一次
public void log(){
/**
* 日志处理
*/
Log log = null;
while((log = LogQueue.tempLogQueue.poll()) != null){
logReposiory.save(log);
}
}
/**
* 测试定时任务是否开启
*/
// @Scheduled(cron = "0 0/1 * * * ?") // 每分钟执行一次
// public static void work(){
// System.out.println("执行调度任务:"+new Date());
// }
}