Spring Boot 整合Logstash

2023/8/4 23:52:17

本文主要是介绍Spring Boot 整合Logstash,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

前言

    ELK是目前主流的日志收集、存储和可视化的工具。

  • ElasticSearch:用于存储日志信息;

  • Logstash:用于收集、处理和转发日志信息;

  • Kibana:提供可视化的Web界面.

   

 这里主要实现的是通过logbook和logstash完成日志的收集功能。


1、添加依赖包

<!--json转换-->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
</dependency>

<!--调用远程接口-->
<dependency>
    <groupId>io.github.openfeign</groupId>
    <artifactId>feign-core</artifactId>
    <version>11.9.1</version>
</dependency>
<dependency>
    <groupId>io.github.openfeign</groupId>
    <artifactId>feign-gson</artifactId>
    <version>11.9.1</version>
</dependency>

<!--集成logstash-->
<dependency>
    <groupId>net.logstash.logback</groupId>
    <artifactId>logstash-logback-encoder</artifactId>
    <version>7.2</version>
</dependency>


2、创建logback-spring.xml文件

在main\resources路径下创建logback-spring.xml文件,用于配置logback。

因为我的logstash需要Authorization验证,所以就自己实现了httpClient,小伙伴们需要将 Logstash的<appender>和<httpClient>中的class指向自己的项目路径。

    

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>

    <springProperty scope="context" name="projectName" source="tec.project-name"
                    defaultValue="tec"/>
    <springProperty scope="context" name="stage" source="stage" defaultValue="local"/>

    <appender name="LOCAL-CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%d{YYYY-MM-dd HH:mm:ss.SSS} %5level %logger{0} [%t] [%mdc] - %msg%n</pattern>
        </encoder>
    </appender>

    <!--指向自己的项目文件-->
    <appender name="LOGSTASH"
              class="net.logstash.logback.appender.LoggingEventAsyncDisruptorAppender">
        <ringBufferSize>8192</ringBufferSize>
        <appender class="com.unicorn.tec.logstash.HttpAppender">
            <!--指向自己的项目文件-->
            <httpClient class="com.unicorn.tec.logstash.HttpClient">
                <!--logstash地址和端口号-->
                <destination>http://47.**.**.**:****</destination>
                <!--Authorization的Username和Password-->
                <username>username</username>
                <password>password</password>
                <topic>my-topic</topic>
                <app>my-app</app>
                <!--日志收集级别-->
                <level>ERROR</level>
            </httpClient>
            <encoder class="net.logstash.logback.encoder.LogstashEncoder">
                <fieldNames>
                    <timestamp>timestamp</timestamp>
                </fieldNames>
            </encoder>
        </appender>
    </appender>

    <root level="ERROR">
        <appender-ref ref="LOGSTASH"/>
        <appender-ref ref="LOCAL-CONSOLE"/>
    </root>

</configuration>


3、实现HttpAppender和HttpClient

在实现HttpAppender和HttpClient之前,我们需要先声明logstash的远程调用接口,即创建ILogstashService文件

package com.unicorn.tec.logstash;

import feign.Headers;
import feign.RequestLine;
import feign.Response;
import org.springframework.web.bind.annotation.RequestBody;

public interface ILogstashService
{
    @Headers("Content-Type: application/json")
    @RequestLine("PUT /")
    Response addLog(@RequestBody String body);
}


HttpClient代码:

package com.unicorn.tec.logstash;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.spi.ContextAwareBase;
import ch.qos.logback.core.spi.LifeCycle;
import ch.qos.logback.core.status.Status;
import com.alibaba.fastjson.JSON;
import feign.Feign;
import feign.Logger;
import feign.Response;
import feign.auth.BasicAuthRequestInterceptor;
import lombok.var;
import org.springframework.beans.factory.annotation.Value;

import java.io.IOException;
import java.net.HttpURLConnection;
import java.util.*;

public class HttpClient extends ContextAwareBase implements LifeCycle
{
    private String destination;
    private String username;
    private String password;
    private String topic;
    private String app;
    private String level;
    @Value("${spring.profiles.active}")
    private String env;
    private ILogstashService logstash;
    private volatile boolean isStarted = false;
    @Override
    public synchronized void start(){
        if(isStarted){
            return;
        }

        if (destination == null || destination.isEmpty()) {
            addError("No encoder was configured. Use <destination> URL");
        }

        if (hasAnyErrors()) {
            return;
        }

        logstash = Feign.builder()
                .decoder((response, type) -> response)
                .logLevel(Logger.Level.NONE)
                .requestInterceptor(new BasicAuthRequestInterceptor(username, password))
                .target(ILogstashService.class, destination);

        isStarted = true;
        addInfo("HttpClient started: '" + destination + "'");
    }

    public void addLog(ILoggingEvent event)
    {

        Level minLevel = Level.toLevel(level);
        if(!event.getLevel().isGreaterOrEqual(minLevel)){
            return;
        }

        //自定义自己想要的内容
        Map<String,Object> request = new HashMap<>();
        request.put("Topic", topic);
        request.put("Message",event.getMessage());
        request.put("Level",toLevelString(event.getLevel()));
        request.put("Logger",event.getLoggerName());
        request.put("Time",event.getTimeStamp());
        request.put("App", app);
        request.put("Env","dev");
        request.put("Source","");
        request.put("Scope","");

        var dic = event.getMDCPropertyMap();
        if(dic.containsKey("url")){
            request.put("Url",dic.get("url"));
        }
        if(dic.containsKey("parameters")){
            request.put("Parameters",dic.get("parameters"));
        }
        if(dic.containsKey("cookie")){
            request.put("Session",dic.get("cookie"));
        }

        var throwable = event.getThrowableProxy();
        if(throwable != null){
            request.put("Exception", throwable.getMessage());

            StringBuffer stackTraceString = new StringBuffer();
            stackTraceString.append(throwable.getClassName())
                            .append(": ")
                            .append(throwable.getMessage());

            for (var element : throwable.getStackTraceElementProxyArray()){
                stackTraceString.append("\n ")
                        .append(element.getSTEAsString());
            }
            request.put("StackTrace",stackTraceString);
        }

        String json = JSON.toJSONString(request);

        sendLogstash(json);
    }

    private String toLevelString(Level level)
    {
        if(level == Level.INFO)
        {
            return "Information";
        }else if(level == Level.DEBUG)
        {
            return "Debug";
        } else if (level == Level.ERROR)
        {
            return "Error";
        } else if (level == Level.WARN)
        {
            return "Warning";
        }

        return "";
    }

    private void sendLogstash(String json){

        if (!isStarted()) {
            return;
        }

        try {

            verifyResponse(logstash.addLog(json));

        } catch (Exception ex) {
            addWarn("Can't execute PUT request. URL: '" + destination + "'", ex);
        }
    }

    @Override
        public synchronized void stop() {
            if (isStarted) {
                isStarted = false;
            }
        }

        @Override
        public boolean isStarted() {
            return isStarted;
        }
    
    @Override
    public String toString() {
        return destination == null ? "HttpClient" : "HttpClient --> " + destination;
    }

    public void setDestination(String destination) {
        this.destination = destination;
    }

    public void setUsername(String username){
        this.username = username;
    }

    public void setPassword(String password){
        this.password = password;
    }

    public void setTopic(String topic){
        this.topic = topic;
    }

    public void setApp(String app){
        this.app = app;
    }

    public void setLevel(String level){
        this.level = level;
    }

    public List<Status> getStatusList() {
        return getStatusManager().getCopyOfStatusList();
    }

    private void verifyResponse(Response response) throws IOException {
        if (response.status() != HttpURLConnection.HTTP_OK) {
            String msg = "" +
                    "ResponseCode: " + response.status() + "; " +
                    "Reason: " + response.reason() + "; " +
                    "URL: " + response.request().url();
            throw new IOException(msg);
        }
    }

    private boolean hasAnyErrors() {
        return getStatusList().stream().anyMatch(x -> x.getLevel() > 1);
    }
}


HttpAppender代码:

package com.unicorn.tec.logstash;

import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.AppenderBase;
import net.logstash.logback.encoder.LogstashEncoder;
import org.springframework.stereotype.Component;

@Component
public class HttpAppender extends AppenderBase<ILoggingEvent>
{
    private LogstashEncoder encoder;
    private HttpClient httpClient;

    @Override
    public synchronized void start(){
        if(isStarted()){
            return;
        }

        verifyConfigurationParameters();
        addInfo("HttpAppender started");

        super.start();
    }

    @Override
    public void append(ILoggingEvent event) {
        if (!isStarted()) {
            return;
        }

        httpClient.addLog(event);

    }

    public void setEncoder(LogstashEncoder encoder) {
        this.encoder = encoder;
    }

    public void setHttpClient(HttpClient httpClient) {
        this.httpClient = httpClient;
    }

    private void verifyConfigurationParameters(){
        if(encoder == null){
            addError("No encoder was configured. Use <encoder> to specify the class name");
        }

        if(httpClient == null){
            addError("No destination was configured. Use <httpClient> to specify HttpClient");
        }
    }
}


至此,项目中所有error级别的log都会被推送到logstash中。但如果想要记录程序运行时的异常的话,则需要通过Spring框架截获异常并记录到logback中。


4、截获异常并记录

为了方便定位问题,我将request的一些信息也记录在日志里了。

@ControllerAdvice
@Slf4j
public class TecExceptionHandler extends ResponseEntityExceptionHandler {
    @ExceptionHandler(value = {Exception.class})
    protected Object handleExceptionInternal(Exceptionex, WebRequest request) {

        MDC.put("url", ((ServletWebRequest) request).getRequest().getRequestURI());
        MDC.put("parameters",JSON.toJSONString(request.getParameterMap()));
        MDC.put("cookie",request.getHeader("cookie"));

        log.error(ex.getMessage(), ex);

        MDC.clear();
        
        return .....
    }
}


5、总结

    核心思路就是通过监听logback的ILoggingEvent事件,然后封装成自己想要的内容,发送到自己的logstash中。因为logback无法直接记录exception,所以需要截获exception并记录到log中(可能是自己没找到合适的方法,希望大神纠正)。


参考:

idealo/ logstash-logback-http




这篇关于Spring Boot 整合Logstash的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程