From 3fe635b5e6f329e09e58ec154d5d2db40723e52c Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Mon, 7 Apr 2025 22:01:44 -0700 Subject: [PATCH 01/12] add flow-control and remove auto-read --- .../Netty4IncrementalRequestHandlingIT.java | 21 +- .../netty4/Netty4HttpHeaderValidator.java | 269 ++---- .../netty4/Netty4HttpPipeliningHandler.java | 6 +- .../netty4/Netty4HttpRequestBodyStream.java | 107 +-- .../netty4/Netty4HttpServerTransport.java | 4 + .../Netty4HttpHeaderThreadContextTests.java | 7 +- .../Netty4HttpHeaderValidatorTests.java | 781 ++---------------- .../Netty4HttpRequestBodyStreamTests.java | 82 +- 8 files changed, 187 insertions(+), 1090 deletions(-) diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java index 0158384b47aa4..1430b884b68b9 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java @@ -178,8 +178,6 @@ public void testClientConnectionCloseMidStream() throws Exception { // await stream handler is ready and request full content var handler = ctx.awaitRestChannelAccepted(opaqueId); - assertBusy(() -> assertNotEquals(0, handler.stream.bufSize())); - assertFalse(handler.streamClosed); // terminate client connection @@ -190,10 +188,7 @@ public void testClientConnectionCloseMidStream() throws Exception { handler.stream.next(); // wait for resources to be released - assertBusy(() -> { - assertEquals(0, handler.stream.bufSize()); - assertTrue(handler.streamClosed); - }); + assertBusy(() -> assertTrue(handler.streamClosed)); } } @@ -208,15 +203,11 @@ public void testServerCloseConnectionMidStream() throws Exception { // await stream handler is ready and request full content var handler = ctx.awaitRestChannelAccepted(opaqueId); - assertBusy(() -> assertNotEquals(0, handler.stream.bufSize())); assertFalse(handler.streamClosed); // terminate connection on server and wait resources are released handler.channel.request().getHttpChannel().close(); - assertBusy(() -> { - assertEquals(0, handler.stream.bufSize()); - assertTrue(handler.streamClosed); - }); + assertBusy(() -> assertTrue(handler.streamClosed)); } } @@ -230,16 +221,12 @@ public void testServerExceptionMidStream() throws Exception { // await stream handler is ready and request full content var handler = ctx.awaitRestChannelAccepted(opaqueId); - assertBusy(() -> assertNotEquals(0, handler.stream.bufSize())); assertFalse(handler.streamClosed); handler.shouldThrowInsideHandleChunk = true; handler.stream.next(); - assertBusy(() -> { - assertEquals(0, handler.stream.bufSize()); - assertTrue(handler.streamClosed); - }); + assertBusy(() -> assertTrue(handler.streamClosed)); } } @@ -280,7 +267,7 @@ public void testClientBackpressure() throws Exception { }); handler.readBytes(partSize); } - assertTrue(handler.stream.hasLast()); + assertTrue(handler.recvLast); } } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java index 95a68cb52bbdb..e8a84527c2f5f 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java @@ -9,249 +9,106 @@ package org.elasticsearch.http.netty4; -import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.DecoderResult; import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpObject; import io.netty.handler.codec.http.HttpRequest; -import io.netty.handler.codec.http.LastHttpContent; -import io.netty.util.ReferenceCountUtil; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.core.Nullable; import org.elasticsearch.http.netty4.internal.HttpValidator; import org.elasticsearch.transport.Transports; -import java.util.ArrayDeque; - -import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.DROPPING_DATA_PERMANENTLY; -import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.DROPPING_DATA_UNTIL_NEXT_REQUEST; -import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.FORWARDING_DATA_UNTIL_NEXT_REQUEST; -import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.QUEUEING_DATA; -import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.WAITING_TO_START; - -public class Netty4HttpHeaderValidator extends ChannelInboundHandlerAdapter { +public class Netty4HttpHeaderValidator extends ChannelDuplexHandler { private final HttpValidator validator; private final ThreadContext threadContext; - private ArrayDeque pending = new ArrayDeque<>(4); - private State state = WAITING_TO_START; + private boolean droppingContent; + private boolean validatingRequest; public Netty4HttpHeaderValidator(HttpValidator validator, ThreadContext threadContext) { this.validator = validator; this.threadContext = threadContext; } - State getState() { - return state; - } - - @SuppressWarnings("fallthrough") @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - assert msg instanceof HttpObject; - final HttpObject httpObject = (HttpObject) msg; - - switch (state) { - case WAITING_TO_START: - assert pending.isEmpty(); - pending.add(ReferenceCountUtil.retain(httpObject)); - requestStart(ctx); - assert state == QUEUEING_DATA; - assert ctx.channel().config().isAutoRead() == false; - break; - case QUEUEING_DATA: - pending.add(ReferenceCountUtil.retain(httpObject)); - break; - case FORWARDING_DATA_UNTIL_NEXT_REQUEST: - assert pending.isEmpty(); - if (httpObject instanceof LastHttpContent) { - state = WAITING_TO_START; - } - ctx.fireChannelRead(httpObject); - break; - case DROPPING_DATA_UNTIL_NEXT_REQUEST: - assert pending.isEmpty(); - if (httpObject instanceof LastHttpContent) { - state = WAITING_TO_START; - } - ReferenceCountUtil.release(httpObject); - break; - case DROPPING_DATA_PERMANENTLY: - assert pending.isEmpty(); - ReferenceCountUtil.release(httpObject); // consume without enqueuing - ctx.channel().config().setAutoRead(false); - break; - } - } - - private void requestStart(ChannelHandlerContext ctx) { - assert state == WAITING_TO_START; - - if (pending.isEmpty()) { - return; - } - - final HttpObject httpObject = pending.getFirst(); - final HttpRequest httpRequest; - if (httpObject instanceof HttpRequest && httpObject.decoderResult().isSuccess()) { - // a properly decoded HTTP start message is expected to begin validation - // anything else is probably an error that the downstream HTTP message aggregator will have to handle - httpRequest = (HttpRequest) httpObject; - } else { - httpRequest = null; - } - - state = QUEUEING_DATA; - ctx.channel().config().setAutoRead(false); - - if (httpRequest == null) { - // this looks like a malformed request and will forward without validation - ctx.channel().eventLoop().execute(() -> forwardFullRequest(ctx)); - } else { - assert Transports.assertDefaultThreadContext(threadContext); - ActionListener.run( - // this prevents thread-context changes to propagate to the validation listener - // atm, the validation listener submits to the event loop executor, which doesn't know about the ES thread-context, - // so this is just a defensive play, in case the code inside the listener changes to not use the event loop executor - ActionListener.assertOnce( - new ContextPreservingActionListener( - threadContext.wrapRestorable(threadContext.newStoredContext()), - // Always explicitly dispatch back to the event loop to prevent reentrancy concerns if we are still on event loop - new ActionListener<>() { - @Override - public void onResponse(Void unused) { - assert Transports.assertDefaultThreadContext(threadContext); - ctx.channel().eventLoop().execute(() -> forwardFullRequest(ctx)); - } - - @Override - public void onFailure(Exception e) { - assert Transports.assertDefaultThreadContext(threadContext); - ctx.channel().eventLoop().execute(() -> forwardRequestWithDecoderExceptionAndNoContent(ctx, e)); - } - } - ) - ), - listener -> { - // this prevents thread-context changes to propagate beyond the validation, as netty worker threads are reused - try (ThreadContext.StoredContext ignore = threadContext.newStoredContext()) { - validator.validate(httpRequest, ctx.channel(), listener); + assert ctx.channel().config().isAutoRead() == false : "auto-read should be always disabled"; + if (msg instanceof HttpObject httpObject) { + if (httpObject.decoderResult().isFailure()) { + ctx.fireChannelRead(httpObject); // pass-through for decoding failures + } else { + if (msg instanceof HttpRequest request) { + validate(ctx, request); + } else if (msg instanceof HttpContent content) { + if (droppingContent) { + content.release(); + } else { + assert validatingRequest == false : "unexpected content before validation completed"; + ctx.fireChannelRead(content); } } - ); - } - } - - private void forwardFullRequest(ChannelHandlerContext ctx) { - Transports.assertDefaultThreadContext(threadContext); - assert ctx.channel().eventLoop().inEventLoop(); - assert ctx.channel().config().isAutoRead() == false; - assert state == QUEUEING_DATA; - - ctx.channel().config().setAutoRead(true); - boolean fullRequestForwarded = forwardData(ctx, pending); - - assert fullRequestForwarded || pending.isEmpty(); - if (fullRequestForwarded) { - state = WAITING_TO_START; - requestStart(ctx); - } else { - state = FORWARDING_DATA_UNTIL_NEXT_REQUEST; - } - - assert state == WAITING_TO_START || state == QUEUEING_DATA || state == FORWARDING_DATA_UNTIL_NEXT_REQUEST; - } - - private void forwardRequestWithDecoderExceptionAndNoContent(ChannelHandlerContext ctx, Exception e) { - Transports.assertDefaultThreadContext(threadContext); - assert ctx.channel().eventLoop().inEventLoop(); - assert ctx.channel().config().isAutoRead() == false; - assert state == QUEUEING_DATA; - - HttpObject messageToForward = pending.getFirst(); - boolean fullRequestDropped = dropData(pending); - if (messageToForward instanceof HttpContent toReplace) { - // if the request to forward contained data (which got dropped), replace with empty data - messageToForward = toReplace.replace(Unpooled.EMPTY_BUFFER); - } - messageToForward.setDecoderResult(DecoderResult.failure(e)); - - ctx.channel().config().setAutoRead(true); - ctx.fireChannelRead(messageToForward); - - assert fullRequestDropped || pending.isEmpty(); - if (fullRequestDropped) { - state = WAITING_TO_START; - requestStart(ctx); - } else { - state = DROPPING_DATA_UNTIL_NEXT_REQUEST; + } } - - assert state == WAITING_TO_START || state == QUEUEING_DATA || state == DROPPING_DATA_UNTIL_NEXT_REQUEST; } @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - state = DROPPING_DATA_PERMANENTLY; - while (true) { - if (dropData(pending) == false) { - break; - } + public void read(ChannelHandlerContext ctx) throws Exception { + // until validation is completed we can ignore read calls, + // once validation is finished HttpRequest will be fired and downstream can read from there + if (validatingRequest == false) { + ctx.read(); } - super.channelInactive(ctx); } - private static boolean forwardData(ChannelHandlerContext ctx, ArrayDeque pending) { - final int pendingMessages = pending.size(); - try { - HttpObject toForward; - while ((toForward = pending.poll()) != null) { - ctx.fireChannelRead(toForward); - ReferenceCountUtil.release(toForward); // reference cnt incremented when enqueued - if (toForward instanceof LastHttpContent) { - return true; - } - } - return false; - } finally { - maybeResizePendingDown(pendingMessages, pending); - } - } + void validate(ChannelHandlerContext ctx, HttpRequest request) { + assert Transports.assertDefaultThreadContext(threadContext); + droppingContent = false; + validatingRequest = true; + ActionListener.run( + // this prevents thread-context changes to propagate to the validation listener + // atm, the validation listener submits to the event loop executor, which doesn't know about the ES thread-context, + // so this is just a defensive play, in case the code inside the listener changes to not use the event loop executor + ActionListener.assertOnce( + new ContextPreservingActionListener( + threadContext.wrapRestorable(threadContext.newStoredContext()), + new ActionListener<>() { + @Override + public void onResponse(Void unused) { + handleValidationResult(ctx, request, null); + } - private static boolean dropData(ArrayDeque pending) { - final int pendingMessages = pending.size(); - try { - HttpObject toDrop; - while ((toDrop = pending.poll()) != null) { - ReferenceCountUtil.release(toDrop, 2); // 1 for enqueuing, 1 for consuming - if (toDrop instanceof LastHttpContent) { - return true; + @Override + public void onFailure(Exception e) { + handleValidationResult(ctx, request, e); + } + } + ) + ), + listener -> { + // this prevents thread-context changes to propagate beyond the validation, as netty worker threads are reused + try (ThreadContext.StoredContext ignore = threadContext.newStoredContext()) { + validator.validate(request, ctx.channel(), listener); } } - return false; - } finally { - maybeResizePendingDown(pendingMessages, pending); - } + ); } - private static void maybeResizePendingDown(int largeSize, ArrayDeque pending) { - if (pending.size() <= 4 && largeSize > 32) { - // Prevent the ArrayDeque from becoming forever large due to a single large message. - ArrayDeque old = pending; - pending = new ArrayDeque<>(4); - pending.addAll(old); - } + void handleValidationResult(ChannelHandlerContext ctx, HttpRequest request, @Nullable Exception validationError) { + assert Transports.assertDefaultThreadContext(threadContext); + // Always explicitly dispatch back to the event loop to prevent reentrancy concerns if we are still on event loop + ctx.channel().eventLoop().execute(() -> { + if (validationError != null) { + request.setDecoderResult(DecoderResult.failure(validationError)); + droppingContent = true; + } + validatingRequest = false; + ctx.fireChannelRead(request); + }); } - enum State { - WAITING_TO_START, - QUEUEING_DATA, - FORWARDING_DATA_UNTIL_NEXT_REQUEST, - DROPPING_DATA_UNTIL_NEXT_REQUEST, - DROPPING_DATA_PERMANENTLY - } } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java index 4809f1a1a275b..301d085e262f7 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java @@ -139,14 +139,16 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) { } else { var contentStream = new Netty4HttpRequestBodyStream( ctx.channel(), - serverTransport.getThreadPool().getThreadContext(), - activityTracker + serverTransport.getThreadPool().getThreadContext() ); currentRequestStream = contentStream; netty4HttpRequest = new Netty4HttpRequest(readSequence++, request, contentStream); } } handlePipelinedRequest(ctx, netty4HttpRequest); + if (request instanceof FullHttpRequest) { + ctx.read(); + } } else { assert msg instanceof HttpContent : "expect HttpContent got " + msg; assert currentRequestStream != null : "current stream must exists before handling http content"; diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java index 88b4518c8de89..4d9afd180ac07 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java @@ -9,14 +9,11 @@ package org.elasticsearch.http.netty4; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.CompositeByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.LastHttpContent; -import org.elasticsearch.common.network.ThreadWatchdog; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.Releasables; import org.elasticsearch.http.HttpBody; @@ -27,34 +24,22 @@ /** * Netty based implementation of {@link HttpBody.Stream}. - * This implementation utilize {@link io.netty.channel.ChannelConfig#setAutoRead(boolean)} - * to prevent entire payload buffering. But sometimes upstream can send few chunks of data despite - * autoRead=off. In this case chunks will be buffered until downstream calls {@link Stream#next()} */ public class Netty4HttpRequestBodyStream implements HttpBody.Stream { private final Channel channel; - private final ChannelFutureListener closeListener = future -> doClose(); private final List tracingHandlers = new ArrayList<>(4); private final ThreadContext threadContext; - private final ThreadWatchdog.ActivityTracker activityTracker; - private ByteBuf buf; - private boolean requested = false; private boolean closing = false; private HttpBody.ChunkHandler handler; private ThreadContext.StoredContext requestContext; + private final ChannelFutureListener closeListener = future -> doClose(); - // used in tests - private volatile int bufSize = 0; - private volatile boolean hasLast = false; - - public Netty4HttpRequestBodyStream(Channel channel, ThreadContext threadContext, ThreadWatchdog.ActivityTracker activityTracker) { + public Netty4HttpRequestBodyStream(Channel channel, ThreadContext threadContext) { this.channel = channel; this.threadContext = threadContext; this.requestContext = threadContext.newStoredContext(); - this.activityTracker = activityTracker; Netty4Utils.addListener(channel.closeFuture(), closeListener); - channel.config().setAutoRead(false); } @Override @@ -77,81 +62,26 @@ public void addTracingHandler(ChunkHandler chunkHandler) { public void next() { assert handler != null : "handler must be set before requesting next chunk"; requestContext = threadContext.newStoredContext(); - channel.eventLoop().submit(() -> { - activityTracker.startActivity(); - requested = true; - try { - if (closing) { - return; - } - if (buf == null) { - channel.read(); - } else { - send(); - } - } catch (Throwable e) { - channel.pipeline().fireExceptionCaught(e); - } finally { - activityTracker.stopActivity(); - } - }); + channel.eventLoop().execute(channel::read); } public void handleNettyContent(HttpContent httpContent) { - assert hasLast == false : "receive http content on completed stream"; - hasLast = httpContent instanceof LastHttpContent; if (closing) { httpContent.release(); + channel.eventLoop().execute(channel::read); } else { - addChunk(httpContent.content()); - if (requested) { - send(); - } - } - } - - // adds chunk to current buffer, will allocate composite buffer when need to hold more than 1 chunk - private void addChunk(ByteBuf chunk) { - assert chunk != null; - if (buf == null) { - buf = chunk; - } else if (buf instanceof CompositeByteBuf comp) { - comp.addComponent(true, chunk); - } else { - var comp = channel.alloc().compositeBuffer(); - comp.addComponent(true, buf); - comp.addComponent(true, chunk); - buf = comp; - } - bufSize = buf.readableBytes(); - } - - // visible for test - int bufSize() { - return bufSize; - } - - // visible for test - boolean hasLast() { - return hasLast; - } - - private void send() { - assert requested; - assert handler != null : "must set handler before receiving next chunk"; - var bytesRef = Netty4Utils.toReleasableBytesReference(buf); - requested = false; - buf = null; - bufSize = 0; - try (var ignored = threadContext.restoreExistingContext(requestContext)) { - for (var tracer : tracingHandlers) { - tracer.onNext(bytesRef, hasLast); + try (var ignored = threadContext.restoreExistingContext(requestContext)) { + var isLast = httpContent instanceof LastHttpContent; + var buf = Netty4Utils.toReleasableBytesReference(httpContent.content()); + for (var tracer : tracingHandlers) { + tracer.onNext(buf, isLast); + } + handler.onNext(buf, isLast); + if (isLast) { + channel.read(); + channel.closeFuture().removeListener(closeListener); + } } - handler.onNext(bytesRef, hasLast); - } - if (hasLast) { - channel.config().setAutoRead(true); - channel.closeFuture().removeListener(closeListener); } } @@ -174,11 +104,6 @@ private void doClose() { handler.close(); } } - if (buf != null) { - buf.release(); - buf = null; - bufSize = 0; - } - channel.config().setAutoRead(true); + channel.eventLoop().execute(channel::read); } } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index 9ffa4b479be17..c6bfb0ce7b42f 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -29,6 +29,7 @@ import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponseEncoder; import io.netty.handler.codec.http.HttpUtil; +import io.netty.handler.flow.FlowControlHandler; import io.netty.handler.ssl.SslHandler; import io.netty.handler.timeout.ReadTimeoutException; import io.netty.handler.timeout.ReadTimeoutHandler; @@ -364,6 +365,7 @@ protected HttpMessage createMessage(String[] initialLine) throws Exception { } decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR); ch.pipeline().addLast("decoder", decoder); // parses the HTTP bytes request into HTTP message pieces + ch.pipeline().addLast(new FlowControlHandler()); if (httpValidator != null) { // runs a validation function on the first HTTP message piece which contains all the headers // if validation passes, the pieces of that particular request are forwarded, otherwise they are discarded @@ -421,6 +423,8 @@ protected Result beginEncode(HttpResponse httpResponse, String acceptEncoding) t new Netty4HttpPipeliningHandler(transport.pipeliningMaxEvents, transport, threadWatchdogActivityTracker) ); transport.serverAcceptedChannel(nettyHttpChannel); + ch.config().setAutoRead(false); + ch.read(); } @Override diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderThreadContextTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderThreadContextTests.java index 9a12ba75d7742..957b5fd066ff2 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderThreadContextTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderThreadContextTests.java @@ -19,6 +19,7 @@ import io.netty.handler.codec.http.DefaultLastHttpContent; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.flow.FlowControlHandler; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -52,7 +53,8 @@ public class Netty4HttpHeaderThreadContextTests extends ESTestCase { @Override public void setUp() throws Exception { super.setUp(); - channel = new EmbeddedChannel(); + channel = new EmbeddedChannel(new FlowControlHandler()); + channel.config().setAutoRead(false); threadPool = new TestThreadPool(TEST_MOCK_TRANSPORT_THREAD_PREFIX); } @@ -181,6 +183,7 @@ private void sendRequestThrough(boolean success, Semaphore validationDone) throw threadPool.generic().submit(() -> { DefaultHttpRequest request1 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); channel.writeInbound(request1); + channel.read(); DefaultHttpContent content1 = randomBoolean() ? new DefaultHttpContent(Unpooled.buffer(4)) : null; if (content1 != null) { channel.writeInbound(content1); @@ -196,9 +199,11 @@ private void sendRequestThrough(boolean success, Semaphore validationDone) throw } channel.runPendingTasks(); assertThat(channel.readInbound(), sameInstance(request1)); + channel.read(); if (content1 != null && success) { assertThat(channel.readInbound(), sameInstance(content1)); } + channel.read(); if (success) { assertThat(channel.readInbound(), sameInstance(lastContent1)); } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidatorTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidatorTests.java index 1c0b434105f28..6a7cd2e4cfd71 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidatorTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidatorTests.java @@ -9,766 +9,123 @@ package org.elasticsearch.http.netty4; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; import io.netty.channel.embedded.EmbeddedChannel; -import io.netty.handler.codec.DecoderResult; -import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.DefaultHttpContent; import io.netty.handler.codec.http.DefaultHttpRequest; import io.netty.handler.codec.http.DefaultLastHttpContent; -import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.LastHttpContent; -import io.netty.util.AsciiString; +import io.netty.handler.flow.FlowControlHandler; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.ValidationException; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.http.netty4.internal.HttpValidator; import org.elasticsearch.test.ESTestCase; -import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.concurrent.atomic.AtomicReference; - -import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.DROPPING_DATA_UNTIL_NEXT_REQUEST; -import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.FORWARDING_DATA_UNTIL_NEXT_REQUEST; -import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.QUEUEING_DATA; -import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.WAITING_TO_START; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.nullValue; -import static org.hamcrest.Matchers.sameInstance; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; public class Netty4HttpHeaderValidatorTests extends ESTestCase { - - private final AtomicReference header = new AtomicReference<>(); - private final AtomicReference> listener = new AtomicReference<>(); private EmbeddedChannel channel; - private Netty4HttpHeaderValidator netty4HttpHeaderValidator; - private final AtomicReference validationException = new AtomicReference<>(); + private BlockingQueue validatorRequestQueue; @Override public void setUp() throws Exception { super.setUp(); - reset(); - } - - private void reset() { - channel = new EmbeddedChannel(); - header.set(null); - listener.set(null); - validationException.set(null); - HttpValidator validator = (httpRequest, channel, validationCompleteListener) -> { - header.set(httpRequest); - final var exception = validationException.get(); - if (exception != null) { - throw exception; - } - listener.set(validationCompleteListener); - }; - netty4HttpHeaderValidator = new Netty4HttpHeaderValidator(validator, new ThreadContext(Settings.EMPTY)); - channel.pipeline().addLast(netty4HttpHeaderValidator); - } - - public void testValidationPausesAndResumesData() { - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - - final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); - DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4)); - channel.writeInbound(request); - channel.writeInbound(content); - - assertThat(header.get(), sameInstance(request)); - // channel is paused - assertThat(channel.readInbound(), nullValue()); - assertFalse(channel.config().isAutoRead()); - - // channel is resumed - listener.get().onResponse(null); - channel.runPendingTasks(); - - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(FORWARDING_DATA_UNTIL_NEXT_REQUEST)); - assertThat(channel.readInbound(), sameInstance(request)); - assertThat(channel.readInbound(), sameInstance(content)); - assertThat(channel.readInbound(), nullValue()); - assertThat(content.refCnt(), equalTo(1)); - - // channel continues in resumed state after request finishes - DefaultLastHttpContent lastContent = new DefaultLastHttpContent(Unpooled.buffer(4)); - channel.writeInbound(lastContent); - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - assertThat(channel.readInbound(), sameInstance(lastContent)); - assertThat(lastContent.refCnt(), equalTo(1)); - - // channel is again paused while validating next request - channel.writeInbound(request); - assertFalse(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA)); - } - - public void testValidatorDoesNotTweakAutoReadAfterValidationComplete() { - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - - final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); - DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4)); - channel.writeInbound(request); - channel.writeInbound(content); - - assertThat(header.get(), sameInstance(request)); - // channel is paused - assertThat(channel.readInbound(), nullValue()); - assertFalse(channel.config().isAutoRead()); - - // channel is resumed - listener.get().onResponse(null); - channel.runPendingTasks(); - - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(FORWARDING_DATA_UNTIL_NEXT_REQUEST)); - assertThat(channel.readInbound(), sameInstance(request)); - assertThat(channel.readInbound(), sameInstance(content)); - assertThat(channel.readInbound(), nullValue()); - assertThat(content.refCnt(), equalTo(1)); + validatorRequestQueue = new LinkedBlockingQueue<>(); + channel = new EmbeddedChannel(new Netty4HttpHeaderValidator((httpRequest, channel, listener) -> { + validatorRequestQueue.add(new ValidationRequest(httpRequest, channel, listener)); + }, new ThreadContext(Settings.EMPTY))); channel.config().setAutoRead(false); - - channel.writeOutbound(new DefaultHttpContent(Unpooled.buffer(4))); - assertFalse(channel.config().isAutoRead()); - } - - public void testContentForwardedAfterValidation() { - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - - final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); - channel.writeInbound(request); - - DefaultHttpContent content1 = null; - if (randomBoolean()) { - content1 = new DefaultHttpContent(Unpooled.buffer(4)); - channel.writeInbound(content1); - } - - assertThat(header.get(), sameInstance(request)); - // channel is paused - assertThat(channel.readInbound(), nullValue()); - assertFalse(channel.config().isAutoRead()); - - // channel is resumed - listener.get().onResponse(null); - channel.runPendingTasks(); - - // resumed channel after successful validation forwards data - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(FORWARDING_DATA_UNTIL_NEXT_REQUEST)); - // write more content to the channel after validation passed - DefaultHttpContent content2 = new DefaultHttpContent(Unpooled.buffer(4)); - channel.writeInbound(content2); - assertThat(channel.readInbound(), sameInstance(request)); - DefaultHttpContent content3 = new DefaultHttpContent(Unpooled.buffer(4)); - channel.writeInbound(content3); - if (content1 != null) { - assertThat(channel.readInbound(), sameInstance(content1)); - assertThat(content1.refCnt(), equalTo(1)); - } - assertThat(channel.readInbound(), sameInstance(content2)); - assertThat(content2.refCnt(), equalTo(1)); - DefaultHttpContent content4 = null; - if (randomBoolean()) { - content4 = new DefaultHttpContent(Unpooled.buffer(4)); - channel.writeInbound(content4); - } - assertThat(channel.readInbound(), sameInstance(content3)); - assertThat(content3.refCnt(), equalTo(1)); - if (content4 != null) { - assertThat(channel.readInbound(), sameInstance(content4)); - assertThat(content4.refCnt(), equalTo(1)); - } - - // channel continues in resumed state after request finishes - DefaultLastHttpContent lastContent = new DefaultLastHttpContent(Unpooled.buffer(4)); - channel.writeInbound(lastContent); - - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - assertThat(channel.readInbound(), sameInstance(lastContent)); - assertThat(lastContent.refCnt(), equalTo(1)); - - if (randomBoolean()) { - channel.writeInbound(request); - assertFalse(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA)); - } } - public void testContentDroppedAfterValidationFailure() { - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - - final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); - channel.writeInbound(request); - - DefaultHttpContent content1 = null; - if (randomBoolean()) { - content1 = new DefaultHttpContent(Unpooled.buffer(4)); - channel.writeInbound(content1); - } - - assertThat(header.get(), sameInstance(request)); - // channel is paused - assertThat(channel.readInbound(), nullValue()); - assertFalse(channel.config().isAutoRead()); - - // channel is resumed - listener.get().onFailure(new ElasticsearchException("Boom")); - channel.runPendingTasks(); - - // resumed channel after failed validation drops data - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(DROPPING_DATA_UNTIL_NEXT_REQUEST)); - // write more content to the channel after validation passed - DefaultHttpContent content2 = new DefaultHttpContent(Unpooled.buffer(4)); - channel.writeInbound(content2); - assertThat(channel.readInbound(), sameInstance(request)); - DefaultHttpContent content3 = new DefaultHttpContent(Unpooled.buffer(4)); - channel.writeInbound(content3); - if (content1 != null) { - assertThat(channel.readInbound(), nullValue()); - assertThat(content1.refCnt(), equalTo(0)); - } - assertThat(channel.readInbound(), nullValue()); // content2 - assertThat(content2.refCnt(), equalTo(0)); - DefaultHttpContent content4 = null; - if (randomBoolean()) { - content4 = new DefaultHttpContent(Unpooled.buffer(4)); - channel.writeInbound(content4); - } - assertThat(channel.readInbound(), nullValue()); // content3 - assertThat(content3.refCnt(), equalTo(0)); - if (content4 != null) { - assertThat(channel.readInbound(), nullValue()); - assertThat(content4.refCnt(), equalTo(0)); - } - - assertThat(channel.readInbound(), nullValue()); // extra read still returns "null" - - // channel continues in resumed state after request finishes - DefaultLastHttpContent lastContent = new DefaultLastHttpContent(Unpooled.buffer(4)); - channel.writeInbound(lastContent); - - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - assertThat(channel.readInbound(), nullValue()); // lastContent - assertThat(lastContent.refCnt(), equalTo(0)); - - if (randomBoolean()) { - channel.writeInbound(new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri")); - assertFalse(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA)); - } + HttpRequest newHttpRequest() { + return new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, ""); } - public void testValidationErrorForwardsAsDecoderErrorMessage() { - for (Exception exception : List.of( - new Exception("Failure"), - new ElasticsearchException("Failure"), - new ElasticsearchSecurityException("Failure") - )) { - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - - final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); - final DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4)); - - channel.writeInbound(request); - channel.writeInbound(content); - - assertThat(header.get(), sameInstance(request)); - assertThat(channel.readInbound(), nullValue()); - assertFalse(channel.config().isAutoRead()); - - listener.get().onFailure(exception); - channel.runPendingTasks(); - assertTrue(channel.config().isAutoRead()); - DefaultHttpRequest failed = channel.readInbound(); - assertThat(failed, sameInstance(request)); - assertThat(failed.headers().get(HttpHeaderNames.CONNECTION), nullValue()); - assertTrue(failed.decoderResult().isFailure()); - Exception cause = (Exception) failed.decoderResult().cause(); - assertThat(cause, equalTo(exception)); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(DROPPING_DATA_UNTIL_NEXT_REQUEST)); - - assertThat(channel.readInbound(), nullValue()); - assertThat(content.refCnt(), equalTo(0)); - - DefaultLastHttpContent lastContent = new DefaultLastHttpContent(Unpooled.buffer(4)); - channel.writeInbound(lastContent); - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - assertThat(channel.readInbound(), nullValue()); - assertThat(lastContent.refCnt(), equalTo(0)); - - reset(); - } + HttpContent newHttpContent() { + return new DefaultHttpContent(Unpooled.buffer()); } - public void testValidationExceptionForwardsAsDecoderErrorMessage() { - final var exception = new ElasticsearchException("Failure"); - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - - final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); - - validationException.set(exception); - channel.writeInbound(request); - - assertThat(header.get(), sameInstance(request)); - assertThat(listener.get(), nullValue()); - - channel.runPendingTasks(); - assertTrue(channel.config().isAutoRead()); - DefaultHttpRequest failed = channel.readInbound(); - assertThat(failed, sameInstance(request)); - assertThat(failed.headers().get(HttpHeaderNames.CONNECTION), nullValue()); - assertTrue(failed.decoderResult().isFailure()); - Exception cause = (Exception) failed.decoderResult().cause(); - assertThat(cause, equalTo(exception)); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(DROPPING_DATA_UNTIL_NEXT_REQUEST)); - - final DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4)); - channel.writeInbound(content); - - assertThat(channel.readInbound(), nullValue()); - assertThat(content.refCnt(), equalTo(0)); - - DefaultLastHttpContent lastContent = new DefaultLastHttpContent(Unpooled.buffer(4)); - channel.writeInbound(lastContent); - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - assertThat(channel.readInbound(), nullValue()); - assertThat(lastContent.refCnt(), equalTo(0)); + LastHttpContent newLastHttpContent() { + return new DefaultLastHttpContent(); } - public void testValidationHandlesMultipleQueuedUpMessages() { - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - - final DefaultHttpRequest request1 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); - DefaultHttpContent content1 = new DefaultHttpContent(Unpooled.buffer(4)); - DefaultLastHttpContent lastContent1 = new DefaultLastHttpContent(Unpooled.buffer(4)); - channel.writeInbound(request1); - channel.writeInbound(content1); - channel.writeInbound(lastContent1); - final DefaultHttpRequest request2 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); - DefaultHttpContent content2 = new DefaultHttpContent(Unpooled.buffer(4)); - DefaultLastHttpContent lastContent2 = new DefaultLastHttpContent(Unpooled.buffer(4)); - channel.writeInbound(request2); - channel.writeInbound(content2); - channel.writeInbound(lastContent2); - - assertThat(header.get(), sameInstance(request1)); - assertThat(channel.readInbound(), nullValue()); - assertFalse(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA)); - - listener.get().onResponse(null); - channel.runPendingTasks(); - assertThat(channel.readInbound(), sameInstance(request1)); - assertThat(channel.readInbound(), sameInstance(content1)); - assertThat(channel.readInbound(), sameInstance(lastContent1)); - assertThat(content1.refCnt(), equalTo(1)); - assertThat(lastContent1.refCnt(), equalTo(1)); - - assertThat(header.get(), sameInstance(request2)); - - assertFalse(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA)); - assertThat(channel.readInbound(), nullValue()); - - listener.get().onResponse(null); - channel.runPendingTasks(); - assertThat(channel.readInbound(), sameInstance(request2)); - assertThat(channel.readInbound(), sameInstance(content2)); - assertThat(channel.readInbound(), sameInstance(lastContent2)); - assertThat(content2.refCnt(), equalTo(1)); - assertThat(lastContent2.refCnt(), equalTo(1)); - - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - assertThat(channel.readInbound(), nullValue()); + public void testValidatorReceiveHttpRequest() { + channel.writeInbound(newHttpRequest()); + assertEquals(1, validatorRequestQueue.size()); + assertNull(channel.readInbound()); } - public void testValidationFailureRecoversForEnqueued() { - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - - // write 2 requests before validation for the first one fails - final DefaultHttpRequest request1 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); - DefaultHttpContent content1 = new DefaultHttpContent(Unpooled.buffer(4)); - DefaultLastHttpContent lastContent1 = new DefaultLastHttpContent(Unpooled.buffer(4)); - channel.writeInbound(request1); - channel.writeInbound(content1); - channel.writeInbound(lastContent1); - final DefaultHttpRequest request2 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); - DefaultHttpContent content2 = new DefaultHttpContent(Unpooled.buffer(4)); - DefaultLastHttpContent lastContent2 = new DefaultLastHttpContent(Unpooled.buffer(4)); - channel.writeInbound(request2); - channel.writeInbound(content2); - - boolean finishSecondRequest = randomBoolean(); - if (finishSecondRequest) { - channel.writeInbound(lastContent2); - } - - // channel is paused and both requests are queued - assertThat(header.get(), sameInstance(request1)); - assertThat(channel.readInbound(), nullValue()); - assertFalse(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA)); - assertThat(content1.refCnt(), equalTo(2)); - assertThat(lastContent1.refCnt(), equalTo(2)); - assertThat(content2.refCnt(), equalTo(2)); - if (finishSecondRequest) { - assertThat(lastContent2.refCnt(), equalTo(2)); - } + /** + * Sends back-to-back http requests and randomly fail validation. + * Ensures that invalid requests drop content and valid pass through. + */ + public void testMixedValidationResults() { + for (var i = 0; i < 1000; i++) { + var shouldPassValidation = randomBoolean(); + var request = newHttpRequest(); + var content = newHttpContent(); + var last = newLastHttpContent(); - // validation for the 1st request FAILS - Exception exception = new ElasticsearchException("Boom"); - listener.get().onFailure(exception); - channel.runPendingTasks(); - - // request1 becomes a decoder exception and its content is dropped - assertThat(channel.readInbound(), sameInstance(request1)); - assertThat(request1.headers().get(HttpHeaderNames.CONNECTION), nullValue()); - assertTrue(request1.decoderResult().isFailure()); - Exception cause = (Exception) request1.decoderResult().cause(); - assertThat(cause, equalTo(exception)); - assertThat(content1.refCnt(), equalTo(0)); // content is dropped - assertThat(lastContent1.refCnt(), equalTo(0)); // content is dropped - assertThat(channel.readInbound(), nullValue()); - - // channel pauses for the validation of the 2nd request - assertThat(header.get(), sameInstance(request2)); - assertFalse(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA)); - assertThat(channel.readInbound(), nullValue()); - - // validation for the 2nd request SUCCEEDS - listener.get().onResponse(null); - channel.runPendingTasks(); - - // 2nd request is forwarded correctly - assertThat(channel.readInbound(), sameInstance(request2)); - assertThat(channel.readInbound(), sameInstance(content2)); - assertThat(content2.refCnt(), equalTo(1)); - - if (finishSecondRequest == false) { - assertThat(netty4HttpHeaderValidator.getState(), equalTo(FORWARDING_DATA_UNTIL_NEXT_REQUEST)); - assertTrue(channel.config().isAutoRead()); - assertThat(channel.readInbound(), nullValue()); - // while in forwarding state the request can continue - if (randomBoolean()) { - DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4)); - channel.writeInbound(content); - assertThat(channel.readInbound(), sameInstance(content)); - assertThat(content.refCnt(), equalTo(1)); + channel.writeInbound(request); + var validationRequest = validatorRequestQueue.poll(); + assertNotNull(validationRequest); + if (shouldPassValidation) { + validationRequest.listener.onResponse(null); + } else { + validationRequest.listener.onFailure(new ValidationException()); } - channel.writeInbound(lastContent2); - } - - assertThat(channel.readInbound(), sameInstance(lastContent2)); - assertThat(lastContent2.refCnt(), equalTo(1)); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - assertTrue(channel.config().isAutoRead()); - } - - public void testValidationFailureRecoversForInbound() { - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - - // write a single request, but don't finish it yet, for which the validation fails - final DefaultHttpRequest request1 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); - DefaultHttpContent content1 = new DefaultHttpContent(Unpooled.buffer(4)); - channel.writeInbound(request1); - channel.writeInbound(content1); - - // channel is paused and the request is queued - assertThat(header.get(), sameInstance(request1)); - assertThat(channel.readInbound(), nullValue()); - assertFalse(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA)); - assertThat(content1.refCnt(), equalTo(2)); - - // validation for the 1st request FAILS - Exception exception = new ElasticsearchException("Boom"); - listener.get().onFailure(exception); - channel.runPendingTasks(); - - // request1 becomes a decoder exception and its content is dropped - assertThat(channel.readInbound(), sameInstance(request1)); - assertThat(request1.headers().get(HttpHeaderNames.CONNECTION), nullValue()); - assertTrue(request1.decoderResult().isFailure()); - Exception cause = (Exception) request1.decoderResult().cause(); - assertThat(cause, equalTo(exception)); - assertThat(content1.refCnt(), equalTo(0)); // content is dropped - assertThat(channel.readInbound(), nullValue()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(DROPPING_DATA_UNTIL_NEXT_REQUEST)); - - if (randomBoolean()) { - channel.writeInbound(new DefaultHttpContent(Unpooled.buffer(4))); - } - DefaultLastHttpContent lastContent1 = new DefaultLastHttpContent(Unpooled.buffer(4)); - channel.writeInbound(lastContent1); - if (randomBoolean()) { - assertThat(channel.readInbound(), nullValue()); - } - assertThat(lastContent1.refCnt(), equalTo(0)); // content is dropped - - // write 2nd request after the 1st one failed validation - final DefaultHttpRequest request2 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); - DefaultHttpContent content2 = new DefaultHttpContent(Unpooled.buffer(4)); - DefaultLastHttpContent lastContent2 = new DefaultLastHttpContent(Unpooled.buffer(4)); - channel.writeInbound(request2); - channel.writeInbound(content2); - boolean finishSecondRequest = randomBoolean(); - if (finishSecondRequest) { - channel.writeInbound(lastContent2); - } - - // channel pauses for the validation of the 2nd request - assertThat(header.get(), sameInstance(request2)); - assertFalse(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA)); - assertThat(channel.readInbound(), nullValue()); - - // validation for the 2nd request SUCCEEDS - listener.get().onResponse(null); - channel.runPendingTasks(); + channel.runPendingTasks(); - // 2nd request is forwarded correctly - assertThat(channel.readInbound(), sameInstance(request2)); - assertThat(channel.readInbound(), sameInstance(content2)); - assertThat(content2.refCnt(), equalTo(1)); + var gotRequest = channel.readInbound(); + assertEquals( + "should set decoder result failure for invalid request", + shouldPassValidation, + ((HttpRequest) gotRequest).decoderResult().isSuccess() + ); + assertEquals(request, gotRequest); - if (finishSecondRequest == false) { - assertThat(netty4HttpHeaderValidator.getState(), equalTo(FORWARDING_DATA_UNTIL_NEXT_REQUEST)); - assertTrue(channel.config().isAutoRead()); - assertThat(channel.readInbound(), nullValue()); - // while in forwarding state the request can continue - if (randomBoolean()) { - DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4)); + if (shouldPassValidation) { channel.writeInbound(content); - assertThat(channel.readInbound(), sameInstance(content)); - assertThat(content.refCnt(), equalTo(1)); + channel.writeInbound(last); + assertEquals("should pass content for valid request", content, channel.readInbound()); + assertEquals(last, channel.readInbound()); + } else { + assertNull("should drop content for invalid request", channel.readInbound()); } - channel.writeInbound(lastContent2); - } - - assertThat(channel.readInbound(), sameInstance(lastContent2)); - assertThat(lastContent2.refCnt(), equalTo(1)); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - assertTrue(channel.config().isAutoRead()); - } - - public void testValidationSuccessForLargeMessage() { - assertTrue(channel.config().isAutoRead()); - - final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); - channel.writeInbound(request); - - int messageLength = randomIntBetween(32, 128); - for (int i = 0; i < messageLength; ++i) { - channel.writeInbound(new DefaultHttpContent(Unpooled.buffer(4))); } - channel.writeInbound(new DefaultLastHttpContent(Unpooled.buffer(4))); - boolean followupRequest = randomBoolean(); - if (followupRequest) { - channel.writeInbound(request); - } - - assertThat(header.get(), sameInstance(request)); - assertThat(channel.readInbound(), nullValue()); - assertFalse(channel.config().isAutoRead()); - - listener.get().onResponse(null); - channel.runPendingTasks(); - if (followupRequest) { - assertFalse(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA)); - } else { - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - } - assertThat(channel.readInbound(), sameInstance(request)); - for (int i = 0; i < messageLength; ++i) { - Object content = channel.readInbound(); - assertThat(content, instanceOf(DefaultHttpContent.class)); - assertThat(((DefaultHttpContent) content).refCnt(), equalTo(1)); - } - assertThat(channel.readInbound(), instanceOf(LastHttpContent.class)); - assertThat(channel.readInbound(), nullValue()); } - public void testValidationFailureForLargeMessage() { - assertTrue(channel.config().isAutoRead()); - - final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); - channel.writeInbound(request); - - int messageLength = randomIntBetween(32, 128); - DefaultHttpContent[] messageContents = new DefaultHttpContent[messageLength]; - for (int i = 0; i < messageLength; ++i) { - messageContents[i] = new DefaultHttpContent(Unpooled.buffer(4)); - channel.writeInbound(messageContents[i]); - } - DefaultLastHttpContent lastHttpContent = new DefaultLastHttpContent(Unpooled.buffer(4)); - channel.writeInbound(lastHttpContent); - boolean followupRequest = randomBoolean(); - if (followupRequest) { - channel.writeInbound(new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri")); - } - - assertThat(header.get(), sameInstance(request)); - assertThat(channel.readInbound(), nullValue()); - assertFalse(channel.config().isAutoRead()); - - Exception exception = new ElasticsearchException("Boom"); - listener.get().onFailure(exception); - channel.runPendingTasks(); - if (followupRequest) { - assertFalse(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA)); - } else { - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - } - assertThat(channel.readInbound(), sameInstance(request)); - assertThat(request.headers().get(HttpHeaderNames.CONNECTION), nullValue()); - assertTrue(request.decoderResult().isFailure()); - Exception cause = (Exception) request.decoderResult().cause(); - assertThat(cause, equalTo(exception)); - for (int i = 0; i < messageLength; ++i) { - assertThat(channel.readInbound(), nullValue()); - assertThat(messageContents[i].refCnt(), equalTo(0)); - } - assertThat(channel.readInbound(), nullValue()); - assertThat(lastHttpContent.refCnt(), equalTo(0)); - assertThat(channel.readInbound(), nullValue()); - } + public void testIgnoreReadWhenValidating() { + channel.pipeline().addFirst(new FlowControlHandler()); // catch all inbound messages - public void testFullRequestValidationFailure() { - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); + channel.writeInbound(newHttpRequest()); + channel.writeInbound(newLastHttpContent()); // should hold by flow-control-handler + assertNull("nothing should pass yet", channel.readInbound()); - ByteBuf buf = channel.alloc().buffer(); - ByteBufUtil.copy(AsciiString.of("test full http request"), buf); - final DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri", buf); - channel.writeInbound(request); + channel.read(); + var validationRequest = validatorRequestQueue.poll(); + assertNotNull(validationRequest); - // request got through to validation - assertThat(header.get(), sameInstance(request)); - // channel is paused - assertThat(channel.readInbound(), nullValue()); - assertFalse(channel.config().isAutoRead()); + channel.read(); + assertNull("should ignore read while validating", channel.readInbound()); - // validation fails - Exception exception = new ElasticsearchException("Boom"); - listener.get().onFailure(exception); + validationRequest.listener.onResponse(null); channel.runPendingTasks(); + assertTrue("http request should pass", channel.readInbound() instanceof HttpRequest); + assertNull("content should not pass yet, need explicit read", channel.readInbound()); - // channel is resumed and waiting for next request - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - - DefaultFullHttpRequest throughRequest = channel.readInbound(); - // "through request" contains a decoder exception - assertThat(throughRequest, not(sameInstance(request))); - assertTrue(throughRequest.decoderResult().isFailure()); - // the content is cleared when validation fails - assertThat(new String(ByteBufUtil.getBytes(throughRequest.content()), StandardCharsets.UTF_8), is("")); - assertThat(buf.refCnt(), is(0)); - Exception cause = (Exception) throughRequest.decoderResult().cause(); - assertThat(cause, equalTo(exception)); + channel.read(); + assertTrue(channel.readInbound() instanceof LastHttpContent); } - public void testFullRequestValidationSuccess() { - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - - ByteBuf buf = channel.alloc().buffer(); - try { - ByteBufUtil.copy(AsciiString.of("test full http request"), buf); - final DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri", buf); - channel.writeInbound(request); - - // request got through to validation - assertThat(header.get(), sameInstance(request)); - // channel is paused - assertThat(channel.readInbound(), nullValue()); - assertFalse(channel.config().isAutoRead()); - - // validation succeeds - listener.get().onResponse(null); - channel.runPendingTasks(); - - // channel is resumed and waiting for next request - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); + record ValidationRequest(HttpRequest request, Channel channel, ActionListener listener) {} - DefaultFullHttpRequest throughRequest = channel.readInbound(); - // request goes through unaltered - assertThat(throughRequest, sameInstance(request)); - assertFalse(throughRequest.decoderResult().isFailure()); - // the content is unaltered - assertThat(new String(ByteBufUtil.getBytes(throughRequest.content()), StandardCharsets.UTF_8), is("test full http request")); - assertThat(buf.refCnt(), is(1)); - assertThat(throughRequest.decoderResult().cause(), nullValue()); - } finally { - buf.release(); - } - } - - public void testFullRequestWithDecoderException() { - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - - ByteBuf buf = channel.alloc().buffer(); - try { - ByteBufUtil.copy(AsciiString.of("test full http request"), buf); - // a request with a decoder error prior to validation - final DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri", buf); - Exception cause = new ElasticsearchException("Boom"); - request.setDecoderResult(DecoderResult.failure(cause)); - channel.writeInbound(request); - - // request goes through without invoking the validator - assertThat(header.get(), nullValue()); - assertThat(listener.get(), nullValue()); - // channel is NOT paused - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - - DefaultFullHttpRequest throughRequest = channel.readInbound(); - // request goes through unaltered - assertThat(throughRequest, sameInstance(request)); - assertTrue(throughRequest.decoderResult().isFailure()); - assertThat(throughRequest.decoderResult().cause(), equalTo(cause)); - // the content is unaltered - assertThat(new String(ByteBufUtil.getBytes(throughRequest.content()), StandardCharsets.UTF_8), is("test full http request")); - assertThat(buf.refCnt(), is(1)); - } finally { - buf.release(); - } - } } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java index 7492737d4f877..04ed049e5cade 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java @@ -12,6 +12,7 @@ import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.DefaultEventLoop; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.embedded.EmbeddedChannel; @@ -21,7 +22,6 @@ import io.netty.handler.flow.FlowControlHandler; import org.elasticsearch.common.bytes.ReleasableBytesReference; -import org.elasticsearch.common.network.ThreadWatchdog; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.http.HttpBody; @@ -44,14 +44,14 @@ public class Netty4HttpRequestBodyStreamTests extends ESTestCase { private final ThreadContext threadContext = new ThreadContext(Settings.EMPTY); private EmbeddedChannel channel; private Netty4HttpRequestBodyStream stream; - private ThreadWatchdog.ActivityTracker activityTracker; @Override public void setUp() throws Exception { super.setUp(); channel = new EmbeddedChannel(); - activityTracker = new ThreadWatchdog.ActivityTracker(); - stream = new Netty4HttpRequestBodyStream(channel, threadContext, activityTracker); + channel.pipeline().addLast("flow", new FlowControlHandler()); + channel.config().setAutoRead(false); + stream = new Netty4HttpRequestBodyStream(channel, threadContext); stream.setHandler(discardHandler); // set default handler, each test might override one channel.pipeline().addLast(new SimpleChannelInboundHandler(false) { @Override @@ -67,15 +67,6 @@ public void tearDown() throws Exception { stream.close(); } - // ensures that no chunks are sent downstream without request - public void testEnqueueChunksBeforeRequest() { - var totalChunks = randomIntBetween(1, 100); - for (int i = 0; i < totalChunks; i++) { - channel.writeInbound(randomContent(1024)); - } - assertEquals(totalChunks * 1024, stream.bufSize()); - } - // ensures all received chunks can be flushed downstream public void testFlushAllReceivedChunks() { var chunks = new ArrayList(); @@ -90,47 +81,27 @@ public void testFlushAllReceivedChunks() { var totalChunks = randomIntBetween(1, 100); for (int i = 0; i < totalChunks; i++) { channel.writeInbound(randomContent(chunkSize)); + stream.next(); + channel.runPendingTasks(); + assertEquals("should receive all chunks as single composite", i + 1, chunks.size()); } - stream.next(); - channel.runPendingTasks(); - assertEquals("should receive all chunks as single composite", 1, chunks.size()); assertEquals(chunkSize * totalChunks, totalBytes.get()); } - // ensures that channel.setAutoRead(true) only when we flush last chunk - public void testSetAutoReadOnLastFlush() { + // ensures that we read from channel after last chunk + public void testChannelReadAfterLastContent() { + var readCounter = new AtomicInteger(); + channel.pipeline().addAfter("flow", "read-counter", new ChannelOutboundHandlerAdapter() { + @Override + public void read(ChannelHandlerContext ctx) throws Exception { + readCounter.incrementAndGet(); + super.read(ctx); + } + }); channel.writeInbound(randomLastContent(10)); - assertFalse("should not auto-read on last content reception", channel.config().isAutoRead()); stream.next(); channel.runPendingTasks(); - assertTrue("should set auto-read once last content is flushed", channel.config().isAutoRead()); - } - - // ensures that we read from channel when no current chunks available - // and pass next chunk downstream without holding - public void testReadFromChannel() { - var gotChunks = new ArrayList(); - var gotLast = new AtomicBoolean(false); - stream.setHandler((chunk, isLast) -> { - gotChunks.add(chunk); - gotLast.set(isLast); - chunk.close(); - }); - channel.pipeline().addFirst(new FlowControlHandler()); // block all incoming messages, need explicit channel.read() - var chunkSize = 1024; - var totalChunks = randomIntBetween(1, 32); - for (int i = 0; i < totalChunks - 1; i++) { - channel.writeInbound(randomContent(chunkSize)); - } - channel.writeInbound(randomLastContent(chunkSize)); - - for (int i = 0; i < totalChunks; i++) { - assertEquals("should not enqueue chunks", 0, stream.bufSize()); - stream.next(); - channel.runPendingTasks(); - assertEquals("each next() should produce single chunk", i + 1, gotChunks.size()); - } - assertTrue("should receive last content", gotLast.get()); + assertEquals("should have at least 2 reads, one for last content, and one after last", 2, readCounter.get()); } public void testReadFromHasCorrectThreadContext() throws InterruptedException { @@ -142,8 +113,9 @@ public void testReadFromHasCorrectThreadContext() throws InterruptedException { try { // activity tracker requires stream execution in the same thread, setting up stream inside event-loop eventLoop.submit(() -> { - channel = new EmbeddedChannel(); - stream = new Netty4HttpRequestBodyStream(channel, threadContext, new ThreadWatchdog.ActivityTracker()); + channel = new EmbeddedChannel(new FlowControlHandler()); + channel.config().setAutoRead(false); + stream = new Netty4HttpRequestBodyStream(channel, threadContext); channel.pipeline().addLast(new SimpleChannelInboundHandler(false) { @Override protected void channelRead0(ChannelHandlerContext ctx, HttpContent msg) { @@ -198,18 +170,6 @@ public void close() { } } - public void testStreamNextActivityTracker() { - var t0 = activityTracker.get(); - var N = between(1, 10); - for (int i = 0; i < N; i++) { - channel.writeInbound(randomContent(1024)); - stream.next(); - channel.runPendingTasks(); - } - var t1 = activityTracker.get(); - assertEquals("stream#next() must trigger activity tracker: N*step=" + N + "*2=" + N * 2L + " times", t1, t0 + N * 2L); - } - // ensure that we catch all exceptions and throw them into channel pipeline public void testCatchExceptions() { var gotExceptions = new CountDownLatch(3); // number of tests below From a89fec25b2192b6f290015a995035a048d70b2c5 Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Mon, 7 Apr 2025 22:25:53 -0700 Subject: [PATCH 02/12] non-reentrant reads --- .../http/netty4/Netty4HttpRequestBodyStream.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java index 4d9afd180ac07..13bbb15f16969 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java @@ -58,17 +58,21 @@ public void addTracingHandler(ChunkHandler chunkHandler) { tracingHandlers.add(chunkHandler); } + private void scheduleRead() { + channel.eventLoop().execute(channel::read); + } + @Override public void next() { assert handler != null : "handler must be set before requesting next chunk"; requestContext = threadContext.newStoredContext(); - channel.eventLoop().execute(channel::read); + scheduleRead(); } public void handleNettyContent(HttpContent httpContent) { if (closing) { httpContent.release(); - channel.eventLoop().execute(channel::read); + scheduleRead(); } else { try (var ignored = threadContext.restoreExistingContext(requestContext)) { var isLast = httpContent instanceof LastHttpContent; @@ -78,7 +82,7 @@ public void handleNettyContent(HttpContent httpContent) { } handler.onNext(buf, isLast); if (isLast) { - channel.read(); + scheduleRead(); channel.closeFuture().removeListener(closeListener); } } @@ -104,6 +108,6 @@ private void doClose() { handler.close(); } } - channel.eventLoop().execute(channel::read); + scheduleRead(); } } From d83f45b5bf201b07c91ca322159abb28472677c2 Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Mon, 7 Apr 2025 22:27:02 -0700 Subject: [PATCH 03/12] Update docs/changelog/126441.yaml --- docs/changelog/126441.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/126441.yaml diff --git a/docs/changelog/126441.yaml b/docs/changelog/126441.yaml new file mode 100644 index 0000000000000..717289a88c966 --- /dev/null +++ b/docs/changelog/126441.yaml @@ -0,0 +1,5 @@ +pr: 126441 +summary: Add flow-control and remove auto-read in netty4 http pipeline +area: Network +type: enhancement +issues: [] From b749dd37ee4be49c58e7286d363035c99e219c3a Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Mon, 7 Apr 2025 23:43:58 -0700 Subject: [PATCH 04/12] requested reads in validator --- .../http/netty4/Netty4HttpHeaderValidator.java | 11 ++++++++++- .../http/netty4/Netty4HttpPipeliningHandler.java | 8 +++++--- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java index e8a84527c2f5f..4cf1c6a04af60 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java @@ -29,6 +29,7 @@ public class Netty4HttpHeaderValidator extends ChannelDuplexHandler { private final ThreadContext threadContext; private boolean droppingContent; private boolean validatingRequest; + private boolean readRequested; public Netty4HttpHeaderValidator(HttpValidator validator, ThreadContext threadContext) { this.validator = validator; @@ -47,6 +48,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } else if (msg instanceof HttpContent content) { if (droppingContent) { content.release(); + ctx.read(); } else { assert validatingRequest == false : "unexpected content before validation completed"; ctx.fireChannelRead(content); @@ -60,7 +62,10 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception public void read(ChannelHandlerContext ctx) throws Exception { // until validation is completed we can ignore read calls, // once validation is finished HttpRequest will be fired and downstream can read from there - if (validatingRequest == false) { + if (validatingRequest) { + readRequested = true; + } else { + readRequested = false; ctx.read(); } } @@ -108,6 +113,10 @@ void handleValidationResult(ChannelHandlerContext ctx, HttpRequest request, @Nul } validatingRequest = false; ctx.fireChannelRead(request); + if (readRequested) { + readRequested = false; + ctx.channel().eventLoop().execute(ctx::read); + } }); } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java index 301d085e262f7..308ccc8f82d8a 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java @@ -146,9 +146,6 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) { } } handlePipelinedRequest(ctx, netty4HttpRequest); - if (request instanceof FullHttpRequest) { - ctx.read(); - } } else { assert msg instanceof HttpContent : "expect HttpContent got " + msg; assert currentRequestStream != null : "current stream must exists before handling http content"; @@ -158,6 +155,11 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) { } } } finally { + if (msg instanceof HttpRequest httpRequest) { + if (httpRequest instanceof FullHttpRequest || httpRequest.decoderResult().isFailure()) { + ctx.channel().eventLoop().execute(ctx::read); + } + } activityTracker.stopActivity(); } } From f8d7fb1eccc1136212f63deb50fd13f621470fa0 Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Tue, 8 Apr 2025 01:05:48 -0700 Subject: [PATCH 05/12] intercept readComplete --- .../http/netty4/Netty4HttpHeaderValidator.java | 18 +++++++++--------- .../http/netty4/Netty4HttpServerTransport.java | 4 ++-- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java index 4cf1c6a04af60..a01379d0262e0 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java @@ -29,7 +29,6 @@ public class Netty4HttpHeaderValidator extends ChannelDuplexHandler { private final ThreadContext threadContext; private boolean droppingContent; private boolean validatingRequest; - private boolean readRequested; public Netty4HttpHeaderValidator(HttpValidator validator, ThreadContext threadContext) { this.validator = validator; @@ -62,14 +61,18 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception public void read(ChannelHandlerContext ctx) throws Exception { // until validation is completed we can ignore read calls, // once validation is finished HttpRequest will be fired and downstream can read from there - if (validatingRequest) { - readRequested = true; - } else { - readRequested = false; + if (validatingRequest == false) { ctx.read(); } } + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + if (validatingRequest == false) { + ctx.fireChannelReadComplete(); + } + } + void validate(ChannelHandlerContext ctx, HttpRequest request) { assert Transports.assertDefaultThreadContext(threadContext); droppingContent = false; @@ -113,10 +116,7 @@ void handleValidationResult(ChannelHandlerContext ctx, HttpRequest request, @Nul } validatingRequest = false; ctx.fireChannelRead(request); - if (readRequested) { - readRequested = false; - ctx.channel().eventLoop().execute(ctx::read); - } + ctx.fireChannelReadComplete(); }); } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index c6bfb0ce7b42f..79fdbfc8693a5 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -366,6 +366,8 @@ protected HttpMessage createMessage(String[] initialLine) throws Exception { decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR); ch.pipeline().addLast("decoder", decoder); // parses the HTTP bytes request into HTTP message pieces ch.pipeline().addLast(new FlowControlHandler()); + ch.config().setAutoRead(false); + ch.read(); if (httpValidator != null) { // runs a validation function on the first HTTP message piece which contains all the headers // if validation passes, the pieces of that particular request are forwarded, otherwise they are discarded @@ -423,8 +425,6 @@ protected Result beginEncode(HttpResponse httpResponse, String acceptEncoding) t new Netty4HttpPipeliningHandler(transport.pipeliningMaxEvents, transport, threadWatchdogActivityTracker) ); transport.serverAcceptedChannel(nettyHttpChannel); - ch.config().setAutoRead(false); - ch.read(); } @Override From fea33edf45305556db4c3e78c985089c9267253e Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Tue, 8 Apr 2025 13:30:38 -0700 Subject: [PATCH 06/12] missing read detector --- .../http/netty4/MissingReadDetector.java | 71 +++++++++++++++++++ .../http/netty4/Netty4HttpAggregator.java | 1 + .../netty4/Netty4HttpServerTransport.java | 10 +++ 3 files changed, 82 insertions(+) create mode 100644 modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/MissingReadDetector.java diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/MissingReadDetector.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/MissingReadDetector.java new file mode 100644 index 0000000000000..ab9c73dea2544 --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/MissingReadDetector.java @@ -0,0 +1,71 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.http.netty4; + +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.util.concurrent.ScheduledFuture; + +import org.elasticsearch.common.time.TimeProvider; +import org.elasticsearch.common.util.concurrent.FutureUtils; + +import java.util.concurrent.TimeUnit; + +/** + * When channel auto-read is disabled handlers are responsible to read from channel. + * But it's hard to detect when read is missing. This helper class throws assertion errors + * when no reads where detected in given time interval. Normally, in tests, 30 seconds is enough + * to avoid test hang for too long, but can be increased if needed. + */ +class MissingReadDetector extends ChannelDuplexHandler { + final long interval; + final TimeProvider timer; + long reqTimeMs; + long respTimeMs; + ScheduledFuture checker; + + MissingReadDetector(TimeProvider timer, long missingReadInterval) { + this.interval = missingReadInterval; + this.timer = timer; + } + + @Override + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + checker = ctx.channel().eventLoop().scheduleAtFixedRate(() -> { + if (respTimeMs >= reqTimeMs) { // stale read + long now = timer.absoluteTimeInMillis(); + if (now >= respTimeMs + interval) { + ctx.fireExceptionCaught(new AssertionError("stale channel, no reads for " + (now - respTimeMs) + " ms")); + } + } + }, interval, interval, TimeUnit.MILLISECONDS); + super.channelRegistered(ctx); + } + + @Override + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + if (checker != null) { + FutureUtils.cancel(checker); + } + super.channelUnregistered(ctx); + } + + @Override + public void read(ChannelHandlerContext ctx) throws Exception { + reqTimeMs = timer.absoluteTimeInMillis(); + super.read(ctx); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + respTimeMs = timer.absoluteTimeInMillis(); + super.channelRead(ctx, msg); + } +} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java index 0294b4626496c..8500273d90af9 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java @@ -48,6 +48,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } if (aggregating || msg instanceof FullHttpRequest) { super.channelRead(ctx, msg); + ctx.read(); } else { streamContentSizeHandler.channelRead(ctx, msg); } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index 79fdbfc8693a5..c52c14c60e777 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -47,6 +47,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.Assertions; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Nullable; import org.elasticsearch.http.AbstractHttpServerTransport; @@ -365,9 +366,18 @@ protected HttpMessage createMessage(String[] initialLine) throws Exception { } decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR); ch.pipeline().addLast("decoder", decoder); // parses the HTTP bytes request into HTTP message pieces + + // from this point in pipeline every handler must call ctx or channel #read() when ready to process next HTTP part ch.pipeline().addLast(new FlowControlHandler()); + if (Assertions.ENABLED) { + // missing reads are hard to catch, but we can detect absence of reads within interval + long missingReadIntervalMs = 30_000; + ch.pipeline().addLast(new MissingReadDetector(transport.threadPool, missingReadIntervalMs)); + } + // disable auto-read and issue first read, following reads must come from handlers ch.config().setAutoRead(false); ch.read(); + if (httpValidator != null) { // runs a validation function on the first HTTP message piece which contains all the headers // if validation passes, the pieces of that particular request are forwarded, otherwise they are discarded From d8c89c2c9245cb9d83b2cd1c1521d81eede9b551 Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Tue, 8 Apr 2025 16:08:37 -0700 Subject: [PATCH 07/12] more reads and relax detector --- .../http/netty4/MissingReadDetector.java | 23 +++++++++++-------- .../netty4/Netty4HttpContentSizeHandler.java | 3 +++ .../netty4/Netty4HttpPipeliningHandler.java | 1 + .../netty4/Netty4HttpServerTransport.java | 2 +- ...y4HttpServerTransportCloseNotifyTests.java | 2 +- 5 files changed, 20 insertions(+), 11 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/MissingReadDetector.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/MissingReadDetector.java index ab9c73dea2544..44962d464b65c 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/MissingReadDetector.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/MissingReadDetector.java @@ -13,6 +13,8 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.util.concurrent.ScheduledFuture; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.common.time.TimeProvider; import org.elasticsearch.common.util.concurrent.FutureUtils; @@ -20,16 +22,19 @@ /** * When channel auto-read is disabled handlers are responsible to read from channel. - * But it's hard to detect when read is missing. This helper class throws assertion errors + * But it's hard to detect when read is missing. This helper class print warnings * when no reads where detected in given time interval. Normally, in tests, 30 seconds is enough * to avoid test hang for too long, but can be increased if needed. */ class MissingReadDetector extends ChannelDuplexHandler { - final long interval; - final TimeProvider timer; - long reqTimeMs; - long respTimeMs; - ScheduledFuture checker; + + private static final Logger logger = LogManager.getLogger(MissingReadDetector.class); + + private final long interval; + private final TimeProvider timer; + private long reqTimeMs; + private long respTimeMs; + private ScheduledFuture checker; MissingReadDetector(TimeProvider timer, long missingReadInterval) { this.interval = missingReadInterval; @@ -37,12 +42,12 @@ class MissingReadDetector extends ChannelDuplexHandler { } @Override - public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { checker = ctx.channel().eventLoop().scheduleAtFixedRate(() -> { if (respTimeMs >= reqTimeMs) { // stale read long now = timer.absoluteTimeInMillis(); if (now >= respTimeMs + interval) { - ctx.fireExceptionCaught(new AssertionError("stale channel, no reads for " + (now - respTimeMs) + " ms")); + logger.warn("haven't read from channel for {}", (now - respTimeMs)); } } }, interval, interval, TimeUnit.MILLISECONDS); @@ -50,7 +55,7 @@ public void channelRegistered(ChannelHandlerContext ctx) throws Exception { } @Override - public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { if (checker != null) { FutureUtils.cancel(checker); } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpContentSizeHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpContentSizeHandler.java index fee9d227d8310..8b40361292be2 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpContentSizeHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpContentSizeHandler.java @@ -123,6 +123,7 @@ private void handleRequest(ChannelHandlerContext ctx, HttpRequest request) { isContinueExpected = true; } else { ctx.writeAndFlush(EXPECTATION_FAILED_CLOSE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE); + ctx.read(); return; } } @@ -136,6 +137,7 @@ private void handleRequest(ChannelHandlerContext ctx, HttpRequest request) { decoder.reset(); } ctx.writeAndFlush(TOO_LARGE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); + ctx.read(); } else { ignoreContent = false; currentContentLength = 0; @@ -155,6 +157,7 @@ private void handleContent(ChannelHandlerContext ctx, HttpContent msg) { if (currentContentLength > maxContentLength) { msg.release(); ctx.writeAndFlush(TOO_LARGE_CLOSE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE); + ctx.read(); } else { ctx.fireChannelRead(msg); } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java index 308ccc8f82d8a..ae33a1026e916 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java @@ -509,6 +509,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { } else { serverTransport.onException(channel, (Exception) cause); } + ctx.read(); } @Override diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index c52c14c60e777..300f9782f283f 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -371,7 +371,7 @@ protected HttpMessage createMessage(String[] initialLine) throws Exception { ch.pipeline().addLast(new FlowControlHandler()); if (Assertions.ENABLED) { // missing reads are hard to catch, but we can detect absence of reads within interval - long missingReadIntervalMs = 30_000; + long missingReadIntervalMs = 10_000; ch.pipeline().addLast(new MissingReadDetector(transport.threadPool, missingReadIntervalMs)); } // disable auto-read and issue first read, following reads must come from handlers diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportCloseNotifyTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportCloseNotifyTests.java index ec2881b989d0b..6a2a6df28899f 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportCloseNotifyTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportCloseNotifyTests.java @@ -251,7 +251,7 @@ public void close() { server.dispatcher.reqQueue.forEach(r -> r.request.getHttpRequest().release()); server.netty.stop(); server.threadPool.shutdownNow(); - safeAwait(client.netty.config().group().shutdownGracefully()); + safeAwait(client.netty.config().group().shutdownGracefully(0,0, TimeUnit.SECONDS)); } } From c90d4d71d6854dce8afbea708379647f920e65b4 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Tue, 8 Apr 2025 23:15:24 +0000 Subject: [PATCH 08/12] [CI] Auto commit changes from spotless --- .../SecurityNetty4HttpServerTransportCloseNotifyTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportCloseNotifyTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportCloseNotifyTests.java index 6a2a6df28899f..4ab0e53ce2bd5 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportCloseNotifyTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportCloseNotifyTests.java @@ -251,7 +251,7 @@ public void close() { server.dispatcher.reqQueue.forEach(r -> r.request.getHttpRequest().release()); server.netty.stop(); server.threadPool.shutdownNow(); - safeAwait(client.netty.config().group().shutdownGracefully(0,0, TimeUnit.SECONDS)); + safeAwait(client.netty.config().group().shutdownGracefully(0, 0, TimeUnit.SECONDS)); } } From 55681eda07da677ed6caa1df3a4cd783f102c9b6 Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Wed, 9 Apr 2025 19:09:25 -0700 Subject: [PATCH 09/12] more tests --- .../Netty4IncrementalRequestHandlingIT.java | 132 +++++++++--------- .../http/netty4/MissingReadDetector.java | 25 ++-- .../http/netty4/Netty4HttpAggregator.java | 5 +- .../netty4/Netty4HttpContentSizeHandler.java | 1 + .../netty4/Netty4HttpHeaderValidator.java | 8 -- .../netty4/Netty4HttpPipeliningHandler.java | 12 +- .../netty4/Netty4HttpRequestBodyStream.java | 28 ++-- .../Netty4HttpContentSizeHandlerTests.java | 17 ++- .../Netty4HttpHeaderValidatorTests.java | 21 +++ .../Netty4HttpRequestBodyStreamTests.java | 16 ++- .../http/netty4/ReadSniffer.java | 43 ++++++ 11 files changed, 195 insertions(+), 113 deletions(-) create mode 100644 modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/ReadSniffer.java diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java index 1430b884b68b9..f9bfe0f296237 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java @@ -94,6 +94,64 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase { private static final int MAX_CONTENT_LENGTH = ByteSizeUnit.MB.toIntBytes(50); + private static long transportStatsRequestBytesSize(Ctx ctx) { + var httpTransport = internalCluster().getInstance(HttpServerTransport.class, ctx.nodeName); + var stats = httpTransport.stats().clientStats(); + var bytes = 0L; + for (var s : stats) { + bytes += s.requestSizeBytes(); + } + return bytes; + } + + static int MBytes(int m) { + return m * 1024 * 1024; + } + + static T safePoll(BlockingDeque queue) { + try { + var t = queue.poll(SAFE_AWAIT_TIMEOUT.seconds(), TimeUnit.SECONDS); + assertNotNull("queue is empty", t); + return t; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError(e); + } + } + + private static FullHttpRequest fullHttpRequest(String opaqueId, ByteBuf content) { + var req = new DefaultFullHttpRequest(HTTP_1_1, POST, ControlServerRequestPlugin.ROUTE, Unpooled.wrappedBuffer(content)); + req.headers().add(CONTENT_LENGTH, content.readableBytes()); + req.headers().add(CONTENT_TYPE, APPLICATION_JSON); + req.headers().add(Task.X_OPAQUE_ID_HTTP_HEADER, opaqueId); + return req; + } + + private static HttpRequest httpRequest(String opaqueId, int contentLength) { + return httpRequest(ControlServerRequestPlugin.ROUTE, opaqueId, contentLength); + } + + private static HttpRequest httpRequest(String uri, String opaqueId, int contentLength) { + var req = new DefaultHttpRequest(HTTP_1_1, POST, uri); + req.headers().add(CONTENT_LENGTH, contentLength); + req.headers().add(CONTENT_TYPE, APPLICATION_JSON); + req.headers().add(Task.X_OPAQUE_ID_HTTP_HEADER, opaqueId); + return req; + } + + private static HttpContent randomContent(int size, boolean isLast) { + var buf = Unpooled.wrappedBuffer(randomByteArrayOfLength(size)); + if (isLast) { + return new DefaultLastHttpContent(buf); + } else { + return new DefaultHttpContent(buf); + } + } + + private static ByteBuf randomByteBuf(int size) { + return Unpooled.wrappedBuffer(randomByteArrayOfLength(size)); + } + @Override protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings)); @@ -372,16 +430,6 @@ public void testBadRequestReleaseQueuedChunks() throws Exception { } } - private static long transportStatsRequestBytesSize(Ctx ctx) { - var httpTransport = internalCluster().getInstance(HttpServerTransport.class, ctx.nodeName); - var stats = httpTransport.stats().clientStats(); - var bytes = 0L; - for (var s : stats) { - bytes += s.requestSizeBytes(); - } - return bytes; - } - /** * ensures that {@link org.elasticsearch.http.HttpClientStatsTracker} counts streamed content bytes */ @@ -476,55 +524,7 @@ private String opaqueId(int reqNo) { return getTestName() + "-" + reqNo; } - static int MBytes(int m) { - return m * 1024 * 1024; - } - - static T safePoll(BlockingDeque queue) { - try { - var t = queue.poll(SAFE_AWAIT_TIMEOUT.seconds(), TimeUnit.SECONDS); - assertNotNull("queue is empty", t); - return t; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new AssertionError(e); - } - } - - static FullHttpRequest fullHttpRequest(String opaqueId, ByteBuf content) { - var req = new DefaultFullHttpRequest(HTTP_1_1, POST, ControlServerRequestPlugin.ROUTE, Unpooled.wrappedBuffer(content)); - req.headers().add(CONTENT_LENGTH, content.readableBytes()); - req.headers().add(CONTENT_TYPE, APPLICATION_JSON); - req.headers().add(Task.X_OPAQUE_ID_HTTP_HEADER, opaqueId); - return req; - } - - static HttpRequest httpRequest(String opaqueId, int contentLength) { - return httpRequest(ControlServerRequestPlugin.ROUTE, opaqueId, contentLength); - } - - static HttpRequest httpRequest(String uri, String opaqueId, int contentLength) { - var req = new DefaultHttpRequest(HTTP_1_1, POST, uri); - req.headers().add(CONTENT_LENGTH, contentLength); - req.headers().add(CONTENT_TYPE, APPLICATION_JSON); - req.headers().add(Task.X_OPAQUE_ID_HTTP_HEADER, opaqueId); - return req; - } - - static HttpContent randomContent(int size, boolean isLast) { - var buf = Unpooled.wrappedBuffer(randomByteArrayOfLength(size)); - if (isLast) { - return new DefaultLastHttpContent(buf); - } else { - return new DefaultHttpContent(buf); - } - } - - static ByteBuf randomByteBuf(int size) { - return Unpooled.wrappedBuffer(randomByteArrayOfLength(size)); - } - - Ctx setupClientCtx() throws Exception { + private Ctx setupClientCtx() throws Exception { var nodeName = internalCluster().getRandomNodeName(); var clientRespQueue = new LinkedBlockingDeque<>(16); var bootstrap = bootstrapClient(nodeName, clientRespQueue); @@ -532,7 +532,7 @@ Ctx setupClientCtx() throws Exception { return new Ctx(getTestName(), nodeName, bootstrap, channel, clientRespQueue); } - Bootstrap bootstrapClient(String node, BlockingQueue queue) { + private Bootstrap bootstrapClient(String node, BlockingQueue queue) { var httpServer = internalCluster().getInstance(HttpServerTransport.class, node); var remoteAddr = randomFrom(httpServer.boundAddress().boundAddresses()); return new Bootstrap().group(new NioEventLoopGroup(1)) @@ -570,9 +570,13 @@ protected boolean addMockHttpTransport() { return false; // enable http } - record Ctx(String testName, String nodeName, Bootstrap clientBootstrap, Channel clientChannel, BlockingDeque clientRespQueue) - implements - AutoCloseable { + private record Ctx( + String testName, + String nodeName, + Bootstrap clientBootstrap, + Channel clientChannel, + BlockingDeque clientRespQueue + ) implements AutoCloseable { @Override public void close() throws Exception { @@ -597,7 +601,7 @@ ServerRequestHandler awaitRestChannelAccepted(String opaqueId) throws Exception } } - static class ServerRequestHandler implements BaseRestHandler.RequestBodyChunkConsumer { + private static class ServerRequestHandler implements BaseRestHandler.RequestBodyChunkConsumer { final SubscribableListener channelAccepted = new SubscribableListener<>(); final String opaqueId; final BlockingDeque recvChunks = new LinkedBlockingDeque<>(); diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/MissingReadDetector.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/MissingReadDetector.java index 44962d464b65c..4851f1ef7c5eb 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/MissingReadDetector.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/MissingReadDetector.java @@ -23,7 +23,7 @@ /** * When channel auto-read is disabled handlers are responsible to read from channel. * But it's hard to detect when read is missing. This helper class print warnings - * when no reads where detected in given time interval. Normally, in tests, 30 seconds is enough + * when no reads where detected in given time interval. Normally, in tests, 10 seconds is enough * to avoid test hang for too long, but can be increased if needed. */ class MissingReadDetector extends ChannelDuplexHandler { @@ -32,8 +32,8 @@ class MissingReadDetector extends ChannelDuplexHandler { private final long interval; private final TimeProvider timer; - private long reqTimeMs; - private long respTimeMs; + private boolean pendingRead; + private long lastRead; private ScheduledFuture checker; MissingReadDetector(TimeProvider timer, long missingReadInterval) { @@ -44,14 +44,14 @@ class MissingReadDetector extends ChannelDuplexHandler { @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { checker = ctx.channel().eventLoop().scheduleAtFixedRate(() -> { - if (respTimeMs >= reqTimeMs) { // stale read + if (pendingRead == false) { long now = timer.absoluteTimeInMillis(); - if (now >= respTimeMs + interval) { - logger.warn("haven't read from channel for {}", (now - respTimeMs)); + if (now >= lastRead + interval) { + logger.warn("chan-id={} haven't read from channel for [{}ms]", ctx.channel().id(), (now - lastRead)); } } }, interval, interval, TimeUnit.MILLISECONDS); - super.channelRegistered(ctx); + super.handlerAdded(ctx); } @Override @@ -59,18 +59,19 @@ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { if (checker != null) { FutureUtils.cancel(checker); } - super.channelUnregistered(ctx); + super.handlerRemoved(ctx); } @Override public void read(ChannelHandlerContext ctx) throws Exception { - reqTimeMs = timer.absoluteTimeInMillis(); - super.read(ctx); + pendingRead = true; + ctx.read(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - respTimeMs = timer.absoluteTimeInMillis(); - super.channelRead(ctx, msg); + pendingRead = false; + lastRead = timer.absoluteTimeInMillis(); + ctx.fireChannelRead(msg); } } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java index 8500273d90af9..479d053937e1a 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java @@ -15,6 +15,7 @@ import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpRequestDecoder; +import io.netty.handler.codec.http.LastHttpContent; import org.elasticsearch.http.HttpPreRequest; import org.elasticsearch.http.netty4.internal.HttpHeadersAuthenticatorUtils; @@ -48,7 +49,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } if (aggregating || msg instanceof FullHttpRequest) { super.channelRead(ctx, msg); - ctx.read(); + if (msg instanceof LastHttpContent == false) { + ctx.read(); // HttpObjectAggregator is tricky with auto-read off, it might not call read again, calling on its behalf + } } else { streamContentSizeHandler.channelRead(ctx, msg); } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpContentSizeHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpContentSizeHandler.java index 8b40361292be2..e121d091b1bdb 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpContentSizeHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpContentSizeHandler.java @@ -152,6 +152,7 @@ private void handleRequest(ChannelHandlerContext ctx, HttpRequest request) { private void handleContent(ChannelHandlerContext ctx, HttpContent msg) { if (ignoreContent) { msg.release(); + ctx.read(); } else { currentContentLength += msg.content().readableBytes(); if (currentContentLength > maxContentLength) { diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java index a01379d0262e0..9ed69048a5f63 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java @@ -66,13 +66,6 @@ public void read(ChannelHandlerContext ctx) throws Exception { } } - @Override - public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { - if (validatingRequest == false) { - ctx.fireChannelReadComplete(); - } - } - void validate(ChannelHandlerContext ctx, HttpRequest request) { assert Transports.assertDefaultThreadContext(threadContext); droppingContent = false; @@ -116,7 +109,6 @@ void handleValidationResult(ChannelHandlerContext ctx, HttpRequest request, @Nul } validatingRequest = false; ctx.fireChannelRead(request); - ctx.fireChannelReadComplete(); }); } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java index ae33a1026e916..5bec94c2e8a72 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java @@ -137,10 +137,7 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) { netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest); currentRequestStream = null; } else { - var contentStream = new Netty4HttpRequestBodyStream( - ctx.channel(), - serverTransport.getThreadPool().getThreadContext() - ); + var contentStream = new Netty4HttpRequestBodyStream(ctx, serverTransport.getThreadPool().getThreadContext()); currentRequestStream = contentStream; netty4HttpRequest = new Netty4HttpRequest(readSequence++, request, contentStream); } @@ -155,10 +152,8 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) { } } } finally { - if (msg instanceof HttpRequest httpRequest) { - if (httpRequest instanceof FullHttpRequest || httpRequest.decoderResult().isFailure()) { - ctx.channel().eventLoop().execute(ctx::read); - } + if (currentRequestStream == null) { + ctx.channel().eventLoop().execute(ctx::read); } activityTracker.stopActivity(); } @@ -509,7 +504,6 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { } else { serverTransport.onException(channel, (Exception) cause); } - ctx.read(); } @Override diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java index 13bbb15f16969..b4396569ae35e 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java @@ -9,8 +9,8 @@ package org.elasticsearch.http.netty4; -import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.LastHttpContent; @@ -27,19 +27,19 @@ */ public class Netty4HttpRequestBodyStream implements HttpBody.Stream { - private final Channel channel; private final List tracingHandlers = new ArrayList<>(4); private final ThreadContext threadContext; + private final ChannelHandlerContext ctx; private boolean closing = false; private HttpBody.ChunkHandler handler; private ThreadContext.StoredContext requestContext; private final ChannelFutureListener closeListener = future -> doClose(); - public Netty4HttpRequestBodyStream(Channel channel, ThreadContext threadContext) { - this.channel = channel; + public Netty4HttpRequestBodyStream(ChannelHandlerContext ctx, ThreadContext threadContext) { + this.ctx = ctx; this.threadContext = threadContext; this.requestContext = threadContext.newStoredContext(); - Netty4Utils.addListener(channel.closeFuture(), closeListener); + Netty4Utils.addListener(ctx.channel().closeFuture(), closeListener); } @Override @@ -58,21 +58,21 @@ public void addTracingHandler(ChunkHandler chunkHandler) { tracingHandlers.add(chunkHandler); } - private void scheduleRead() { - channel.eventLoop().execute(channel::read); + private void read() { + ctx.channel().eventLoop().execute(ctx::read); } @Override public void next() { assert handler != null : "handler must be set before requesting next chunk"; requestContext = threadContext.newStoredContext(); - scheduleRead(); + read(); } public void handleNettyContent(HttpContent httpContent) { if (closing) { httpContent.release(); - scheduleRead(); + read(); } else { try (var ignored = threadContext.restoreExistingContext(requestContext)) { var isLast = httpContent instanceof LastHttpContent; @@ -82,8 +82,8 @@ public void handleNettyContent(HttpContent httpContent) { } handler.onNext(buf, isLast); if (isLast) { - scheduleRead(); - channel.closeFuture().removeListener(closeListener); + read(); + ctx.channel().closeFuture().removeListener(closeListener); } } } @@ -91,10 +91,10 @@ public void handleNettyContent(HttpContent httpContent) { @Override public void close() { - if (channel.eventLoop().inEventLoop()) { + if (ctx.channel().eventLoop().inEventLoop()) { doClose(); } else { - channel.eventLoop().submit(this::doClose); + ctx.channel().eventLoop().submit(this::doClose); } } @@ -108,6 +108,6 @@ private void doClose() { handler.close(); } } - scheduleRead(); + read(); } } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpContentSizeHandlerTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpContentSizeHandlerTests.java index 36399c8d6d7a5..3ffda91359395 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpContentSizeHandlerTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpContentSizeHandlerTests.java @@ -40,6 +40,7 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase { private static final int REPS = 1000; private EmbeddedChannel channel; private EmbeddedChannel encoder; // channel to encode HTTP objects into bytes + private ReadSniffer readSniffer; private static HttpContent httpContent(int size) { return new DefaultHttpContent(Unpooled.wrappedBuffer(randomByteArrayOfLength(size))); @@ -68,7 +69,10 @@ public void setUp() throws Exception { super.setUp(); var decoder = new HttpRequestDecoder(); encoder = new EmbeddedChannel(new HttpRequestEncoder()); - channel = new EmbeddedChannel(decoder, new Netty4HttpContentSizeHandler(decoder, MAX_CONTENT_LENGTH)); + readSniffer = new ReadSniffer(); + channel = new EmbeddedChannel(); + channel.config().setAutoRead(false); + channel.pipeline().addLast(decoder, readSniffer, new Netty4HttpContentSizeHandler(decoder, MAX_CONTENT_LENGTH)); } /** @@ -85,6 +89,7 @@ public void testContinue() { assertFalse(HttpUtil.is100ContinueExpected(recvRequest)); channel.writeInbound(encode(LastHttpContent.EMPTY_LAST_CONTENT)); assertEquals(LastHttpContent.EMPTY_LAST_CONTENT, channel.readInbound()); + assertEquals("must not read from channel", 0, readSniffer.readCnt); } } @@ -99,6 +104,7 @@ public void testWithoutContinue() { assertNotNull("request should pass", channel.readInbound()); channel.writeInbound(encode(LastHttpContent.EMPTY_LAST_CONTENT)); assertEquals(LastHttpContent.EMPTY_LAST_CONTENT, channel.readInbound()); + assertEquals("must not read from channel", 0, readSniffer.readCnt); } } @@ -121,6 +127,7 @@ public void testContinueWithContent() { assertNotNull(recvContent); assertEquals(MAX_CONTENT_LENGTH, recvContent.content().readableBytes()); recvContent.release(); + assertEquals("must not read from channel", 0, readSniffer.readCnt); } } @@ -134,6 +141,7 @@ public void testExpectationFailed() { channel.writeInbound(encode(sendRequest)); var resp = (FullHttpResponse) channel.readOutbound(); assertEquals(HttpResponseStatus.EXPECTATION_FAILED, resp.status()); + assertEquals("expect 2 reads, one from size handler and HTTP decoder will emit LastHttpContent", 2, readSniffer.readCnt); assertFalse(channel.isOpen()); resp.release(); } @@ -152,6 +160,7 @@ public void testEntityTooLarge() { assertEquals(HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, resp.status()); assertNull("request should not pass", channel.readInbound()); assertTrue("should not close channel", channel.isOpen()); + assertEquals("must read from channel", i + 1, readSniffer.readCnt); resp.release(); } } @@ -160,11 +169,13 @@ public void testEntityTooLarge() { * Mixed load of oversized and normal requests with Exepct:100-Continue. */ public void testMixedContent() { + var expectReadCnt = 0; for (int i = 0; i < REPS; i++) { var isOversized = randomBoolean(); var sendRequest = httpRequest(); HttpUtil.set100ContinueExpected(sendRequest, true); if (isOversized) { + expectReadCnt++; HttpUtil.setContentLength(sendRequest, OVERSIZED_LENGTH); channel.writeInbound(encode(sendRequest)); var resp = (FullHttpResponse) channel.readOutbound(); @@ -188,6 +199,7 @@ public void testMixedContent() { assertEquals("actual content size should match", normalSize, recvContent.content().readableBytes()); recvContent.release(); } + assertEquals(expectReadCnt, readSniffer.readCnt); } } @@ -205,6 +217,7 @@ public void testEntityTooLargeWithContentWithoutExpect() { resp.release(); assertNull("request and content should not pass", channel.readInbound()); assertTrue("should not close channel", channel.isOpen()); + assertEquals("expect two reads per loop, one for request and one for content", (i + 1) * 2, readSniffer.readCnt); } } @@ -234,7 +247,7 @@ public void testEntityTooLargeWithChunkedContent() { var resp = (FullHttpResponse) channel.readOutbound(); assertEquals("should respond with 413", HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, resp.status()); assertFalse("should close channel", channel.isOpen()); + assertEquals("expect read after response", 1, readSniffer.readCnt); resp.release(); } - } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidatorTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidatorTests.java index 6a7cd2e4cfd71..f5215e1505ba3 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidatorTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidatorTests.java @@ -15,9 +15,11 @@ import io.netty.handler.codec.http.DefaultHttpContent; import io.netty.handler.codec.http.DefaultHttpRequest; import io.netty.handler.codec.http.DefaultLastHttpContent; +import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.flow.FlowControlHandler; @@ -126,6 +128,25 @@ public void testIgnoreReadWhenValidating() { assertTrue(channel.readInbound() instanceof LastHttpContent); } + public void testWithFlowControlAndAggregator() { + channel.pipeline().addFirst(new FlowControlHandler()); + channel.pipeline().addLast(new Netty4HttpAggregator(8192, (req) -> true, new HttpRequestDecoder())); + + channel.writeInbound(newHttpRequest()); + channel.writeInbound(newHttpContent()); + channel.writeInbound(newLastHttpContent()); + + channel.read(); + assertNull("should ignore read while validating", channel.readInbound()); + + var validationRequest = validatorRequestQueue.poll(); + assertNotNull(validationRequest); + validationRequest.listener.onResponse(null); + channel.runPendingTasks(); + + assertTrue(channel.readInbound() instanceof FullHttpRequest); + } + record ValidationRequest(HttpRequest request, Channel channel, ActionListener listener) {} } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java index 04ed049e5cade..fdd4cb8008b68 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java @@ -51,9 +51,14 @@ public void setUp() throws Exception { channel = new EmbeddedChannel(); channel.pipeline().addLast("flow", new FlowControlHandler()); channel.config().setAutoRead(false); - stream = new Netty4HttpRequestBodyStream(channel, threadContext); - stream.setHandler(discardHandler); // set default handler, each test might override one channel.pipeline().addLast(new SimpleChannelInboundHandler(false) { + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + stream = new Netty4HttpRequestBodyStream(ctx, threadContext); + stream.setHandler(discardHandler); // set default handler, each test might override one + super.handlerAdded(ctx); + } + @Override protected void channelRead0(ChannelHandlerContext ctx, HttpContent msg) { stream.handleNettyContent(msg); @@ -115,8 +120,13 @@ public void testReadFromHasCorrectThreadContext() throws InterruptedException { eventLoop.submit(() -> { channel = new EmbeddedChannel(new FlowControlHandler()); channel.config().setAutoRead(false); - stream = new Netty4HttpRequestBodyStream(channel, threadContext); channel.pipeline().addLast(new SimpleChannelInboundHandler(false) { + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + stream = new Netty4HttpRequestBodyStream(ctx, threadContext); + super.handlerAdded(ctx); + } + @Override protected void channelRead0(ChannelHandlerContext ctx, HttpContent msg) { stream.handleNettyContent(msg); diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/ReadSniffer.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/ReadSniffer.java new file mode 100644 index 0000000000000..12453affa5db1 --- /dev/null +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/ReadSniffer.java @@ -0,0 +1,43 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.http.netty4; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; + +/** + * Sniffs channel reads, helps detect missing or unexpected ones. + * Plug-in into channel and checks at the end test. + *
+ *     {@code
+ *     chan = new EmbeddedChannel();
+ *     chan.config().setAutoRead(false);
+ *     readSniffer = new ReadSniffer();
+ *     chan.pipeline().addLast(readSniffer, ...otherHandlers);
+ *     ...
+ *     // run test
+ *     ...
+ *     assertEquals("unexpected read", 0, readSniffer.readCnt)
+ *     // or
+ *     assertEquals("exact number of reads", 2, readSniffer.readCnt)
+ *     }
+ * 
+ * + */ +public class ReadSniffer extends ChannelOutboundHandlerAdapter { + + int readCnt; + + @Override + public void read(ChannelHandlerContext ctx) throws Exception { + readCnt++; + super.read(ctx); + } +} From 2a6a7922dfadec02a5c80afa447c89d0b09b6998 Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Wed, 9 Apr 2025 19:16:35 -0700 Subject: [PATCH 10/12] comment --- .../src/test/java/org/elasticsearch/http/netty4/ReadSniffer.java | 1 - 1 file changed, 1 deletion(-) diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/ReadSniffer.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/ReadSniffer.java index 12453affa5db1..a9e593da55599 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/ReadSniffer.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/ReadSniffer.java @@ -14,7 +14,6 @@ /** * Sniffs channel reads, helps detect missing or unexpected ones. - * Plug-in into channel and checks at the end test. *
  *     {@code
  *     chan = new EmbeddedChannel();

From 34f5884e6dcaa373601fdd418cacf097988bbd13 Mon Sep 17 00:00:00 2001
From: Mikhail Berezovskiy 
Date: Thu, 10 Apr 2025 17:11:16 -0700
Subject: [PATCH 11/12] feedback

---
 .../http/netty4/MissingReadDetector.java      |  5 +--
 .../netty4/Netty4HttpHeaderValidator.java     |  1 -
 .../netty4/Netty4HttpServerTransport.java     |  9 +++--
 .../Netty4HttpContentSizeHandlerTests.java    | 27 ++++++++++-----
 .../Netty4HttpHeaderValidatorTests.java       | 23 ++++++++++---
 .../Netty4HttpRequestBodyStreamTests.java     | 34 +++++++++++--------
 .../http/netty4/ReadSniffer.java              |  4 +--
 7 files changed, 67 insertions(+), 36 deletions(-)

diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/MissingReadDetector.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/MissingReadDetector.java
index 4851f1ef7c5eb..f456bba8064bd 100644
--- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/MissingReadDetector.java
+++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/MissingReadDetector.java
@@ -36,8 +36,8 @@ class MissingReadDetector extends ChannelDuplexHandler {
     private long lastRead;
     private ScheduledFuture checker;
 
-    MissingReadDetector(TimeProvider timer, long missingReadInterval) {
-        this.interval = missingReadInterval;
+    MissingReadDetector(TimeProvider timer, long missingReadIntervalMillis) {
+        this.interval = missingReadIntervalMillis;
         this.timer = timer;
     }
 
@@ -70,6 +70,7 @@ public void read(ChannelHandlerContext ctx) throws Exception {
 
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        assert ctx.channel().config().isAutoRead() == false : "auto-read must be always disabled";
         pendingRead = false;
         lastRead = timer.absoluteTimeInMillis();
         ctx.fireChannelRead(msg);
diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java
index 9ed69048a5f63..99ad4932799cd 100644
--- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java
+++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java
@@ -37,7 +37,6 @@ public Netty4HttpHeaderValidator(HttpValidator validator, ThreadContext threadCo
 
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-        assert ctx.channel().config().isAutoRead() == false : "auto-read should be always disabled";
         if (msg instanceof HttpObject httpObject) {
             if (httpObject.decoderResult().isFailure()) {
                 ctx.fireChannelRead(httpObject); // pass-through for decoding failures
diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java
index 300f9782f283f..df2201af586e4 100644
--- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java
+++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java
@@ -319,6 +319,9 @@ protected HttpChannelHandler(
 
         @Override
         protected void initChannel(Channel ch) throws Exception {
+            // auto-read must be disabled all the time
+            ch.config().setAutoRead(false);
+
             Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(ch);
             ch.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel);
             if (acceptChannelPredicate != null) {
@@ -374,9 +377,6 @@ protected HttpMessage createMessage(String[] initialLine) throws Exception {
                 long missingReadIntervalMs = 10_000;
                 ch.pipeline().addLast(new MissingReadDetector(transport.threadPool, missingReadIntervalMs));
             }
-            // disable auto-read and issue first read, following reads must come from handlers
-            ch.config().setAutoRead(false);
-            ch.read();
 
             if (httpValidator != null) {
                 // runs a validation function on the first HTTP message piece which contains all the headers
@@ -435,6 +435,9 @@ protected Result beginEncode(HttpResponse httpResponse, String acceptEncoding) t
                     new Netty4HttpPipeliningHandler(transport.pipeliningMaxEvents, transport, threadWatchdogActivityTracker)
                 );
             transport.serverAcceptedChannel(nettyHttpChannel);
+
+            // make very first read call, since auto-read is disabled; following reads must come from the handlers
+            ch.read();
         }
 
         @Override
diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpContentSizeHandlerTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpContentSizeHandlerTests.java
index 3ffda91359395..edbaa3b62d187 100644
--- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpContentSizeHandlerTests.java
+++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpContentSizeHandlerTests.java
@@ -12,6 +12,7 @@
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.handler.codec.DecoderResult;
 import io.netty.handler.codec.http.DefaultHttpContent;
 import io.netty.handler.codec.http.DefaultHttpRequest;
 import io.netty.handler.codec.http.DefaultLastHttpContent;
@@ -75,6 +76,16 @@ public void setUp() throws Exception {
         channel.pipeline().addLast(decoder, readSniffer, new Netty4HttpContentSizeHandler(decoder, MAX_CONTENT_LENGTH));
     }
 
+    public void testDecodingFailurePassThrough() {
+        for (var i = 0; i < REPS; i++) {
+            var sendReq = httpRequest();
+            sendReq.setDecoderResult(DecoderResult.failure(new Exception("bad")));
+            channel.writeInbound(sendReq);
+            assertEquals(sendReq, channel.readInbound());
+        }
+        assertEquals("should not read from channel, failures are handled downstream", 0, readSniffer.readCount);
+    }
+
     /**
      * Assert that handler replies 100-continue for acceptable request and pass request further.
      */
@@ -89,7 +100,7 @@ public void testContinue() {
             assertFalse(HttpUtil.is100ContinueExpected(recvRequest));
             channel.writeInbound(encode(LastHttpContent.EMPTY_LAST_CONTENT));
             assertEquals(LastHttpContent.EMPTY_LAST_CONTENT, channel.readInbound());
-            assertEquals("must not read from channel", 0, readSniffer.readCnt);
+            assertEquals("must not read from channel", 0, readSniffer.readCount);
         }
     }
 
@@ -104,7 +115,7 @@ public void testWithoutContinue() {
             assertNotNull("request should pass", channel.readInbound());
             channel.writeInbound(encode(LastHttpContent.EMPTY_LAST_CONTENT));
             assertEquals(LastHttpContent.EMPTY_LAST_CONTENT, channel.readInbound());
-            assertEquals("must not read from channel", 0, readSniffer.readCnt);
+            assertEquals("must not read from channel", 0, readSniffer.readCount);
         }
     }
 
@@ -127,7 +138,7 @@ public void testContinueWithContent() {
             assertNotNull(recvContent);
             assertEquals(MAX_CONTENT_LENGTH, recvContent.content().readableBytes());
             recvContent.release();
-            assertEquals("must not read from channel", 0, readSniffer.readCnt);
+            assertEquals("must not read from channel", 0, readSniffer.readCount);
         }
     }
 
@@ -141,7 +152,7 @@ public void testExpectationFailed() {
         channel.writeInbound(encode(sendRequest));
         var resp = (FullHttpResponse) channel.readOutbound();
         assertEquals(HttpResponseStatus.EXPECTATION_FAILED, resp.status());
-        assertEquals("expect 2 reads, one from size handler and HTTP decoder will emit LastHttpContent", 2, readSniffer.readCnt);
+        assertEquals("expect 2 reads, one from size handler and HTTP decoder will emit LastHttpContent", 2, readSniffer.readCount);
         assertFalse(channel.isOpen());
         resp.release();
     }
@@ -160,7 +171,7 @@ public void testEntityTooLarge() {
             assertEquals(HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, resp.status());
             assertNull("request should not pass", channel.readInbound());
             assertTrue("should not close channel", channel.isOpen());
-            assertEquals("must read from channel", i + 1, readSniffer.readCnt);
+            assertEquals("must read from channel", i + 1, readSniffer.readCount);
             resp.release();
         }
     }
@@ -199,7 +210,7 @@ public void testMixedContent() {
                 assertEquals("actual content size should match", normalSize, recvContent.content().readableBytes());
                 recvContent.release();
             }
-            assertEquals(expectReadCnt, readSniffer.readCnt);
+            assertEquals(expectReadCnt, readSniffer.readCount);
         }
     }
 
@@ -217,7 +228,7 @@ public void testEntityTooLargeWithContentWithoutExpect() {
             resp.release();
             assertNull("request and content should not pass", channel.readInbound());
             assertTrue("should not close channel", channel.isOpen());
-            assertEquals("expect two reads per loop, one for request and one for content", (i + 1) * 2, readSniffer.readCnt);
+            assertEquals("expect two reads per loop, one for request and one for content", (i + 1) * 2, readSniffer.readCount);
         }
     }
 
@@ -247,7 +258,7 @@ public void testEntityTooLargeWithChunkedContent() {
         var resp = (FullHttpResponse) channel.readOutbound();
         assertEquals("should respond with 413", HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, resp.status());
         assertFalse("should close channel", channel.isOpen());
-        assertEquals("expect read after response", 1, readSniffer.readCnt);
+        assertEquals("expect read after response", 1, readSniffer.readCount);
         resp.release();
     }
 }
diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidatorTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidatorTests.java
index f5215e1505ba3..82bfec06fffe0 100644
--- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidatorTests.java
+++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidatorTests.java
@@ -12,6 +12,7 @@
 import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
 import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.handler.codec.DecoderResult;
 import io.netty.handler.codec.http.DefaultHttpContent;
 import io.netty.handler.codec.http.DefaultHttpRequest;
 import io.netty.handler.codec.http.DefaultLastHttpContent;
@@ -41,9 +42,12 @@ public class Netty4HttpHeaderValidatorTests extends ESTestCase {
     public void setUp() throws Exception {
         super.setUp();
         validatorRequestQueue = new LinkedBlockingQueue<>();
-        channel = new EmbeddedChannel(new Netty4HttpHeaderValidator((httpRequest, channel, listener) -> {
-            validatorRequestQueue.add(new ValidationRequest(httpRequest, channel, listener));
-        }, new ThreadContext(Settings.EMPTY)));
+        channel = new EmbeddedChannel(
+            new Netty4HttpHeaderValidator(
+                (httpRequest, channel, listener) -> validatorRequestQueue.add(new ValidationRequest(httpRequest, channel, listener)),
+                new ThreadContext(Settings.EMPTY)
+            )
+        );
         channel.config().setAutoRead(false);
     }
 
@@ -65,6 +69,15 @@ public void testValidatorReceiveHttpRequest() {
         assertNull(channel.readInbound());
     }
 
+    public void testDecoderFailurePassThrough() {
+        for (var i = 0; i < 1000; i++) {
+            var httpRequest = newHttpRequest();
+            httpRequest.setDecoderResult(DecoderResult.failure(new Exception("bad")));
+            channel.writeInbound(httpRequest);
+            assertEquals(httpRequest, channel.readInbound());
+        }
+    }
+
     /**
      * Sends back-to-back http requests and randomly fail validation.
      * Ensures that invalid requests drop content and valid pass through.
@@ -94,9 +107,9 @@ public void testMixedValidationResults() {
             );
             assertEquals(request, gotRequest);
 
+            channel.writeInbound(content);
+            channel.writeInbound(last);
             if (shouldPassValidation) {
-                channel.writeInbound(content);
-                channel.writeInbound(last);
                 assertEquals("should pass content for valid request", content, channel.readInbound());
                 assertEquals(last, channel.readInbound());
             } else {
diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java
index fdd4cb8008b68..b2e91e49a0746 100644
--- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java
+++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java
@@ -12,7 +12,6 @@
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.ChannelOutboundHandlerAdapter;
 import io.netty.channel.DefaultEventLoop;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.embedded.EmbeddedChannel;
@@ -43,13 +42,15 @@ public class Netty4HttpRequestBodyStreamTests extends ESTestCase {
     static HttpBody.ChunkHandler discardHandler = (chunk, isLast) -> chunk.close();
     private final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
     private EmbeddedChannel channel;
+    private ReadSniffer readSniffer;
     private Netty4HttpRequestBodyStream stream;
 
     @Override
     public void setUp() throws Exception {
         super.setUp();
         channel = new EmbeddedChannel();
-        channel.pipeline().addLast("flow", new FlowControlHandler());
+        readSniffer = new ReadSniffer();
+        channel.pipeline().addLast(new FlowControlHandler(), readSniffer);
         channel.config().setAutoRead(false);
         channel.pipeline().addLast(new SimpleChannelInboundHandler(false) {
             @Override
@@ -72,8 +73,8 @@ public void tearDown() throws Exception {
         stream.close();
     }
 
-    // ensures all received chunks can be flushed downstream
-    public void testFlushAllReceivedChunks() {
+    // ensures all chunks are passed to downstream
+    public void testPassAllChunks() {
         var chunks = new ArrayList();
         var totalBytes = new AtomicInteger();
         stream.setHandler((chunk, isLast) -> {
@@ -81,32 +82,35 @@ public void testFlushAllReceivedChunks() {
             totalBytes.addAndGet(chunk.length());
             chunk.close();
         });
-
         var chunkSize = 1024;
         var totalChunks = randomIntBetween(1, 100);
         for (int i = 0; i < totalChunks; i++) {
             channel.writeInbound(randomContent(chunkSize));
             stream.next();
             channel.runPendingTasks();
-            assertEquals("should receive all chunks as single composite", i + 1, chunks.size());
+
         }
+        assertEquals(totalChunks, chunks.size());
         assertEquals(chunkSize * totalChunks, totalBytes.get());
     }
 
     // ensures that we read from channel after last chunk
     public void testChannelReadAfterLastContent() {
-        var readCounter = new AtomicInteger();
-        channel.pipeline().addAfter("flow", "read-counter", new ChannelOutboundHandlerAdapter() {
-            @Override
-            public void read(ChannelHandlerContext ctx) throws Exception {
-                readCounter.incrementAndGet();
-                super.read(ctx);
-            }
-        });
         channel.writeInbound(randomLastContent(10));
         stream.next();
         channel.runPendingTasks();
-        assertEquals("should have at least 2 reads, one for last content, and one after last", 2, readCounter.get());
+        assertEquals("should have at least 2 reads, one for last content, and one after last", 2, readSniffer.readCount);
+    }
+
+    // ensures when stream is closing we read and discard chunks
+    public void testReadAndReleaseOnClosing() {
+        var unexpectedChunk = new AtomicBoolean();
+        stream.setHandler((chunk, isLast) -> unexpectedChunk.set(true));
+        stream.close();
+        channel.writeInbound(randomContent(1024));
+        channel.writeInbound(randomLastContent(0));
+        assertFalse("chunk should be discarded", unexpectedChunk.get());
+        assertEquals("expect 3 reads, a first from stream.close, and other two after chunks", 3, readSniffer.readCount);
     }
 
     public void testReadFromHasCorrectThreadContext() throws InterruptedException {
diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/ReadSniffer.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/ReadSniffer.java
index a9e593da55599..af6844883b68f 100644
--- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/ReadSniffer.java
+++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/ReadSniffer.java
@@ -32,11 +32,11 @@
  */
 public class ReadSniffer extends ChannelOutboundHandlerAdapter {
 
-    int readCnt;
+    int readCount;
 
     @Override
     public void read(ChannelHandlerContext ctx) throws Exception {
-        readCnt++;
+        readCount++;
         super.read(ctx);
     }
 }

From 6b3083e2e90e7afd54f70fc857beba24b6225c9d Mon Sep 17 00:00:00 2001
From: Mikhail Berezovskiy 
Date: Fri, 11 Apr 2025 11:28:21 -0700
Subject: [PATCH 12/12] feedback

---
 .../netty4/Netty4HttpHeaderValidator.java     | 47 +++++++++++--------
 .../netty4/Netty4HttpPipeliningHandler.java   |  5 +-
 2 files changed, 31 insertions(+), 21 deletions(-)

diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java
index 99ad4932799cd..668780fc90665 100644
--- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java
+++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java
@@ -27,8 +27,7 @@ public class Netty4HttpHeaderValidator extends ChannelDuplexHandler {
 
     private final HttpValidator validator;
     private final ThreadContext threadContext;
-    private boolean droppingContent;
-    private boolean validatingRequest;
+    private State state;
 
     public Netty4HttpHeaderValidator(HttpValidator validator, ThreadContext threadContext) {
         this.validator = validator;
@@ -37,20 +36,22 @@ public Netty4HttpHeaderValidator(HttpValidator validator, ThreadContext threadCo
 
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-        if (msg instanceof HttpObject httpObject) {
-            if (httpObject.decoderResult().isFailure()) {
-                ctx.fireChannelRead(httpObject); // pass-through for decoding failures
+        assert msg instanceof HttpObject;
+        var httpObject = (HttpObject) msg;
+        if (httpObject.decoderResult().isFailure()) {
+            ctx.fireChannelRead(httpObject); // pass-through for decoding failures
+        } else {
+            if (msg instanceof HttpRequest request) {
+                validate(ctx, request);
             } else {
-                if (msg instanceof HttpRequest request) {
-                    validate(ctx, request);
-                } else if (msg instanceof HttpContent content) {
-                    if (droppingContent) {
-                        content.release();
-                        ctx.read();
-                    } else {
-                        assert validatingRequest == false : "unexpected content before validation completed";
-                        ctx.fireChannelRead(content);
-                    }
+                assert msg instanceof HttpContent;
+                var content = (HttpContent) msg;
+                if (state == State.DROPPING) {
+                    content.release();
+                    ctx.read();
+                } else {
+                    assert state == State.PASSING : "unexpected content before validation completed";
+                    ctx.fireChannelRead(content);
                 }
             }
         }
@@ -60,15 +61,14 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
     public void read(ChannelHandlerContext ctx) throws Exception {
         // until validation is completed we can ignore read calls,
         // once validation is finished HttpRequest will be fired and downstream can read from there
-        if (validatingRequest == false) {
+        if (state != State.VALIDATING) {
             ctx.read();
         }
     }
 
     void validate(ChannelHandlerContext ctx, HttpRequest request) {
         assert Transports.assertDefaultThreadContext(threadContext);
-        droppingContent = false;
-        validatingRequest = true;
+        state = State.VALIDATING;
         ActionListener.run(
             // this prevents thread-context changes to propagate to the validation listener
             // atm, the validation listener submits to the event loop executor, which doesn't know about the ES thread-context,
@@ -104,11 +104,18 @@ void handleValidationResult(ChannelHandlerContext ctx, HttpRequest request, @Nul
         ctx.channel().eventLoop().execute(() -> {
             if (validationError != null) {
                 request.setDecoderResult(DecoderResult.failure(validationError));
-                droppingContent = true;
+                state = State.DROPPING;
+            } else {
+                state = State.PASSING;
             }
-            validatingRequest = false;
             ctx.fireChannelRead(request);
         });
     }
 
+    private enum State {
+        PASSING,
+        VALIDATING,
+        DROPPING
+    }
+
 }
diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java
index 5bec94c2e8a72..227b925b79276 100644
--- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java
+++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java
@@ -118,6 +118,7 @@ public Netty4HttpPipeliningHandler(
     @Override
     public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
         activityTracker.startActivity();
+        boolean shouldRead = true;
         try {
             if (msg instanceof HttpRequest request) {
                 final Netty4HttpRequest netty4HttpRequest;
@@ -140,19 +141,21 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
                         var contentStream = new Netty4HttpRequestBodyStream(ctx, serverTransport.getThreadPool().getThreadContext());
                         currentRequestStream = contentStream;
                         netty4HttpRequest = new Netty4HttpRequest(readSequence++, request, contentStream);
+                        shouldRead = false;
                     }
                 }
                 handlePipelinedRequest(ctx, netty4HttpRequest);
             } else {
                 assert msg instanceof HttpContent : "expect HttpContent got " + msg;
                 assert currentRequestStream != null : "current stream must exists before handling http content";
+                shouldRead = false;
                 currentRequestStream.handleNettyContent((HttpContent) msg);
                 if (msg instanceof LastHttpContent) {
                     currentRequestStream = null;
                 }
             }
         } finally {
-            if (currentRequestStream == null) {
+            if (shouldRead) {
                 ctx.channel().eventLoop().execute(ctx::read);
             }
             activityTracker.stopActivity();