本站提供互联网编程技术交流分享,部分技术教程不断更新中,请随时关注或联系我寻求帮助 ,同时也欢迎有兴趣的朋友进行投稿。

Logstash读取Kafka数据写入HDFS详解

ELK 熊哥club 194℃ 0评论

强大的功能,丰富的插件,让logstash在数据处理的行列中出类拔萃

通常日志数据除了要入ES提供实时展示和简单统计外,还需要写入大数据集群来提供更为深入的逻辑处理,前边几篇ELK的文章介绍过利用logstash将kafka的数据写入到elasticsearch集群,这篇文章将会介绍如何通过logstash将数据写入HDFS

本文所有演示均基于logstash 6.6.2版本

数据收集

logstash默认不支持数据直接写入HDFS,官方推荐的output插件是webhdfs,webhdfs使用HDFS提供的API将数据写入HDFS集群

插件安装

插件安装比较简单,直接使用内置命令即可

配置hosts

HDFS集群内通过主机名进行通信所以logstash所在的主机需要配置hadoop集群的hosts信息

如果不配置host信息,可能会报下边的错

logstash配置

kafka里边的源日志格式可以参考这片文章:ELK日志系统之使用Rsyslog快速方便的收集Nginx日志

logstash的配置如下:

logstash配置文件分为三部分:input、filter、output

input指定源在哪里,我们是从kafka取数据,这里就写kafka集群的配置信息,配置解释:

  • bootstrap_servers:指定kafka集群的地址
  • topics:需要读取的topic名字
  • codec:指定下数据的格式,我们写入的时候直接是json格式的,这里也配置json方便后续处理

filter可以对input输入的内容进行过滤或处理,例如格式化,添加字段,删除字段等等

这里我们主要是为了解决生成HDFS文件时因时区不对差8小时导致的文件名不对的问题,后边有详细解释

output指定处理过的日志输出到哪里,可以是ES或者是HDFS等等,可以同时配置多个,webhdfs主要配置解释:

  • host:为hadoop集群namenode节点名称
  • user:为启动hdfs的用户名,不然没有权限写入数据
  • path:指定存储到HDFS上的文件路径,这里我们每日创建目录,并按小时存放文件
  • stdout:打开主要是方便调试,启动logstash时会在控制台打印详细的日志信息并格式化方便查找问题,正式环境建议关闭

webhdfs还有一些其他的参数例如compression,flush_size,standby_host,standby_port等可查看官方文档了解详细用法

启动logstash

因为logstash配置中开了stdout输出,所以能在控制台看到格式化的数据,如下:

查看hdfs发现数据已经按照定义好的路径正常写入

至此kafka到hdfs数据转储完成

遇到的坑

HDFS按小时生成文件名不对

logstash在处理数据时会自动生成一个字段@timestamp,默认情况下这个字段存储的是logstash收到消息的时间,使用的是UTC时区,会跟国内的时间差8小时

我们output到ES或者HDFS时通常会使用类似于rsyslog-nginx-%{+YYYY.MM.dd}这样的变量来动态的设置index或者文件名,方便后续的检索,这里的变量YYYY使用的就是@timestamp中的时间,因为时区的问题生成的index或者文件名就差8小时不是很准确,这个问题在ELK架构中因为全部都是用的UTC时间且最终kibana展示时会自动转换我们无需关心,但这里要生成文件就需要认真对待下了

这里采用的方案是解析日志中的时间字段time_local,然后根据日志中的时间字段添加两个新字段index.date和index.hour来分别标识日期和小时,在output的时候使用这两个新加的字段做变量来生成文件

logstash filter配置如下:

output的path中配置如下

HDFS记录多了时间和host字段

在没有指定codec的情况下,logstash会给每一条日志添加时间和host字段,例如:

源日志格式为

经过logstash处理后多了时间和host字段

如果不需要我们可以指定最终的format只取message,解决方法为在output中添加如下配置:

同时output到ES和HDFS

在实际应用中我们需要同时将日志数据写入ES和HDFS,那么可以直接用下边的配置来处理

这里我使用logstash的date插件将日志中的”time_local”字段直接替换为了@timestamp,这样做有什么好处呢?

logstash默认生成的@timestamp字段记录的时间是logstash接收到消息的时间,这个时间可能与日志产生的时间不同,而我们往往需要关注的时间是日志产生的时间,且在ELK架构中Kibana日志输出的默认顺序就是按照@timestamp来排序的,所以往往我们需要将默认的@timestamp替换成日志产生的时间,替换方法就用到了date插件,date插件的用法如下

match:匹配日志中的时间字段,这里为time_local

target:将match匹配到的时间戳存储到给定的字段中,默认不指定的话就存到@timestamp字段

另外还有参数可以配置:timezone,locale,tag_on_failure等,具体可查看官方文档

转自:运维咖啡吧

本文地址: https://www.xiongge.club/1386.html

转载请注明:熊哥club » Logstash读取Kafka数据写入HDFS详解

喜欢 (3)
[您的支持是我最大的动力]
分享 (0)
发表我的评论
取消评论
表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
×
订阅图标按钮