-
Notifications
You must be signed in to change notification settings - Fork 35
Description
I'm using reactor-netty to make API calls and facing an issue wherein reactor-netty is not able to create new connections for the incoming requests. And I debuged codes and found the result of WIP.getAndIncrement(this) always above zero.
Expected Behavior
Even if the drainLoop is broken by some requests, it shouldn't affect further requests.
Actual Behavior
The darainLoop never called anymore after some exceptions occurred.
Steps to Reproduce
Unfortunately, I don't know how to reproduce this because I don't know the root cause.
If this helps in any manner, I checked the logs on prod and there're some exceptions occurred during drainLoop:
Failed to mark a promise as failure because it has failed already: DefaultChannelPromise@5a854314(failure: io.netty.handler.codec.EncoderException: io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1), unnotified cause: io.netty.handler.codec.EncoderException: io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:104)
at io.netty.channel.CombinedChannelDuplexHandler.write(CombinedChannelDuplexHandler.java:346)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
at io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:304)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
at io.netty.handler.timeout.WriteTimeoutHandler.write(WriteTimeoutHandler.java:112)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:808)
at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1025)
at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:294)
at reactor.netty.http.HttpOperations.lambda$send$0(HttpOperations.java:123)
at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:151)
at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53)
at reactor.core.publisher.Mono.subscribe(Mono.java:4198)
at reactor.netty.NettyOutbound.subscribe(NettyOutbound.java:336)
at reactor.core.publisher.MonoSource.subscribe(MonoSource.java:65)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.netty.http.client.HttpClientConnect$HttpIOHandlerObserver.onStateChange(HttpClientConnect.java:446)
at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:518)
at reactor.netty.resources.PooledConnectionProvider$DisposableAcquire.run(PooledConnectionProvider.java:633)
at reactor.netty.resources.PooledConnectionProvider$DisposableAcquire.onNext(PooledConnectionProvider.java:506)
at reactor.netty.resources.PooledConnectionProvider$DisposableAcquire.onNext(PooledConnectionProvider.java:457)
at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.deliver(AbstractPool.java:419)
at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.lambda$drainLoop$15(SimpleDequePool.java:391)
at reactor.core.scheduler.ImmediateScheduler.schedule(ImmediateScheduler.java:47)
at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.drainLoop(SimpleDequePool.java:391)
at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.drain(SimpleDequePool.java:261)
at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.doAcquire(SimpleDequePool.java:256)
at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.request(AbstractPool.java:382)
at reactor.netty.resources.PooledConnectionProvider$DisposableAcquire.onSubscribe(PooledConnectionProvider.java:532)
at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool$QueueBorrowerMono.subscribe(SimpleDequePool.java:578)
at reactor.netty.resources.PooledConnectionProvider.disposableAcquire(PooledConnectionProvider.java:218)
at reactor.netty.resources.PooledConnectionProvider.lambda$acquire$3(PooledConnectionProvider.java:182)
at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.lambda$subscribe$0(HttpClientConnect.java:327)
at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.resubscribe(FluxRetryWhen.java:204)
at reactor.core.publisher.FluxRetryWhen$RetryWhenOtherSubscriber.onNext(FluxRetryWhen.java:250)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:274)
at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:851)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1782)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:241)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1782)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:147)
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1782)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:147)
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56)
at reactor.core.publisher.Mono.subscribe(Mono.java:4213)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:441)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:243)
at reactor.core.publisher.DirectProcessor$DirectInner.onNext(DirectProcessor.java:333)
at reactor.core.publisher.DirectProcessor.onNext(DirectProcessor.java:142)
at reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:99)
at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onError(FluxRetryWhen.java:180)
at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:185)
at reactor.netty.http.client.HttpClientConnect$HttpObserver.onUncaughtException(HttpClientConnect.java:407)
at reactor.netty.ReactorNetty$CompositeConnectionObserver.onUncaughtException(ReactorNetty.java:511)
at reactor.netty.resources.PooledConnectionProvider$DisposableAcquire.onUncaughtException(PooledConnectionProvider.java:549)
at reactor.netty.resources.PooledConnectionProvider$PooledConnection.onUncaughtException(PooledConnectionProvider.java:385)
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:230)
at reactor.netty.channel.FluxReceive.onInboundError(FluxReceive.java:435)
at reactor.netty.channel.ChannelOperations.onInboundError(ChannelOperations.java:442)
at reactor.netty.channel.ChannelOperationsHandler.exceptionCaught(ChannelOperationsHandler.java:129)
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireExceptionCaught(CombinedChannelDuplexHandler.java:424)
at io.netty.channel.ChannelHandlerAdapter.exceptionCaught(ChannelHandlerAdapter.java:92)
at io.netty.channel.CombinedChannelDuplexHandler$1.fireExceptionCaught(CombinedChannelDuplexHandler.java:145)
at io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:143)
at io.netty.channel.CombinedChannelDuplexHandler.exceptionCaught(CombinedChannelDuplexHandler.java:231)
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273)
at io.netty.handler.ssl.SslHandler.exceptionCaught(SslHandler.java:1144)
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273)
at io.netty.handler.proxy.ProxyHandler.exceptionCaught(ProxyHandler.java:241)
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireExceptionCaught(CombinedChannelDuplexHandler.java:424)
at io.netty.channel.CombinedChannelDuplexHandler$1.fireExceptionCaught(CombinedChannelDuplexHandler.java:161)
at io.netty.channel.CombinedChannelDuplexHandler.exceptionCaught(CombinedChannelDuplexHandler.java:233)
at io.netty.handler.proxy.HttpProxyHandler$HttpClientCodecWrapper.exceptionCaught(HttpProxyHandler.java:247)
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273)
at io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1377)
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
at io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:907)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.handleReadException(AbstractEpollStreamChannel.java:728)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:818)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74)
at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138)
at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
at io.netty.handler.codec.http.DefaultFullHttpRequest.release(DefaultFullHttpRequest.java:102)
at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:88)
at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:91)
... 111 more
io.netty.channel.unix.Errors$NativeIoException: writevAddresses(..) failed: Connection reset by peer
Possible Solution
From what I understand from the source code for SimpleDequePool.java, the method drainLoop() just incremented WIP while checking the (WIP.getAndIncrement(this) == 0) condition and then decrementing WIP for itself.
If some exceptions occured during drainLoop(), it would fail to handle decrementing WIP, the further requests to drain() will never pass the below if condition.
void drain() {
if (WIP.getAndIncrement(this) == 0) {
drainLoop();
}
}So I think it'd be better to implement fallback to decrement the WIP in case something wrong happen during drainLoop.
Your Environment
reactor-netty: 0.9.14.RELEASE
netty: 4.1.51.Final
JVM version: openjdk version "11.0.4"
OS and version: 4.19.118-1.el7.centos.x86_64