Skip to content

Pipe's completion handler possibly executed by the "wrong thread", leading to race conditions #5049

@sauthieg

Description

@sauthieg

Version

Vert.x 4.4.6, but likely earlier versions have the issue as well.

Context

It started with a suspicious NPE in our home-made Vert.x metrics plugin:

java.lang.NullPointerException: null
    at org.forgerock.http.vertx.monitoring.VertxHttpClientMetrics$HttpClientMetricsImpl$1.requestEnd(VertxHttpClientMetrics.java:120)
    at org.forgerock.http.vertx.monitoring.VertxHttpClientMetrics$HttpClientMetricsImpl$1.requestEnd(VertxHttpClientMetrics.java:94)
    at io.vertx.core.http.impl.Http1xClientConnection.endRequest(Http1xClientConnection.java:320)
    at io.vertx.core.http.impl.Http1xClientConnection.writeBuffer(Http1xClientConnection.java:307)
    at io.vertx.core.http.impl.Http1xClientConnection.access$1100(Http1xClientConnection.java:108)
    at io.vertx.core.http.impl.Http1xClientConnection$StreamImpl.writeBuffer(Http1xClientConnection.java:603)
    at io.vertx.core.http.impl.Http1xClientConnection$StreamImpl.writeBuffer(Http1xClientConnection.java:566)
    at io.vertx.core.http.impl.HttpClientRequestImpl.doWrite(HttpClientRequestImpl.java:506)
    at io.vertx.core.http.impl.HttpClientRequestImpl.write(HttpClientRequestImpl.java:471)
    at io.vertx.core.http.impl.HttpClientRequestImpl.end(HttpClientRequestImpl.java:417)
    at io.vertx.core.streams.impl.PipeImpl.handleSuccess(PipeImpl.java:116)
    at io.vertx.core.streams.impl.PipeImpl.lambda$to$2(PipeImpl.java:102)
    at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:141)
    at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60)
    at io.vertx.core.impl.future.FutureImpl.addListener(FutureImpl.java:196)
    at io.vertx.core.impl.future.PromiseImpl.addListener(PromiseImpl.java:23)
    at io.vertx.core.impl.future.FutureImpl.onComplete(FutureImpl.java:164)
    at io.vertx.core.impl.future.PromiseImpl.onComplete(PromiseImpl.java:23)
    at io.vertx.core.streams.impl.PipeImpl.to(PipeImpl.java:88)
    at io.vertx.core.streams.ReadStream.pipeTo(ReadStream.java:119)
    at io.vertx.core.http.HttpClientRequest.send(HttpClientRequest.java:401)
    at io.vertx.rxjava3.core.http.HttpClientRequest.lambda$rxSend$24(HttpClientRequest.java:614)
   ...

Before you ask, yes I verified and we NEVER returns a null in metrics.beginRequest(...).

Then, I noticed this second exception that happens in the same request:

java.lang.IllegalStateException: unexpected message type: LastHttpContent$1, state: 0 
    at io.netty.handler.codec.http.HttpObjectEncoder.write(HttpObjectEncoder.java:108) 
    at io.netty.channel.CombinedChannelDuplexHandler.write(CombinedChannelDuplexHandler.java:346) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:879) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:940) 
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:966) 
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:934) 
    at io.vertx.core.net.impl.ConnectionBase.write(ConnectionBase.java:180) 
    at io.vertx.core.net.impl.ConnectionBase.writeToChannel(ConnectionBase.java:232) 
    at io.vertx.core.net.impl.ConnectionBase.writeToChannel(ConnectionBase.java:217) 
    at io.vertx.core.net.impl.ConnectionBase.writeToChannel(ConnectionBase.java:213) 
    at io.vertx.core.http.impl.Http1xClientConnection.writeBuffer(Http1xClientConnection.java:305) 
    at io.vertx.core.http.impl.Http1xClientConnection.access$ 
    at io.vertx.core.http.impl.Http1xClientConnection$StreamImpl.writeBuffer(Http1xClientConnection.java:603) 
    at io.vertx.core.http.impl.Http1xClientConnection$StreamImpl.writeBuffer(Http1xClientConnection.java:566) 
    at io.vertx.core.http.impl.HttpClientRequestImpl.doWrite(HttpClientRequestImpl.java:506) 
    at io.vertx.core.http.impl.HttpClientRequestImpl.write(HttpClientRequestImpl.java:471) 
    at io.vertx.core.http.impl.HttpClientRequestImpl.end(HttpClientRequestImpl.java:417) 
    at io.vertx.core.streams.impl.PipeImpl.handleSuccess(PipeImpl.java:116) 
    at io.vertx.core.streams.impl.PipeImpl.lambda$ 
    at io.vertx.core.streams.impl.PipeImpl$$Lambda$.handle 
    at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:141) 
    at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60) 
    at io.vertx.core.impl.future.FutureImpl.addListener(FutureImpl.java:196) 
    at io.vertx.core.impl.future.PromiseImpl.addListener(PromiseImpl.java:23) 
    at io.vertx.core.impl.future.FutureImpl.onComplete(FutureImpl.java:164) 
    at io.vertx.core.impl.future.PromiseImpl.onComplete(PromiseImpl.java:23) 
    at io.vertx.core.streams.impl.PipeImpl.to(PipeImpl.java:88) 
    at io.vertx.core.streams.ReadStream.pipeTo(ReadStream.java:119) 
    at io.vertx.core.http.HttpClientRequest.send(HttpClientRequest.java:401) 
    at io.vertx.rxjava3.core.http.HttpClientRequest.lambda$ 

That ISE happens because the HttpObjectEncoder receive a LastHttpContent item, BEFORE been told to write the request's head (request line, headers, ...).

Analysis

The first stack is IMHO a consequence of the 2nd one: writing the last content fail, but the execution continues, ignoring the failure.
See

writeToChannel(msg, listener);
if (end) {
endRequest(s);
}
if writeToChannel() fails, we still execute endRequest().

Why that failure in the first place ?
I think it all comes down to the fact that the write operations coming from the Pipe are executed in a NON event loop thread (and therefore placed in a queue of operations), while the end operation is done by an event loop thread (and therefore executed immediately).

Do you have a reproducer?

See https://gist.github.com/sauthieg/7a9730c35da43042e7fbaf4d2a1ed1b0
That involves an RX ReadStreamSubscriber because that was what I have in my issue.

In short, that shows that when a ReadStreamSubscriber's source/upstream completes on another thread (writing 1 or more elements just before completion, still with the same thread), then the Pipe's completion might be executed by the event loop thread, potentially resulting in a race condition in the destination write stream.

That happens because:

  • PipeImpl.to() is being executed on the event loop thread
  • the upstream flowable is ready to emit all its elements (and complete) when the pipe executes src.resume()
  • upstream flowable emit on a non eventloop thread
  • upstream flowable onNext() executes the destination's WriteStream.write()(on the non event loop thread)
  • Http1xClientConnection.StreamImpl enforce that writeHead() is executed on the event loop, so that action is queued
  • upstream flowable calls ReadStreamSubscriber.onComplete, that marks the Pipe's result future as completed (but as the Pipe's result.future().onComplete(...) as NOT been executed, there is no handler yet to handle that action) (still on the non event loop thread)
  • finally, result.future().onComplete(...) is executed (by the event loop thread), and because the future is already completed, the handler is executed immediately (calling WriteStream.end())
  • that eventually executes Http1xClientConnection.StreamImpl.writeBuffer() (on the event loop), immediately trying to write that LastHttpContent item in he Netty channel
  • Boom

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions