99
1010package org .elasticsearch .monitor .metrics ;
1111
12+ import org .elasticsearch .action .DocWriteRequest ;
1213import org .elasticsearch .action .bulk .BulkRequest ;
1314import org .elasticsearch .action .bulk .BulkRequestBuilder ;
1415import org .elasticsearch .action .bulk .BulkResponse ;
16+ import org .elasticsearch .action .bulk .IncrementalBulkService ;
1517import org .elasticsearch .action .delete .DeleteRequest ;
1618import org .elasticsearch .action .index .IndexRequest ;
19+ import org .elasticsearch .action .support .PlainActionFuture ;
1720import org .elasticsearch .cluster .metadata .IndexMetadata ;
1821import org .elasticsearch .common .settings .Setting ;
1922import org .elasticsearch .common .settings .Settings ;
23+ import org .elasticsearch .common .util .concurrent .AbstractRunnable ;
2024import org .elasticsearch .common .util .concurrent .EsRejectedExecutionException ;
25+ import org .elasticsearch .core .AbstractRefCounted ;
2126import org .elasticsearch .core .TimeValue ;
27+ import org .elasticsearch .index .IndexingPressure ;
2228import org .elasticsearch .plugins .Plugin ;
2329import org .elasticsearch .plugins .PluginsService ;
2430import org .elasticsearch .rest .RestStatus ;
2531import org .elasticsearch .telemetry .Measurement ;
2632import org .elasticsearch .telemetry .TestTelemetryPlugin ;
2733import org .elasticsearch .test .ESIntegTestCase ;
34+ import org .elasticsearch .threadpool .ThreadPool ;
2835
36+ import java .util .ArrayList ;
2937import java .util .Arrays ;
3038import java .util .Collection ;
3139import java .util .List ;
3240import java .util .Map ;
41+ import java .util .concurrent .CountDownLatch ;
42+ import java .util .concurrent .CyclicBarrier ;
43+ import java .util .concurrent .atomic .AtomicBoolean ;
3344import java .util .function .Function ;
3445
3546import static org .elasticsearch .index .IndexingPressure .MAX_COORDINATING_BYTES ;
3647import static org .elasticsearch .index .IndexingPressure .MAX_PRIMARY_BYTES ;
3748import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertAcked ;
49+ import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertNoFailures ;
3850import static org .hamcrest .Matchers .equalTo ;
3951import static org .hamcrest .Matchers .greaterThan ;
4052import static org .hamcrest .Matchers .greaterThanOrEqualTo ;
53+ import static org .hamcrest .Matchers .lessThan ;
4154
4255@ ESIntegTestCase .ClusterScope (scope = ESIntegTestCase .Scope .TEST , numDataNodes = 0 , numClientNodes = 0 )
4356public class NodeIndexingMetricsIT extends ESIntegTestCase {
@@ -453,6 +466,211 @@ public void testPrimaryDocumentRejectionMetricsFluctuatingOverTime() throws Exce
453466 }
454467 }
455468
469+ // Borrowed this test from IncrementalBulkIT and added test for metrics to it
470+ public void testIncrementalBulkLowWatermarkSplitMetrics () throws Exception {
471+ final String nodeName = internalCluster ().startNode (
472+ Settings .builder ()
473+ .put (IndexingPressure .SPLIT_BULK_LOW_WATERMARK .getKey (), "512B" )
474+ .put (IndexingPressure .SPLIT_BULK_LOW_WATERMARK_SIZE .getKey (), "2048B" )
475+ .put (IndexingPressure .SPLIT_BULK_HIGH_WATERMARK .getKey (), "4KB" )
476+ .put (IndexingPressure .SPLIT_BULK_HIGH_WATERMARK_SIZE .getKey (), "1024B" )
477+ .build ()
478+ );
479+ ensureStableCluster (1 );
480+
481+ String index = "test" ;
482+ createIndex (index );
483+
484+ IncrementalBulkService incrementalBulkService = internalCluster ().getInstance (IncrementalBulkService .class , nodeName );
485+ IndexingPressure indexingPressure = internalCluster ().getInstance (IndexingPressure .class , nodeName );
486+ final TestTelemetryPlugin testTelemetryPlugin = internalCluster ().getInstance (PluginsService .class , nodeName )
487+ .filterPlugins (TestTelemetryPlugin .class )
488+ .findFirst ()
489+ .orElseThrow ();
490+ testTelemetryPlugin .resetMeter ();
491+
492+ IncrementalBulkService .Handler handler = incrementalBulkService .newBulkRequest ();
493+
494+ AbstractRefCounted refCounted = AbstractRefCounted .of (() -> {});
495+ AtomicBoolean nextPage = new AtomicBoolean (false );
496+
497+ IndexRequest indexRequest = indexRequest (index );
498+ long total = indexRequest .ramBytesUsed ();
499+ while (total < 2048 ) {
500+ refCounted .incRef ();
501+ handler .addItems (List .of (indexRequest ), refCounted ::decRef , () -> nextPage .set (true ));
502+ assertTrue (nextPage .get ());
503+ nextPage .set (false );
504+ indexRequest = indexRequest (index );
505+ total += indexRequest .ramBytesUsed ();
506+ }
507+
508+ assertThat (indexingPressure .stats ().getCurrentCombinedCoordinatingAndPrimaryBytes (), greaterThan (0L ));
509+ assertThat (indexingPressure .stats ().getLowWaterMarkSplits (), equalTo (0L ));
510+
511+ testTelemetryPlugin .collect ();
512+ assertThat (
513+ getSingleRecordedMetric (
514+ testTelemetryPlugin ::getLongAsyncCounterMeasurement ,
515+ "es.indexing.coordinating.low_watermark_splits.total"
516+ ).getLong (),
517+ equalTo (0L )
518+ );
519+ assertThat (
520+ getSingleRecordedMetric (
521+ testTelemetryPlugin ::getLongAsyncCounterMeasurement ,
522+ "es.indexing.coordinating.high_watermark_splits.total"
523+ ).getLong (),
524+ equalTo (0L )
525+ );
526+
527+ refCounted .incRef ();
528+ handler .addItems (List .of (indexRequest (index )), refCounted ::decRef , () -> nextPage .set (true ));
529+
530+ assertBusy (() -> assertThat (indexingPressure .stats ().getCurrentCombinedCoordinatingAndPrimaryBytes (), equalTo (0L )));
531+ assertBusy (() -> assertThat (indexingPressure .stats ().getLowWaterMarkSplits (), equalTo (1L )));
532+ assertThat (indexingPressure .stats ().getHighWaterMarkSplits (), equalTo (0L ));
533+
534+ testTelemetryPlugin .collect ();
535+ assertThat (
536+ getLatestRecordedMetric (
537+ testTelemetryPlugin ::getLongAsyncCounterMeasurement ,
538+ "es.indexing.coordinating.low_watermark_splits.total"
539+ ).getLong (),
540+ equalTo (1L )
541+ );
542+ assertThat (
543+ getLatestRecordedMetric (
544+ testTelemetryPlugin ::getLongAsyncCounterMeasurement ,
545+ "es.indexing.coordinating.high_watermark_splits.total"
546+ ).getLong (),
547+ equalTo (0L )
548+ );
549+
550+ PlainActionFuture <BulkResponse > future = new PlainActionFuture <>();
551+ handler .lastItems (List .of (indexRequest ), refCounted ::decRef , future );
552+
553+ BulkResponse bulkResponse = safeGet (future );
554+ assertNoFailures (bulkResponse );
555+ assertFalse (refCounted .hasReferences ());
556+ }
557+
558+ // Borrowed this test from IncrementalBulkIT and added test for metrics to it
559+ public void testIncrementalBulkHighWatermarkSplitMetrics () throws Exception {
560+ final String nodeName = internalCluster ().startNode (
561+ Settings .builder ()
562+ .put (IndexingPressure .SPLIT_BULK_LOW_WATERMARK .getKey (), "512B" )
563+ .put (IndexingPressure .SPLIT_BULK_LOW_WATERMARK_SIZE .getKey (), "2048B" )
564+ .put (IndexingPressure .SPLIT_BULK_HIGH_WATERMARK .getKey (), "4KB" )
565+ .put (IndexingPressure .SPLIT_BULK_HIGH_WATERMARK_SIZE .getKey (), "1024B" )
566+ .build ()
567+ );
568+ ensureStableCluster (1 );
569+
570+ String index = "test" ;
571+ createIndex (index );
572+
573+ IncrementalBulkService incrementalBulkService = internalCluster ().getInstance (IncrementalBulkService .class , nodeName );
574+ IndexingPressure indexingPressure = internalCluster ().getInstance (IndexingPressure .class , nodeName );
575+ ThreadPool threadPool = internalCluster ().getInstance (ThreadPool .class , nodeName );
576+ final TestTelemetryPlugin testTelemetryPlugin = internalCluster ().getInstance (PluginsService .class , nodeName )
577+ .filterPlugins (TestTelemetryPlugin .class )
578+ .findFirst ()
579+ .orElseThrow ();
580+ testTelemetryPlugin .resetMeter ();
581+
582+ AbstractRefCounted refCounted = AbstractRefCounted .of (() -> {});
583+ AtomicBoolean nextPage = new AtomicBoolean (false );
584+
585+ ArrayList <IncrementalBulkService .Handler > handlers = new ArrayList <>();
586+ for (int i = 0 ; i < 4 ; ++i ) {
587+ ArrayList <DocWriteRequest <?>> requests = new ArrayList <>();
588+ add512BRequests (requests , index );
589+ IncrementalBulkService .Handler handler = incrementalBulkService .newBulkRequest ();
590+ handlers .add (handler );
591+ refCounted .incRef ();
592+ handler .addItems (requests , refCounted ::decRef , () -> nextPage .set (true ));
593+ assertTrue (nextPage .get ());
594+ nextPage .set (false );
595+ }
596+
597+ // Test that a request smaller than SPLIT_BULK_HIGH_WATERMARK_SIZE (1KB) is not throttled
598+ ArrayList <DocWriteRequest <?>> requestsNoThrottle = new ArrayList <>();
599+ add512BRequests (requestsNoThrottle , index );
600+ IncrementalBulkService .Handler handlerNoThrottle = incrementalBulkService .newBulkRequest ();
601+ handlers .add (handlerNoThrottle );
602+ refCounted .incRef ();
603+ handlerNoThrottle .addItems (requestsNoThrottle , refCounted ::decRef , () -> nextPage .set (true ));
604+ assertTrue (nextPage .get ());
605+ nextPage .set (false );
606+ assertThat (indexingPressure .stats ().getHighWaterMarkSplits (), equalTo (0L ));
607+
608+ testTelemetryPlugin .collect ();
609+ assertThat (
610+ getSingleRecordedMetric (
611+ testTelemetryPlugin ::getLongAsyncCounterMeasurement ,
612+ "es.indexing.coordinating.low_watermark_splits.total"
613+ ).getLong (),
614+ equalTo (0L )
615+ );
616+ assertThat (
617+ getSingleRecordedMetric (
618+ testTelemetryPlugin ::getLongAsyncCounterMeasurement ,
619+ "es.indexing.coordinating.high_watermark_splits.total"
620+ ).getLong (),
621+ equalTo (0L )
622+ );
623+
624+ ArrayList <DocWriteRequest <?>> requestsThrottle = new ArrayList <>();
625+ // Test that a request larger than SPLIT_BULK_HIGH_WATERMARK_SIZE (1KB) is throttled
626+ add512BRequests (requestsThrottle , index );
627+ add512BRequests (requestsThrottle , index );
628+
629+ CountDownLatch finishLatch = new CountDownLatch (1 );
630+ blockWritePool (threadPool , finishLatch );
631+ IncrementalBulkService .Handler handlerThrottled = incrementalBulkService .newBulkRequest ();
632+ refCounted .incRef ();
633+ handlerThrottled .addItems (requestsThrottle , refCounted ::decRef , () -> nextPage .set (true ));
634+ assertFalse (nextPage .get ());
635+ finishLatch .countDown ();
636+
637+ handlers .add (handlerThrottled );
638+
639+ // Wait until we are ready for the next page
640+ assertBusy (() -> assertTrue (nextPage .get ()));
641+ assertBusy (() -> assertThat (indexingPressure .stats ().getHighWaterMarkSplits (), equalTo (1L )));
642+ assertThat (indexingPressure .stats ().getLowWaterMarkSplits (), equalTo (0L ));
643+
644+ testTelemetryPlugin .collect ();
645+ assertThat (
646+ getLatestRecordedMetric (
647+ testTelemetryPlugin ::getLongAsyncCounterMeasurement ,
648+ "es.indexing.coordinating.low_watermark_splits.total"
649+ ).getLong (),
650+ equalTo (0L )
651+ );
652+ assertThat (
653+ getLatestRecordedMetric (
654+ testTelemetryPlugin ::getLongAsyncCounterMeasurement ,
655+ "es.indexing.coordinating.high_watermark_splits.total"
656+ ).getLong (),
657+ equalTo (1L )
658+ );
659+
660+ for (IncrementalBulkService .Handler h : handlers ) {
661+ refCounted .incRef ();
662+ PlainActionFuture <BulkResponse > future = new PlainActionFuture <>();
663+ h .lastItems (List .of (indexRequest (index )), refCounted ::decRef , future );
664+ BulkResponse bulkResponse = safeGet (future );
665+ assertNoFailures (bulkResponse );
666+ }
667+
668+ assertBusy (() -> assertThat (indexingPressure .stats ().getCurrentCombinedCoordinatingAndPrimaryBytes (), equalTo (0L )));
669+ refCounted .decRef ();
670+ assertFalse (refCounted .hasReferences ());
671+ testTelemetryPlugin .collect ();
672+ }
673+
456674 private static Measurement getSingleRecordedMetric (Function <String , List <Measurement >> metricGetter , String name ) {
457675 final List <Measurement > measurements = metricGetter .apply (name );
458676 assertFalse ("Indexing metric is not recorded" , measurements .isEmpty ());
@@ -470,4 +688,47 @@ private static boolean doublesEquals(double expected, double actual) {
470688 final double eps = .0000001 ;
471689 return Math .abs (expected - actual ) < eps ;
472690 }
691+
692+ private static IndexRequest indexRequest (String index ) {
693+ IndexRequest indexRequest = new IndexRequest ();
694+ indexRequest .index (index );
695+ indexRequest .source (Map .of ("field" , randomAlphaOfLength (10 )));
696+ return indexRequest ;
697+ }
698+
699+ private static void add512BRequests (ArrayList <DocWriteRequest <?>> requests , String index ) {
700+ long total = 0 ;
701+ while (total < 512 ) {
702+ IndexRequest indexRequest = indexRequest (index );
703+ requests .add (indexRequest );
704+ total += indexRequest .ramBytesUsed ();
705+ }
706+ assertThat (total , lessThan (1024L ));
707+ }
708+
709+ private static void blockWritePool (ThreadPool threadPool , CountDownLatch finishLatch ) {
710+ final var threadCount = threadPool .info (ThreadPool .Names .WRITE ).getMax ();
711+ final var startBarrier = new CyclicBarrier (threadCount + 1 );
712+ final var blockingTask = new AbstractRunnable () {
713+ @ Override
714+ public void onFailure (Exception e ) {
715+ fail (e );
716+ }
717+
718+ @ Override
719+ protected void doRun () {
720+ safeAwait (startBarrier );
721+ safeAwait (finishLatch );
722+ }
723+
724+ @ Override
725+ public boolean isForceExecution () {
726+ return true ;
727+ }
728+ };
729+ for (int i = 0 ; i < threadCount ; i ++) {
730+ threadPool .executor (ThreadPool .Names .WRITE ).execute (blockingTask );
731+ }
732+ safeAwait (startBarrier );
733+ }
473734}
0 commit comments