@@ -896,6 +896,106 @@ public void testUnavailableBudgetBlocksNewMergeTasksFromStartingExecution() thro
896896 }
897897 }
898898
899+ public void testEnqueuedMergeTasksAreUnblockedWhenEstimatedMergeSizeChanges () throws Exception {
900+ long diskSpaceLimitBytes = randomLongBetween (10L , 100L );
901+ aFileStore .usableSpace = diskSpaceLimitBytes + randomLongBetween (1L , 100L );
902+ aFileStore .totalSpace = aFileStore .usableSpace + randomLongBetween (1L , 10L );
903+ bFileStore .usableSpace = diskSpaceLimitBytes + randomLongBetween (1L , 100L );
904+ bFileStore .totalSpace = bFileStore .usableSpace + randomLongBetween (1L , 10L );
905+ boolean aHasMoreSpace = aFileStore .usableSpace > bFileStore .usableSpace ;
906+ Settings .Builder settingsBuilder = Settings .builder ().put (settings );
907+ settingsBuilder .put (ThreadPoolMergeExecutorService .INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING .getKey (), diskSpaceLimitBytes + "b" );
908+ try (
909+ ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorService
910+ .maybeCreateThreadPoolMergeExecutorService (
911+ testThreadPool ,
912+ ClusterSettings .createBuiltInClusterSettings (settingsBuilder .build ()),
913+ nodeEnvironment
914+ )
915+ ) {
916+ assert threadPoolMergeExecutorService != null ;
917+ assertThat (threadPoolMergeExecutorService .getMaxConcurrentMerges (), greaterThanOrEqualTo (1 ));
918+ final long availableBudget = aHasMoreSpace
919+ ? aFileStore .usableSpace - diskSpaceLimitBytes
920+ : bFileStore .usableSpace - diskSpaceLimitBytes ;
921+ final AtomicLong expectedAvailableBudget = new AtomicLong (availableBudget );
922+ assertBusy (
923+ () -> assertThat (threadPoolMergeExecutorService .getDiskSpaceAvailableForNewMergeTasks (), is (expectedAvailableBudget .get ()))
924+ );
925+ List <ThreadPoolMergeScheduler .MergeTask > tasksRunList = new ArrayList <>();
926+ List <ThreadPoolMergeScheduler .MergeTask > tasksAbortList = new ArrayList <>();
927+ int submittedMergesCount = randomIntBetween (1 , 5 );
928+ long [] mergeSizeEstimates = new long [submittedMergesCount ];
929+ for (int i = 0 ; i < submittedMergesCount ; i ++) {
930+ // all these merge estimates are over-budget
931+ mergeSizeEstimates [i ] = availableBudget + randomLongBetween (1L , 10L );
932+ }
933+ for (int i = 0 ; i < submittedMergesCount ;) {
934+ ThreadPoolMergeScheduler .MergeTask mergeTask = mock (ThreadPoolMergeScheduler .MergeTask .class );
935+ when (mergeTask .supportsIOThrottling ()).thenReturn (randomBoolean ());
936+ doAnswer (mock -> {
937+ Schedule schedule = randomFrom (Schedule .values ());
938+ if (schedule == BACKLOG ) {
939+ testThreadPool .executor (ThreadPool .Names .GENERIC ).execute (() -> {
940+ // re-enqueue backlogged merge task
941+ threadPoolMergeExecutorService .reEnqueueBackloggedMergeTask (mergeTask );
942+ });
943+ } else if (schedule == RUN ) {
944+ tasksRunList .add (mergeTask );
945+ } else if (schedule == ABORT ) {
946+ tasksAbortList .add (mergeTask );
947+ }
948+ return schedule ;
949+ }).when (mergeTask ).schedule ();
950+ // randomly let some task complete
951+ if (randomBoolean ()) {
952+ // this task is not blocked
953+ when (mergeTask .estimatedRemainingMergeSize ()).thenReturn (randomLongBetween (0L , availableBudget ));
954+ } else {
955+ // this task will initially be blocked because over-budget
956+ int finalI = i ;
957+ doAnswer (mock -> mergeSizeEstimates [finalI ]).when (mergeTask ).estimatedRemainingMergeSize ();
958+ i ++;
959+ }
960+ assertTrue (threadPoolMergeExecutorService .submitMergeTask (mergeTask ));
961+ }
962+ // assert tasks are blocked because their estimated merge size is over the available budget
963+ assertBusy (() -> {
964+ assertTrue (threadPoolMergeExecutorService .isMergingBlockedDueToInsufficientDiskSpace ());
965+ assertThat (threadPoolMergeExecutorService .getMergeTasksQueueLength (), is (submittedMergesCount ));
966+ assertThat (threadPoolMergeExecutorService .getDiskSpaceAvailableForNewMergeTasks (), is (availableBudget ));
967+ assertThat (threadPoolMergeExecutorService .getRunningMergeTasks ().size (), is (0 ));
968+ });
969+ // change estimates to be under the available budget
970+ for (int i = 0 ; i < submittedMergesCount ; i ++) {
971+ mergeSizeEstimates [i ] = randomLongBetween (0L , availableBudget );
972+ }
973+ // assert tasks are all unblocked because their estimated merge size is now under the available budget
974+ assertBusy (() -> {
975+ assertFalse (threadPoolMergeExecutorService .isMergingBlockedDueToInsufficientDiskSpace ());
976+ assertThat (threadPoolMergeExecutorService .getMergeTasksQueueLength (), is (0 ));
977+ assertThat (threadPoolMergeExecutorService .getDiskSpaceAvailableForNewMergeTasks (), is (availableBudget ));
978+ });
979+ // assert all merge tasks are either run or aborted
980+ assertBusy (() -> {
981+ for (ThreadPoolMergeScheduler .MergeTask mergeTask : tasksRunList ) {
982+ verify (mergeTask , times (1 )).run ();
983+ verify (mergeTask , times (0 )).abort ();
984+ }
985+ for (ThreadPoolMergeScheduler .MergeTask mergeTask : tasksAbortList ) {
986+ verify (mergeTask , times (0 )).run ();
987+ verify (mergeTask , times (1 )).abort ();
988+ }
989+ });
990+ }
991+ if (setThreadPoolMergeSchedulerSetting ) {
992+ assertWarnings (
993+ "[indices.merge.scheduler.use_thread_pool] setting was deprecated in Elasticsearch "
994+ + "and will be removed in a future release. See the breaking changes documentation for the next major version."
995+ );
996+ }
997+ }
998+
899999 public void testMergeTasksAreUnblockedWhenMoreDiskSpaceBecomesAvailable () throws Exception {
9001000 aFileStore .totalSpace = randomLongBetween (300L , 1_000L );
9011001 bFileStore .totalSpace = randomLongBetween (300L , 1_000L );
0 commit comments