一次logstash性能排查记录

youncyb 发布于 2023-02-01 2958 次阅读 大数据


一次logstash性能排查记录

1. 前言

数据架构最开始是filebeat直接入ES,但是由于后面扩充了filebeat节点数量,这就导致了2个问题,第一个是如果遇到解析方面的修改,那么需要修改多个filebeat,很麻烦;第二个是多个filebeat入库,时常导致ES集群崩溃。鉴于这两个原因,将架构切换为了filebeat入logstash,logstash再入ES。

logstash的配置如下:

CPU: 32核

JVM:31G

机器内存:96G

带宽:内网千兆

logstash.yml:

平均一条json数据是:90KB。

当架构切换为ELK时,又遇到了一个问题,查看kibana上面的监控显示:logstash到ES的吞吐量远远<=500/s,并且输入数据呈现波浪状,一会高达几百,一会儿变为0。

怀疑是kibana监控的问题,又用Prometheus搭建了一个监控,这个监控可以额外看到heap和GC。但是,该监控的显示的平均速度依然几百/s。当排除了是监控计数的问题后,又通过Events in 、Filtered rate 和 Events out 进行对比,发现filter和out的速度都跟得上in的速度。并且通过htop观察到CPU的占用率只有30%左右,nload只有9左右;通过iftop观察到网络带宽占用基本上没有。

根据监控的数据,推断瓶颈在于输入这端,但是不确定是filebeat造成的,还是logstash造成的,然后开始了试错过程。

 

2. 试错过程

1. 调整filebeat配置

调整了filebeat的queue.mem、bulk_max_size、worker,调整情况如下,这两个参数意思如下:

  • queue.mem.events:表示filebeat可以在内存中存储多少个事件,一条数据算一个事件,如果output消费不赢,并且达到了该值,就会阻塞harvest。将该值调大,可以避免读文件浪费的时间,默认是4096,设置为20000。

  • queue.mem.flush.min_events:表示当满足了该值,就可以将事件发送到output。默认是2048,设置为4096。

  • queue.mem.flush.timeout:表示min_events最多等待时间。默认是1s,设置为2s,如果超过2s,不管是否达到4096,都会发送事件到output。

  • bulk_max_size:output一次性最多可以处理的事件,增大该值可以减少发事件的次数。默认2048,设置为4096。

  • worker:output的给每个host设置的线程数量,默认是1。下面配置中,设置了3个host,两个worker,所以最终的线程数是:2*3=6。

 

这次测试数据总量为4w条,本以为logstash显示Events in rate可以快速增长,但事与愿违,从监控上看到的事件输入和处理速度仍然<=500/s,处理4w条数据,耗时20分钟以上,这不经让我沉思。

首先排除掉是filebeat机器配置的问题,因为filebeat所在机器的CPU和内存占用都很低,并且网络占用,也仅仅只在最开始发送数据时有占用,其他时间基本为0。

再查看filebeat的日志,发现filebeat的第一批数据很快就发送出去了,harvset是在阻塞的状态,再结合网络占用基本为0,由此,确定是output的消费速度没有跟上,即logstash没有跟上filebeat推的速度。

首先排除网络传输原因,再根据之前观察的情况,确定问题出在logstash的input上

2. 排除compression_level

filebeat的压缩等级设置为3,采用的是gzip压缩。怀疑是不是logstash将事件消耗在解压数据上了,于是将其设置为0,发现只是稍微增加了传输耗时,数据依然恰在了logstash的input处,还没进入filter。

3. 是不是logstash的beats处理比较大的数据太慢?

为了验证这个问题,将logstash的input修改为从本机文件中读,配置如下,测试结果显示4w行的json文件,从读文件到输出完成,耗时不到40秒。

可以确定问题就出来logstash的beats插件,接下来就是找到出问题的到底是beats哪部分。

3. 调试logstash-input-beats

beats插件是由ruby和java编写的,java是核心处理代码,ruby是启动代码。首先看看ruby是如何启动beats服务的,在logstash-input-beats-6.1.2-java/lib/logstash/inputs/beats.rb,首先通过注册器注册了一个beats服务,然后在服务中注册了一个监听器,每次到来的数据都会放入MessageListener类处理。

MessageListener获取了每一条数据传入java处理,加上时间打印后,发现瓶颈不在此处。那么问题肯定出在了核心的java代码上。

修改logstash启动的jvm参数,开启远程debug,对Message的getData方法下断点src/main/java/org/logstash/beats/Message.java,发现即使只传输5000条数据,也等了很久,才触发getData方法的断点。可知导致问题出现的地方还在更前面的位置,通过观察调用栈,有一个类引起了我的注意:BytetoMessageDecoder,该方法是对数据进行解码。

image-20220907213929555

此处调用callDecode对数据解码,对该方法下断点,跟进decodeRemovalReentryProtection方法,最终会进入decode方法。在调试时就发现callDecode偶尔会有一点卡顿,而样本数据的某些行,一行数据甚至超过了10m,所以,大概率是decode问题。

image-20220907214204955

 

image-20220907214246391

 

image-20220907214315740

decode会进入处理json的逻辑,addMessage会当每一条消息到来就去扩容一次,而且每次扩容都只扩message.size+8字节,如果碰到一条比较大的消息,那么就会将时间浪费在对interBuffer的扩容上,所以在调试时偶尔会感到卡顿。

这个设计,应该是为了避免浪费JVM内存,所以选择了一次性扩一点。

image-20220907215052267

image-20220907215206440

 

4. 解决方案

通过增加扩容的字节数,减少扩容次数。

最后,当修改完毕,想通过官方推荐的方案使用logstash-plugin install时,遇到了极大的坑,安装显示成功,但是由于没有编译,所以报错了。然后打算通过ruby的bundle install安装依赖,再使用gem build编译,遇到依赖冲突了。折腾了半天,发现ruby编译太麻烦了,突然想到直接替换jar不就好了嘛?然后在本地通过gradle vendor编译。

上线后,速度提升不止10倍。但长期观察后,发现如果ES的吞吐跟不上logstash的接收速度,那么会导致logstash内存崩溃。所以,调整时需根据自身情况调整扩容方式。

5. 后续遇到问题

针对第4部分的,当遇到filebeat推送速度超过了logstash的写入速度,此时瓶颈在es的写入速度上。由于logstash没有速度限制器,会导致出现以下2个问题。这两个问题都是由于Direct buffer溢出导致,根本原因则是没有限速器。

针对出现的两个问题,通过搜索logstash-input-beats的issue,可以找到有前人已经遇到过该类问题,并且已经提出了PR,但该PR还没被合并。

该PR是基于475和410的,通过检查PR475,发现该PR实现了两个功能,其原理可以参考netty4-数据读取性能大于处理时“直接内存水位处理器”使用

  1. 将beats的push-read方式替换为pull-read,可以控制beats端的速度

  2. 当遇到快接近oom时,则丢弃新来的beats链接

由于475是410的增强版,所以我们只需要合并PR475即可。

image-20240218113324132

5.1 修改logstash-input-beats

合并完分支后,还需要注释以下代码src/main/java/org/logstash/beats/OOMConnectionCloser.java,避免过多的日志打印:

image-20240219100624489

5.2 编译与安装

1. 如果是在插件本身的版本是6.8.0,则直接替换vendor

完成代码后,使用./gradlew vendor编译jar,会生成一个叫做vendor的目录,如果是在6.8.0版本的插件上使用,可以直接替换vendor目录。

image-20240219101452363

2. 其他版本

在其他版本的插件修改,需要先通过以下命令编译成gem文件(注:由于logstash的版本不同,可能会遇到Ruby的依赖冲突问题,需自行解决依赖冲突问题,我使用的是logstash7.17.17,未遇到依赖冲突,8.0则有依赖冲突)。

然后在本地环境中,安装一次

安装beats完成后,可在logstash的gemfile中看到插件已被修改为自定义版本,位于localgems下。

image-20240219102603225

接着,可以生成离线版的插件,供服务器使用或者打包整个logstash替换。

生成离线时,(注:由于ruby源在国外,需要在shell中挂上HTTP代理):

安装:

 

5.3 解决仍存在的bug

上线运行几天后,在直接batch.release()会遇到netty的内存释放引用计数错误,该问题可能导致batch未正确的释放,内存泄露。

image-20240219103153138

发现该问题后,为了拿到更全面的日志输出,我将通过--log.level=debug添加到启动命令行,通过对日志进行分析后,发现最终引发问题的原因在于:

错误是通过AbstractByteBuf.checkNewCapacity函数抛出,结合源码得知是申请后的容量超过了最大容量限制,而maxCapacity()的定义是int类型,那么容量最大也就是2GB。

上文中,为了修复扩容过于缓慢的问题,我们将netty.ByteBuf的扩容方式改为了将当前容量*2,这就会导致如果当前容量为1GB时,无法处理完beats发来的一次完整的batch,那么就需要再次扩容,接着就发生容量超过2147483647字节的错误。

所以解决方式有两种:

  1. 降低beats每次发送数据batchSize,可通过估算每条数据byteSize*batchSize低于1GB

  2. 我的filebeat设置的batchSize是4096条,为了避免更改多个filebeat端的配置,修改扩容方式如下:

     

6. 参考

  1. 写了个简单的logstash-output-rocketmq插件

  2. 优雅地修改他人贡献的 Pull Request

  3. Implement a strategy to handle OOM in direct memory