ELK

ELK

数据流向

应用日志格式

## 注意不要换行,filebeat按行解析
[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n

filebeat

filebeat.prospectors:
- input_type: log
  paths:
    ## app服务名称.log,写死是为了防止发生轮转抓取历史数据
    - /usr/local/logs/app-collector.log
    ## 自定义ES中_type值
    document_type: "app-log"
    multiline:
      pattern: '^\['
      negate: true			## 是否匹配到
      match: after			## 合并到上一行末尾
      max_lines: 2000			## 最大行数
      timeout: 2s			## 如果在规定时间没有新的日志事件就不等待后面的日志
    fields:
      logbiz: collector
      logtopic: app-log-collector	## 按服务划分用作kafka topic
      env: dev
- input_type: log
  paths:
    - /usr/local/error-collector.log
    document_type: "error-log"
    multiline:
      pattern: '^\['
      negate: true			## 是否匹配到
      match: after			## 合并到上一行末尾
      max_lines: 2000			## 最大行数
      timeout: 2s			## 如果在规定时间没有新的日志事件就不等待后面的日志
    fields:
      logbiz: collector
      logtopic: error-log-collector	## 按服务划分用作kafka topic
      env: dev
output.kafka:
  enabled: true
  hosts: [192.168.150.70:9092]
  topic: '%{[fields.logtopic]}'
  partition.hash:
    reachable_only: true
  compression: gzip
  max_message_bytes: 1000000
  required_acks: 1
logging.to_files: true

kafka

logstash

## multiline插件也可用于其它类似的堆栈式信息,比如linux的内核日志
input {
    kafka {
        client_id => "0"
        ## app-log-服务名称
        topics_pattern => "app-log-.*"
        bootstrap_servers => "192.168.150.70:9092"
        codec => json
        consumer_threads => 1
        decorate_events => true
        group_id => "app-log-group"
    }

    kafka {
        client_id  => "1"
        ## error-log-服务名称
        topics_pattern => "error-log-.*"
        bootstrap_servers => "192.168.150.70:9092"
        codec => json
        consumer_threads => 1
        decorate_events => true
        group_id => "error-log-group"
    }
}

filter {
    ruby {
        code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"
    }

    if "app-log" in [fields][logtopic]{
        grok {
            match => {"message" => "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"}
        }
    }

    if "error-log" in [fields][logtopic]{
        grok {
            ## 表达式
            match => {"message" => "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"}
        }
    }
}


## 测试输出到控制台
output {
    stdout {codec => rubydebug}
}

output {
    if "app-log" in [fields][logtopic]{
        ## es插件
        elasticsearch {
            ## es服务地址
            hosts => ["192.168.150.40:9200"]
            index => "app-log-%{[fields][logbiz]}-%{index_time}"
            ## 是否嗅探集群ip,通过设置进行es负载均衡发日志消息
            sniffing => true
            # 模板覆盖
            template_overwrite => true
        }
    }

    if "error-log" in [fields][logtopic]{
        ## es插件
        elasticsearch {
            ## es服务地址
            hosts => ["192.168.150.40:9200"]
            index => "error-log-%{[fields][logbiz]}-%{index_time}"
            ## 是否嗅探集群ip,通过设置进行es负载均衡发日志消息
            sniffing => true
            # 模板覆盖
            template_overwrite => true
        }
    }
}

kibana

## 创建一个watcher,比如定义一个trigger 每个5s钟看一下input里的数据
PUT _xpack/watcher/watch/error_log_collector_watcher
{
  "trigger": {
    "schedule": {
      "interval": "5s"
    }
  },
  "input": {
    "search": {
      "request": {
        "indices": ["<error_log_collector-{now+8h/d}>"],
        "body": {
          "size": 0,
          "query": {
            "bool": {
              "must": [
                  {
                    "term": {"level": "ERROR"}
                  }
              ],
              "filter": {
                "range": {
                    "currentDateTime": {
                    "gt": "now-30s" , "lt": "now"
                  }
                }
              } 
            }
          }
        }
      }
    }
  },

  "condition": {
    "compare": {
      "ctx.payload.hits.total": {
        "gt": 0
      }
    }
  },
 
  "transform": {
    "search": {
      "request": {
        "indices": ["<error-log-collector-{now+8h/d}>"],
        "body": {
          "size": 1,
          "query": {
            "bool": {
              "must": [
                  {
                    "term": {"level": "ERROR"}
                  }
              ],
              "filter": {
                "range": {
                    "currentDateTime": {
                    "gt": "now-30s" , "lt": "now"
                  }
                }
              } 
            }
          },
          "sort": [
            {
                "currentDateTime": {
                    "order": "desc"
                }
            }
          ]
        }
      }
    }
  },
  "actions": {
    "test_error": {
      "webhook" : {
        "method" : "POST",
        "url" : "http://192.168.11.31:8001/accurateWatch",
        "body" : "{\"title\": \"异常错误告警\", \"applicationName\": \"\", \"level\":\"告警级别P1\", \"body\": \"\", \"executionTime\": \"\"}"
      }
    }
 }
}

发表评论

发表
Table of Contents