2种方式将日志同步ES中并且用kibana展示

零、前提
0.1 配置es
解压文件

tar zxvf elasticsearch-6.2.2.tar.gz

由于es不能用root账户启动,所以需要添加一个非root账户

useradd es

修改es文件夹的权限

chown -R es:es elasticsearch-6.2.2

修改配置文件

vi  /usr/local/elasticsearch-6.2.2/config/elasticsearch.yml

修改elasticsearch.yml的内容如下

#端口
http.port: 9200
#IP
network.host: 0.0.0.0
#外网IP
network.publish_host: 121.41.1.1  
http.cors.enabled: true
http.cors.allow-origin: "*"
#data路径
path.data:  /usr/local/elasticsearch-6.2.2/data
#logs路径
path.logs:  /usr/local/elasticsearch-6.2.2/logs

创建data和logs文件夹

mkdir data logs

启动es(为了方便观察日志没有后台启动)

./bin/elasticsearch

0.2 logstash 配置
解压文件

tar zxvf logstash-6.2.2.tar.gz

进入config文件夹新建logstash-spring.conf文件并编写内容

mkdir logstash-spring.conf
vi logstash-spring.conf

log4j_es.conf内容

input {
tcp {
port => 5000
codec => json_lines
}
}
output{
elasticsearch { hosts => ["localhost:9200"] }
stdout { codec => rubydebug }
}

启动logstash (为了方便观察日志没有后台启动)

./bin/logstash -f config/logstash-spring.conf

0.3 kibana 配置
解压

tar zxvf kibana-6.2.2-linux-x86_64.tar.gz

修改配置文件kibana.yml

server.port: 5601
# To allow connections from remote users, set this parameter to a non-loopback address.
server.host: "121.41.1.1"

# The URL of the Elasticsearch instance to use for all your queries.
elasticsearch.url: "http://121.41.1.1:9200"

启动kibana

./bin/kibana

3.kibana界面设置
进入kibana界面,点击LogTrail可以看到我们的日志了。
在这里插入图片描述

一、第一种方式采用springboot+logback+logstash+es+kibana

思想:采用logstash作为客户端进行将日志推送到服务端、同时过滤并接收数据,通过配置同步到es中。

二、第二种方式采用springboot+logback+es+kibana

思想:通过重新logback的代码、然后拿到日志,将日志放入队列中、再用定时任务循环去存入es中、之所以需要层、原因通过重写方式,直接存入es,bean容器加载不了es。但是可以拿到es的bean和logback的bean,在把es的bean加到logback的bean中,然后启动、也可以开启异步。AppenderBasePatternLayoutEncoder方法。

三、代码展示
在这里插入图片描述
pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.8.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.xxxx</groupId>
    <artifactId>logtoes</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>logtoes</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Logstash encoder -->
        <dependency>
            <groupId>net.logstash.logback</groupId>
            <artifactId>logstash-logback-encoder</artifactId>
            <version>5.3</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-elasticsearch</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.10</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

logback-spring.xml

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE configuration>
<configuration>
    <include resource="org/springframework/boot/logging/logback/base.xml" />
    <!--引入在application.properties文件中的变量-->
    <property resource="application.properties"></property>
    <!--定义logstash 传输方式 以及地址-->
    <appender name="LOGSTASH" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
        <destination>${LOGSTASH_HOST}:${LOGSTASH_PORT}</destination>
        <encoder charset="UTF-8" class="net.logstash.logback.encoder.LogstashEncoder"/>
    </appender>

    <root level="INFO">
        <appender-ref ref="LOGSTASH"/>
        <appender-ref ref="CONSOLE"/>
    </root>

</configuration>
server.port=8080
logging.config=classpath:logback-spring.xml
LOGSTASH_HOST=121.41.1.1
LOGSTASH_PORT=5000
spring.data.elasticsearch.cluster-name=application-dev
spring.data.elasticsearch.cluster-nodes=121.41.1.1:9300

LogtoesApplication.java

package com.xxxx.logtoes;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
@RestController
@EnableScheduling
public class LogtoesApplication {

    private static final Logger logger = LoggerFactory.getLogger(LogtoesApplication.class);

    @GetMapping("test")
    public void test(){
        logger.info("---------info---------");
        logger.warn("---------warn---------");
        logger.debug("--------debug--------");
        logger.error("--------error--------");
    }


    //将logback日志放入es中
    public static void main(String[] args) {
        SpringApplication.run(LogtoesApplication.class, args);
    }

}

Log.java

package com.xxxx.logtoes;

import lombok.AllArgsConstructor;
import lombok.Data;
import org.springframework.data.elasticsearch.annotations.Document;

import java.io.Serializable;

@Document(indexName = "logs_message", type = "message")
@Data
@AllArgsConstructor
public class Log implements Serializable {

    private static final long serialVersionUID = -8865560702631716573L;

    private String id;
    /**
     * 类的全路径名称
     */
    private String loggerName;
    /**
     * 打印的消息
     */
    private String message;
    /**
     * 线程名称
     */
    private String threadName;
    /**
     * 时间戳 日志打印的时间
     */
    private String timeStamp;
    /**
     * 日志打印级别
     */
    private String level;
}

LoggingService.java

package com.xxxx.logtoes;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.spi.IThrowableProxy;
import ch.qos.logback.classic.spi.LoggerContextVO;
import ch.qos.logback.core.spi.DeferredProcessingAware;
import java.util.Map;
import org.slf4j.Marker;

public interface LoggingService extends DeferredProcessingAware {

    String getThreadName();

    Level getLevel();

    String getMessage();

    Object[] getArgumentArray();

    String getFormattedMessage();

    String getLoggerName();

    LoggerContextVO getLoggerContextVO();

    IThrowableProxy getThrowableProxy();

    StackTraceElement[] getCallerData();

    boolean hasCallerData();

    Marker getMarker();

    Map<String, String> getMDCPropertyMap();

    /** @deprecated */
    Map<String, String> getMdc();

    long getTimeStamp();

    void prepareForDeferredProcessing();
}

LoggingServiceImpl.java

package com.xxxx.logtoes;

import ch.qos.logback.core.AppenderBase;
import com.alibaba.fastjson.JSON;

import java.util.UUID;

/**
 * 日志服务
 */
public class LoggingServiceImpl extends AppenderBase<LoggingService> {

    @Override
    protected void append(LoggingService loggingService) {
        Log log = new Log(UUID.randomUUID().toString(),loggingService.getLoggerName(), loggingService.getMessage(), loggingService.getThreadName(), String.valueOf(loggingService.getTimeStamp()),loggingService.getLevel().toString());
        //添加到阻塞队列
        LogQueue.tempLogQueue.add(log);
        System.out.println(JSON.toJSONString(log,true));
    }
}

LogQueue.java

package com.xxxx.logtoes;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class LogQueue {
    public static BlockingQueue<Log> tempLogQueue = new LinkedBlockingQueue<>();
}

LogReposiory.java

package com.xxxx.logtoes;

import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Repository;

@Repository
public interface LogReposiory extends ElasticsearchRepository<Log, String> {

}

LogTask.java

package com.xxxx.logtoes;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Scheduled;

@Configuration
public class LogTask {

    @Autowired
    private LogReposiory logReposiory;
    /**
     *  接着上一次程序跑完后的5秒后执行
     */
    @Scheduled(fixedRate = 5000)//每5秒执行一次
    public void log(){
        /**
         * 日志处理
         */
        Log log = null;
        while((log = LogQueue.tempLogQueue.poll()) != null){
            logReposiory.save(log);
        }
    }

    /**
     * 测试定时任务是否开启
     */
//    @Scheduled(cron = "0 0/1 * * * ?") // 每分钟执行一次
//    public static void work(){
//        System.out.println("执行调度任务:"+new Date());
//    }
}