filebeat采集多个目录日志的场景是因为我的场景是kubernetes,在kubernetes中所有的服务都跑在容器中,容器中的日志都在node节点上,server端的日志我是直接采集的一个路径,在路径中所有的server服务的日志都采集,nginx的日志路径和server日志路径是一致的,不过在全文采集时出现了利用正常,server的日志格式和nginx的日志格式不一样。所以采集日志的时候server的日志采集的比较全,但是nginx的日志就采集不全了,这就需要我采集两个路径的日志,不过两个不同路径的日志就需要分别存储,我这里使用的是kafka进行过滤,那就需要使用logstash进行区分。
# cat filebeat-kafka.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: filebeat
namespace: logging
labels:
k8s-app: filebeat
data:
filebeat.yml: |-
filebeat.inputs:
- type: log
paths:
- /var/log/containers/english*.log
tags: ["eng_java_logs"]
multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}\ [0-9]{2}:[0-9]{2}:[0-9]{2}'
multiline.negate: true
multiline.match: after
multiline.timeout: 10s
encoding: utf-8
document_type: mysql-proxy
scan_frequency: 20s
harverster_buffer_size: 16384
max_bytes: 10485760
tail_files: true
- type: log
paths:
- /var/log/pods/*.log
key: ["eng_nginx_logs"]
multiline.pattern: '^{'
multiline.negate: true
multiline.match: after
multiline.timeout: 10s
encoding: utf-8
document_type: mysql-proxy
scan_frequency: 20s
harverster_buffer_size: 16384
max_bytes: 10485760
tail_files: true
.................................
这样在filebeat中就能采集两个路径下的所有日志,这里要注意,日志的数据一定要在两个路径下,要不会采集不到数据,把采集的数据发送到kafka中,然后在logstash中进行过滤,根据不同的路径日志写到不同的索引中。
input {
kafka {
bootstrap_servers => "10.16.30.1:9092"
client_id => "logstash01"
topics => ["ienglish"]
group_id => "logstash"
decorate_events => true
codec => "json"
tags => "eng_java_logs"
key => "eng_nginx_logs"
}
}
filter {
mutate{
remove_field => ["_id"]
remove_field => ["_score"]
remove_field => ["_type"]
remove_field => ["_index"]
remove_field => ["host"]
remove_field => ["agent"]
remove_field => ["ecs"]
remove_field => ["tags"]
remove_field => ["fields"]
remove_field => ["@version"]
#remove_field => ["@timestamp"]
remove_field => ["stream"]
remove_field => ["log"]
remove_field => ["kubernetes"]
remove_field => ["input"]
}
mutate{
add_field => { "kubernetes.container.name" => "kubernetes.container.name" }
}
grok{
match => {
"message" => "%{TIMESTAMP_ISO8601:access_time} %{LOGLEVEL:loglevel} \[%{DATA:exception_info}\] - \<%{MESSAGE:message}\>"
}
pattern_definitions => {
"MESSAGE" => "[\s\S]*"
}
}
date {
match => [ "access_time","yyyy-MM-dd HH:mm:ss,SSS" ]
}
mutate {
remove_field => ["access_time","[message][0]"]
}
}
output {
if [tags] == "eng_java_logs" {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "server-%{+YYYY.MM.dd}"
}
} else if [key] == "eng_nginx_logs" {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "msg-%{+YYYY.MM.dd}"
}
}
这样就能保证两个不同的路径写到不同的索引中了,第二个路径中的日志没有做过滤,直接写到es中,在es中会产生两个索引的文件分别是msg开头和server开头的文件,这样就可以根据不同的索引进行匹配内容了。

您可以选择一种方式赞助本站
支付宝扫一扫赞助
微信钱包扫描赞助
赏