5151import static org .hamcrest .Matchers .equalTo ;
5252import static org .hamcrest .Matchers .greaterThan ;
5353import static org .hamcrest .Matchers .greaterThanOrEqualTo ;
54+ import static org .hamcrest .Matchers .hasItem ;
5455import static org .hamcrest .Matchers .is ;
5556import static org .hamcrest .Matchers .lessThan ;
5657import static org .hamcrest .Matchers .lessThanOrEqualTo ;
@@ -299,9 +300,9 @@ public void testTargetIORateChangesWhenSubmittingMergeTasks() throws Exception {
299300 }
300301 }
301302
302- public void testIORateIsAdjustedForRunningMergeTasks () throws Exception {
303- int mergeExecutorThreadCount = randomIntBetween (1 , 3 );
304- int mergesStillToSubmit = randomIntBetween (1 , 10 );
303+ public void testIORateIsAdjustedForAllRunningMergeTasks () throws Exception {
304+ int mergeExecutorThreadCount = randomIntBetween (1 , 5 );
305+ int mergesStillToSubmit = randomIntBetween (1 , 20 );
305306 int mergesStillToComplete = mergesStillToSubmit ;
306307 Settings settings = Settings .builder ()
307308 .put (ThreadPoolMergeScheduler .USE_THREAD_POOL_MERGE_SCHEDULER_SETTING .getKey (), true )
@@ -320,6 +321,7 @@ public void testIORateIsAdjustedForRunningMergeTasks() throws Exception {
320321 ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor ) testThreadPool .executor (ThreadPool .Names .MERGE );
321322 Semaphore runMergeSemaphore = new Semaphore (0 );
322323 Set <MergeTask > currentlyRunningMergeTasksSet = ConcurrentCollections .newConcurrentSet ();
324+ Set <MergeTask > currentlyRunningOrAbortingMergeTasksSet = ConcurrentCollections .newConcurrentSet ();
323325 while (mergesStillToComplete > 0 ) {
324326 if (mergesStillToSubmit > 0 && (currentlyRunningMergeTasksSet .isEmpty () || randomBoolean ())) {
325327 MergeTask mergeTask = mock (MergeTask .class );
@@ -337,39 +339,48 @@ public void testIORateIsAdjustedForRunningMergeTasks() throws Exception {
337339 }).when (mergeTask ).schedule ();
338340 doAnswer (mock -> {
339341 currentlyRunningMergeTasksSet .add (mergeTask );
342+ currentlyRunningOrAbortingMergeTasksSet .add (mergeTask );
340343 // wait to be signalled before completing
341344 runMergeSemaphore .acquire ();
345+ currentlyRunningOrAbortingMergeTasksSet .remove (mergeTask );
342346 currentlyRunningMergeTasksSet .remove (mergeTask );
343347 return null ;
344348 }).when (mergeTask ).run ();
345349 doAnswer (mock -> {
350+ currentlyRunningOrAbortingMergeTasksSet .add (mergeTask );
346351 // wait to be signalled before completing
347352 runMergeSemaphore .acquire ();
353+ currentlyRunningOrAbortingMergeTasksSet .remove (mergeTask );
348354 return null ;
349355 }).when (mergeTask ).abort ();
350- int activeMergeTasksCount = threadPoolExecutor .getActiveCount ();
351- threadPoolMergeExecutorService .submitMergeTask (mergeTask );
352- long newIORate = threadPoolMergeExecutorService .getTargetIORateBytesPerSec ();
353- // all currently running merge tasks must be IO throttled
356+ assertThat (runMergeSemaphore .availablePermits (), is (0 ));
357+ boolean isAnyExecutorAvailable = currentlyRunningOrAbortingMergeTasksSet .size () < mergeExecutorThreadCount ;
358+ boolean mergeTaskSubmitted = threadPoolMergeExecutorService .submitMergeTask (mergeTask );
359+ assertTrue (mergeTaskSubmitted );
360+ if (isAnyExecutorAvailable ) {
361+ assertBusy (() -> assertThat (currentlyRunningOrAbortingMergeTasksSet , hasItem (mergeTask )));
362+ }
363+ long latestIORate = threadPoolMergeExecutorService .getTargetIORateBytesPerSec ();
364+ // all currently running merge tasks must be IO throttled to the latest IO Rate
354365 assertBusy (() -> {
355- // await new merge to start executing
356- if (activeMergeTasksCount < mergeExecutorThreadCount ) {
357- assertThat (threadPoolExecutor .getActiveCount (), is (activeMergeTasksCount + 1 ));
358- }
359- // assert IO throttle is set on the running merge tasks
366+ // assert IO throttle is set on ALL the running merge tasks
360367 for (MergeTask currentlyRunningMergeTask : currentlyRunningMergeTasksSet ) {
361- var ioRateCaptor = ArgumentCaptor . forClass ( Long . class );
368+ verify ( currentlyRunningMergeTask ). run ( );
362369 // only interested in the last invocation
370+ var ioRateCaptor = ArgumentCaptor .forClass (Long .class );
363371 verify (currentlyRunningMergeTask , atLeastOnce ()).setIORateLimit (ioRateCaptor .capture ());
364- assertThat (ioRateCaptor .getValue (), is (newIORate ));
372+ assertThat (ioRateCaptor .getValue (), is (latestIORate ));
365373 }
366374 });
367375 mergesStillToSubmit --;
368376 } else {
369377 long completedMerges = threadPoolExecutor .getCompletedTaskCount ();
370378 runMergeSemaphore .release ();
371379 // await merge to finish
372- assertBusy (() -> assertThat (threadPoolExecutor .getCompletedTaskCount (), is (completedMerges + 1 )));
380+ assertBusy (() -> {
381+ assertThat (threadPoolExecutor .getCompletedTaskCount (), is (completedMerges + 1 ));
382+ assertThat (runMergeSemaphore .availablePermits (), is (0 ));
383+ });
373384 mergesStillToComplete --;
374385 }
375386 }
0 commit comments