From 44dfc951a92fc18587cf4e9cca16adfd4fa39b77 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 23 Apr 2025 13:50:03 +0100 Subject: [PATCH 1/2] Add end-to-end bulk splitting test Today we do not have a test that verifies the Netty HTTP pipeline interacts properly with the incremental bulk handling service and splits requests when the watermark is hit. This commit adds such a test. --- .../Netty4IncrementalRequestHandlingIT.java | 91 ++++++++++++++++++- 1 file changed, 86 insertions(+), 5 deletions(-) diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java index 856d6e294f817..fc801ef171990 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java @@ -47,7 +47,9 @@ import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.ReleasableBytesReference; +import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.ClusterSettings; @@ -63,6 +65,7 @@ import org.elasticsearch.http.HttpBodyTracer; import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.http.HttpTransportSettings; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.BaseRestHandler; @@ -73,13 +76,17 @@ import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; +import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.MockLog; import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.test.rest.ObjectPath; import org.elasticsearch.transport.Transports; import org.elasticsearch.transport.netty4.Netty4Utils; +import org.elasticsearch.xcontent.json.JsonXContent; import java.nio.channels.ClosedChannelException; +import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.List; import java.util.Map; @@ -102,6 +109,7 @@ import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; import static org.hamcrest.Matchers.anEmptyMap; import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -361,7 +369,12 @@ public void test413TooLargeOnExpect100Continue() throws Exception { // ensures that oversized chunked encoded request has maxContentLength limit and returns 413 public void testOversizedChunkedEncoding() throws Exception { - try (var clientContext = newClientContext(t -> {/* ignore exception from e.g. server closing socket */})) { + try ( + var clientContext = newClientContext( + internalCluster().getRandomNodeName(), + t -> {/* ignore exception from e.g. server closing socket */} + ) + ) { var opaqueId = clientContext.newOpaqueId(); final var requestBodyStream = new HttpChunkedInput( new ChunkedStream(new ByteBufInputStream(Unpooled.wrappedBuffer(randomByteArrayOfLength(MAX_CONTENT_LENGTH + 1)))), @@ -501,6 +514,72 @@ private void assertHttpBodyLogging(Consumer test) throws Exceptio } } + public void testBulkIndexingRequestSplitting() throws Exception { + final var watermarkBytes = between(100, 200); + final var tinyNode = internalCluster().startCoordinatingOnlyNode( + Settings.builder() + .put(IndexingPressure.SPLIT_BULK_LOW_WATERMARK.getKey(), ByteSizeValue.ofBytes(watermarkBytes)) + .put(IndexingPressure.SPLIT_BULK_LOW_WATERMARK_SIZE.getKey(), ByteSizeValue.ofBytes(watermarkBytes)) + .build() + ); + + try (var clientContext = newClientContext(tinyNode, cause -> ExceptionsHelper.maybeDieOnAnotherThread(new AssertionError(cause)))) { + final var request = new DefaultHttpRequest(HTTP_1_1, POST, "/_bulk"); + request.headers().add(CONTENT_TYPE, APPLICATION_JSON); + HttpUtil.setTransferEncodingChunked(request, true); + + final var channel = clientContext.channel(); + channel.writeAndFlush(request); + + final var indexName = randomIdentifier(); + final var indexCreatedListener = ClusterServiceUtils.addTemporaryStateListener( + cs -> Iterators.filter( + cs.metadata().indicesAllProjects().iterator(), + indexMetadata -> indexMetadata.getIndex().getName().equals(indexName) + ).hasNext() + ); + + indexCreatedListener.addListener(ActionListener.running(() -> logger.info("--> index created"))); + + final var valueLength = between(10, 30); + final var docSizeBytes = "{'field':''}".length() + valueLength; + final var itemCount = between(watermarkBytes / docSizeBytes + 1, 300); // enough to split at least once + assertThat(itemCount * docSizeBytes, greaterThan(watermarkBytes)); + for (int i = 0; i < itemCount; i++) { + channel.write(new DefaultHttpContent(Unpooled.wrappedBuffer(Strings.format(""" + {"index":{"_index":"%s"}} + {"field":"%s"} + """, indexName, randomAlphaOfLength(valueLength)).getBytes(StandardCharsets.UTF_8)))); + } + + channel.flush(); + safeAwait(indexCreatedListener); + + channel.writeAndFlush(new DefaultLastHttpContent()); + final var response = clientContext.getNextResponse(); + try { + assertEquals(RestStatus.OK.getStatus(), response.status().code()); + final ObjectPath responseBody; + final var copy = response.content().copy(); + try { + responseBody = ObjectPath.createFromXContent(JsonXContent.jsonXContent, Netty4Utils.toBytesReference(copy)); + } finally { + copy.release(); + } + assertFalse(responseBody.evaluate("errors")); + assertEquals(itemCount, responseBody.evaluateArraySize("items")); + for (int i = 0; i < itemCount; i++) { + assertEquals( + RestStatus.CREATED.getStatus(), + (int) asInstanceOf(int.class, responseBody.evaluateExact("items", Integer.toString(i), "index", "status")) + ); + } + } finally { + response.release(); + } + } + } + static FullHttpRequest fullHttpRequest(String opaqueId, ByteBuf content) { var request = new DefaultFullHttpRequest(HTTP_1_1, POST, ControlServerRequestPlugin.ROUTE, Unpooled.wrappedBuffer(content)); request.headers().add(CONTENT_LENGTH, content.readableBytes()); @@ -539,11 +618,13 @@ protected boolean addMockHttpTransport() { private static final LongSupplier idGenerator = new AtomicLong()::getAndIncrement; private ClientContext newClientContext() throws Exception { - return newClientContext(cause -> ExceptionsHelper.maybeDieOnAnotherThread(new AssertionError(cause))); + return newClientContext( + internalCluster().getRandomNodeName(), + cause -> ExceptionsHelper.maybeDieOnAnotherThread(new AssertionError(cause)) + ); } - private ClientContext newClientContext(Consumer exceptionHandler) throws Exception { - var nodeName = internalCluster().getRandomNodeName(); + private ClientContext newClientContext(String nodeName, Consumer exceptionHandler) throws Exception { var clientResponseQueue = new LinkedBlockingDeque(16); final var httpServerTransport = internalCluster().getInstance(HttpServerTransport.class, nodeName); var remoteAddr = randomFrom(httpServerTransport.boundAddress().boundAddresses()); @@ -556,7 +637,7 @@ private ClientContext newClientContext(Consumer exceptionHandler) thr protected void initChannel(SocketChannel ch) { var p = ch.pipeline(); p.addLast(new HttpClientCodec()); - p.addLast(new HttpObjectAggregator(ByteSizeUnit.KB.toIntBytes(4))); + p.addLast(new HttpObjectAggregator(ByteSizeUnit.MB.toIntBytes(4))); p.addLast(new SimpleChannelInboundHandler() { @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) { From 17959b09b41c076a432f289aaca9da523bf9ef1f Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 23 Apr 2025 14:17:35 +0100 Subject: [PATCH 2/2] Comments --- .../http/netty4/Netty4IncrementalRequestHandlingIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java index fc801ef171990..3072178eaed4e 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java @@ -553,14 +553,14 @@ public void testBulkIndexingRequestSplitting() throws Exception { } channel.flush(); - safeAwait(indexCreatedListener); + safeAwait(indexCreatedListener); // index must be created before we finish sending the request channel.writeAndFlush(new DefaultLastHttpContent()); final var response = clientContext.getNextResponse(); try { assertEquals(RestStatus.OK.getStatus(), response.status().code()); final ObjectPath responseBody; - final var copy = response.content().copy(); + final var copy = response.content().copy(); // Netty4Utils doesn't handle direct buffers, so copy to heap first try { responseBody = ObjectPath.createFromXContent(JsonXContent.jsonXContent, Netty4Utils.toBytesReference(copy)); } finally {