4646import static org .hamcrest .Matchers .is ;
4747import static org .hamcrest .Matchers .lessThanOrEqualTo ;
4848import static org .mockito .ArgumentMatchers .any ;
49+ import static org .mockito .ArgumentMatchers .anyLong ;
4950import static org .mockito .Mockito .doAnswer ;
5051import static org .mockito .Mockito .mock ;
5152import static org .mockito .Mockito .times ;
@@ -74,13 +75,14 @@ public void testMergesExecuteInSizeOrder() throws IOException {
7475 nodeEnvironment = newNodeEnvironment (settings );
7576 ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorServiceTests
7677 .getThreadPoolMergeExecutorService (threadPoolTaskQueue .getThreadPool (), settings , nodeEnvironment );
78+ var mergeMetrics = mock (MergeMetrics .class );
7779 try (
7880 ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler (
7981 new ShardId ("index" , "_na_" , 1 ),
8082 IndexSettingsModule .newIndexSettings ("index" , Settings .EMPTY ),
8183 threadPoolMergeExecutorService ,
8284 merge -> 0 ,
83- MergeMetrics . NOOP
85+ mergeMetrics
8486 )
8587 ) {
8688 List <OneMerge > executedMergesList = new ArrayList <>();
@@ -98,9 +100,19 @@ public void testMergesExecuteInSizeOrder() throws IOException {
98100 return null ;
99101 }).when (mergeSource ).merge (any (OneMerge .class ));
100102 threadPoolMergeScheduler .merge (mergeSource , randomFrom (MergeTrigger .values ()));
103+
104+ // verify queued byte metric is recorded for each merge
105+ verify (mergeMetrics , times (i + 1 )).incrementQueuedMergeBytes (any (), anyLong ());
101106 }
107+
102108 threadPoolTaskQueue .runAllTasks ();
103109 assertThat (executedMergesList .size (), is (mergeCount ));
110+
111+ // verify metrics are reported for each merge
112+ verify (mergeMetrics , times (mergeCount )).moveQueuedMergeBytesToRunning (any (), anyLong ());
113+ verify (mergeMetrics , times (mergeCount )).decrementRunningMergeBytes (any ());
114+ verify (mergeMetrics , times (mergeCount )).markMergeMetrics (any (), anyLong (), anyLong ());
115+
104116 // assert merges are executed in ascending size order
105117 for (int i = 1 ; i < mergeCount ; i ++) {
106118 assertThat (
@@ -114,6 +126,7 @@ public void testMergesExecuteInSizeOrder() throws IOException {
114126
115127 public void testSimpleMergeTaskBacklogging () {
116128 int mergeExecutorThreadCount = randomIntBetween (1 , 5 );
129+ var mergeMetrics = mock (MergeMetrics .class );
117130 Settings mergeSchedulerSettings = Settings .builder ()
118131 .put (MergeSchedulerConfig .MAX_THREAD_COUNT_SETTING .getKey (), mergeExecutorThreadCount )
119132 .build ();
@@ -124,7 +137,7 @@ public void testSimpleMergeTaskBacklogging() {
124137 IndexSettingsModule .newIndexSettings ("index" , mergeSchedulerSettings ),
125138 threadPoolMergeExecutorService ,
126139 merge -> 0 ,
127- MergeMetrics . NOOP
140+ mergeMetrics
128141 );
129142 // more merge tasks than merge threads
130143 int mergeCount = mergeExecutorThreadCount + randomIntBetween (1 , 5 );
@@ -145,6 +158,9 @@ public void testSimpleMergeTaskBacklogging() {
145158 }
146159 assertThat (threadPoolMergeScheduler .getRunningMergeTasks ().size (), is (mergeExecutorThreadCount ));
147160 assertThat (threadPoolMergeScheduler .getBackloggedMergeTasks ().size (), is (mergeCount - mergeExecutorThreadCount ));
161+
162+ // verify no metrics are recorded as no merges have been queued or executed through the merge scheduler
163+ verifyNoInteractions (mergeMetrics );
148164 }
149165
150166 public void testSimpleMergeTaskReEnqueueingBySize () {
@@ -444,6 +460,7 @@ public void testMergesRunConcurrently() throws Exception {
444460 // disable fs available disk space feature for this test
445461 .put (ThreadPoolMergeExecutorService .INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING .getKey (), "0s" )
446462 .build ();
463+ var mergeMetrics = mock (MergeMetrics .class );
447464 nodeEnvironment = newNodeEnvironment (settings );
448465 try (TestThreadPool testThreadPool = new TestThreadPool ("test" , settings )) {
449466 ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorServiceTests
@@ -456,13 +473,13 @@ public void testMergesRunConcurrently() throws Exception {
456473 IndexSettingsModule .newIndexSettings ("index" , settings ),
457474 threadPoolMergeExecutorService ,
458475 merge -> 0 ,
459- MergeMetrics . NOOP
476+ mergeMetrics
460477 )
461478 ) {
462479 // at least 1 extra merge than there are concurrently allowed
463480 int mergeCount = mergeExecutorThreadCount + randomIntBetween (1 , 10 );
464481 Semaphore runMergeSemaphore = new Semaphore (0 );
465- for (int i = 0 ; i < mergeCount ; i ++) {
482+ for (int i = 1 ; i <= mergeCount ; i ++) {
466483 MergeSource mergeSource = mock (MergeSource .class );
467484 OneMerge oneMerge = mock (OneMerge .class );
468485 when (oneMerge .getStoreMergeInfo ()).thenReturn (getNewMergeInfo (randomLongBetween (1L , 10L )));
@@ -476,7 +493,11 @@ public void testMergesRunConcurrently() throws Exception {
476493 return null ;
477494 }).when (mergeSource ).merge (any (OneMerge .class ));
478495 threadPoolMergeScheduler .merge (mergeSource , randomFrom (MergeTrigger .values ()));
496+
497+ // verify queued byte metric is recorded for each merge
498+ verify (mergeMetrics , times (i )).incrementQueuedMergeBytes (any (), anyLong ());
479499 }
500+
480501 for (int completedMergesCount = 0 ; completedMergesCount < mergeCount
481502 - mergeSchedulerMaxThreadCount ; completedMergesCount ++) {
482503 int finalCompletedMergesCount = completedMergesCount ;
@@ -521,6 +542,11 @@ public void testMergesRunConcurrently() throws Exception {
521542 runMergeSemaphore .release ();
522543 }
523544 assertBusy (() -> assertTrue (threadPoolMergeExecutorService .allDone ()));
545+
546+ // verify metrics are recorded for each merge
547+ verify (mergeMetrics , times (mergeCount )).moveQueuedMergeBytesToRunning (any (), anyLong ());
548+ verify (mergeMetrics , times (mergeCount )).decrementRunningMergeBytes (any ());
549+ verify (mergeMetrics , times (mergeCount )).markMergeMetrics (any (), anyLong (), anyLong ());
524550 }
525551 }
526552 }
@@ -701,30 +727,63 @@ public void testAutoIOThrottleForMergeTasks() throws Exception {
701727 }
702728 }
703729
704- public void testMergeSchedulerAbortsMergeWhenShouldSkipMergeIsTrue () {
705- ThreadPoolMergeExecutorService threadPoolMergeExecutorService = mock (ThreadPoolMergeExecutorService .class );
706- // build a scheduler that always returns true for shouldSkipMerge
707- ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler (
708- new ShardId ("index" , "_na_" , 1 ),
709- IndexSettingsModule .newIndexSettings ("index" , Settings .builder ().build ()),
710- threadPoolMergeExecutorService ,
711- merge -> 0 ,
712- MergeMetrics .NOOP
730+ public void testMergeSchedulerAbortsMergeWhenShouldSkipMergeIsTrue () throws IOException {
731+ DeterministicTaskQueue threadPoolTaskQueue = new DeterministicTaskQueue ();
732+ Settings settings = Settings .builder ()
733+ // disable fs available disk space feature for this test
734+ .put (ThreadPoolMergeExecutorService .INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING .getKey (), "0s" )
735+ .build ();
736+ nodeEnvironment = newNodeEnvironment (settings );
737+ ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorServiceTests
738+ .getThreadPoolMergeExecutorService (threadPoolTaskQueue .getThreadPool (), settings , nodeEnvironment );
739+ var mergeMetrics = mock (MergeMetrics .class );
740+ try (
741+ // build a scheduler that always returns true for shouldSkipMerge
742+ ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler (
743+ new ShardId ("index" , "_na_" , 1 ),
744+ IndexSettingsModule .newIndexSettings ("index" , Settings .EMPTY ),
745+ threadPoolMergeExecutorService ,
746+ merge -> 0 ,
747+ mergeMetrics
748+ ) {
749+ @ Override
750+ protected boolean shouldSkipMerge () {
751+ return true ;
752+ }
753+ }
713754 ) {
714- @ Override
715- protected boolean shouldSkipMerge () {
716- return true ;
755+ int mergeCount = randomIntBetween (2 , 10 );
756+ for (int i = 0 ; i < mergeCount ; i ++) {
757+ MergeSource mergeSource = mock (MergeSource .class );
758+ OneMerge oneMerge = mock (OneMerge .class );
759+ when (oneMerge .getStoreMergeInfo ()).thenReturn (getNewMergeInfo (randomLongBetween (1L , 10L )));
760+ when (oneMerge .getMergeProgress ()).thenReturn (new MergePolicy .OneMergeProgress ());
761+ when (mergeSource .getNextMerge ()).thenReturn (oneMerge , (OneMerge ) null );
762+
763+ // create the merge task
764+ MergeTask mergeTask = threadPoolMergeScheduler .newMergeTask (mergeSource , oneMerge , randomFrom (MergeTrigger .values ()));
765+
766+ // verify that calling schedule on the merge task indicates the merge should be aborted
767+ Schedule schedule = threadPoolMergeScheduler .schedule (mergeTask );
768+ assertThat (schedule , is (Schedule .ABORT ));
769+
770+ // run the merge through the scheduler
771+ threadPoolMergeScheduler .merge (mergeSource , randomFrom (MergeTrigger .values ()));
772+
773+ // verify queued merge byte metrics are still recorded for each merge
774+ verify (mergeMetrics , times (i + 1 )).incrementQueuedMergeBytes (any (), anyLong ());
717775 }
718- };
719- MergeSource mergeSource = mock (MergeSource .class );
720- OneMerge oneMerge = mock (OneMerge .class );
721- when (oneMerge .getStoreMergeInfo ()).thenReturn (getNewMergeInfo (randomLongBetween (1L , 10L )));
722- when (oneMerge .getMergeProgress ()).thenReturn (new MergePolicy .OneMergeProgress ());
723- when (mergeSource .getNextMerge ()).thenReturn (oneMerge , (OneMerge ) null );
724- MergeTask mergeTask = threadPoolMergeScheduler .newMergeTask (mergeSource , oneMerge , randomFrom (MergeTrigger .values ()));
725- // verify that calling schedule on the merge task indicates the merge should be aborted
726- Schedule schedule = threadPoolMergeScheduler .schedule (mergeTask );
727- assertThat (schedule , is (Schedule .ABORT ));
776+
777+ // run all merges; they should all be aborted
778+ threadPoolTaskQueue .runAllTasks ();
779+
780+ // verify queued bytes metrics are moved to running and decremented
781+ verify (mergeMetrics , times (mergeCount )).moveQueuedMergeBytesToRunning (any (), anyLong ());
782+ verify (mergeMetrics , times (mergeCount )).decrementRunningMergeBytes (any ());
783+
784+ // verify we did not mark the merges as merged
785+ verify (mergeMetrics , times (0 )).markMergeMetrics (any (), anyLong (), anyLong ());
786+ }
728787 }
729788
730789 private static MergeInfo getNewMergeInfo (long estimatedMergeBytes ) {
0 commit comments