Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/126441.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 126441
summary: Add flow-control and remove auto-read in netty4 http pipeline
area: Network
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,64 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {

private static final int MAX_CONTENT_LENGTH = ByteSizeUnit.MB.toIntBytes(50);

private static long transportStatsRequestBytesSize(Ctx ctx) {
var httpTransport = internalCluster().getInstance(HttpServerTransport.class, ctx.nodeName);
var stats = httpTransport.stats().clientStats();
var bytes = 0L;
for (var s : stats) {
bytes += s.requestSizeBytes();
}
return bytes;
}

static int MBytes(int m) {
return m * 1024 * 1024;
}

static <T> T safePoll(BlockingDeque<T> queue) {
try {
var t = queue.poll(SAFE_AWAIT_TIMEOUT.seconds(), TimeUnit.SECONDS);
assertNotNull("queue is empty", t);
return t;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError(e);
}
}

private static FullHttpRequest fullHttpRequest(String opaqueId, ByteBuf content) {
var req = new DefaultFullHttpRequest(HTTP_1_1, POST, ControlServerRequestPlugin.ROUTE, Unpooled.wrappedBuffer(content));
req.headers().add(CONTENT_LENGTH, content.readableBytes());
req.headers().add(CONTENT_TYPE, APPLICATION_JSON);
req.headers().add(Task.X_OPAQUE_ID_HTTP_HEADER, opaqueId);
return req;
}

private static HttpRequest httpRequest(String opaqueId, int contentLength) {
return httpRequest(ControlServerRequestPlugin.ROUTE, opaqueId, contentLength);
}

private static HttpRequest httpRequest(String uri, String opaqueId, int contentLength) {
var req = new DefaultHttpRequest(HTTP_1_1, POST, uri);
req.headers().add(CONTENT_LENGTH, contentLength);
req.headers().add(CONTENT_TYPE, APPLICATION_JSON);
req.headers().add(Task.X_OPAQUE_ID_HTTP_HEADER, opaqueId);
return req;
}

private static HttpContent randomContent(int size, boolean isLast) {
var buf = Unpooled.wrappedBuffer(randomByteArrayOfLength(size));
if (isLast) {
return new DefaultLastHttpContent(buf);
} else {
return new DefaultHttpContent(buf);
}
}

private static ByteBuf randomByteBuf(int size) {
return Unpooled.wrappedBuffer(randomByteArrayOfLength(size));
}

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings));
Expand Down Expand Up @@ -178,8 +236,6 @@ public void testClientConnectionCloseMidStream() throws Exception {

// await stream handler is ready and request full content
var handler = ctx.awaitRestChannelAccepted(opaqueId);
assertBusy(() -> assertNotEquals(0, handler.stream.bufSize()));

assertFalse(handler.streamClosed);

// terminate client connection
Expand All @@ -190,10 +246,7 @@ public void testClientConnectionCloseMidStream() throws Exception {
handler.stream.next();

// wait for resources to be released
assertBusy(() -> {
assertEquals(0, handler.stream.bufSize());
assertTrue(handler.streamClosed);
});
assertBusy(() -> assertTrue(handler.streamClosed));
}
}

Expand All @@ -208,15 +261,11 @@ public void testServerCloseConnectionMidStream() throws Exception {

// await stream handler is ready and request full content
var handler = ctx.awaitRestChannelAccepted(opaqueId);
assertBusy(() -> assertNotEquals(0, handler.stream.bufSize()));
assertFalse(handler.streamClosed);

// terminate connection on server and wait resources are released
handler.channel.request().getHttpChannel().close();
assertBusy(() -> {
assertEquals(0, handler.stream.bufSize());
assertTrue(handler.streamClosed);
});
assertBusy(() -> assertTrue(handler.streamClosed));
}
}

Expand All @@ -230,16 +279,12 @@ public void testServerExceptionMidStream() throws Exception {

// await stream handler is ready and request full content
var handler = ctx.awaitRestChannelAccepted(opaqueId);
assertBusy(() -> assertNotEquals(0, handler.stream.bufSize()));
assertFalse(handler.streamClosed);

handler.shouldThrowInsideHandleChunk = true;
handler.stream.next();

assertBusy(() -> {
assertEquals(0, handler.stream.bufSize());
assertTrue(handler.streamClosed);
});
assertBusy(() -> assertTrue(handler.streamClosed));
}
}

Expand Down Expand Up @@ -280,7 +325,7 @@ public void testClientBackpressure() throws Exception {
});
handler.readBytes(partSize);
}
assertTrue(handler.stream.hasLast());
assertTrue(handler.recvLast);
}
}

Expand Down Expand Up @@ -385,16 +430,6 @@ public void testBadRequestReleaseQueuedChunks() throws Exception {
}
}

private static long transportStatsRequestBytesSize(Ctx ctx) {
var httpTransport = internalCluster().getInstance(HttpServerTransport.class, ctx.nodeName);
var stats = httpTransport.stats().clientStats();
var bytes = 0L;
for (var s : stats) {
bytes += s.requestSizeBytes();
}
return bytes;
}

/**
* ensures that {@link org.elasticsearch.http.HttpClientStatsTracker} counts streamed content bytes
*/
Expand Down Expand Up @@ -489,63 +524,15 @@ private String opaqueId(int reqNo) {
return getTestName() + "-" + reqNo;
}

static int MBytes(int m) {
return m * 1024 * 1024;
}

static <T> T safePoll(BlockingDeque<T> queue) {
try {
var t = queue.poll(SAFE_AWAIT_TIMEOUT.seconds(), TimeUnit.SECONDS);
assertNotNull("queue is empty", t);
return t;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError(e);
}
}

static FullHttpRequest fullHttpRequest(String opaqueId, ByteBuf content) {
var req = new DefaultFullHttpRequest(HTTP_1_1, POST, ControlServerRequestPlugin.ROUTE, Unpooled.wrappedBuffer(content));
req.headers().add(CONTENT_LENGTH, content.readableBytes());
req.headers().add(CONTENT_TYPE, APPLICATION_JSON);
req.headers().add(Task.X_OPAQUE_ID_HTTP_HEADER, opaqueId);
return req;
}

static HttpRequest httpRequest(String opaqueId, int contentLength) {
return httpRequest(ControlServerRequestPlugin.ROUTE, opaqueId, contentLength);
}

static HttpRequest httpRequest(String uri, String opaqueId, int contentLength) {
var req = new DefaultHttpRequest(HTTP_1_1, POST, uri);
req.headers().add(CONTENT_LENGTH, contentLength);
req.headers().add(CONTENT_TYPE, APPLICATION_JSON);
req.headers().add(Task.X_OPAQUE_ID_HTTP_HEADER, opaqueId);
return req;
}

static HttpContent randomContent(int size, boolean isLast) {
var buf = Unpooled.wrappedBuffer(randomByteArrayOfLength(size));
if (isLast) {
return new DefaultLastHttpContent(buf);
} else {
return new DefaultHttpContent(buf);
}
}

static ByteBuf randomByteBuf(int size) {
return Unpooled.wrappedBuffer(randomByteArrayOfLength(size));
}

Ctx setupClientCtx() throws Exception {
private Ctx setupClientCtx() throws Exception {
var nodeName = internalCluster().getRandomNodeName();
var clientRespQueue = new LinkedBlockingDeque<>(16);
var bootstrap = bootstrapClient(nodeName, clientRespQueue);
var channel = bootstrap.connect().sync().channel();
return new Ctx(getTestName(), nodeName, bootstrap, channel, clientRespQueue);
}

Bootstrap bootstrapClient(String node, BlockingQueue<Object> queue) {
private Bootstrap bootstrapClient(String node, BlockingQueue<Object> queue) {
var httpServer = internalCluster().getInstance(HttpServerTransport.class, node);
var remoteAddr = randomFrom(httpServer.boundAddress().boundAddresses());
return new Bootstrap().group(new NioEventLoopGroup(1))
Expand Down Expand Up @@ -583,9 +570,13 @@ protected boolean addMockHttpTransport() {
return false; // enable http
}

record Ctx(String testName, String nodeName, Bootstrap clientBootstrap, Channel clientChannel, BlockingDeque<Object> clientRespQueue)
implements
AutoCloseable {
private record Ctx(
String testName,
String nodeName,
Bootstrap clientBootstrap,
Channel clientChannel,
BlockingDeque<Object> clientRespQueue
) implements AutoCloseable {

@Override
public void close() throws Exception {
Expand All @@ -610,7 +601,7 @@ ServerRequestHandler awaitRestChannelAccepted(String opaqueId) throws Exception
}
}

static class ServerRequestHandler implements BaseRestHandler.RequestBodyChunkConsumer {
private static class ServerRequestHandler implements BaseRestHandler.RequestBodyChunkConsumer {
final SubscribableListener<Void> channelAccepted = new SubscribableListener<>();
final String opaqueId;
final BlockingDeque<Chunk> recvChunks = new LinkedBlockingDeque<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.http.netty4;

import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.ScheduledFuture;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.time.TimeProvider;
import org.elasticsearch.common.util.concurrent.FutureUtils;

import java.util.concurrent.TimeUnit;

/**
* When channel auto-read is disabled handlers are responsible to read from channel.
* But it's hard to detect when read is missing. This helper class print warnings
* when no reads where detected in given time interval. Normally, in tests, 10 seconds is enough
* to avoid test hang for too long, but can be increased if needed.
*/
class MissingReadDetector extends ChannelDuplexHandler {

private static final Logger logger = LogManager.getLogger(MissingReadDetector.class);

private final long interval;
private final TimeProvider timer;
private boolean pendingRead;
private long lastRead;
private ScheduledFuture<?> checker;

MissingReadDetector(TimeProvider timer, long missingReadIntervalMillis) {
this.interval = missingReadIntervalMillis;
this.timer = timer;
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
checker = ctx.channel().eventLoop().scheduleAtFixedRate(() -> {
if (pendingRead == false) {
long now = timer.absoluteTimeInMillis();
if (now >= lastRead + interval) {
logger.warn("chan-id={} haven't read from channel for [{}ms]", ctx.channel().id(), (now - lastRead));
Comment on lines +49 to +50
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether it is worthwhile to warn faster if the last seen message is not either a FullHttpRequest or LastHttpContent?

Copy link
Contributor Author

@mhl-b mhl-b Apr 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is use-case for this? Not reading from stream at anytime sounds equally bad. Our transport code that missed read at the end of request or stream handler that forgot to read chunk or close stream, both problematic.

}
}
}, interval, interval, TimeUnit.MILLISECONDS);
super.handlerAdded(ctx);
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
if (checker != null) {
FutureUtils.cancel(checker);
}
super.handlerRemoved(ctx);
}

@Override
public void read(ChannelHandlerContext ctx) throws Exception {
pendingRead = true;
ctx.read();
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
assert ctx.channel().config().isAutoRead() == false : "auto-read must be always disabled";
pendingRead = false;
lastRead = timer.absoluteTimeInMillis();
ctx.fireChannelRead(msg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.LastHttpContent;

import org.elasticsearch.http.HttpPreRequest;
import org.elasticsearch.http.netty4.internal.HttpHeadersAuthenticatorUtils;
Expand Down Expand Up @@ -48,6 +49,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}
if (aggregating || msg instanceof FullHttpRequest) {
super.channelRead(ctx, msg);
if (msg instanceof LastHttpContent == false) {
ctx.read(); // HttpObjectAggregator is tricky with auto-read off, it might not call read again, calling on its behalf
}
} else {
streamContentSizeHandler.channelRead(ctx, msg);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This branch is apparently not covered by the unit test suite, could we add a test that hits it?

Copy link
Contributor Author

@mhl-b mhl-b Apr 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

org.elasticsearch.http.netty4.Netty4HttpHeaderValidatorTests#testWithFlowControlAndAggregator is covering it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, next step is to remove aggregator from netty. There is no incentive to add more tests here, more to remove in coming days.

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ private void handleRequest(ChannelHandlerContext ctx, HttpRequest request) {
isContinueExpected = true;
} else {
ctx.writeAndFlush(EXPECTATION_FAILED_CLOSE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE);
ctx.read();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The request.decoderResult().isFailure() case above is also not covered by unit tests, can we add that?

return;
}
}
Expand All @@ -136,6 +137,7 @@ private void handleRequest(ChannelHandlerContext ctx, HttpRequest request) {
decoder.reset();
}
ctx.writeAndFlush(TOO_LARGE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
ctx.read();
} else {
ignoreContent = false;
currentContentLength = 0;
Expand All @@ -150,11 +152,13 @@ private void handleRequest(ChannelHandlerContext ctx, HttpRequest request) {
private void handleContent(ChannelHandlerContext ctx, HttpContent msg) {
if (ignoreContent) {
msg.release();
ctx.read();
} else {
currentContentLength += msg.content().readableBytes();
if (currentContentLength > maxContentLength) {
msg.release();
ctx.writeAndFlush(TOO_LARGE_CLOSE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE);
ctx.read();
} else {
ctx.fireChannelRead(msg);
}
Expand Down
Loading
Loading