4747import org .elasticsearch .client .internal .node .NodeClient ;
4848import org .elasticsearch .cluster .metadata .IndexNameExpressionResolver ;
4949import org .elasticsearch .cluster .node .DiscoveryNodes ;
50+ import org .elasticsearch .common .Strings ;
5051import org .elasticsearch .common .bytes .ReleasableBytesReference ;
52+ import org .elasticsearch .common .collect .Iterators ;
5153import org .elasticsearch .common .component .AbstractLifecycleComponent ;
5254import org .elasticsearch .common .io .stream .NamedWriteableRegistry ;
5355import org .elasticsearch .common .settings .ClusterSettings ;
6365import org .elasticsearch .http .HttpBodyTracer ;
6466import org .elasticsearch .http .HttpServerTransport ;
6567import org .elasticsearch .http .HttpTransportSettings ;
68+ import org .elasticsearch .index .IndexingPressure ;
6669import org .elasticsearch .plugins .ActionPlugin ;
6770import org .elasticsearch .plugins .Plugin ;
6871import org .elasticsearch .rest .BaseRestHandler ;
7376import org .elasticsearch .rest .RestResponse ;
7477import org .elasticsearch .rest .RestStatus ;
7578import org .elasticsearch .tasks .Task ;
79+ import org .elasticsearch .test .ClusterServiceUtils ;
7680import org .elasticsearch .test .ESIntegTestCase ;
7781import org .elasticsearch .test .MockLog ;
7882import org .elasticsearch .test .junit .annotations .TestLogging ;
83+ import org .elasticsearch .test .rest .ObjectPath ;
7984import org .elasticsearch .transport .Transports ;
8085import org .elasticsearch .transport .netty4 .Netty4Utils ;
86+ import org .elasticsearch .xcontent .json .JsonXContent ;
8187
8288import java .nio .channels .ClosedChannelException ;
89+ import java .nio .charset .StandardCharsets ;
8390import java .util .Collection ;
8491import java .util .List ;
8592import java .util .Map ;
102109import static io .netty .handler .codec .http .HttpVersion .HTTP_1_1 ;
103110import static org .hamcrest .Matchers .anEmptyMap ;
104111import static org .hamcrest .Matchers .empty ;
112+ import static org .hamcrest .Matchers .greaterThan ;
105113import static org .hamcrest .Matchers .instanceOf ;
106114import static org .hamcrest .Matchers .lessThanOrEqualTo ;
107115
@@ -361,7 +369,12 @@ public void test413TooLargeOnExpect100Continue() throws Exception {
361369
362370 // ensures that oversized chunked encoded request has maxContentLength limit and returns 413
363371 public void testOversizedChunkedEncoding () throws Exception {
364- try (var clientContext = newClientContext (t -> {/* ignore exception from e.g. server closing socket */ })) {
372+ try (
373+ var clientContext = newClientContext (
374+ internalCluster ().getRandomNodeName (),
375+ t -> {/* ignore exception from e.g. server closing socket */ }
376+ )
377+ ) {
365378 var opaqueId = clientContext .newOpaqueId ();
366379 final var requestBodyStream = new HttpChunkedInput (
367380 new ChunkedStream (new ByteBufInputStream (Unpooled .wrappedBuffer (randomByteArrayOfLength (MAX_CONTENT_LENGTH + 1 )))),
@@ -501,6 +514,72 @@ private void assertHttpBodyLogging(Consumer<ClientContext> test) throws Exceptio
501514 }
502515 }
503516
517+ public void testBulkIndexingRequestSplitting () throws Exception {
518+ final var watermarkBytes = between (100 , 200 );
519+ final var tinyNode = internalCluster ().startCoordinatingOnlyNode (
520+ Settings .builder ()
521+ .put (IndexingPressure .SPLIT_BULK_LOW_WATERMARK .getKey (), ByteSizeValue .ofBytes (watermarkBytes ))
522+ .put (IndexingPressure .SPLIT_BULK_LOW_WATERMARK_SIZE .getKey (), ByteSizeValue .ofBytes (watermarkBytes ))
523+ .build ()
524+ );
525+
526+ try (var clientContext = newClientContext (tinyNode , cause -> ExceptionsHelper .maybeDieOnAnotherThread (new AssertionError (cause )))) {
527+ final var request = new DefaultHttpRequest (HTTP_1_1 , POST , "/_bulk" );
528+ request .headers ().add (CONTENT_TYPE , APPLICATION_JSON );
529+ HttpUtil .setTransferEncodingChunked (request , true );
530+
531+ final var channel = clientContext .channel ();
532+ channel .writeAndFlush (request );
533+
534+ final var indexName = randomIdentifier ();
535+ final var indexCreatedListener = ClusterServiceUtils .addTemporaryStateListener (
536+ cs -> Iterators .filter (
537+ cs .metadata ().indicesAllProjects ().iterator (),
538+ indexMetadata -> indexMetadata .getIndex ().getName ().equals (indexName )
539+ ).hasNext ()
540+ );
541+
542+ indexCreatedListener .addListener (ActionListener .running (() -> logger .info ("--> index created" )));
543+
544+ final var valueLength = between (10 , 30 );
545+ final var docSizeBytes = "{'field':''}" .length () + valueLength ;
546+ final var itemCount = between (watermarkBytes / docSizeBytes + 1 , 300 ); // enough to split at least once
547+ assertThat (itemCount * docSizeBytes , greaterThan (watermarkBytes ));
548+ for (int i = 0 ; i < itemCount ; i ++) {
549+ channel .write (new DefaultHttpContent (Unpooled .wrappedBuffer (Strings .format ("""
550+ {"index":{"_index":"%s"}}
551+ {"field":"%s"}
552+ """ , indexName , randomAlphaOfLength (valueLength )).getBytes (StandardCharsets .UTF_8 ))));
553+ }
554+
555+ channel .flush ();
556+ safeAwait (indexCreatedListener ); // index must be created before we finish sending the request
557+
558+ channel .writeAndFlush (new DefaultLastHttpContent ());
559+ final var response = clientContext .getNextResponse ();
560+ try {
561+ assertEquals (RestStatus .OK .getStatus (), response .status ().code ());
562+ final ObjectPath responseBody ;
563+ final var copy = response .content ().copy (); // Netty4Utils doesn't handle direct buffers, so copy to heap first
564+ try {
565+ responseBody = ObjectPath .createFromXContent (JsonXContent .jsonXContent , Netty4Utils .toBytesReference (copy ));
566+ } finally {
567+ copy .release ();
568+ }
569+ assertFalse (responseBody .evaluate ("errors" ));
570+ assertEquals (itemCount , responseBody .evaluateArraySize ("items" ));
571+ for (int i = 0 ; i < itemCount ; i ++) {
572+ assertEquals (
573+ RestStatus .CREATED .getStatus (),
574+ (int ) asInstanceOf (int .class , responseBody .evaluateExact ("items" , Integer .toString (i ), "index" , "status" ))
575+ );
576+ }
577+ } finally {
578+ response .release ();
579+ }
580+ }
581+ }
582+
504583 static FullHttpRequest fullHttpRequest (String opaqueId , ByteBuf content ) {
505584 var request = new DefaultFullHttpRequest (HTTP_1_1 , POST , ControlServerRequestPlugin .ROUTE , Unpooled .wrappedBuffer (content ));
506585 request .headers ().add (CONTENT_LENGTH , content .readableBytes ());
@@ -539,11 +618,13 @@ protected boolean addMockHttpTransport() {
539618 private static final LongSupplier idGenerator = new AtomicLong ()::getAndIncrement ;
540619
541620 private ClientContext newClientContext () throws Exception {
542- return newClientContext (cause -> ExceptionsHelper .maybeDieOnAnotherThread (new AssertionError (cause )));
621+ return newClientContext (
622+ internalCluster ().getRandomNodeName (),
623+ cause -> ExceptionsHelper .maybeDieOnAnotherThread (new AssertionError (cause ))
624+ );
543625 }
544626
545- private ClientContext newClientContext (Consumer <Throwable > exceptionHandler ) throws Exception {
546- var nodeName = internalCluster ().getRandomNodeName ();
627+ private ClientContext newClientContext (String nodeName , Consumer <Throwable > exceptionHandler ) throws Exception {
547628 var clientResponseQueue = new LinkedBlockingDeque <FullHttpResponse >(16 );
548629 final var httpServerTransport = internalCluster ().getInstance (HttpServerTransport .class , nodeName );
549630 var remoteAddr = randomFrom (httpServerTransport .boundAddress ().boundAddresses ());
@@ -556,7 +637,7 @@ private ClientContext newClientContext(Consumer<Throwable> exceptionHandler) thr
556637 protected void initChannel (SocketChannel ch ) {
557638 var p = ch .pipeline ();
558639 p .addLast (new HttpClientCodec ());
559- p .addLast (new HttpObjectAggregator (ByteSizeUnit .KB .toIntBytes (4 )));
640+ p .addLast (new HttpObjectAggregator (ByteSizeUnit .MB .toIntBytes (4 )));
560641 p .addLast (new SimpleChannelInboundHandler <FullHttpResponse >() {
561642 @ Override
562643 protected void channelRead0 (ChannelHandlerContext ctx , FullHttpResponse msg ) {
0 commit comments