Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions docs/changelog/126441.yaml

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -94,64 +94,6 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {

private static final int MAX_CONTENT_LENGTH = ByteSizeUnit.MB.toIntBytes(50);

private static long transportStatsRequestBytesSize(Ctx ctx) {
var httpTransport = internalCluster().getInstance(HttpServerTransport.class, ctx.nodeName);
var stats = httpTransport.stats().clientStats();
var bytes = 0L;
for (var s : stats) {
bytes += s.requestSizeBytes();
}
return bytes;
}

static int MBytes(int m) {
return m * 1024 * 1024;
}

static <T> T safePoll(BlockingDeque<T> queue) {
try {
var t = queue.poll(SAFE_AWAIT_TIMEOUT.seconds(), TimeUnit.SECONDS);
assertNotNull("queue is empty", t);
return t;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError(e);
}
}

private static FullHttpRequest fullHttpRequest(String opaqueId, ByteBuf content) {
var req = new DefaultFullHttpRequest(HTTP_1_1, POST, ControlServerRequestPlugin.ROUTE, Unpooled.wrappedBuffer(content));
req.headers().add(CONTENT_LENGTH, content.readableBytes());
req.headers().add(CONTENT_TYPE, APPLICATION_JSON);
req.headers().add(Task.X_OPAQUE_ID_HTTP_HEADER, opaqueId);
return req;
}

private static HttpRequest httpRequest(String opaqueId, int contentLength) {
return httpRequest(ControlServerRequestPlugin.ROUTE, opaqueId, contentLength);
}

private static HttpRequest httpRequest(String uri, String opaqueId, int contentLength) {
var req = new DefaultHttpRequest(HTTP_1_1, POST, uri);
req.headers().add(CONTENT_LENGTH, contentLength);
req.headers().add(CONTENT_TYPE, APPLICATION_JSON);
req.headers().add(Task.X_OPAQUE_ID_HTTP_HEADER, opaqueId);
return req;
}

private static HttpContent randomContent(int size, boolean isLast) {
var buf = Unpooled.wrappedBuffer(randomByteArrayOfLength(size));
if (isLast) {
return new DefaultLastHttpContent(buf);
} else {
return new DefaultHttpContent(buf);
}
}

private static ByteBuf randomByteBuf(int size) {
return Unpooled.wrappedBuffer(randomByteArrayOfLength(size));
}

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings));
Expand Down Expand Up @@ -236,6 +178,8 @@ public void testClientConnectionCloseMidStream() throws Exception {

// await stream handler is ready and request full content
var handler = ctx.awaitRestChannelAccepted(opaqueId);
assertBusy(() -> assertNotEquals(0, handler.stream.bufSize()));

assertFalse(handler.streamClosed);

// terminate client connection
Expand All @@ -246,7 +190,10 @@ public void testClientConnectionCloseMidStream() throws Exception {
handler.stream.next();

// wait for resources to be released
assertBusy(() -> assertTrue(handler.streamClosed));
assertBusy(() -> {
assertEquals(0, handler.stream.bufSize());
assertTrue(handler.streamClosed);
});
}
}

Expand All @@ -261,11 +208,15 @@ public void testServerCloseConnectionMidStream() throws Exception {

// await stream handler is ready and request full content
var handler = ctx.awaitRestChannelAccepted(opaqueId);
assertBusy(() -> assertNotEquals(0, handler.stream.bufSize()));
assertFalse(handler.streamClosed);

// terminate connection on server and wait resources are released
handler.channel.request().getHttpChannel().close();
assertBusy(() -> assertTrue(handler.streamClosed));
assertBusy(() -> {
assertEquals(0, handler.stream.bufSize());
assertTrue(handler.streamClosed);
});
}
}

Expand All @@ -279,12 +230,16 @@ public void testServerExceptionMidStream() throws Exception {

// await stream handler is ready and request full content
var handler = ctx.awaitRestChannelAccepted(opaqueId);
assertBusy(() -> assertNotEquals(0, handler.stream.bufSize()));
assertFalse(handler.streamClosed);

handler.shouldThrowInsideHandleChunk = true;
handler.stream.next();

assertBusy(() -> assertTrue(handler.streamClosed));
assertBusy(() -> {
assertEquals(0, handler.stream.bufSize());
assertTrue(handler.streamClosed);
});
}
}

Expand Down Expand Up @@ -325,7 +280,7 @@ public void testClientBackpressure() throws Exception {
});
handler.readBytes(partSize);
}
assertTrue(handler.recvLast);
assertTrue(handler.stream.hasLast());
}
}

Expand Down Expand Up @@ -430,6 +385,16 @@ public void testBadRequestReleaseQueuedChunks() throws Exception {
}
}

private static long transportStatsRequestBytesSize(Ctx ctx) {
var httpTransport = internalCluster().getInstance(HttpServerTransport.class, ctx.nodeName);
var stats = httpTransport.stats().clientStats();
var bytes = 0L;
for (var s : stats) {
bytes += s.requestSizeBytes();
}
return bytes;
}

/**
* ensures that {@link org.elasticsearch.http.HttpClientStatsTracker} counts streamed content bytes
*/
Expand Down Expand Up @@ -524,15 +489,63 @@ private String opaqueId(int reqNo) {
return getTestName() + "-" + reqNo;
}

private Ctx setupClientCtx() throws Exception {
static int MBytes(int m) {
return m * 1024 * 1024;
}

static <T> T safePoll(BlockingDeque<T> queue) {
try {
var t = queue.poll(SAFE_AWAIT_TIMEOUT.seconds(), TimeUnit.SECONDS);
assertNotNull("queue is empty", t);
return t;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError(e);
}
}

static FullHttpRequest fullHttpRequest(String opaqueId, ByteBuf content) {
var req = new DefaultFullHttpRequest(HTTP_1_1, POST, ControlServerRequestPlugin.ROUTE, Unpooled.wrappedBuffer(content));
req.headers().add(CONTENT_LENGTH, content.readableBytes());
req.headers().add(CONTENT_TYPE, APPLICATION_JSON);
req.headers().add(Task.X_OPAQUE_ID_HTTP_HEADER, opaqueId);
return req;
}

static HttpRequest httpRequest(String opaqueId, int contentLength) {
return httpRequest(ControlServerRequestPlugin.ROUTE, opaqueId, contentLength);
}

static HttpRequest httpRequest(String uri, String opaqueId, int contentLength) {
var req = new DefaultHttpRequest(HTTP_1_1, POST, uri);
req.headers().add(CONTENT_LENGTH, contentLength);
req.headers().add(CONTENT_TYPE, APPLICATION_JSON);
req.headers().add(Task.X_OPAQUE_ID_HTTP_HEADER, opaqueId);
return req;
}

static HttpContent randomContent(int size, boolean isLast) {
var buf = Unpooled.wrappedBuffer(randomByteArrayOfLength(size));
if (isLast) {
return new DefaultLastHttpContent(buf);
} else {
return new DefaultHttpContent(buf);
}
}

static ByteBuf randomByteBuf(int size) {
return Unpooled.wrappedBuffer(randomByteArrayOfLength(size));
}

Ctx setupClientCtx() throws Exception {
var nodeName = internalCluster().getRandomNodeName();
var clientRespQueue = new LinkedBlockingDeque<>(16);
var bootstrap = bootstrapClient(nodeName, clientRespQueue);
var channel = bootstrap.connect().sync().channel();
return new Ctx(getTestName(), nodeName, bootstrap, channel, clientRespQueue);
}

private Bootstrap bootstrapClient(String node, BlockingQueue<Object> queue) {
Bootstrap bootstrapClient(String node, BlockingQueue<Object> queue) {
var httpServer = internalCluster().getInstance(HttpServerTransport.class, node);
var remoteAddr = randomFrom(httpServer.boundAddress().boundAddresses());
return new Bootstrap().group(new NioEventLoopGroup(1))
Expand Down Expand Up @@ -570,13 +583,9 @@ protected boolean addMockHttpTransport() {
return false; // enable http
}

private record Ctx(
String testName,
String nodeName,
Bootstrap clientBootstrap,
Channel clientChannel,
BlockingDeque<Object> clientRespQueue
) implements AutoCloseable {
record Ctx(String testName, String nodeName, Bootstrap clientBootstrap, Channel clientChannel, BlockingDeque<Object> clientRespQueue)
implements
AutoCloseable {

@Override
public void close() throws Exception {
Expand All @@ -601,7 +610,7 @@ ServerRequestHandler awaitRestChannelAccepted(String opaqueId) throws Exception
}
}

private static class ServerRequestHandler implements BaseRestHandler.RequestBodyChunkConsumer {
static class ServerRequestHandler implements BaseRestHandler.RequestBodyChunkConsumer {
final SubscribableListener<Void> channelAccepted = new SubscribableListener<>();
final String opaqueId;
final BlockingDeque<Chunk> recvChunks = new LinkedBlockingDeque<>();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.LastHttpContent;

import org.elasticsearch.http.HttpPreRequest;
import org.elasticsearch.http.netty4.internal.HttpHeadersAuthenticatorUtils;
Expand Down Expand Up @@ -49,9 +48,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}
if (aggregating || msg instanceof FullHttpRequest) {
super.channelRead(ctx, msg);
if (msg instanceof LastHttpContent == false) {
ctx.read(); // HttpObjectAggregator is tricky with auto-read off, it might not call read again, calling on its behalf
}
} else {
streamContentSizeHandler.channelRead(ctx, msg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ private void handleRequest(ChannelHandlerContext ctx, HttpRequest request) {
isContinueExpected = true;
} else {
ctx.writeAndFlush(EXPECTATION_FAILED_CLOSE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE);
ctx.read();
return;
}
}
Expand All @@ -137,7 +136,6 @@ private void handleRequest(ChannelHandlerContext ctx, HttpRequest request) {
decoder.reset();
}
ctx.writeAndFlush(TOO_LARGE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
ctx.read();
} else {
ignoreContent = false;
currentContentLength = 0;
Expand All @@ -152,13 +150,11 @@ private void handleRequest(ChannelHandlerContext ctx, HttpRequest request) {
private void handleContent(ChannelHandlerContext ctx, HttpContent msg) {
if (ignoreContent) {
msg.release();
ctx.read();
} else {
currentContentLength += msg.content().readableBytes();
if (currentContentLength > maxContentLength) {
msg.release();
ctx.writeAndFlush(TOO_LARGE_CLOSE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE);
ctx.read();
} else {
ctx.fireChannelRead(msg);
}
Expand Down
Loading
Loading