## 注意不要换行,filebeat按行解析
[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n
filebeat.prospectors:
- input_type: log
paths:
## app服务名称.log,写死是为了防止发生轮转抓取历史数据
- /usr/local/logs/app-collector.log
## 自定义ES中_type值
document_type: "app-log"
multiline:
pattern: '^\['
negate: true ## 是否匹配到
match: after ## 合并到上一行末尾
max_lines: 2000 ## 最大行数
timeout: 2s ## 如果在规定时间没有新的日志事件就不等待后面的日志
fields:
logbiz: collector
logtopic: app-log-collector ## 按服务划分用作kafka topic
env: dev
- input_type: log
paths:
- /usr/local/error-collector.log
document_type: "error-log"
multiline:
pattern: '^\['
negate: true ## 是否匹配到
match: after ## 合并到上一行末尾
max_lines: 2000 ## 最大行数
timeout: 2s ## 如果在规定时间没有新的日志事件就不等待后面的日志
fields:
logbiz: collector
logtopic: error-log-collector ## 按服务划分用作kafka topic
env: dev
output.kafka:
enabled: true
hosts: [192.168.150.70:9092]
topic: '%{[fields.logtopic]}'
partition.hash:
reachable_only: true
compression: gzip
max_message_bytes: 1000000
required_acks: 1
logging.to_files: true
## multiline插件也可用于其它类似的堆栈式信息,比如linux的内核日志
input {
kafka {
client_id => "0"
## app-log-服务名称
topics_pattern => "app-log-.*"
bootstrap_servers => "192.168.150.70:9092"
codec => json
consumer_threads => 1
decorate_events => true
group_id => "app-log-group"
}
kafka {
client_id => "1"
## error-log-服务名称
topics_pattern => "error-log-.*"
bootstrap_servers => "192.168.150.70:9092"
codec => json
consumer_threads => 1
decorate_events => true
group_id => "error-log-group"
}
}
filter {
ruby {
code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"
}
if "app-log" in [fields][logtopic]{
grok {
match => {"message" => "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"}
}
}
if "error-log" in [fields][logtopic]{
grok {
## 表达式
match => {"message" => "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"}
}
}
}
## 测试输出到控制台
output {
stdout {codec => rubydebug}
}
output {
if "app-log" in [fields][logtopic]{
## es插件
elasticsearch {
## es服务地址
hosts => ["192.168.150.40:9200"]
index => "app-log-%{[fields][logbiz]}-%{index_time}"
## 是否嗅探集群ip,通过设置进行es负载均衡发日志消息
sniffing => true
# 模板覆盖
template_overwrite => true
}
}
if "error-log" in [fields][logtopic]{
## es插件
elasticsearch {
## es服务地址
hosts => ["192.168.150.40:9200"]
index => "error-log-%{[fields][logbiz]}-%{index_time}"
## 是否嗅探集群ip,通过设置进行es负载均衡发日志消息
sniffing => true
# 模板覆盖
template_overwrite => true
}
}
}
## 创建一个watcher,比如定义一个trigger 每个5s钟看一下input里的数据
PUT _xpack/watcher/watch/error_log_collector_watcher
{
"trigger": {
"schedule": {
"interval": "5s"
}
},
"input": {
"search": {
"request": {
"indices": ["<error_log_collector-{now+8h/d}>"],
"body": {
"size": 0,
"query": {
"bool": {
"must": [
{
"term": {"level": "ERROR"}
}
],
"filter": {
"range": {
"currentDateTime": {
"gt": "now-30s" , "lt": "now"
}
}
}
}
}
}
}
}
},
"condition": {
"compare": {
"ctx.payload.hits.total": {
"gt": 0
}
}
},
"transform": {
"search": {
"request": {
"indices": ["<error-log-collector-{now+8h/d}>"],
"body": {
"size": 1,
"query": {
"bool": {
"must": [
{
"term": {"level": "ERROR"}
}
],
"filter": {
"range": {
"currentDateTime": {
"gt": "now-30s" , "lt": "now"
}
}
}
}
},
"sort": [
{
"currentDateTime": {
"order": "desc"
}
}
]
}
}
}
},
"actions": {
"test_error": {
"webhook" : {
"method" : "POST",
"url" : "http://192.168.11.31:8001/accurateWatch",
"body" : "{\"title\": \"异常错误告警\", \"applicationName\": \"\", \"level\":\"告警级别P1\", \"body\": \"\", \"executionTime\": \"\"}"
}
}
}
}