diff --git a/docs/changelog/127817.yaml b/docs/changelog/127817.yaml new file mode 100644 index 0000000000000..7c0f78b39a809 --- /dev/null +++ b/docs/changelog/127817.yaml @@ -0,0 +1,5 @@ +pr: 127817 +summary: Replace auto-read with proper flow-control in HTTP pipeline +area: Network +type: enhancement +issues: [] diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java index 9615286c65235..ef9cd8fb5ced9 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java @@ -208,7 +208,6 @@ public void testClientConnectionCloseMidStream() throws Exception { // await stream handler is ready and request full content var handler = clientContext.awaitRestChannelAccepted(opaqueId); - assertBusy(() -> assertNotEquals(0, handler.stream.bufSize())); assertFalse(handler.isClosed()); @@ -218,7 +217,6 @@ public void testClientConnectionCloseMidStream() throws Exception { assertEquals(requestTransmittedLength, handler.readUntilClose()); assertTrue(handler.isClosed()); - assertEquals(0, handler.stream.bufSize()); } } @@ -235,7 +233,6 @@ public void testServerCloseConnectionMidStream() throws Exception { // await stream handler is ready and request full content var handler = clientContext.awaitRestChannelAccepted(opaqueId); - assertBusy(() -> assertNotEquals(0, handler.stream.bufSize())); assertFalse(handler.isClosed()); // terminate connection on server and wait resources are released @@ -244,7 +241,6 @@ public void testServerCloseConnectionMidStream() throws Exception { handler.channel.request().getHttpChannel().close(); assertThat(safeGet(exceptionFuture), instanceOf(ClosedChannelException.class)); assertTrue(handler.isClosed()); - assertBusy(() -> assertEquals(0, handler.stream.bufSize())); } } @@ -260,7 +256,6 @@ public void testServerExceptionMidStream() throws Exception { // await stream handler is ready and request full content var handler = clientContext.awaitRestChannelAccepted(opaqueId); - assertBusy(() -> assertNotEquals(0, handler.stream.bufSize())); assertFalse(handler.isClosed()); // terminate connection on server and wait resources are released @@ -272,7 +267,6 @@ public void testServerExceptionMidStream() throws Exception { final var exception = asInstanceOf(RuntimeException.class, safeGet(exceptionFuture)); assertEquals(ServerRequestHandler.SIMULATED_EXCEPTION_MESSAGE, exception.getMessage()); safeAwait(handler.closedLatch); - assertBusy(() -> assertEquals(0, handler.stream.bufSize())); } } @@ -313,7 +307,7 @@ public void testClientBackpressure() throws Exception { }); handler.readBytes(partSize); } - assertTrue(handler.stream.hasLast()); + assertTrue(handler.receivedLastChunk); } } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/MissingReadDetector.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/MissingReadDetector.java new file mode 100644 index 0000000000000..f456bba8064bd --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/MissingReadDetector.java @@ -0,0 +1,78 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.http.netty4; + +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.util.concurrent.ScheduledFuture; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.time.TimeProvider; +import org.elasticsearch.common.util.concurrent.FutureUtils; + +import java.util.concurrent.TimeUnit; + +/** + * When channel auto-read is disabled handlers are responsible to read from channel. + * But it's hard to detect when read is missing. This helper class print warnings + * when no reads where detected in given time interval. Normally, in tests, 10 seconds is enough + * to avoid test hang for too long, but can be increased if needed. + */ +class MissingReadDetector extends ChannelDuplexHandler { + + private static final Logger logger = LogManager.getLogger(MissingReadDetector.class); + + private final long interval; + private final TimeProvider timer; + private boolean pendingRead; + private long lastRead; + private ScheduledFuture checker; + + MissingReadDetector(TimeProvider timer, long missingReadIntervalMillis) { + this.interval = missingReadIntervalMillis; + this.timer = timer; + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + checker = ctx.channel().eventLoop().scheduleAtFixedRate(() -> { + if (pendingRead == false) { + long now = timer.absoluteTimeInMillis(); + if (now >= lastRead + interval) { + logger.warn("chan-id={} haven't read from channel for [{}ms]", ctx.channel().id(), (now - lastRead)); + } + } + }, interval, interval, TimeUnit.MILLISECONDS); + super.handlerAdded(ctx); + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + if (checker != null) { + FutureUtils.cancel(checker); + } + super.handlerRemoved(ctx); + } + + @Override + public void read(ChannelHandlerContext ctx) throws Exception { + pendingRead = true; + ctx.read(); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + assert ctx.channel().config().isAutoRead() == false : "auto-read must be always disabled"; + pendingRead = false; + lastRead = timer.absoluteTimeInMillis(); + ctx.fireChannelRead(msg); + } +} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java index 0294b4626496c..479d053937e1a 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java @@ -15,6 +15,7 @@ import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpRequestDecoder; +import io.netty.handler.codec.http.LastHttpContent; import org.elasticsearch.http.HttpPreRequest; import org.elasticsearch.http.netty4.internal.HttpHeadersAuthenticatorUtils; @@ -48,6 +49,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } if (aggregating || msg instanceof FullHttpRequest) { super.channelRead(ctx, msg); + if (msg instanceof LastHttpContent == false) { + ctx.read(); // HttpObjectAggregator is tricky with auto-read off, it might not call read again, calling on its behalf + } } else { streamContentSizeHandler.channelRead(ctx, msg); } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpContentSizeHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpContentSizeHandler.java index fee9d227d8310..e121d091b1bdb 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpContentSizeHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpContentSizeHandler.java @@ -123,6 +123,7 @@ private void handleRequest(ChannelHandlerContext ctx, HttpRequest request) { isContinueExpected = true; } else { ctx.writeAndFlush(EXPECTATION_FAILED_CLOSE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE); + ctx.read(); return; } } @@ -136,6 +137,7 @@ private void handleRequest(ChannelHandlerContext ctx, HttpRequest request) { decoder.reset(); } ctx.writeAndFlush(TOO_LARGE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); + ctx.read(); } else { ignoreContent = false; currentContentLength = 0; @@ -150,11 +152,13 @@ private void handleRequest(ChannelHandlerContext ctx, HttpRequest request) { private void handleContent(ChannelHandlerContext ctx, HttpContent msg) { if (ignoreContent) { msg.release(); + ctx.read(); } else { currentContentLength += msg.content().readableBytes(); if (currentContentLength > maxContentLength) { msg.release(); ctx.writeAndFlush(TOO_LARGE_CLOSE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE); + ctx.read(); } else { ctx.fireChannelRead(msg); } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java index 95a68cb52bbdb..668780fc90665 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java @@ -9,249 +9,113 @@ package org.elasticsearch.http.netty4; -import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.DecoderResult; import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpObject; import io.netty.handler.codec.http.HttpRequest; -import io.netty.handler.codec.http.LastHttpContent; -import io.netty.util.ReferenceCountUtil; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.core.Nullable; import org.elasticsearch.http.netty4.internal.HttpValidator; import org.elasticsearch.transport.Transports; -import java.util.ArrayDeque; - -import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.DROPPING_DATA_PERMANENTLY; -import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.DROPPING_DATA_UNTIL_NEXT_REQUEST; -import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.FORWARDING_DATA_UNTIL_NEXT_REQUEST; -import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.QUEUEING_DATA; -import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.WAITING_TO_START; - -public class Netty4HttpHeaderValidator extends ChannelInboundHandlerAdapter { +public class Netty4HttpHeaderValidator extends ChannelDuplexHandler { private final HttpValidator validator; private final ThreadContext threadContext; - private ArrayDeque pending = new ArrayDeque<>(4); - private State state = WAITING_TO_START; + private State state; public Netty4HttpHeaderValidator(HttpValidator validator, ThreadContext threadContext) { this.validator = validator; this.threadContext = threadContext; } - State getState() { - return state; - } - - @SuppressWarnings("fallthrough") @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { assert msg instanceof HttpObject; - final HttpObject httpObject = (HttpObject) msg; - - switch (state) { - case WAITING_TO_START: - assert pending.isEmpty(); - pending.add(ReferenceCountUtil.retain(httpObject)); - requestStart(ctx); - assert state == QUEUEING_DATA; - assert ctx.channel().config().isAutoRead() == false; - break; - case QUEUEING_DATA: - pending.add(ReferenceCountUtil.retain(httpObject)); - break; - case FORWARDING_DATA_UNTIL_NEXT_REQUEST: - assert pending.isEmpty(); - if (httpObject instanceof LastHttpContent) { - state = WAITING_TO_START; - } - ctx.fireChannelRead(httpObject); - break; - case DROPPING_DATA_UNTIL_NEXT_REQUEST: - assert pending.isEmpty(); - if (httpObject instanceof LastHttpContent) { - state = WAITING_TO_START; - } - ReferenceCountUtil.release(httpObject); - break; - case DROPPING_DATA_PERMANENTLY: - assert pending.isEmpty(); - ReferenceCountUtil.release(httpObject); // consume without enqueuing - ctx.channel().config().setAutoRead(false); - break; - } - } - - private void requestStart(ChannelHandlerContext ctx) { - assert state == WAITING_TO_START; - - if (pending.isEmpty()) { - return; - } - - final HttpObject httpObject = pending.getFirst(); - final HttpRequest httpRequest; - if (httpObject instanceof HttpRequest && httpObject.decoderResult().isSuccess()) { - // a properly decoded HTTP start message is expected to begin validation - // anything else is probably an error that the downstream HTTP message aggregator will have to handle - httpRequest = (HttpRequest) httpObject; - } else { - httpRequest = null; - } - - state = QUEUEING_DATA; - ctx.channel().config().setAutoRead(false); - - if (httpRequest == null) { - // this looks like a malformed request and will forward without validation - ctx.channel().eventLoop().execute(() -> forwardFullRequest(ctx)); + var httpObject = (HttpObject) msg; + if (httpObject.decoderResult().isFailure()) { + ctx.fireChannelRead(httpObject); // pass-through for decoding failures } else { - assert Transports.assertDefaultThreadContext(threadContext); - ActionListener.run( - // this prevents thread-context changes to propagate to the validation listener - // atm, the validation listener submits to the event loop executor, which doesn't know about the ES thread-context, - // so this is just a defensive play, in case the code inside the listener changes to not use the event loop executor - ActionListener.assertOnce( - new ContextPreservingActionListener( - threadContext.wrapRestorable(threadContext.newStoredContext()), - // Always explicitly dispatch back to the event loop to prevent reentrancy concerns if we are still on event loop - new ActionListener<>() { - @Override - public void onResponse(Void unused) { - assert Transports.assertDefaultThreadContext(threadContext); - ctx.channel().eventLoop().execute(() -> forwardFullRequest(ctx)); - } - - @Override - public void onFailure(Exception e) { - assert Transports.assertDefaultThreadContext(threadContext); - ctx.channel().eventLoop().execute(() -> forwardRequestWithDecoderExceptionAndNoContent(ctx, e)); - } - } - ) - ), - listener -> { - // this prevents thread-context changes to propagate beyond the validation, as netty worker threads are reused - try (ThreadContext.StoredContext ignore = threadContext.newStoredContext()) { - validator.validate(httpRequest, ctx.channel(), listener); - } + if (msg instanceof HttpRequest request) { + validate(ctx, request); + } else { + assert msg instanceof HttpContent; + var content = (HttpContent) msg; + if (state == State.DROPPING) { + content.release(); + ctx.read(); + } else { + assert state == State.PASSING : "unexpected content before validation completed"; + ctx.fireChannelRead(content); } - ); - } - } - - private void forwardFullRequest(ChannelHandlerContext ctx) { - Transports.assertDefaultThreadContext(threadContext); - assert ctx.channel().eventLoop().inEventLoop(); - assert ctx.channel().config().isAutoRead() == false; - assert state == QUEUEING_DATA; - - ctx.channel().config().setAutoRead(true); - boolean fullRequestForwarded = forwardData(ctx, pending); - - assert fullRequestForwarded || pending.isEmpty(); - if (fullRequestForwarded) { - state = WAITING_TO_START; - requestStart(ctx); - } else { - state = FORWARDING_DATA_UNTIL_NEXT_REQUEST; + } } - - assert state == WAITING_TO_START || state == QUEUEING_DATA || state == FORWARDING_DATA_UNTIL_NEXT_REQUEST; - } - - private void forwardRequestWithDecoderExceptionAndNoContent(ChannelHandlerContext ctx, Exception e) { - Transports.assertDefaultThreadContext(threadContext); - assert ctx.channel().eventLoop().inEventLoop(); - assert ctx.channel().config().isAutoRead() == false; - assert state == QUEUEING_DATA; - - HttpObject messageToForward = pending.getFirst(); - boolean fullRequestDropped = dropData(pending); - if (messageToForward instanceof HttpContent toReplace) { - // if the request to forward contained data (which got dropped), replace with empty data - messageToForward = toReplace.replace(Unpooled.EMPTY_BUFFER); - } - messageToForward.setDecoderResult(DecoderResult.failure(e)); - - ctx.channel().config().setAutoRead(true); - ctx.fireChannelRead(messageToForward); - - assert fullRequestDropped || pending.isEmpty(); - if (fullRequestDropped) { - state = WAITING_TO_START; - requestStart(ctx); - } else { - state = DROPPING_DATA_UNTIL_NEXT_REQUEST; - } - - assert state == WAITING_TO_START || state == QUEUEING_DATA || state == DROPPING_DATA_UNTIL_NEXT_REQUEST; } @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - state = DROPPING_DATA_PERMANENTLY; - while (true) { - if (dropData(pending) == false) { - break; - } + public void read(ChannelHandlerContext ctx) throws Exception { + // until validation is completed we can ignore read calls, + // once validation is finished HttpRequest will be fired and downstream can read from there + if (state != State.VALIDATING) { + ctx.read(); } - super.channelInactive(ctx); } - private static boolean forwardData(ChannelHandlerContext ctx, ArrayDeque pending) { - final int pendingMessages = pending.size(); - try { - HttpObject toForward; - while ((toForward = pending.poll()) != null) { - ctx.fireChannelRead(toForward); - ReferenceCountUtil.release(toForward); // reference cnt incremented when enqueued - if (toForward instanceof LastHttpContent) { - return true; + void validate(ChannelHandlerContext ctx, HttpRequest request) { + assert Transports.assertDefaultThreadContext(threadContext); + state = State.VALIDATING; + ActionListener.run( + // this prevents thread-context changes to propagate to the validation listener + // atm, the validation listener submits to the event loop executor, which doesn't know about the ES thread-context, + // so this is just a defensive play, in case the code inside the listener changes to not use the event loop executor + ActionListener.assertOnce( + new ContextPreservingActionListener( + threadContext.wrapRestorable(threadContext.newStoredContext()), + new ActionListener<>() { + @Override + public void onResponse(Void unused) { + handleValidationResult(ctx, request, null); + } + + @Override + public void onFailure(Exception e) { + handleValidationResult(ctx, request, e); + } + } + ) + ), + listener -> { + // this prevents thread-context changes to propagate beyond the validation, as netty worker threads are reused + try (ThreadContext.StoredContext ignore = threadContext.newStoredContext()) { + validator.validate(request, ctx.channel(), listener); } } - return false; - } finally { - maybeResizePendingDown(pendingMessages, pending); - } + ); } - private static boolean dropData(ArrayDeque pending) { - final int pendingMessages = pending.size(); - try { - HttpObject toDrop; - while ((toDrop = pending.poll()) != null) { - ReferenceCountUtil.release(toDrop, 2); // 1 for enqueuing, 1 for consuming - if (toDrop instanceof LastHttpContent) { - return true; - } + void handleValidationResult(ChannelHandlerContext ctx, HttpRequest request, @Nullable Exception validationError) { + assert Transports.assertDefaultThreadContext(threadContext); + // Always explicitly dispatch back to the event loop to prevent reentrancy concerns if we are still on event loop + ctx.channel().eventLoop().execute(() -> { + if (validationError != null) { + request.setDecoderResult(DecoderResult.failure(validationError)); + state = State.DROPPING; + } else { + state = State.PASSING; } - return false; - } finally { - maybeResizePendingDown(pendingMessages, pending); - } + ctx.fireChannelRead(request); + }); } - private static void maybeResizePendingDown(int largeSize, ArrayDeque pending) { - if (pending.size() <= 4 && largeSize > 32) { - // Prevent the ArrayDeque from becoming forever large due to a single large message. - ArrayDeque old = pending; - pending = new ArrayDeque<>(4); - pending.addAll(old); - } + private enum State { + PASSING, + VALIDATING, + DROPPING } - enum State { - WAITING_TO_START, - QUEUEING_DATA, - FORWARDING_DATA_UNTIL_NEXT_REQUEST, - DROPPING_DATA_UNTIL_NEXT_REQUEST, - DROPPING_DATA_PERMANENTLY - } } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java index 4809f1a1a275b..227b925b79276 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java @@ -118,6 +118,7 @@ public Netty4HttpPipeliningHandler( @Override public void channelRead(final ChannelHandlerContext ctx, final Object msg) { activityTracker.startActivity(); + boolean shouldRead = true; try { if (msg instanceof HttpRequest request) { final Netty4HttpRequest netty4HttpRequest; @@ -137,25 +138,26 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) { netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest); currentRequestStream = null; } else { - var contentStream = new Netty4HttpRequestBodyStream( - ctx.channel(), - serverTransport.getThreadPool().getThreadContext(), - activityTracker - ); + var contentStream = new Netty4HttpRequestBodyStream(ctx, serverTransport.getThreadPool().getThreadContext()); currentRequestStream = contentStream; netty4HttpRequest = new Netty4HttpRequest(readSequence++, request, contentStream); + shouldRead = false; } } handlePipelinedRequest(ctx, netty4HttpRequest); } else { assert msg instanceof HttpContent : "expect HttpContent got " + msg; assert currentRequestStream != null : "current stream must exists before handling http content"; + shouldRead = false; currentRequestStream.handleNettyContent((HttpContent) msg); if (msg instanceof LastHttpContent) { currentRequestStream = null; } } } finally { + if (shouldRead) { + ctx.channel().eventLoop().execute(ctx::read); + } activityTracker.stopActivity(); } } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java index 88b4518c8de89..f7c9d1404c612 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java @@ -9,14 +9,11 @@ package org.elasticsearch.http.netty4; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.CompositeByteBuf; -import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.LastHttpContent; -import org.elasticsearch.common.network.ThreadWatchdog; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.Releasables; import org.elasticsearch.http.HttpBody; @@ -27,34 +24,23 @@ /** * Netty based implementation of {@link HttpBody.Stream}. - * This implementation utilize {@link io.netty.channel.ChannelConfig#setAutoRead(boolean)} - * to prevent entire payload buffering. But sometimes upstream can send few chunks of data despite - * autoRead=off. In this case chunks will be buffered until downstream calls {@link Stream#next()} */ public class Netty4HttpRequestBodyStream implements HttpBody.Stream { - private final Channel channel; - private final ChannelFutureListener closeListener = future -> doClose(); private final List tracingHandlers = new ArrayList<>(4); private final ThreadContext threadContext; - private final ThreadWatchdog.ActivityTracker activityTracker; - private ByteBuf buf; - private boolean requested = false; + private final ChannelHandlerContext ctx; private boolean closing = false; + private boolean readLastChunk = false; private HttpBody.ChunkHandler handler; private ThreadContext.StoredContext requestContext; + private final ChannelFutureListener closeListener = future -> doClose(); - // used in tests - private volatile int bufSize = 0; - private volatile boolean hasLast = false; - - public Netty4HttpRequestBodyStream(Channel channel, ThreadContext threadContext, ThreadWatchdog.ActivityTracker activityTracker) { - this.channel = channel; + public Netty4HttpRequestBodyStream(ChannelHandlerContext ctx, ThreadContext threadContext) { + this.ctx = ctx; this.threadContext = threadContext; this.requestContext = threadContext.newStoredContext(); - this.activityTracker = activityTracker; - Netty4Utils.addListener(channel.closeFuture(), closeListener); - channel.config().setAutoRead(false); + Netty4Utils.addListener(ctx.channel().closeFuture(), closeListener); } @Override @@ -64,6 +50,7 @@ public ChunkHandler handler() { @Override public void setHandler(ChunkHandler chunkHandler) { + assert ctx.channel().eventLoop().inEventLoop() : Thread.currentThread().getName(); this.handler = chunkHandler; } @@ -73,98 +60,51 @@ public void addTracingHandler(ChunkHandler chunkHandler) { tracingHandlers.add(chunkHandler); } + private void read() { + ctx.channel().eventLoop().execute(ctx::read); + } + @Override public void next() { assert handler != null : "handler must be set before requesting next chunk"; requestContext = threadContext.newStoredContext(); - channel.eventLoop().submit(() -> { - activityTracker.startActivity(); - requested = true; - try { - if (closing) { - return; - } - if (buf == null) { - channel.read(); - } else { - send(); - } - } catch (Throwable e) { - channel.pipeline().fireExceptionCaught(e); - } finally { - activityTracker.stopActivity(); - } - }); + read(); } public void handleNettyContent(HttpContent httpContent) { - assert hasLast == false : "receive http content on completed stream"; - hasLast = httpContent instanceof LastHttpContent; + assert ctx.channel().eventLoop().inEventLoop() : Thread.currentThread().getName(); if (closing) { httpContent.release(); + read(); } else { - addChunk(httpContent.content()); - if (requested) { - send(); - } - } - } - - // adds chunk to current buffer, will allocate composite buffer when need to hold more than 1 chunk - private void addChunk(ByteBuf chunk) { - assert chunk != null; - if (buf == null) { - buf = chunk; - } else if (buf instanceof CompositeByteBuf comp) { - comp.addComponent(true, chunk); - } else { - var comp = channel.alloc().compositeBuffer(); - comp.addComponent(true, buf); - comp.addComponent(true, chunk); - buf = comp; - } - bufSize = buf.readableBytes(); - } - - // visible for test - int bufSize() { - return bufSize; - } - - // visible for test - boolean hasLast() { - return hasLast; - } - - private void send() { - assert requested; - assert handler != null : "must set handler before receiving next chunk"; - var bytesRef = Netty4Utils.toReleasableBytesReference(buf); - requested = false; - buf = null; - bufSize = 0; - try (var ignored = threadContext.restoreExistingContext(requestContext)) { - for (var tracer : tracingHandlers) { - tracer.onNext(bytesRef, hasLast); + assert readLastChunk == false; + try (var ignored = threadContext.restoreExistingContext(requestContext)) { + var isLast = httpContent instanceof LastHttpContent; + var buf = Netty4Utils.toReleasableBytesReference(httpContent.content()); + for (var tracer : tracingHandlers) { + tracer.onNext(buf, isLast); + } + handler.onNext(buf, isLast); + if (isLast) { + readLastChunk = true; + ctx.channel().closeFuture().removeListener(closeListener); + read(); + } } - handler.onNext(bytesRef, hasLast); - } - if (hasLast) { - channel.config().setAutoRead(true); - channel.closeFuture().removeListener(closeListener); } } @Override public void close() { - if (channel.eventLoop().inEventLoop()) { + if (ctx.channel().eventLoop().inEventLoop()) { doClose(); } else { - channel.eventLoop().submit(this::doClose); + ctx.channel().eventLoop().submit(this::doClose); } } private void doClose() { + assert ctx.channel().eventLoop().inEventLoop() : Thread.currentThread().getName(); closing = true; try (var ignored = threadContext.restoreExistingContext(requestContext)) { for (var tracer : tracingHandlers) { @@ -174,11 +114,8 @@ private void doClose() { handler.close(); } } - if (buf != null) { - buf.release(); - buf = null; - bufSize = 0; + if (readLastChunk == false) { + read(); } - channel.config().setAutoRead(true); } } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index 9ffa4b479be17..c8f2d75d18a6f 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -29,6 +29,7 @@ import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponseEncoder; import io.netty.handler.codec.http.HttpUtil; +import io.netty.handler.flow.FlowControlHandler; import io.netty.handler.ssl.SslHandler; import io.netty.handler.timeout.ReadTimeoutException; import io.netty.handler.timeout.ReadTimeoutHandler; @@ -46,6 +47,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.Assertions; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Nullable; import org.elasticsearch.http.AbstractHttpServerTransport; @@ -317,6 +319,9 @@ protected HttpChannelHandler( @Override protected void initChannel(Channel ch) throws Exception { + // auto-read must be disabled all the time + ch.config().setAutoRead(false); + Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(ch); ch.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel); if (acceptChannelPredicate != null) { @@ -364,6 +369,15 @@ protected HttpMessage createMessage(String[] initialLine) throws Exception { } decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR); ch.pipeline().addLast("decoder", decoder); // parses the HTTP bytes request into HTTP message pieces + + // from this point in pipeline every handler must call ctx or channel #read() when ready to process next HTTP part + ch.pipeline().addLast(new FlowControlHandler()); + if (Assertions.ENABLED) { + // missing reads are hard to catch, but we can detect absence of reads within interval + long missingReadIntervalMs = 10_000; + ch.pipeline().addLast(new MissingReadDetector(transport.threadPool, missingReadIntervalMs)); + } + if (httpValidator != null) { // runs a validation function on the first HTTP message piece which contains all the headers // if validation passes, the pieces of that particular request are forwarded, otherwise they are discarded @@ -415,12 +429,19 @@ protected Result beginEncode(HttpResponse httpResponse, String acceptEncoding) t if (ResourceLeakDetector.isEnabled()) { ch.pipeline().addLast(new Netty4LeakDetectionHandler()); } + // See https://github.com/netty/netty/issues/15053: the combination of FlowControlHandler and HttpContentDecompressor above + // can emit multiple chunks per read, but HttpBody.Stream requires chunks to arrive one-at-a-time so until that issue is + // resolved we must add another flow controller here: + ch.pipeline().addLast(new FlowControlHandler()); ch.pipeline() .addLast( "pipelining", new Netty4HttpPipeliningHandler(transport.pipeliningMaxEvents, transport, threadWatchdogActivityTracker) ); transport.serverAcceptedChannel(nettyHttpChannel); + + // make very first read call, since auto-read is disabled; following reads must come from the handlers + ch.read(); } @Override diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpContentSizeHandlerTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpContentSizeHandlerTests.java index 36399c8d6d7a5..edbaa3b62d187 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpContentSizeHandlerTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpContentSizeHandlerTests.java @@ -12,6 +12,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.codec.DecoderResult; import io.netty.handler.codec.http.DefaultHttpContent; import io.netty.handler.codec.http.DefaultHttpRequest; import io.netty.handler.codec.http.DefaultLastHttpContent; @@ -40,6 +41,7 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase { private static final int REPS = 1000; private EmbeddedChannel channel; private EmbeddedChannel encoder; // channel to encode HTTP objects into bytes + private ReadSniffer readSniffer; private static HttpContent httpContent(int size) { return new DefaultHttpContent(Unpooled.wrappedBuffer(randomByteArrayOfLength(size))); @@ -68,7 +70,20 @@ public void setUp() throws Exception { super.setUp(); var decoder = new HttpRequestDecoder(); encoder = new EmbeddedChannel(new HttpRequestEncoder()); - channel = new EmbeddedChannel(decoder, new Netty4HttpContentSizeHandler(decoder, MAX_CONTENT_LENGTH)); + readSniffer = new ReadSniffer(); + channel = new EmbeddedChannel(); + channel.config().setAutoRead(false); + channel.pipeline().addLast(decoder, readSniffer, new Netty4HttpContentSizeHandler(decoder, MAX_CONTENT_LENGTH)); + } + + public void testDecodingFailurePassThrough() { + for (var i = 0; i < REPS; i++) { + var sendReq = httpRequest(); + sendReq.setDecoderResult(DecoderResult.failure(new Exception("bad"))); + channel.writeInbound(sendReq); + assertEquals(sendReq, channel.readInbound()); + } + assertEquals("should not read from channel, failures are handled downstream", 0, readSniffer.readCount); } /** @@ -85,6 +100,7 @@ public void testContinue() { assertFalse(HttpUtil.is100ContinueExpected(recvRequest)); channel.writeInbound(encode(LastHttpContent.EMPTY_LAST_CONTENT)); assertEquals(LastHttpContent.EMPTY_LAST_CONTENT, channel.readInbound()); + assertEquals("must not read from channel", 0, readSniffer.readCount); } } @@ -99,6 +115,7 @@ public void testWithoutContinue() { assertNotNull("request should pass", channel.readInbound()); channel.writeInbound(encode(LastHttpContent.EMPTY_LAST_CONTENT)); assertEquals(LastHttpContent.EMPTY_LAST_CONTENT, channel.readInbound()); + assertEquals("must not read from channel", 0, readSniffer.readCount); } } @@ -121,6 +138,7 @@ public void testContinueWithContent() { assertNotNull(recvContent); assertEquals(MAX_CONTENT_LENGTH, recvContent.content().readableBytes()); recvContent.release(); + assertEquals("must not read from channel", 0, readSniffer.readCount); } } @@ -134,6 +152,7 @@ public void testExpectationFailed() { channel.writeInbound(encode(sendRequest)); var resp = (FullHttpResponse) channel.readOutbound(); assertEquals(HttpResponseStatus.EXPECTATION_FAILED, resp.status()); + assertEquals("expect 2 reads, one from size handler and HTTP decoder will emit LastHttpContent", 2, readSniffer.readCount); assertFalse(channel.isOpen()); resp.release(); } @@ -152,6 +171,7 @@ public void testEntityTooLarge() { assertEquals(HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, resp.status()); assertNull("request should not pass", channel.readInbound()); assertTrue("should not close channel", channel.isOpen()); + assertEquals("must read from channel", i + 1, readSniffer.readCount); resp.release(); } } @@ -160,11 +180,13 @@ public void testEntityTooLarge() { * Mixed load of oversized and normal requests with Exepct:100-Continue. */ public void testMixedContent() { + var expectReadCnt = 0; for (int i = 0; i < REPS; i++) { var isOversized = randomBoolean(); var sendRequest = httpRequest(); HttpUtil.set100ContinueExpected(sendRequest, true); if (isOversized) { + expectReadCnt++; HttpUtil.setContentLength(sendRequest, OVERSIZED_LENGTH); channel.writeInbound(encode(sendRequest)); var resp = (FullHttpResponse) channel.readOutbound(); @@ -188,6 +210,7 @@ public void testMixedContent() { assertEquals("actual content size should match", normalSize, recvContent.content().readableBytes()); recvContent.release(); } + assertEquals(expectReadCnt, readSniffer.readCount); } } @@ -205,6 +228,7 @@ public void testEntityTooLargeWithContentWithoutExpect() { resp.release(); assertNull("request and content should not pass", channel.readInbound()); assertTrue("should not close channel", channel.isOpen()); + assertEquals("expect two reads per loop, one for request and one for content", (i + 1) * 2, readSniffer.readCount); } } @@ -234,7 +258,7 @@ public void testEntityTooLargeWithChunkedContent() { var resp = (FullHttpResponse) channel.readOutbound(); assertEquals("should respond with 413", HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, resp.status()); assertFalse("should close channel", channel.isOpen()); + assertEquals("expect read after response", 1, readSniffer.readCount); resp.release(); } - } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderThreadContextTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderThreadContextTests.java index 9a12ba75d7742..957b5fd066ff2 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderThreadContextTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderThreadContextTests.java @@ -19,6 +19,7 @@ import io.netty.handler.codec.http.DefaultLastHttpContent; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.flow.FlowControlHandler; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -52,7 +53,8 @@ public class Netty4HttpHeaderThreadContextTests extends ESTestCase { @Override public void setUp() throws Exception { super.setUp(); - channel = new EmbeddedChannel(); + channel = new EmbeddedChannel(new FlowControlHandler()); + channel.config().setAutoRead(false); threadPool = new TestThreadPool(TEST_MOCK_TRANSPORT_THREAD_PREFIX); } @@ -181,6 +183,7 @@ private void sendRequestThrough(boolean success, Semaphore validationDone) throw threadPool.generic().submit(() -> { DefaultHttpRequest request1 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); channel.writeInbound(request1); + channel.read(); DefaultHttpContent content1 = randomBoolean() ? new DefaultHttpContent(Unpooled.buffer(4)) : null; if (content1 != null) { channel.writeInbound(content1); @@ -196,9 +199,11 @@ private void sendRequestThrough(boolean success, Semaphore validationDone) throw } channel.runPendingTasks(); assertThat(channel.readInbound(), sameInstance(request1)); + channel.read(); if (content1 != null && success) { assertThat(channel.readInbound(), sameInstance(content1)); } + channel.read(); if (success) { assertThat(channel.readInbound(), sameInstance(lastContent1)); } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidatorTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidatorTests.java index 1c0b434105f28..d29894a149a4f 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidatorTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidatorTests.java @@ -9,766 +9,158 @@ package org.elasticsearch.http.netty4; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.DecoderResult; -import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.DefaultHttpContent; import io.netty.handler.codec.http.DefaultHttpRequest; import io.netty.handler.codec.http.DefaultLastHttpContent; -import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.LastHttpContent; -import io.netty.util.AsciiString; +import io.netty.handler.flow.FlowControlHandler; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.ValidationException; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.http.netty4.internal.HttpValidator; import org.elasticsearch.test.ESTestCase; -import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.concurrent.atomic.AtomicReference; - -import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.DROPPING_DATA_UNTIL_NEXT_REQUEST; -import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.FORWARDING_DATA_UNTIL_NEXT_REQUEST; -import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.QUEUEING_DATA; -import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.WAITING_TO_START; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.nullValue; -import static org.hamcrest.Matchers.sameInstance; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; public class Netty4HttpHeaderValidatorTests extends ESTestCase { - - private final AtomicReference header = new AtomicReference<>(); - private final AtomicReference> listener = new AtomicReference<>(); private EmbeddedChannel channel; - private Netty4HttpHeaderValidator netty4HttpHeaderValidator; - private final AtomicReference validationException = new AtomicReference<>(); + private BlockingQueue validatorRequestQueue; @Override public void setUp() throws Exception { super.setUp(); - reset(); + validatorRequestQueue = new LinkedBlockingQueue<>(); + channel = new EmbeddedChannel( + new Netty4HttpHeaderValidator( + (httpRequest, channel, listener) -> validatorRequestQueue.add(new ValidationRequest(httpRequest, channel, listener)), + new ThreadContext(Settings.EMPTY) + ) + ); + channel.config().setAutoRead(false); } - private void reset() { - channel = new EmbeddedChannel(); - header.set(null); - listener.set(null); - validationException.set(null); - HttpValidator validator = (httpRequest, channel, validationCompleteListener) -> { - header.set(httpRequest); - final var exception = validationException.get(); - if (exception != null) { - throw exception; - } - listener.set(validationCompleteListener); - }; - netty4HttpHeaderValidator = new Netty4HttpHeaderValidator(validator, new ThreadContext(Settings.EMPTY)); - channel.pipeline().addLast(netty4HttpHeaderValidator); + HttpRequest newHttpRequest() { + return new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, ""); } - public void testValidationPausesAndResumesData() { - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - - final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); - DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4)); - channel.writeInbound(request); - channel.writeInbound(content); - - assertThat(header.get(), sameInstance(request)); - // channel is paused - assertThat(channel.readInbound(), nullValue()); - assertFalse(channel.config().isAutoRead()); - - // channel is resumed - listener.get().onResponse(null); - channel.runPendingTasks(); - - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(FORWARDING_DATA_UNTIL_NEXT_REQUEST)); - assertThat(channel.readInbound(), sameInstance(request)); - assertThat(channel.readInbound(), sameInstance(content)); - assertThat(channel.readInbound(), nullValue()); - assertThat(content.refCnt(), equalTo(1)); - - // channel continues in resumed state after request finishes - DefaultLastHttpContent lastContent = new DefaultLastHttpContent(Unpooled.buffer(4)); - channel.writeInbound(lastContent); - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - assertThat(channel.readInbound(), sameInstance(lastContent)); - assertThat(lastContent.refCnt(), equalTo(1)); - - // channel is again paused while validating next request - channel.writeInbound(request); - assertFalse(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA)); + HttpContent newHttpContent() { + return new DefaultHttpContent(Unpooled.buffer()); } - public void testValidatorDoesNotTweakAutoReadAfterValidationComplete() { - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - - final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); - DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4)); - channel.writeInbound(request); - channel.writeInbound(content); - - assertThat(header.get(), sameInstance(request)); - // channel is paused - assertThat(channel.readInbound(), nullValue()); - assertFalse(channel.config().isAutoRead()); - - // channel is resumed - listener.get().onResponse(null); - channel.runPendingTasks(); - - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(FORWARDING_DATA_UNTIL_NEXT_REQUEST)); - assertThat(channel.readInbound(), sameInstance(request)); - assertThat(channel.readInbound(), sameInstance(content)); - assertThat(channel.readInbound(), nullValue()); - assertThat(content.refCnt(), equalTo(1)); - channel.config().setAutoRead(false); - - channel.writeOutbound(new DefaultHttpContent(Unpooled.buffer(4))); - assertFalse(channel.config().isAutoRead()); + LastHttpContent newLastHttpContent() { + return new DefaultLastHttpContent(); } - public void testContentForwardedAfterValidation() { - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - - final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); - channel.writeInbound(request); - - DefaultHttpContent content1 = null; - if (randomBoolean()) { - content1 = new DefaultHttpContent(Unpooled.buffer(4)); - channel.writeInbound(content1); - } - - assertThat(header.get(), sameInstance(request)); - // channel is paused - assertThat(channel.readInbound(), nullValue()); - assertFalse(channel.config().isAutoRead()); - - // channel is resumed - listener.get().onResponse(null); - channel.runPendingTasks(); - - // resumed channel after successful validation forwards data - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(FORWARDING_DATA_UNTIL_NEXT_REQUEST)); - // write more content to the channel after validation passed - DefaultHttpContent content2 = new DefaultHttpContent(Unpooled.buffer(4)); - channel.writeInbound(content2); - assertThat(channel.readInbound(), sameInstance(request)); - DefaultHttpContent content3 = new DefaultHttpContent(Unpooled.buffer(4)); - channel.writeInbound(content3); - if (content1 != null) { - assertThat(channel.readInbound(), sameInstance(content1)); - assertThat(content1.refCnt(), equalTo(1)); - } - assertThat(channel.readInbound(), sameInstance(content2)); - assertThat(content2.refCnt(), equalTo(1)); - DefaultHttpContent content4 = null; - if (randomBoolean()) { - content4 = new DefaultHttpContent(Unpooled.buffer(4)); - channel.writeInbound(content4); - } - assertThat(channel.readInbound(), sameInstance(content3)); - assertThat(content3.refCnt(), equalTo(1)); - if (content4 != null) { - assertThat(channel.readInbound(), sameInstance(content4)); - assertThat(content4.refCnt(), equalTo(1)); - } - - // channel continues in resumed state after request finishes - DefaultLastHttpContent lastContent = new DefaultLastHttpContent(Unpooled.buffer(4)); - channel.writeInbound(lastContent); - - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - assertThat(channel.readInbound(), sameInstance(lastContent)); - assertThat(lastContent.refCnt(), equalTo(1)); - - if (randomBoolean()) { - channel.writeInbound(request); - assertFalse(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA)); - } + public void testValidatorReceiveHttpRequest() { + channel.writeInbound(newHttpRequest()); + assertEquals(1, validatorRequestQueue.size()); + assertNull(channel.readInbound()); } - public void testContentDroppedAfterValidationFailure() { - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - - final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); - channel.writeInbound(request); - - DefaultHttpContent content1 = null; - if (randomBoolean()) { - content1 = new DefaultHttpContent(Unpooled.buffer(4)); - channel.writeInbound(content1); - } - - assertThat(header.get(), sameInstance(request)); - // channel is paused - assertThat(channel.readInbound(), nullValue()); - assertFalse(channel.config().isAutoRead()); - - // channel is resumed - listener.get().onFailure(new ElasticsearchException("Boom")); - channel.runPendingTasks(); - - // resumed channel after failed validation drops data - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(DROPPING_DATA_UNTIL_NEXT_REQUEST)); - // write more content to the channel after validation passed - DefaultHttpContent content2 = new DefaultHttpContent(Unpooled.buffer(4)); - channel.writeInbound(content2); - assertThat(channel.readInbound(), sameInstance(request)); - DefaultHttpContent content3 = new DefaultHttpContent(Unpooled.buffer(4)); - channel.writeInbound(content3); - if (content1 != null) { - assertThat(channel.readInbound(), nullValue()); - assertThat(content1.refCnt(), equalTo(0)); - } - assertThat(channel.readInbound(), nullValue()); // content2 - assertThat(content2.refCnt(), equalTo(0)); - DefaultHttpContent content4 = null; - if (randomBoolean()) { - content4 = new DefaultHttpContent(Unpooled.buffer(4)); - channel.writeInbound(content4); - } - assertThat(channel.readInbound(), nullValue()); // content3 - assertThat(content3.refCnt(), equalTo(0)); - if (content4 != null) { - assertThat(channel.readInbound(), nullValue()); - assertThat(content4.refCnt(), equalTo(0)); - } - - assertThat(channel.readInbound(), nullValue()); // extra read still returns "null" - - // channel continues in resumed state after request finishes - DefaultLastHttpContent lastContent = new DefaultLastHttpContent(Unpooled.buffer(4)); - channel.writeInbound(lastContent); - - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - assertThat(channel.readInbound(), nullValue()); // lastContent - assertThat(lastContent.refCnt(), equalTo(0)); - - if (randomBoolean()) { - channel.writeInbound(new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri")); - assertFalse(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA)); + public void testDecoderFailurePassThrough() { + for (var i = 0; i < 1000; i++) { + var httpRequest = newHttpRequest(); + httpRequest.setDecoderResult(DecoderResult.failure(new Exception("bad"))); + channel.writeInbound(httpRequest); + assertEquals(httpRequest, channel.readInbound()); } } - public void testValidationErrorForwardsAsDecoderErrorMessage() { - for (Exception exception : List.of( - new Exception("Failure"), - new ElasticsearchException("Failure"), - new ElasticsearchSecurityException("Failure") - )) { - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - - final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); - final DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4)); + /** + * Sends back-to-back http requests and randomly fail validation. + * Ensures that invalid requests drop content and valid pass through. + */ + public void testMixedValidationResults() { + for (var i = 0; i < 1000; i++) { + var shouldPassValidation = randomBoolean(); + var request = newHttpRequest(); + var content = newHttpContent(); + var last = newLastHttpContent(); channel.writeInbound(request); - channel.writeInbound(content); - - assertThat(header.get(), sameInstance(request)); - assertThat(channel.readInbound(), nullValue()); - assertFalse(channel.config().isAutoRead()); - - listener.get().onFailure(exception); - channel.runPendingTasks(); - assertTrue(channel.config().isAutoRead()); - DefaultHttpRequest failed = channel.readInbound(); - assertThat(failed, sameInstance(request)); - assertThat(failed.headers().get(HttpHeaderNames.CONNECTION), nullValue()); - assertTrue(failed.decoderResult().isFailure()); - Exception cause = (Exception) failed.decoderResult().cause(); - assertThat(cause, equalTo(exception)); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(DROPPING_DATA_UNTIL_NEXT_REQUEST)); - - assertThat(channel.readInbound(), nullValue()); - assertThat(content.refCnt(), equalTo(0)); - - DefaultLastHttpContent lastContent = new DefaultLastHttpContent(Unpooled.buffer(4)); - channel.writeInbound(lastContent); - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - assertThat(channel.readInbound(), nullValue()); - assertThat(lastContent.refCnt(), equalTo(0)); - - reset(); - } - } - - public void testValidationExceptionForwardsAsDecoderErrorMessage() { - final var exception = new ElasticsearchException("Failure"); - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - - final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); - - validationException.set(exception); - channel.writeInbound(request); - - assertThat(header.get(), sameInstance(request)); - assertThat(listener.get(), nullValue()); - - channel.runPendingTasks(); - assertTrue(channel.config().isAutoRead()); - DefaultHttpRequest failed = channel.readInbound(); - assertThat(failed, sameInstance(request)); - assertThat(failed.headers().get(HttpHeaderNames.CONNECTION), nullValue()); - assertTrue(failed.decoderResult().isFailure()); - Exception cause = (Exception) failed.decoderResult().cause(); - assertThat(cause, equalTo(exception)); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(DROPPING_DATA_UNTIL_NEXT_REQUEST)); - - final DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4)); - channel.writeInbound(content); - - assertThat(channel.readInbound(), nullValue()); - assertThat(content.refCnt(), equalTo(0)); - - DefaultLastHttpContent lastContent = new DefaultLastHttpContent(Unpooled.buffer(4)); - channel.writeInbound(lastContent); - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - assertThat(channel.readInbound(), nullValue()); - assertThat(lastContent.refCnt(), equalTo(0)); - } - - public void testValidationHandlesMultipleQueuedUpMessages() { - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - - final DefaultHttpRequest request1 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); - DefaultHttpContent content1 = new DefaultHttpContent(Unpooled.buffer(4)); - DefaultLastHttpContent lastContent1 = new DefaultLastHttpContent(Unpooled.buffer(4)); - channel.writeInbound(request1); - channel.writeInbound(content1); - channel.writeInbound(lastContent1); - final DefaultHttpRequest request2 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); - DefaultHttpContent content2 = new DefaultHttpContent(Unpooled.buffer(4)); - DefaultLastHttpContent lastContent2 = new DefaultLastHttpContent(Unpooled.buffer(4)); - channel.writeInbound(request2); - channel.writeInbound(content2); - channel.writeInbound(lastContent2); - - assertThat(header.get(), sameInstance(request1)); - assertThat(channel.readInbound(), nullValue()); - assertFalse(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA)); - - listener.get().onResponse(null); - channel.runPendingTasks(); - assertThat(channel.readInbound(), sameInstance(request1)); - assertThat(channel.readInbound(), sameInstance(content1)); - assertThat(channel.readInbound(), sameInstance(lastContent1)); - assertThat(content1.refCnt(), equalTo(1)); - assertThat(lastContent1.refCnt(), equalTo(1)); - - assertThat(header.get(), sameInstance(request2)); - - assertFalse(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA)); - assertThat(channel.readInbound(), nullValue()); - - listener.get().onResponse(null); - channel.runPendingTasks(); - assertThat(channel.readInbound(), sameInstance(request2)); - assertThat(channel.readInbound(), sameInstance(content2)); - assertThat(channel.readInbound(), sameInstance(lastContent2)); - assertThat(content2.refCnt(), equalTo(1)); - assertThat(lastContent2.refCnt(), equalTo(1)); - - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - assertThat(channel.readInbound(), nullValue()); - } - - public void testValidationFailureRecoversForEnqueued() { - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - - // write 2 requests before validation for the first one fails - final DefaultHttpRequest request1 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); - DefaultHttpContent content1 = new DefaultHttpContent(Unpooled.buffer(4)); - DefaultLastHttpContent lastContent1 = new DefaultLastHttpContent(Unpooled.buffer(4)); - channel.writeInbound(request1); - channel.writeInbound(content1); - channel.writeInbound(lastContent1); - final DefaultHttpRequest request2 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); - DefaultHttpContent content2 = new DefaultHttpContent(Unpooled.buffer(4)); - DefaultLastHttpContent lastContent2 = new DefaultLastHttpContent(Unpooled.buffer(4)); - channel.writeInbound(request2); - channel.writeInbound(content2); - - boolean finishSecondRequest = randomBoolean(); - if (finishSecondRequest) { - channel.writeInbound(lastContent2); - } - - // channel is paused and both requests are queued - assertThat(header.get(), sameInstance(request1)); - assertThat(channel.readInbound(), nullValue()); - assertFalse(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA)); - assertThat(content1.refCnt(), equalTo(2)); - assertThat(lastContent1.refCnt(), equalTo(2)); - assertThat(content2.refCnt(), equalTo(2)); - if (finishSecondRequest) { - assertThat(lastContent2.refCnt(), equalTo(2)); - } - - // validation for the 1st request FAILS - Exception exception = new ElasticsearchException("Boom"); - listener.get().onFailure(exception); - channel.runPendingTasks(); - - // request1 becomes a decoder exception and its content is dropped - assertThat(channel.readInbound(), sameInstance(request1)); - assertThat(request1.headers().get(HttpHeaderNames.CONNECTION), nullValue()); - assertTrue(request1.decoderResult().isFailure()); - Exception cause = (Exception) request1.decoderResult().cause(); - assertThat(cause, equalTo(exception)); - assertThat(content1.refCnt(), equalTo(0)); // content is dropped - assertThat(lastContent1.refCnt(), equalTo(0)); // content is dropped - assertThat(channel.readInbound(), nullValue()); - - // channel pauses for the validation of the 2nd request - assertThat(header.get(), sameInstance(request2)); - assertFalse(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA)); - assertThat(channel.readInbound(), nullValue()); - - // validation for the 2nd request SUCCEEDS - listener.get().onResponse(null); - channel.runPendingTasks(); - - // 2nd request is forwarded correctly - assertThat(channel.readInbound(), sameInstance(request2)); - assertThat(channel.readInbound(), sameInstance(content2)); - assertThat(content2.refCnt(), equalTo(1)); - - if (finishSecondRequest == false) { - assertThat(netty4HttpHeaderValidator.getState(), equalTo(FORWARDING_DATA_UNTIL_NEXT_REQUEST)); - assertTrue(channel.config().isAutoRead()); - assertThat(channel.readInbound(), nullValue()); - // while in forwarding state the request can continue - if (randomBoolean()) { - DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4)); - channel.writeInbound(content); - assertThat(channel.readInbound(), sameInstance(content)); - assertThat(content.refCnt(), equalTo(1)); + var validationRequest = validatorRequestQueue.poll(); + assertNotNull(validationRequest); + if (shouldPassValidation) { + validationRequest.listener.onResponse(null); + } else { + validationRequest.listener.onFailure(new ValidationException()); } - channel.writeInbound(lastContent2); - } - - assertThat(channel.readInbound(), sameInstance(lastContent2)); - assertThat(lastContent2.refCnt(), equalTo(1)); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - assertTrue(channel.config().isAutoRead()); - } - - public void testValidationFailureRecoversForInbound() { - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - - // write a single request, but don't finish it yet, for which the validation fails - final DefaultHttpRequest request1 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); - DefaultHttpContent content1 = new DefaultHttpContent(Unpooled.buffer(4)); - channel.writeInbound(request1); - channel.writeInbound(content1); - - // channel is paused and the request is queued - assertThat(header.get(), sameInstance(request1)); - assertThat(channel.readInbound(), nullValue()); - assertFalse(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA)); - assertThat(content1.refCnt(), equalTo(2)); - - // validation for the 1st request FAILS - Exception exception = new ElasticsearchException("Boom"); - listener.get().onFailure(exception); - channel.runPendingTasks(); - - // request1 becomes a decoder exception and its content is dropped - assertThat(channel.readInbound(), sameInstance(request1)); - assertThat(request1.headers().get(HttpHeaderNames.CONNECTION), nullValue()); - assertTrue(request1.decoderResult().isFailure()); - Exception cause = (Exception) request1.decoderResult().cause(); - assertThat(cause, equalTo(exception)); - assertThat(content1.refCnt(), equalTo(0)); // content is dropped - assertThat(channel.readInbound(), nullValue()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(DROPPING_DATA_UNTIL_NEXT_REQUEST)); - - if (randomBoolean()) { - channel.writeInbound(new DefaultHttpContent(Unpooled.buffer(4))); - } - DefaultLastHttpContent lastContent1 = new DefaultLastHttpContent(Unpooled.buffer(4)); - channel.writeInbound(lastContent1); - if (randomBoolean()) { - assertThat(channel.readInbound(), nullValue()); - } - assertThat(lastContent1.refCnt(), equalTo(0)); // content is dropped - - // write 2nd request after the 1st one failed validation - final DefaultHttpRequest request2 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); - DefaultHttpContent content2 = new DefaultHttpContent(Unpooled.buffer(4)); - DefaultLastHttpContent lastContent2 = new DefaultLastHttpContent(Unpooled.buffer(4)); - channel.writeInbound(request2); - channel.writeInbound(content2); - boolean finishSecondRequest = randomBoolean(); - if (finishSecondRequest) { - channel.writeInbound(lastContent2); - } - - // channel pauses for the validation of the 2nd request - assertThat(header.get(), sameInstance(request2)); - assertFalse(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA)); - assertThat(channel.readInbound(), nullValue()); - - // validation for the 2nd request SUCCEEDS - listener.get().onResponse(null); - channel.runPendingTasks(); + channel.runPendingTasks(); - // 2nd request is forwarded correctly - assertThat(channel.readInbound(), sameInstance(request2)); - assertThat(channel.readInbound(), sameInstance(content2)); - assertThat(content2.refCnt(), equalTo(1)); + var gotRequest = channel.readInbound(); + assertEquals( + "should set decoder result failure for invalid request", + shouldPassValidation, + ((HttpRequest) gotRequest).decoderResult().isSuccess() + ); + assertEquals(request, gotRequest); - if (finishSecondRequest == false) { - assertThat(netty4HttpHeaderValidator.getState(), equalTo(FORWARDING_DATA_UNTIL_NEXT_REQUEST)); - assertTrue(channel.config().isAutoRead()); - assertThat(channel.readInbound(), nullValue()); - // while in forwarding state the request can continue - if (randomBoolean()) { - DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4)); - channel.writeInbound(content); - assertThat(channel.readInbound(), sameInstance(content)); - assertThat(content.refCnt(), equalTo(1)); + channel.writeInbound(content); + channel.writeInbound(last); + if (shouldPassValidation) { + assertEquals("should pass content for valid request", content, channel.readInbound()); + content.release(); + assertEquals(last, channel.readInbound()); + last.release(); + } else { + assertNull("should drop content for invalid request", channel.readInbound()); } - channel.writeInbound(lastContent2); } - - assertThat(channel.readInbound(), sameInstance(lastContent2)); - assertThat(lastContent2.refCnt(), equalTo(1)); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - assertTrue(channel.config().isAutoRead()); } - public void testValidationSuccessForLargeMessage() { - assertTrue(channel.config().isAutoRead()); + public void testIgnoreReadWhenValidating() { + channel.pipeline().addFirst(new FlowControlHandler()); // catch all inbound messages - final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); - channel.writeInbound(request); + channel.writeInbound(newHttpRequest()); + channel.writeInbound(newLastHttpContent()); // should hold by flow-control-handler + assertNull("nothing should pass yet", channel.readInbound()); - int messageLength = randomIntBetween(32, 128); - for (int i = 0; i < messageLength; ++i) { - channel.writeInbound(new DefaultHttpContent(Unpooled.buffer(4))); - } - channel.writeInbound(new DefaultLastHttpContent(Unpooled.buffer(4))); - boolean followupRequest = randomBoolean(); - if (followupRequest) { - channel.writeInbound(request); - } + channel.read(); + var validationRequest = validatorRequestQueue.poll(); + assertNotNull(validationRequest); - assertThat(header.get(), sameInstance(request)); - assertThat(channel.readInbound(), nullValue()); - assertFalse(channel.config().isAutoRead()); + channel.read(); + assertNull("should ignore read while validating", channel.readInbound()); - listener.get().onResponse(null); + validationRequest.listener.onResponse(null); channel.runPendingTasks(); - if (followupRequest) { - assertFalse(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA)); - } else { - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - } - assertThat(channel.readInbound(), sameInstance(request)); - for (int i = 0; i < messageLength; ++i) { - Object content = channel.readInbound(); - assertThat(content, instanceOf(DefaultHttpContent.class)); - assertThat(((DefaultHttpContent) content).refCnt(), equalTo(1)); - } - assertThat(channel.readInbound(), instanceOf(LastHttpContent.class)); - assertThat(channel.readInbound(), nullValue()); - } - - public void testValidationFailureForLargeMessage() { - assertTrue(channel.config().isAutoRead()); - - final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); - channel.writeInbound(request); + assertTrue("http request should pass", channel.readInbound() instanceof HttpRequest); + assertNull("content should not pass yet, need explicit read", channel.readInbound()); - int messageLength = randomIntBetween(32, 128); - DefaultHttpContent[] messageContents = new DefaultHttpContent[messageLength]; - for (int i = 0; i < messageLength; ++i) { - messageContents[i] = new DefaultHttpContent(Unpooled.buffer(4)); - channel.writeInbound(messageContents[i]); - } - DefaultLastHttpContent lastHttpContent = new DefaultLastHttpContent(Unpooled.buffer(4)); - channel.writeInbound(lastHttpContent); - boolean followupRequest = randomBoolean(); - if (followupRequest) { - channel.writeInbound(new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri")); - } - - assertThat(header.get(), sameInstance(request)); - assertThat(channel.readInbound(), nullValue()); - assertFalse(channel.config().isAutoRead()); - - Exception exception = new ElasticsearchException("Boom"); - listener.get().onFailure(exception); - channel.runPendingTasks(); - if (followupRequest) { - assertFalse(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA)); - } else { - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - } - assertThat(channel.readInbound(), sameInstance(request)); - assertThat(request.headers().get(HttpHeaderNames.CONNECTION), nullValue()); - assertTrue(request.decoderResult().isFailure()); - Exception cause = (Exception) request.decoderResult().cause(); - assertThat(cause, equalTo(exception)); - for (int i = 0; i < messageLength; ++i) { - assertThat(channel.readInbound(), nullValue()); - assertThat(messageContents[i].refCnt(), equalTo(0)); - } - assertThat(channel.readInbound(), nullValue()); - assertThat(lastHttpContent.refCnt(), equalTo(0)); - assertThat(channel.readInbound(), nullValue()); + channel.read(); + asInstanceOf(LastHttpContent.class, channel.readInbound()).release(); } - public void testFullRequestValidationFailure() { - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); + public void testWithFlowControlAndAggregator() { + channel.pipeline().addFirst(new FlowControlHandler()); + channel.pipeline().addLast(new Netty4HttpAggregator(8192, (req) -> true, new HttpRequestDecoder())); - ByteBuf buf = channel.alloc().buffer(); - ByteBufUtil.copy(AsciiString.of("test full http request"), buf); - final DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri", buf); - channel.writeInbound(request); + channel.writeInbound(newHttpRequest()); + channel.writeInbound(newHttpContent()); + channel.writeInbound(newLastHttpContent()); - // request got through to validation - assertThat(header.get(), sameInstance(request)); - // channel is paused - assertThat(channel.readInbound(), nullValue()); - assertFalse(channel.config().isAutoRead()); + channel.read(); + assertNull("should ignore read while validating", channel.readInbound()); - // validation fails - Exception exception = new ElasticsearchException("Boom"); - listener.get().onFailure(exception); + var validationRequest = validatorRequestQueue.poll(); + assertNotNull(validationRequest); + validationRequest.listener.onResponse(null); channel.runPendingTasks(); - // channel is resumed and waiting for next request - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - - DefaultFullHttpRequest throughRequest = channel.readInbound(); - // "through request" contains a decoder exception - assertThat(throughRequest, not(sameInstance(request))); - assertTrue(throughRequest.decoderResult().isFailure()); - // the content is cleared when validation fails - assertThat(new String(ByteBufUtil.getBytes(throughRequest.content()), StandardCharsets.UTF_8), is("")); - assertThat(buf.refCnt(), is(0)); - Exception cause = (Exception) throughRequest.decoderResult().cause(); - assertThat(cause, equalTo(exception)); - } - - public void testFullRequestValidationSuccess() { - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - - ByteBuf buf = channel.alloc().buffer(); - try { - ByteBufUtil.copy(AsciiString.of("test full http request"), buf); - final DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri", buf); - channel.writeInbound(request); - - // request got through to validation - assertThat(header.get(), sameInstance(request)); - // channel is paused - assertThat(channel.readInbound(), nullValue()); - assertFalse(channel.config().isAutoRead()); - - // validation succeeds - listener.get().onResponse(null); - channel.runPendingTasks(); - - // channel is resumed and waiting for next request - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - - DefaultFullHttpRequest throughRequest = channel.readInbound(); - // request goes through unaltered - assertThat(throughRequest, sameInstance(request)); - assertFalse(throughRequest.decoderResult().isFailure()); - // the content is unaltered - assertThat(new String(ByteBufUtil.getBytes(throughRequest.content()), StandardCharsets.UTF_8), is("test full http request")); - assertThat(buf.refCnt(), is(1)); - assertThat(throughRequest.decoderResult().cause(), nullValue()); - } finally { - buf.release(); - } + asInstanceOf(FullHttpRequest.class, channel.readInbound()).release(); } - public void testFullRequestWithDecoderException() { - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - - ByteBuf buf = channel.alloc().buffer(); - try { - ByteBufUtil.copy(AsciiString.of("test full http request"), buf); - // a request with a decoder error prior to validation - final DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri", buf); - Exception cause = new ElasticsearchException("Boom"); - request.setDecoderResult(DecoderResult.failure(cause)); - channel.writeInbound(request); - - // request goes through without invoking the validator - assertThat(header.get(), nullValue()); - assertThat(listener.get(), nullValue()); - // channel is NOT paused - assertTrue(channel.config().isAutoRead()); - assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - - DefaultFullHttpRequest throughRequest = channel.readInbound(); - // request goes through unaltered - assertThat(throughRequest, sameInstance(request)); - assertTrue(throughRequest.decoderResult().isFailure()); - assertThat(throughRequest.decoderResult().cause(), equalTo(cause)); - // the content is unaltered - assertThat(new String(ByteBufUtil.getBytes(throughRequest.content()), StandardCharsets.UTF_8), is("test full http request")); - assertThat(buf.refCnt(), is(1)); - } finally { - buf.release(); - } - } + record ValidationRequest(HttpRequest request, Channel channel, ActionListener listener) {} } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java index 7492737d4f877..b2e91e49a0746 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java @@ -21,7 +21,6 @@ import io.netty.handler.flow.FlowControlHandler; import org.elasticsearch.common.bytes.ReleasableBytesReference; -import org.elasticsearch.common.network.ThreadWatchdog; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.http.HttpBody; @@ -43,17 +42,24 @@ public class Netty4HttpRequestBodyStreamTests extends ESTestCase { static HttpBody.ChunkHandler discardHandler = (chunk, isLast) -> chunk.close(); private final ThreadContext threadContext = new ThreadContext(Settings.EMPTY); private EmbeddedChannel channel; + private ReadSniffer readSniffer; private Netty4HttpRequestBodyStream stream; - private ThreadWatchdog.ActivityTracker activityTracker; @Override public void setUp() throws Exception { super.setUp(); channel = new EmbeddedChannel(); - activityTracker = new ThreadWatchdog.ActivityTracker(); - stream = new Netty4HttpRequestBodyStream(channel, threadContext, activityTracker); - stream.setHandler(discardHandler); // set default handler, each test might override one + readSniffer = new ReadSniffer(); + channel.pipeline().addLast(new FlowControlHandler(), readSniffer); + channel.config().setAutoRead(false); channel.pipeline().addLast(new SimpleChannelInboundHandler(false) { + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + stream = new Netty4HttpRequestBodyStream(ctx, threadContext); + stream.setHandler(discardHandler); // set default handler, each test might override one + super.handlerAdded(ctx); + } + @Override protected void channelRead0(ChannelHandlerContext ctx, HttpContent msg) { stream.handleNettyContent(msg); @@ -67,17 +73,8 @@ public void tearDown() throws Exception { stream.close(); } - // ensures that no chunks are sent downstream without request - public void testEnqueueChunksBeforeRequest() { - var totalChunks = randomIntBetween(1, 100); - for (int i = 0; i < totalChunks; i++) { - channel.writeInbound(randomContent(1024)); - } - assertEquals(totalChunks * 1024, stream.bufSize()); - } - - // ensures all received chunks can be flushed downstream - public void testFlushAllReceivedChunks() { + // ensures all chunks are passed to downstream + public void testPassAllChunks() { var chunks = new ArrayList(); var totalBytes = new AtomicInteger(); stream.setHandler((chunk, isLast) -> { @@ -85,52 +82,35 @@ public void testFlushAllReceivedChunks() { totalBytes.addAndGet(chunk.length()); chunk.close(); }); - var chunkSize = 1024; var totalChunks = randomIntBetween(1, 100); for (int i = 0; i < totalChunks; i++) { channel.writeInbound(randomContent(chunkSize)); + stream.next(); + channel.runPendingTasks(); + } - stream.next(); - channel.runPendingTasks(); - assertEquals("should receive all chunks as single composite", 1, chunks.size()); + assertEquals(totalChunks, chunks.size()); assertEquals(chunkSize * totalChunks, totalBytes.get()); } - // ensures that channel.setAutoRead(true) only when we flush last chunk - public void testSetAutoReadOnLastFlush() { + // ensures that we read from channel after last chunk + public void testChannelReadAfterLastContent() { channel.writeInbound(randomLastContent(10)); - assertFalse("should not auto-read on last content reception", channel.config().isAutoRead()); stream.next(); channel.runPendingTasks(); - assertTrue("should set auto-read once last content is flushed", channel.config().isAutoRead()); + assertEquals("should have at least 2 reads, one for last content, and one after last", 2, readSniffer.readCount); } - // ensures that we read from channel when no current chunks available - // and pass next chunk downstream without holding - public void testReadFromChannel() { - var gotChunks = new ArrayList(); - var gotLast = new AtomicBoolean(false); - stream.setHandler((chunk, isLast) -> { - gotChunks.add(chunk); - gotLast.set(isLast); - chunk.close(); - }); - channel.pipeline().addFirst(new FlowControlHandler()); // block all incoming messages, need explicit channel.read() - var chunkSize = 1024; - var totalChunks = randomIntBetween(1, 32); - for (int i = 0; i < totalChunks - 1; i++) { - channel.writeInbound(randomContent(chunkSize)); - } - channel.writeInbound(randomLastContent(chunkSize)); - - for (int i = 0; i < totalChunks; i++) { - assertEquals("should not enqueue chunks", 0, stream.bufSize()); - stream.next(); - channel.runPendingTasks(); - assertEquals("each next() should produce single chunk", i + 1, gotChunks.size()); - } - assertTrue("should receive last content", gotLast.get()); + // ensures when stream is closing we read and discard chunks + public void testReadAndReleaseOnClosing() { + var unexpectedChunk = new AtomicBoolean(); + stream.setHandler((chunk, isLast) -> unexpectedChunk.set(true)); + stream.close(); + channel.writeInbound(randomContent(1024)); + channel.writeInbound(randomLastContent(0)); + assertFalse("chunk should be discarded", unexpectedChunk.get()); + assertEquals("expect 3 reads, a first from stream.close, and other two after chunks", 3, readSniffer.readCount); } public void testReadFromHasCorrectThreadContext() throws InterruptedException { @@ -142,9 +122,15 @@ public void testReadFromHasCorrectThreadContext() throws InterruptedException { try { // activity tracker requires stream execution in the same thread, setting up stream inside event-loop eventLoop.submit(() -> { - channel = new EmbeddedChannel(); - stream = new Netty4HttpRequestBodyStream(channel, threadContext, new ThreadWatchdog.ActivityTracker()); + channel = new EmbeddedChannel(new FlowControlHandler()); + channel.config().setAutoRead(false); channel.pipeline().addLast(new SimpleChannelInboundHandler(false) { + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + stream = new Netty4HttpRequestBodyStream(ctx, threadContext); + super.handlerAdded(ctx); + } + @Override protected void channelRead0(ChannelHandlerContext ctx, HttpContent msg) { stream.handleNettyContent(msg); @@ -198,18 +184,6 @@ public void close() { } } - public void testStreamNextActivityTracker() { - var t0 = activityTracker.get(); - var N = between(1, 10); - for (int i = 0; i < N; i++) { - channel.writeInbound(randomContent(1024)); - stream.next(); - channel.runPendingTasks(); - } - var t1 = activityTracker.get(); - assertEquals("stream#next() must trigger activity tracker: N*step=" + N + "*2=" + N * 2L + " times", t1, t0 + N * 2L); - } - // ensure that we catch all exceptions and throw them into channel pipeline public void testCatchExceptions() { var gotExceptions = new CountDownLatch(3); // number of tests below diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/ReadSniffer.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/ReadSniffer.java new file mode 100644 index 0000000000000..af6844883b68f --- /dev/null +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/ReadSniffer.java @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.http.netty4; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; + +/** + * Sniffs channel reads, helps detect missing or unexpected ones. + *
+ *     {@code
+ *     chan = new EmbeddedChannel();
+ *     chan.config().setAutoRead(false);
+ *     readSniffer = new ReadSniffer();
+ *     chan.pipeline().addLast(readSniffer, ...otherHandlers);
+ *     ...
+ *     // run test
+ *     ...
+ *     assertEquals("unexpected read", 0, readSniffer.readCnt)
+ *     // or
+ *     assertEquals("exact number of reads", 2, readSniffer.readCnt)
+ *     }
+ * 
+ * + */ +public class ReadSniffer extends ChannelOutboundHandlerAdapter { + + int readCount; + + @Override + public void read(ChannelHandlerContext ctx) throws Exception { + readCount++; + super.read(ctx); + } +} diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportCloseNotifyTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportCloseNotifyTests.java index ec2881b989d0b..4ab0e53ce2bd5 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportCloseNotifyTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportCloseNotifyTests.java @@ -251,7 +251,7 @@ public void close() { server.dispatcher.reqQueue.forEach(r -> r.request.getHttpRequest().release()); server.netty.stop(); server.threadPool.shutdownNow(); - safeAwait(client.netty.config().group().shutdownGracefully()); + safeAwait(client.netty.config().group().shutdownGracefully(0, 0, TimeUnit.SECONDS)); } }