51
51
import static org .hamcrest .Matchers .equalTo ;
52
52
import static org .hamcrest .Matchers .greaterThan ;
53
53
import static org .hamcrest .Matchers .greaterThanOrEqualTo ;
54
+ import static org .hamcrest .Matchers .hasItem ;
54
55
import static org .hamcrest .Matchers .is ;
55
56
import static org .hamcrest .Matchers .lessThan ;
56
57
import static org .hamcrest .Matchers .lessThanOrEqualTo ;
@@ -299,9 +300,9 @@ public void testTargetIORateChangesWhenSubmittingMergeTasks() throws Exception {
299
300
}
300
301
}
301
302
302
- public void testIORateIsAdjustedForRunningMergeTasks () throws Exception {
303
- int mergeExecutorThreadCount = randomIntBetween (1 , 3 );
304
- int mergesStillToSubmit = randomIntBetween (1 , 10 );
303
+ public void testIORateIsAdjustedForAllRunningMergeTasks () throws Exception {
304
+ int mergeExecutorThreadCount = randomIntBetween (1 , 5 );
305
+ int mergesStillToSubmit = randomIntBetween (1 , 20 );
305
306
int mergesStillToComplete = mergesStillToSubmit ;
306
307
Settings settings = Settings .builder ()
307
308
.put (ThreadPoolMergeScheduler .USE_THREAD_POOL_MERGE_SCHEDULER_SETTING .getKey (), true )
@@ -320,6 +321,7 @@ public void testIORateIsAdjustedForRunningMergeTasks() throws Exception {
320
321
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor ) testThreadPool .executor (ThreadPool .Names .MERGE );
321
322
Semaphore runMergeSemaphore = new Semaphore (0 );
322
323
Set <MergeTask > currentlyRunningMergeTasksSet = ConcurrentCollections .newConcurrentSet ();
324
+ Set <MergeTask > currentlyRunningOrAbortingMergeTasksSet = ConcurrentCollections .newConcurrentSet ();
323
325
while (mergesStillToComplete > 0 ) {
324
326
if (mergesStillToSubmit > 0 && (currentlyRunningMergeTasksSet .isEmpty () || randomBoolean ())) {
325
327
MergeTask mergeTask = mock (MergeTask .class );
@@ -337,39 +339,48 @@ public void testIORateIsAdjustedForRunningMergeTasks() throws Exception {
337
339
}).when (mergeTask ).schedule ();
338
340
doAnswer (mock -> {
339
341
currentlyRunningMergeTasksSet .add (mergeTask );
342
+ currentlyRunningOrAbortingMergeTasksSet .add (mergeTask );
340
343
// wait to be signalled before completing
341
344
runMergeSemaphore .acquire ();
345
+ currentlyRunningOrAbortingMergeTasksSet .remove (mergeTask );
342
346
currentlyRunningMergeTasksSet .remove (mergeTask );
343
347
return null ;
344
348
}).when (mergeTask ).run ();
345
349
doAnswer (mock -> {
350
+ currentlyRunningOrAbortingMergeTasksSet .add (mergeTask );
346
351
// wait to be signalled before completing
347
352
runMergeSemaphore .acquire ();
353
+ currentlyRunningOrAbortingMergeTasksSet .remove (mergeTask );
348
354
return null ;
349
355
}).when (mergeTask ).abort ();
350
- int activeMergeTasksCount = threadPoolExecutor .getActiveCount ();
351
- threadPoolMergeExecutorService .submitMergeTask (mergeTask );
352
- long newIORate = threadPoolMergeExecutorService .getTargetIORateBytesPerSec ();
353
- // all currently running merge tasks must be IO throttled
356
+ assertThat (runMergeSemaphore .availablePermits (), is (0 ));
357
+ boolean isAnyExecutorAvailable = currentlyRunningOrAbortingMergeTasksSet .size () < mergeExecutorThreadCount ;
358
+ boolean mergeTaskSubmitted = threadPoolMergeExecutorService .submitMergeTask (mergeTask );
359
+ assertTrue (mergeTaskSubmitted );
360
+ if (isAnyExecutorAvailable ) {
361
+ assertBusy (() -> assertThat (currentlyRunningOrAbortingMergeTasksSet , hasItem (mergeTask )));
362
+ }
363
+ long latestIORate = threadPoolMergeExecutorService .getTargetIORateBytesPerSec ();
364
+ // all currently running merge tasks must be IO throttled to the latest IO Rate
354
365
assertBusy (() -> {
355
- // await new merge to start executing
356
- if (activeMergeTasksCount < mergeExecutorThreadCount ) {
357
- assertThat (threadPoolExecutor .getActiveCount (), is (activeMergeTasksCount + 1 ));
358
- }
359
- // assert IO throttle is set on the running merge tasks
366
+ // assert IO throttle is set on ALL the running merge tasks
360
367
for (MergeTask currentlyRunningMergeTask : currentlyRunningMergeTasksSet ) {
361
- var ioRateCaptor = ArgumentCaptor . forClass ( Long . class );
368
+ verify ( currentlyRunningMergeTask ). run ( );
362
369
// only interested in the last invocation
370
+ var ioRateCaptor = ArgumentCaptor .forClass (Long .class );
363
371
verify (currentlyRunningMergeTask , atLeastOnce ()).setIORateLimit (ioRateCaptor .capture ());
364
- assertThat (ioRateCaptor .getValue (), is (newIORate ));
372
+ assertThat (ioRateCaptor .getValue (), is (latestIORate ));
365
373
}
366
374
});
367
375
mergesStillToSubmit --;
368
376
} else {
369
377
long completedMerges = threadPoolExecutor .getCompletedTaskCount ();
370
378
runMergeSemaphore .release ();
371
379
// await merge to finish
372
- assertBusy (() -> assertThat (threadPoolExecutor .getCompletedTaskCount (), is (completedMerges + 1 )));
380
+ assertBusy (() -> {
381
+ assertThat (threadPoolExecutor .getCompletedTaskCount (), is (completedMerges + 1 ));
382
+ assertThat (runMergeSemaphore .availablePermits (), is (0 ));
383
+ });
373
384
mergesStillToComplete --;
374
385
}
375
386
}
0 commit comments