如何收集一个应用的日志(3)

简介

这篇文章中提到
filebeat在收集日志的时候,我们让其增加一个log_source字段,表示日志是来源于哪个应用的。

在Kafka MQ的另外一端,也有一个logstash。它主要有三个功能:
1.接收日志
2.解析日志
3.不同的应用保存日志到不同的elasticsearch索引里

接下来,讲讲在logstash配置文件里,如何做到这三个功能

接收日志

input {
    kafka {
        bootstrap_servers => "192.168.1.240:9092,192.168.1.212:9092"
        topics => ["app-log-topic"]
        group_id => "oa-log-consumer-group"
        auto_offset_reset => "earliest"
        decorate_events => "false"
    }
}

这里用到了kafka插件,接受两个kafka节点的日志消息(192.168.1.240:9092,192.168.1.212:9092)
队列名是app-log-topic
消费不同app的日志,group_id也要不同。

解析日志

filter {
        json {
                source => "message"
                remove_field => [ "tags","offset","beat" ,"prospector"]
        }
        if ([fields][log_source] != "oa" ) {
                drop {}
        }
        mutate {
                rename => { "[fields][log_source]" => "log_source" }
                remove_field => [ "fields"]
        }
        grok {
                        match => {"message" => "\[%{LOGLEVEL:loglevel}\] \[(?<occur_time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(.\d{3})?)\] %{JAVACLASS:javaclass} \[%{NUMBER:linenum}\] - (?<content>.*)"}
        }
        if ([loglevel] !~ "^[A-Z]+$"){
                        grok{
                                match => {"message" => "\[%{LOGLEVEL:loglevel}\] \[(?<occur_time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(.\d{3})?)\] %{JAVACLASS:javaclass}\.\[%{DATA}\]\.\[%{DATA}\] \[%{NUMBER:linenum}\] - (?<content>.*)"}
                        }
        }
                if ([loglevel] !~ "^[A-Z]+$"){
                        drop {}
                }
                if ([occur_time] =~ "\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.\d{3}") {
                        date {
                                match => ["occur_time", "yyyy-MM-dd HH:mm:ss.SSS"]
                                target => "@timestamp"
                        }
                } else if ([occur_time] =~ "\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}") {
                        date {
                                match => ["occur_time", "yyyy-MM-dd HH:mm:ss"]
                                target => "@timestamp"
                        }
                }
}

在接下的日志解析过程中,使用了几个logstash的插件,例如:json,mutate,date,grok
json:把收到的消息文本,格式化成json。同时,去除几个字段
mutate:把log_source字段放到json的根节点里
grok:解析具体的日志内容
date:解析日志里的日期时间,并放到@timestamp字段里

这里有一个关键语句,如下:

if ([fields][log_source] != "oa" ) {
    drop {}
}

表示只处理log_source=oa的日志,其他日志不处理。

保存日志

elasticsearch{
        hosts=>["192.168.1.240:9200","192.168.1.230:9200","192.168.1.213:9200"]
        action=>"index"
        index=>"%{log_source}-index"
   }

表示把解析后的数据保存在elasticsearch里,索引名称由log_source来决定。

一个logstash消费不同应用的日志

logstash有个pipelines.yml配置文件
里面为不同应用设置不同配置文件,如下:

- pipeline.id: oa
   path.config: "/opt/logstash-6.2.2/config/kafka-oa-log-receiver.conf"
 - pipeline.id: im-server
   path.config: "/opt/logstash-6.2.2/config/kafka-im-server-log-receiver.conf"
 - pipeline.id: nginx-access
   path.config: "/opt/logstash-6.2.2/config/kafka-nginx-access.conf"
 - pipeline.id: nginx-error
   path.config: "/opt/logstash-6.2.2/config/kafka-nginx-error.conf"

每个配置文件都是input,filter,output结构

此条目发表在IT分类目录,贴了, , , 标签。将固定链接加入收藏夹。

发表评论

电子邮件地址不会被公开。

*