使用elkf做日志分析,把收集到的日志通过kafka进行同步,然后使用logstash进行写入到ElasticSearch中,但是在filebeat收集的时候是真对目录进行收集的,所以无法在filebeat进行筛选采集,那就需要在logstash中筛选了,下面就说一下如何在logstash中进行筛选。我的采集是kubernetes中所有的应用程序的日志,这里说的是java程序的日志筛选,nginx的下个章节在说。看一下我不过滤的日志内容:
@timestamp Jul 1, 2020 @ 20:24:08.754 t @version 1 t _id 1lJVCnMBDJGsKWLZmoA5 t _index english-2020.07.01 # _score - t _type _doc t agent.ephemeral_id 818efb25-67bb-44bb-9426-37608ef89ef8 t agent.hostname iZ2ze268ldc0zhn2tu6gmeZ t agent.id e085cb09-6499-43bf-a253-badddb62bbf1 t agent.type filebeat t agent.version 7.4.2 t ecs.version 1.1.0 t host.architecture x86_64 host.containerized true t host.hostname iZ2ze268ldc0zhn2tu6gmeZ t host.name iZ2ze268ldc0zhn2tu6gmeZ t host.os.codename Core t host.os.family redhat t host.os.kernel 3.10.0-957.21.3.el7.x86_64 t host.os.name CentOS Linux t host.os.platform centos t host.os.version 7 (Core) t input.type container t kubernetes.container.image ************************ t kubernetes.container.name #####项目名称 t kubernetes.labels.app ************************ t kubernetes.labels.pod-template-hash 6d997f74df t kubernetes.labels.release ************************ t kubernetes.namespace ************************ t kubernetes.node.name ************************ t kubernetes.pod.name ************************ t kubernetes.pod.uid ************************ t kubernetes.replicaset.name ************************ t log.file.path ************************8ce9233.log # log.offset 67,740,583 t message 2020-07-01 20:24:08.754 INFO [************************,,,] 9 --- [essageThread_10] com.tope365.common.core.lock.RedisLock : unLock---->result:解锁成功,requestId:3d7a79f5b6034c6e92d7737d997fe1f1 t stream stdout
这里主要的内容有时间戳,字段@timestamp,项目名字,字段kubernetes.container.name,日志内容,字段message,下面针对需求的字段进行筛选,我这里是把不需要的删除掉。看我的logstash的配置文件。
input { kafka { bootstrap_servers => "10.16.30.1:9092" client_id => "logstash01" topics => ["test"] group_id => "logstash" decorate_events => true codec => "json" } } filter { mutate{ #删除的字段 remove_field => ["_id"] remove_field => ["_score"] remove_field => ["_type"] remove_field => ["_index"] remove_field => ["host"] remove_field => ["agent"] remove_field => ["ecs"] remove_field => ["tags"] remove_field => ["fields"] remove_field => ["@version"] remove_field => ["stream"] remove_field => ["log"] remove_field => ["kubernetes"] remove_field => ["input"] } mutate{ #添加一个删除的字段 add_field => { "kubernetes.container.name" => "kubernetes.container.name" } } } output { elasticsearch { hosts => ["http://10.16.30.2:9200"] index => "test-%{+YYYY.MM.dd}" } }
因为kubernetes字段中包含了项目名称,如果一个一个的删除,删除不了,所以这里删除了kubernetes中的所有字段,然后把项目名称的字段在添加上,这样就能保留项目字段的内容了。运行一下看看ElasticSearch中写了那些字段,我这里使用kibana查看一下。
@timestamp Jul 1, 2020 @ 20:28:28.628 t _id kv1ZCnMBBxDuFpkkmt-8 t _index test-2020.07.01 # _score - t _type _doc t kubernetes.container.name 项目名称 t message 2020-07-01 20:28:28.628 INFO [项目名称,0683fc07499fba1b,0683fc07499fba1b,false] 6 --- [XNIO-1 task-114] c.i.data.hbase.admin.config.LogAspect : Response:[Result(code=0, message=success, data=null, requestId=null, isSuccess=null)]
采集的内容和过滤的一样,这里有几个没有过滤,分别是字段_id,_index,_score, _type 没有过滤,不过滤的原因是过滤不掉,这些是系统自带的,后期如果有好的方法,在继续补充。
总结一下:这里删除字段的时候必须删除一级字段,例如:字段kubernetes.container.name,如果我删除字段kubernetes.container.name,在logstash中就不能实现,只能删除kubernetes,会把kubernetes下的所有字段删除。
2021年9月2日 13:53 板凳
现在有个场景,有很多字段,不知道哪些字段要移除,但是我知道哪些字段是要保留的 ,这种场景该怎么处理呢
2021年9月6日 14:34 1层
@哈喽 最简单的方式就是把输出的字段根据自己的需求从新拼接一个新的格式,新格式里只有保留的字段,其余的都删除。
2021年7月2日 17:23 沙发
老哥,拿不到值呀,求救
2021年7月2日 17:36 1层
@邢梓岩 你要先看一下你前面产生日志的字段,根据字段在做筛选就好了