1. 前言
数据架构最开始是filebeat直接入ES,但是由于后面扩充了filebeat节点数量,这就导致了2个问题,第一个是如果遇到解析方面的修改,那么需要修改多个filebeat,很麻烦;第二个是多个filebeat入库,时常导致ES集群崩溃。鉴于这两个原因,将架构切换为了filebeat入logstash,logstash再入ES。
logstash的配置如下:
CPU: 32核
JVM:31G
机器内存:96G
带宽:内网千兆
logstash.yml:
xxxxxxxxxx
71http.host"0.0.0.0"
2pipeline.workers32
3pipeline
4 batch
5 size3000
6 delay5
7pipeline.orderedfalse
平均一条json数据是:90KB。
当架构切换为ELK时,又遇到了一个问题,查看kibana上面的监控显示:logstash到ES的吞吐量远远<=500/s,并且输入数据呈现波浪状,一会高达几百,一会儿变为0。
怀疑是kibana监控的问题,又用Prometheus搭建了一个监控,这个监控可以额外看到heap和GC。但是,该监控的显示的平均速度依然几百/s。当排除了是监控计数的问题后,又通过Events in 、Filtered rate 和 Events out 进行对比,发现filter和out的速度都跟得上in的速度。并且通过htop观察到CPU的占用率只有30%左右,nload只有9左右;通过iftop观察到网络带宽占用基本上没有。
根据监控的数据,推断瓶颈在于输入这端,但是不确定是filebeat造成的,还是logstash造成的,然后开始了试错过程。
2. 试错过程
1. 调整filebeat配置
调整了filebeat的queue.mem、bulk_max_size、worker,调整情况如下,这两个参数意思如下:
queue.mem.events
:表示filebeat可以在内存中存储多少个事件,一条数据算一个事件,如果output消费不赢,并且达到了该值,就会阻塞harvest。将该值调大,可以避免读文件浪费的时间,默认是4096,设置为20000。queue.mem.flush.min_events
:表示当满足了该值,就可以将事件发送到output。默认是2048,设置为4096。queue.mem.flush.timeout
:表示min_events最多等待时间。默认是1s,设置为2s,如果超过2s,不管是否达到4096,都会发送事件到output。bulk_max_size
:output一次性最多可以处理的事件,增大该值可以减少发事件的次数。默认2048,设置为4096。worker
:output的给每个host设置的线程数量,默认是1。下面配置中,设置了3个host,两个worker,所以最终的线程数是:2*3=6。
xxxxxxxxxx
251filebeat.inputs
2type log
3 max_bytes104857600
4 paths
5 /data/hcl/a.json
6 close_eoftrue
7
8logging.level info
9
10
11queue.mem
12 events20000
13 flush.min_events4096
14 flush.timeout 2s
15
16
17output.logstash
18 hosts"172.16.80.x:8100" "172.16.80.x:8101" "172.16.80.x:8099"
19 loadbalancetrue
20 worker2
21 compression_level3
22 timeout 80s
23 bulk_max_size4096
24 ssl.certificate_authorities"/data/hcl/logstash.crt"
25 ssl.verification_mode none
这次测试数据总量为4w条,本以为logstash显示Events in rate可以快速增长,但事与愿违,从监控上看到的事件输入和处理速度仍然<=500/s,处理4w条数据,耗时20分钟以上,这不经让我沉思。
首先排除掉是filebeat机器配置的问题,因为filebeat所在机器的CPU和内存占用都很低,并且网络占用,也仅仅只在最开始发送数据时有占用,其他时间基本为0。
再查看filebeat的日志,发现filebeat的第一批数据很快就发送出去了,harvset是在阻塞的状态,再结合网络占用基本为0,由此,确定是output的消费速度没有跟上,即logstash没有跟上filebeat推的速度。
首先排除网络传输原因,再根据之前观察的情况,确定问题出在logstash的input上
2. 排除compression_level
filebeat的压缩等级设置为3,采用的是gzip压缩。怀疑是不是logstash将事件消耗在解压数据上了,于是将其设置为0,发现只是稍微增加了传输耗时,数据依然恰在了logstash的input处,还没进入filter。
3. 是不是logstash的beats处理比较大的数据太慢?
为了验证这个问题,将logstash的input修改为从本机文件中读,配置如下,测试结果显示4w行的json文件,从读文件到输出完成,耗时不到40秒。
xxxxxxxxxx
291input
2 file
3 path => "/data1/logstash/a.json"
4 mode => "tail"
5 start_position => "beginning"
6 close_older => "5 sec"
7
8
9
10
11
12filter
13 mutate
14 remove_field => "@version""host""path""ecs""tags""fields""log""input""agent"
15
16 json
17 source => "message"
18
19 mutate
20 remove_field => "message"
21
22
23
24
25output
26 stdout
27
28
29
可以确定问题就出来logstash的beats插件,接下来就是找到出问题的到底是beats哪部分。
3. 调试logstash-input-beats
beats插件是由ruby和java编写的,java是核心处理代码,ruby是启动代码。首先看看ruby是如何启动beats服务的,在logstash-input-beats-6.1.2-java/lib/logstash/inputs/beats.rb
,首先通过注册器注册了一个beats服务,然后在服务中注册了一个监听器,每次到来的数据都会放入MessageListener类处理。
xxxxxxxxxx
221 def create_server
2 server = org.logstash.beats.Server.new(@host, @port, @client_inactivity_timeout, @executor_threads)
3 if @ssl
4 ssl_context_builder = new_ssl_context_builder
5 if client_authentification?
6 if @ssl_verify_mode == "force_peer"
7 ssl_context_builder.setVerifyMode(org.logstash.netty.SslContextBuilder::SslClientVerifyMode::FORCE_PEER)
8 elsif @ssl_verify_mode == "peer"
9 ssl_context_builder.setVerifyMode(org.logstash.netty.SslContextBuilder::SslClientVerifyMode::VERIFY_PEER)
10 end
11 ssl_context_builder.setCertificateAuthorities(@ssl_certificate_authorities)
12 end
13 server.setSslHandlerProvider(new_ssl_handshake_provider(ssl_context_builder))
14 end
15 server
16 end
17
18 def run(output_queue)
19 message_listener = MessageListener.new(output_queue, self)
20 @server.setMessageListener(message_listener)
21 @server.listen
22 end # def run
MessageListener获取了每一条数据传入java处理,加上时间打印后,发现瓶颈不在此处。那么问题肯定出在了核心的java代码上。
xxxxxxxxxx
251 def onNewMessage(ctx, message)
2 puts "onNewMessage start:" + Time.new.inspect
3 hash = message.getData
4 ip_address = ip_address(ctx)
5
6 unless ip_address.nil? || hash['@metadata'].nil?
7 set_nested(hash, @input.field_hostip, ip_address)
8 end
9 target_field = extract_target_field(hash)
10
11 extract_tls_peer(hash, ctx)
12
13 if target_field.nil?
14 event = LogStash::Event.new(hash)
15 @nocodec_transformer.transform(event)
16 @queue << event
17 else
18 puts "onNewMessage enter call:" + Time.new.inspect
19 codec(ctx).accept(CodecCallbackListener.new(target_field,
20 hash,
21 message.getIdentityStream(),
22 @codec_transformer,
23 @queue))
24 end
25 end
修改logstash启动的jvm参数,开启远程debug,对Message的getData方法下断点src/main/java/org/logstash/beats/Message.java
,发现即使只传输5000条数据,也等了很久,才触发getData方法的断点。可知导致问题出现的地方还在更前面的位置,通过观察调用栈,有一个类引起了我的注意:BytetoMessageDecoder
,该方法是对数据进行解码。
此处调用callDecode
对数据解码,对该方法下断点,跟进decodeRemovalReentryProtection
方法,最终会进入decode
方法。在调试时就发现callDecode偶尔会有一点卡顿,而样本数据的某些行,一行数据甚至超过了10m,所以,大概率是decode问题。
decode会进入处理json的逻辑,addMessage会当每一条消息到来就去扩容一次,而且每次扩容都只扩
这个设计,应该是为了避免浪费JVM内存,所以选择了一次性扩一点。
4. 解决方案
通过增加扩容的字节数,减少扩容次数。
最后,当修改完毕,想通过官方推荐的方案使用logstash-plugin install时,遇到了极大的坑,安装显示成功,但是由于没有编译,所以报错了。然后打算通过ruby的bundle install安装依赖,再使用gem build编译,遇到依赖冲突了。折腾了半天,发现ruby编译太麻烦了,突然想到直接替换jar不就好了嘛?然后在本地通过gradle vendor编译。
上线后,速度提升不止10倍。但长期观察后,发现如果ES的吞吐跟不上logstash的接收速度,那么会导致logstash内存崩溃。所以,调整时需根据自身情况调整扩容方式。
5. 后续遇到问题
针对第4部分的,当遇到filebeat推送速度超过了logstash的写入速度,此时瓶颈在es的写入速度上。由于logstash没有速度限制器,会导致出现以下2个问题。这两个问题都是由于Direct buffer溢出导致,根本原因则是没有限速器。
xxxxxxxxxx
61io.netty.handler.codec.DecoderException: java.lang.IllegalArgumentException: newCapacity: -2147483648
2
3LEAK: ByteBuf.release() was
4not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
5...
6logstash java.lang.OutOfMemoryError: Direct buffer memory
针对出现的两个问题,通过搜索logstash-input-beats的issue,可以找到有前人已经遇到过该类问题,并且已经提出了PR,但该PR还没被合并。
该PR是基于475和410的,通过检查PR475,发现该PR实现了两个功能,其原理可以参考netty4-数据读取性能大于处理时“直接内存水位处理器”使用:
将beats的push-read方式替换为pull-read,可以控制beats端的速度
当遇到快接近oom时,则丢弃新来的beats链接
由于475是410的增强版,所以我们只需要合并PR475即可。
5.1 修改logstash-input-beats
xxxxxxxxxx
151# 1. clone 主分支
2git clone https://github.com/logstash-plugins/logstash-input-beats.git
3
4# 2. 拉取提交的PR
5git pull origin pull/475/head:mitigate_thundering_herd_2
6
7# 3. 合并PR
8git merge mitigate_thundering_herd_2
9
10# 4. 存在冲突需要手动修改代码(由于我使用的主分支是6.8.0和作者andsel不是同一个主分支,需要需要修改代码)
11
12# 5. 继续合并
13git merge --continue
14
15# 6. git add&git commit提交到本地仓库
合并完分支后,还需要注释以下代码src/main/java/org/logstash/beats/OOMConnectionCloser.java
,避免过多的日志打印:
5.2 编译与安装
1. 如果是在插件本身的版本是6.8.0,则直接替换vendor
完成代码后,使用./gradlew vendor
编译jar,会生成一个叫做vendor
的目录,如果是在6.8.0版本的插件上使用,可以直接替换vendor目录。
2. 其他版本
在其他版本的插件修改,需要先通过以下命令编译成gem文件(注:由于logstash的版本不同,可能会遇到Ruby的依赖冲突问题,需自行解决依赖冲突问题,我使用的是logstash7.17.17,未遇到依赖冲突,8.0则有依赖冲突)。
xxxxxxxxxx
11gem build logstash-input-beats.gemspec
然后在本地环境中,安装一次
xxxxxxxxxx
11./bin/logstash-plugin install logstash-input-beats-6.8.0-java.gem
安装beats完成后,可在logstash的gemfile中看到插件已被修改为自定义版本,位于localgems
下。
接着,可以生成离线版的插件,供服务器使用或者打包整个logstash替换。
生成离线时,(注:由于ruby源在国外,需要在shell中挂上HTTP代理):
xxxxxxxxxx
11bin/logstash-plugin prepare-offline-pack logstash-input-beats
安装:
xxxxxxxxxx
11bin/logstash-plugin install file:///path/to/logstash-input-beats-6.8.0.zip
5.3 解决仍存在的bug
上线运行几天后,在直接batch.release()
会遇到netty的内存释放引用计数错误,该问题可能导致batch未正确的释放,内存泄露。
xxxxxxxxxx
51Caused by: io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
2at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:83) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
3at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:148) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
4at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:101) ~[netty-buffer-4.1.100.Final.jar:4.1.100.Final]
5at org.logstash.beats.V2Batch.release(V2Batch.java:105) ~[logstash-input-beats-6.8.0.jar:?]
发现该问题后,为了拿到更全面的日志输出,我将通过--log.level=debug
添加到启动命令行,通过对日志进行分析后,发现最终引发问题的原因在于:
xxxxxxxxxx
91Caused by: java.lang.IllegalArgumentException: newCapacity: -2147483648 (expected: 0-2147483647)
2at io.netty.buffer.AbstractByteBuf.checkNewCapacity(AbstractByteBuf.java:1435) ~[netty-buffer-4.1.100.Final.jar:4.1.100.Final]
3at io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:104) ~[netty-buffer-4.1.100.Final.jar:4.1.100.Final]
4at org.logstash.beats.V2Batch.addMessage(V2Batch.java:93) ~[logstash-input-beats-6.8.0.jar:?]
5at org.logstash.beats.BeatsParser.decode(BeatsParser.java:208) ~[logstash-input-beats-6.8.0.jar:?]
6at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529) ~[netty-codec-4.1
7.100.Final.jar:4.1.100.Final]
8at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468) ~[netty-codec-4.1.100.Final.jar:4.1.1
900.Final]
错误是通过AbstractByteBuf.checkNewCapacity
函数抛出,结合源码得知是申请后的容量超过了最大容量限制,而maxCapacity()
的定义是int类型,那么容量最大也就是2GB。
xxxxxxxxxx
71protected final void checkNewCapacity(int newCapacity) {
2 ensureAccessible();
3 if (checkBounds && (newCapacity < 0 || newCapacity > maxCapacity())) {
4 throw new IllegalArgumentException("newCapacity: " + newCapacity +
5 " (expected: 0-" + maxCapacity() + ')');
6 }
7}
上文中,为了修复扩容过于缓慢的问题,我们将netty.ByteBuf的扩容方式改为了将当前容量*2,这就会导致如果当前容量为1GB时,无法处理完beats发来的一次完整的batch,那么就需要再次扩容,接着就发生容量超过2147483647字节的错误。
xxxxxxxxxx
11internalBuffer.capacity(internalBuffer.capacity() * 2)
所以解决方式有两种:
降低beats每次发送数据batchSize,可通过估算每条数据byteSize*batchSize低于1GB
我的filebeat设置的batchSize是4096条,为了避免更改多个filebeat端的配置,修改扩容方式如下:
xxxxxxxxxx
191void addMessage(int sequenceNumber, ByteBuf buffer, int size) {
2written++;
3// if (internalBuffer.writableBytes() < size + (2 * SIZE_OF_INT)){
4// internalBuffer.capacity(internalBuffer.capacity() + size + (2 * SIZE_OF_INT));
5// }
6while (internalBuffer.writableBytes() < size + (2 * SIZE_OF_INT)){
7// Avoid memory overflow in extreme situations
8if (internalBuffer.capacity() * 2 > 2147483647) {
9internalBuffer.capacity(internalBuffer.capacity() + (13421772 * SIZE_OF_INT));
10}
11else internalBuffer.capacity(internalBuffer.capacity() * 2);
12}
13internalBuffer.writeInt(sequenceNumber);
14internalBuffer.writeInt(size);
15buffer.readBytes(internalBuffer, size);
16if (sequenceNumber > highestSequence){
17highestSequence = sequenceNumber;
18}
19}
Comments NOTHING