@@ -329,45 +329,26 @@ public void testIORateIsAdjustedForAllRunningMergeTasks() throws Exception {
329329 ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor ) testThreadPool .executor (ThreadPool .Names .MERGE );
330330 Semaphore runMergeSemaphore = new Semaphore (0 );
331331 Set <MergeTask > currentlyRunningMergeTasksSet = ConcurrentCollections .newConcurrentSet ();
332- Set <MergeTask > currentlyRunningOrAbortingMergeTasksSet = ConcurrentCollections .newConcurrentSet ();
333332 while (mergesStillToComplete > 0 ) {
334333 if (mergesStillToSubmit > 0 && (currentlyRunningMergeTasksSet .isEmpty () || randomBoolean ())) {
335334 MergeTask mergeTask = mock (MergeTask .class );
336335 // all tasks support IO throttling in this test case
337336 when (mergeTask .supportsIOThrottling ()).thenReturn (true );
338- doAnswer (mock -> {
339- Schedule schedule = randomFrom (Schedule .values ());
340- if (schedule == BACKLOG ) {
341- testThreadPool .executor (ThreadPool .Names .GENERIC ).execute (() -> {
342- // reenqueue backlogged merge task
343- threadPoolMergeExecutorService .reEnqueueBackloggedMergeTask (mergeTask );
344- });
345- }
346- return schedule ;
347- }).when (mergeTask ).schedule ();
337+ // {@link Schedule.BACKLOG} complicates the test too much because the set of running merge tasks is not stable
338+ when (mergeTask .schedule ()).thenReturn (randomFrom (RUN , ABORT ));
348339 doAnswer (mock -> {
349340 currentlyRunningMergeTasksSet .add (mergeTask );
350- currentlyRunningOrAbortingMergeTasksSet .add (mergeTask );
351341 // wait to be signalled before completing
352342 runMergeSemaphore .acquire ();
353- currentlyRunningOrAbortingMergeTasksSet .remove (mergeTask );
354343 currentlyRunningMergeTasksSet .remove (mergeTask );
355344 return null ;
356345 }).when (mergeTask ).run ();
357346 doAnswer (mock -> {
358- currentlyRunningOrAbortingMergeTasksSet .add (mergeTask );
359347 // wait to be signalled before completing
360348 runMergeSemaphore .acquire ();
361- currentlyRunningOrAbortingMergeTasksSet .remove (mergeTask );
362349 return null ;
363350 }).when (mergeTask ).abort ();
364- assertThat (runMergeSemaphore .availablePermits (), is (0 ));
365- boolean isAnyExecutorAvailable = currentlyRunningOrAbortingMergeTasksSet .size () < mergeExecutorThreadCount ;
366- boolean mergeTaskSubmitted = threadPoolMergeExecutorService .submitMergeTask (mergeTask );
367- assertTrue (mergeTaskSubmitted );
368- if (isAnyExecutorAvailable ) {
369- assertBusy (() -> assertThat (currentlyRunningOrAbortingMergeTasksSet , hasItem (mergeTask )));
370- }
351+ assertTrue (threadPoolMergeExecutorService .submitMergeTask (mergeTask ));
371352 long latestIORate = threadPoolMergeExecutorService .getTargetIORateBytesPerSec ();
372353 // all currently running merge tasks must be IO throttled to the latest IO Rate
373354 assertBusy (() -> {
0 commit comments