diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java index 856d6e294f817..3072178eaed4e 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java @@ -47,7 +47,9 @@ import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.ReleasableBytesReference; +import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.ClusterSettings; @@ -63,6 +65,7 @@ import org.elasticsearch.http.HttpBodyTracer; import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.http.HttpTransportSettings; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.BaseRestHandler; @@ -73,13 +76,17 @@ import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; +import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.MockLog; import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.test.rest.ObjectPath; import org.elasticsearch.transport.Transports; import org.elasticsearch.transport.netty4.Netty4Utils; +import org.elasticsearch.xcontent.json.JsonXContent; import java.nio.channels.ClosedChannelException; +import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.List; import java.util.Map; @@ -102,6 +109,7 @@ import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; import static org.hamcrest.Matchers.anEmptyMap; import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -361,7 +369,12 @@ public void test413TooLargeOnExpect100Continue() throws Exception { // ensures that oversized chunked encoded request has maxContentLength limit and returns 413 public void testOversizedChunkedEncoding() throws Exception { - try (var clientContext = newClientContext(t -> {/* ignore exception from e.g. server closing socket */})) { + try ( + var clientContext = newClientContext( + internalCluster().getRandomNodeName(), + t -> {/* ignore exception from e.g. server closing socket */} + ) + ) { var opaqueId = clientContext.newOpaqueId(); final var requestBodyStream = new HttpChunkedInput( new ChunkedStream(new ByteBufInputStream(Unpooled.wrappedBuffer(randomByteArrayOfLength(MAX_CONTENT_LENGTH + 1)))), @@ -501,6 +514,72 @@ private void assertHttpBodyLogging(Consumer test) throws Exceptio } } + public void testBulkIndexingRequestSplitting() throws Exception { + final var watermarkBytes = between(100, 200); + final var tinyNode = internalCluster().startCoordinatingOnlyNode( + Settings.builder() + .put(IndexingPressure.SPLIT_BULK_LOW_WATERMARK.getKey(), ByteSizeValue.ofBytes(watermarkBytes)) + .put(IndexingPressure.SPLIT_BULK_LOW_WATERMARK_SIZE.getKey(), ByteSizeValue.ofBytes(watermarkBytes)) + .build() + ); + + try (var clientContext = newClientContext(tinyNode, cause -> ExceptionsHelper.maybeDieOnAnotherThread(new AssertionError(cause)))) { + final var request = new DefaultHttpRequest(HTTP_1_1, POST, "/_bulk"); + request.headers().add(CONTENT_TYPE, APPLICATION_JSON); + HttpUtil.setTransferEncodingChunked(request, true); + + final var channel = clientContext.channel(); + channel.writeAndFlush(request); + + final var indexName = randomIdentifier(); + final var indexCreatedListener = ClusterServiceUtils.addTemporaryStateListener( + cs -> Iterators.filter( + cs.metadata().indicesAllProjects().iterator(), + indexMetadata -> indexMetadata.getIndex().getName().equals(indexName) + ).hasNext() + ); + + indexCreatedListener.addListener(ActionListener.running(() -> logger.info("--> index created"))); + + final var valueLength = between(10, 30); + final var docSizeBytes = "{'field':''}".length() + valueLength; + final var itemCount = between(watermarkBytes / docSizeBytes + 1, 300); // enough to split at least once + assertThat(itemCount * docSizeBytes, greaterThan(watermarkBytes)); + for (int i = 0; i < itemCount; i++) { + channel.write(new DefaultHttpContent(Unpooled.wrappedBuffer(Strings.format(""" + {"index":{"_index":"%s"}} + {"field":"%s"} + """, indexName, randomAlphaOfLength(valueLength)).getBytes(StandardCharsets.UTF_8)))); + } + + channel.flush(); + safeAwait(indexCreatedListener); // index must be created before we finish sending the request + + channel.writeAndFlush(new DefaultLastHttpContent()); + final var response = clientContext.getNextResponse(); + try { + assertEquals(RestStatus.OK.getStatus(), response.status().code()); + final ObjectPath responseBody; + final var copy = response.content().copy(); // Netty4Utils doesn't handle direct buffers, so copy to heap first + try { + responseBody = ObjectPath.createFromXContent(JsonXContent.jsonXContent, Netty4Utils.toBytesReference(copy)); + } finally { + copy.release(); + } + assertFalse(responseBody.evaluate("errors")); + assertEquals(itemCount, responseBody.evaluateArraySize("items")); + for (int i = 0; i < itemCount; i++) { + assertEquals( + RestStatus.CREATED.getStatus(), + (int) asInstanceOf(int.class, responseBody.evaluateExact("items", Integer.toString(i), "index", "status")) + ); + } + } finally { + response.release(); + } + } + } + static FullHttpRequest fullHttpRequest(String opaqueId, ByteBuf content) { var request = new DefaultFullHttpRequest(HTTP_1_1, POST, ControlServerRequestPlugin.ROUTE, Unpooled.wrappedBuffer(content)); request.headers().add(CONTENT_LENGTH, content.readableBytes()); @@ -539,11 +618,13 @@ protected boolean addMockHttpTransport() { private static final LongSupplier idGenerator = new AtomicLong()::getAndIncrement; private ClientContext newClientContext() throws Exception { - return newClientContext(cause -> ExceptionsHelper.maybeDieOnAnotherThread(new AssertionError(cause))); + return newClientContext( + internalCluster().getRandomNodeName(), + cause -> ExceptionsHelper.maybeDieOnAnotherThread(new AssertionError(cause)) + ); } - private ClientContext newClientContext(Consumer exceptionHandler) throws Exception { - var nodeName = internalCluster().getRandomNodeName(); + private ClientContext newClientContext(String nodeName, Consumer exceptionHandler) throws Exception { var clientResponseQueue = new LinkedBlockingDeque(16); final var httpServerTransport = internalCluster().getInstance(HttpServerTransport.class, nodeName); var remoteAddr = randomFrom(httpServerTransport.boundAddress().boundAddresses()); @@ -556,7 +637,7 @@ private ClientContext newClientContext(Consumer exceptionHandler) thr protected void initChannel(SocketChannel ch) { var p = ch.pipeline(); p.addLast(new HttpClientCodec()); - p.addLast(new HttpObjectAggregator(ByteSizeUnit.KB.toIntBytes(4))); + p.addLast(new HttpObjectAggregator(ByteSizeUnit.MB.toIntBytes(4))); p.addLast(new SimpleChannelInboundHandler() { @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) {