@@ -118,9 +118,9 @@ public FileStore getFileStore(Path path) {
118118
119119 static class TestMockFileStore extends FileStore {
120120
121- public long totalSpace ;
122- public long freeSpace ;
123- public long usableSpace ;
121+ public volatile long totalSpace ;
122+ public volatile long freeSpace ;
123+ public volatile long usableSpace ;
124124
125125 private final String desc ;
126126
@@ -665,4 +665,156 @@ public void testUnavailableBudgetBlocksNewMergeTasksFromStartingExecution() thro
665665 });
666666 }
667667 }
668+
669+ public void testMergeTasksAreUnblockedWhenMoreDiskSpaceBecomesAvailable () throws Exception {
670+ aFileStore .totalSpace = randomLongBetween (300L , 1_000L );
671+ bFileStore .totalSpace = randomLongBetween (300L , 1_000L );
672+ long grantedUsableSpaceBuffer = randomLongBetween (10L , 50L );
673+ aFileStore .usableSpace = randomLongBetween (200L , aFileStore .totalSpace - grantedUsableSpaceBuffer );
674+ bFileStore .usableSpace = randomLongBetween (200L , bFileStore .totalSpace - grantedUsableSpaceBuffer );
675+ boolean aHasMoreSpace = aFileStore .usableSpace > bFileStore .usableSpace ;
676+ Settings .Builder settingsBuilder = Settings .builder ()
677+ .put (settings );
678+ // change the watermark level, just for coverage and it's easier with the calculations
679+ if (randomBoolean ()) {
680+ settingsBuilder .put (ThreadPoolMergeExecutorService .INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING .getKey (), "90%" );
681+ } else {
682+ settingsBuilder .put (DiskThresholdSettings .CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING .getKey (), "90%" );
683+ }
684+ try (
685+ ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorService
686+ .maybeCreateThreadPoolMergeExecutorService (
687+ testThreadPool ,
688+ ClusterSettings .createBuiltInClusterSettings (settingsBuilder .build ()),
689+ nodeEnvironment
690+ )
691+ ) {
692+ assert threadPoolMergeExecutorService != null ;
693+ assertThat (threadPoolMergeExecutorService .getMaxConcurrentMerges (), greaterThanOrEqualTo (1 ));
694+ // uses the 10% watermark limit
695+ final long availableInitialBudget = aHasMoreSpace
696+ ? aFileStore .usableSpace - aFileStore .totalSpace / 10
697+ : bFileStore .usableSpace - bFileStore .totalSpace / 10 ;
698+ final AtomicLong expectedAvailableBudget = new AtomicLong (availableInitialBudget );
699+ assertBusy (
700+ () -> assertThat (threadPoolMergeExecutorService .getDiskSpaceAvailableForNewMergeTasks (), is (expectedAvailableBudget .get ()))
701+ );
702+ // maybe let some merge tasks hold up some budget
703+ // take care that there's still at least one thread available to run merges
704+ int maxBlockingTasksToSubmit = mergeExecutorThreadCount - 1 ;
705+ // first maybe submit some running or aborting merge tasks that hold up some budget while running or aborting
706+ List <ThreadPoolMergeScheduler .MergeTask > runningMergeTasks = new ArrayList <>();
707+ List <ThreadPoolMergeScheduler .MergeTask > abortingMergeTasks = new ArrayList <>();
708+ CountDownLatch testDoneLatch = new CountDownLatch (1 );
709+ while (expectedAvailableBudget .get () > 0L && maxBlockingTasksToSubmit -- > 0 && randomBoolean ()) {
710+ ThreadPoolMergeScheduler .MergeTask mergeTask = mock (ThreadPoolMergeScheduler .MergeTask .class );
711+ long taskBudget = randomLongBetween (1L , expectedAvailableBudget .get ());
712+ when (mergeTask .estimatedRemainingMergeSize ()).thenReturn (taskBudget );
713+ when (mergeTask .schedule ()).thenReturn (randomFrom (RUN , ABORT ));
714+ // this task runs/aborts, and it's going to hold up some budget for it
715+ expectedAvailableBudget .set (expectedAvailableBudget .get () - taskBudget );
716+ // this task will hold up budget because it blocks when it runs (to simulate it running for a long time)
717+ doAnswer (mock -> {
718+ // wait to be signalled before completing (this holds up budget)
719+ testDoneLatch .await ();
720+ return null ;
721+ }).when (mergeTask ).run ();
722+ doAnswer (mock -> {
723+ // wait to be signalled before completing (this holds up budget)
724+ testDoneLatch .await ();
725+ return null ;
726+ }).when (mergeTask ).abort ();
727+ threadPoolMergeExecutorService .submitMergeTask (mergeTask );
728+ if (mergeTask .schedule () == RUN ) {
729+ runningMergeTasks .add (mergeTask );
730+ } else {
731+ abortingMergeTasks .add (mergeTask );
732+ }
733+ }
734+ assertBusy (() -> {
735+ assertThat (threadPoolMergeExecutorService .getMergeTasksQueueLength (), is (0 ));
736+ assertThat (threadPoolMergeExecutorService .getDiskSpaceAvailableForNewMergeTasks (), is (expectedAvailableBudget .get ()));
737+ });
738+ // send some runnable merge tasks that although runnable are currently over budget
739+ int overBudgetTaskCount = randomIntBetween (1 , 5 );
740+ List <ThreadPoolMergeScheduler .MergeTask > overBudgetTasksToRunList = new ArrayList <>();
741+ List <ThreadPoolMergeScheduler .MergeTask > overBudgetTasksToAbortList = new ArrayList <>();
742+ while (overBudgetTaskCount -- > 0 ) {
743+ ThreadPoolMergeScheduler .MergeTask mergeTask = mock (ThreadPoolMergeScheduler .MergeTask .class );
744+ // currently over-budget
745+ long taskBudget = randomLongBetween (
746+ expectedAvailableBudget .get () + 1L ,
747+ expectedAvailableBudget .get () + grantedUsableSpaceBuffer
748+ );
749+ when (mergeTask .estimatedRemainingMergeSize ()).thenReturn (taskBudget );
750+ Schedule schedule = randomFrom (RUN , ABORT );
751+ when (mergeTask .schedule ()).thenReturn (schedule );
752+ threadPoolMergeExecutorService .submitMergeTask (mergeTask );
753+ if (schedule == RUN ) {
754+ overBudgetTasksToRunList .add (mergeTask );
755+ } else {
756+ overBudgetTasksToAbortList .add (mergeTask );
757+ }
758+ }
759+ // over-budget tasks did not run, are enqueued, and budget is unchanged
760+ assertBusy (() -> {
761+ for (ThreadPoolMergeScheduler .MergeTask mergeTask : overBudgetTasksToAbortList ) {
762+ verify (mergeTask , times (0 )).schedule ();
763+ verify (mergeTask , times (0 )).run ();
764+ verify (mergeTask , times (0 )).abort ();
765+ }
766+ for (ThreadPoolMergeScheduler .MergeTask mergeTask : overBudgetTasksToRunList ) {
767+ verify (mergeTask , times (0 )).schedule ();
768+ verify (mergeTask , times (0 )).run ();
769+ verify (mergeTask , times (0 )).abort ();
770+ }
771+ assertThat (
772+ threadPoolMergeExecutorService .getMergeTasksQueueLength (),
773+ is (overBudgetTasksToAbortList .size () + overBudgetTasksToRunList .size ())
774+ );
775+ assertThat (threadPoolMergeExecutorService .getDiskSpaceAvailableForNewMergeTasks (), is (expectedAvailableBudget .get ()));
776+ });
777+ // more disk space becomes available
778+ if (aHasMoreSpace ) {
779+ aFileStore .usableSpace += grantedUsableSpaceBuffer ;
780+ } else {
781+ bFileStore .usableSpace += grantedUsableSpaceBuffer ;
782+ }
783+ expectedAvailableBudget .set (expectedAvailableBudget .get () + grantedUsableSpaceBuffer );
784+ // all over-budget tasks can now run because more disk space became available
785+ assertBusy (() -> {
786+ for (ThreadPoolMergeScheduler .MergeTask mergeTask : overBudgetTasksToRunList ) {
787+ verify (mergeTask ).schedule ();
788+ verify (mergeTask ).run ();
789+ verify (mergeTask , times (0 )).abort ();
790+ }
791+ for (ThreadPoolMergeScheduler .MergeTask mergeTask : overBudgetTasksToAbortList ) {
792+ verify (mergeTask ).schedule ();
793+ verify (mergeTask , times (0 )).run ();
794+ verify (mergeTask ).abort ();
795+ }
796+ assertThat (threadPoolMergeExecutorService .getMergeTasksQueueLength (), is (0 ));
797+ assertThat (threadPoolMergeExecutorService .getDiskSpaceAvailableForNewMergeTasks (), is (expectedAvailableBudget .get ()));
798+ });
799+ // let test finish cleanly
800+ testDoneLatch .countDown ();
801+ assertBusy (() -> {
802+ for (ThreadPoolMergeScheduler .MergeTask mergeTask : runningMergeTasks ) {
803+ verify (mergeTask ).run ();
804+ }
805+ for (ThreadPoolMergeScheduler .MergeTask mergeTask : abortingMergeTasks ) {
806+ verify (mergeTask ).abort ();
807+ }
808+ assertThat (
809+ threadPoolMergeExecutorService .getDiskSpaceAvailableForNewMergeTasks (),
810+ is (availableInitialBudget + grantedUsableSpaceBuffer )
811+ );
812+ assertThat (threadPoolMergeExecutorService .allDone (), is (true ));
813+ assertThat (
814+ threadPoolMergeExecutorService .getDiskSpaceAvailableForNewMergeTasks (),
815+ is (availableInitialBudget + grantedUsableSpaceBuffer )
816+ );
817+ });
818+ }
819+ }
668820}
0 commit comments