2222import org .junit .Before ;
2323
2424import java .util .Collection ;
25- import java .util .HashSet ;
25+ import java .util .Comparator ;
2626import java .util .PriorityQueue ;
27- import java .util .Set ;
2827import java .util .concurrent .CountDownLatch ;
29- import java .util .concurrent .atomic .AtomicReference ;
30- import java .util .function .Consumer ;
3128
3229import static org .hamcrest .Matchers .equalTo ;
33- import static org .mockito .ArgumentMatchers .any ;
3430import static org .mockito .ArgumentMatchers .anyDouble ;
3531import static org .mockito .Mockito .doAnswer ;
3632import static org .mockito .Mockito .mock ;
@@ -63,7 +59,6 @@ public void testMergeTasksAreAbortedWhenThreadPoolIsShutdown() {
6359 // shutdown the thread pool
6460 testThreadPool .shutdown ();
6561 MergeTask mergeTask = mock (MergeTask .class );
66- when (mergeTask .isRunning ()).thenReturn (false );
6762 boolean mergeTaskSupportsIOThrottling = randomBoolean ();
6863 when (mergeTask .supportsIOThrottling ()).thenReturn (mergeTaskSupportsIOThrottling );
6964 assertFalse (threadPoolMergeExecutorService .submitMergeTask (mergeTask ));
@@ -73,37 +68,34 @@ public void testMergeTasksAreAbortedWhenThreadPoolIsShutdown() {
7368 assertTrue (threadPoolMergeExecutorService .allDone ());
7469 }
7570
76- public void testBackloggedMergeTasksAreAllExecutedExactlyOnce () throws Exception {
77- int mergeExecutorThreadCount = randomIntBetween (1 , 2 );
71+ public void testBackloggedMergeTasksExecuteExactlyOnce () throws Exception {
72+ int mergeExecutorThreadCount = randomIntBetween (1 , 3 );
7873 Settings settings = Settings .builder ()
7974 .put (ThreadPoolMergeScheduler .USE_THREAD_POOL_MERGE_SCHEDULER_SETTING .getKey (), true )
80- // results in few merge threads, in order to increase contention
75+ // few merge threads, in order to increase contention
8176 .put (EsExecutors .NODE_PROCESSORS_SETTING .getKey (), mergeExecutorThreadCount )
8277 .build ();
8378 try (TestThreadPool testThreadPool = new TestThreadPool ("test" , settings )) {
8479 ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorService
8580 .maybeCreateThreadPoolMergeExecutorService (testThreadPool , settings );
8681 assertNotNull (threadPoolMergeExecutorService );
8782 assertThat (threadPoolMergeExecutorService .getMaxConcurrentMerges (), equalTo (mergeExecutorThreadCount ));
88- int mergeTaskCount = randomIntBetween ( 5 , 50 );
89- CountDownLatch mergeTasksDoneLatch = new CountDownLatch ( mergeTaskCount );
83+ // many merge tasks concurrently
84+ int mergeTaskCount = randomIntBetween ( 10 , 100 );
9085 CountDownLatch mergeTasksReadyLatch = new CountDownLatch (mergeTaskCount );
9186 CountDownLatch submitTaskLatch = new CountDownLatch (1 );
9287 Collection <MergeTask > generatedMergeTasks = ConcurrentCollections .newConcurrentSet ();
9388 for (int i = 0 ; i < mergeTaskCount ; i ++) {
9489 new Thread (() -> {
9590 MergeTask mergeTask = mock (MergeTask .class );
96- when (mergeTask .isRunning ()).thenReturn (false );
9791 boolean supportsIOThrottling = randomBoolean ();
9892 when (mergeTask .supportsIOThrottling ()).thenReturn (supportsIOThrottling );
9993 long mergeSize = randomNonNegativeLong ();
10094 when (mergeTask .estimatedMergeSize ()).thenReturn (mergeSize );
10195 doAnswer (mock -> {
10296 // each individual merge task can either "run" or be "backlogged"
10397 boolean runNowOrBacklog = randomBoolean ();
104- if (runNowOrBacklog ) {
105- mergeTasksDoneLatch .countDown ();
106- } else {
98+ if (runNowOrBacklog == false ) {
10799 testThreadPool .executor (ThreadPool .Names .GENERIC ).execute (() -> {
108100 // reenqueue backlogged merge task
109101 threadPoolMergeExecutorService .reEnqueueBackloggedMergeTask (mergeTask );
@@ -120,13 +112,10 @@ public void testBackloggedMergeTasksAreAllExecutedExactlyOnce() throws Exception
120112 }
121113 safeAwait (mergeTasksReadyLatch );
122114 submitTaskLatch .countDown ();
123- safeAwait (mergeTasksDoneLatch );
124115 assertBusy (() -> {
125116 for (MergeTask mergeTask : generatedMergeTasks ) {
126117 verify (mergeTask , times (1 )).run ();
127- if (mergeTask .supportsIOThrottling ()) {
128- verify (mergeTask ).setIORateLimit (anyDouble ());
129- } else {
118+ if (mergeTask .supportsIOThrottling () == false ) {
130119 verify (mergeTask , times (0 )).setIORateLimit (anyDouble ());
131120 }
132121 }
@@ -135,57 +124,82 @@ public void testBackloggedMergeTasksAreAllExecutedExactlyOnce() throws Exception
135124 }
136125 }
137126
138- public void testMergeTasksRunInSizeOrderWithBacklog () {
127+ public void testBackloggedMergeTasksExecuteInSizeOrder () {
139128 ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorService
140129 .maybeCreateThreadPoolMergeExecutorService (
141130 testThreadPool ,
142131 Settings .builder ().put (ThreadPoolMergeScheduler .USE_THREAD_POOL_MERGE_SCHEDULER_SETTING .getKey (), true ).build ()
143132 );
144133 assertNotNull (threadPoolMergeExecutorService );
145- threadPoolMergeExecutorService .submitMergeTask ()
146- // int mergeTaskCount = randomIntBetween(5, 50);
147- int mergeTaskCount = 4 ;
148- PriorityQueue <MergeTask > mergeTasksStillToRun = new PriorityQueue <>();
134+ DeterministicTaskQueue reEnqueueBackloggedTaskQueue = new DeterministicTaskQueue ();
135+ int mergeTaskCount = randomIntBetween (10 , 100 );
136+ PriorityQueue <MergeTask > mergeTasksAvailableToRun = new PriorityQueue <>(
137+ mergeTaskCount ,
138+ Comparator .comparingLong (MergeTask ::estimatedMergeSize )
139+ );
149140 for (int i = 0 ; i < mergeTaskCount ; i ++) {
150141 MergeTask mergeTask = mock (MergeTask .class );
151142 when (mergeTask .isRunning ()).thenReturn (false );
152143 boolean supportsIOThrottling = randomBoolean ();
153144 when (mergeTask .supportsIOThrottling ()).thenReturn (supportsIOThrottling );
154- long mergeSize = randomLongBetween (1 , 10 );
145+ // merge tasks of various sizes (0 might be a valid value)
146+ long mergeSize = randomLongBetween (0 , 10 );
155147 when (mergeTask .estimatedMergeSize ()).thenReturn (mergeSize );
156148 doAnswer (mock -> {
157- @ SuppressWarnings ("unchecked" )
158- MergeTask other = (MergeTask ) mock .getArguments ()[1 ];
159- return Long .com
160- }).when (mergeTask ).compareTo (any (MergeTask .class ));
161- doAnswer (mock -> {
162- mergeTasksStillToRun .remove (mergeTask );
163- // each individual merge task can either "run" or be "backlogged"
149+ // each individual merge task can either "run" or be "backlogged" at any point in time
164150 boolean runNowOrBacklog = randomBoolean ();
151+ // in either case, the merge task is, at least temporarily, not "available" to run
152+ mergeTasksAvailableToRun .remove (mergeTask );
153+ // if merge task cannot run, it is backlogged, and should be re enqueued some time in the future
165154 if (runNowOrBacklog == false ) {
166- if (mergeTasksStillToRun .isEmpty ()) {
167- // reenqueue backlogged merge task now, otherwise the task won't finish
155+ // reenqueue backlogged merge task sometime in the future
156+ reEnqueueBackloggedTaskQueue .scheduleNow (() -> {
157+ // reenqueue backlogged merge task sometime in the future
168158 threadPoolMergeExecutorService .reEnqueueBackloggedMergeTask (mergeTask );
169- mergeTasksStillToRun .add (mergeTask );
170- } else {
171- testThreadPool .executor (ThreadPool .Names .GENERIC ).execute (() -> {
172- // reenqueue backlogged merge task sometime in the future
173- threadPoolMergeExecutorService .reEnqueueBackloggedMergeTask (mergeTask );
174- mergeTasksStillToRun .add (mergeTask );
175- });
176- }
159+ // the merge task should once again be "available" to run
160+ mergeTasksAvailableToRun .add (mergeTask );
161+ });
162+ }
163+ // avoid blocking for unavailable merge task by running one re-enqueuing task now
164+ if (runNowOrBacklog == false && mergeTasksAvailableToRun .isEmpty ()) {
165+ assertTrue (runOneTask (reEnqueueBackloggedTaskQueue ));
166+ }
167+ if (runNowOrBacklog && mergeTasksAvailableToRun .isEmpty () == false ) {
168+ // assert the merge task that's now going to run is the smallest of the ones currently available to run
169+ assertTrue (mergeTask .estimatedMergeSize () <= mergeTasksAvailableToRun .peek ().estimatedMergeSize ());
177170 }
178171 return runNowOrBacklog ;
179172 }).when (mergeTask ).runNowOrBacklog ();
180- doAnswer (mock -> {
181- if (mergeTasksStillToRun .isEmpty () == false ) {
182- assertTrue (mergeTask .estimatedMergeSize () <= mergeTasksStillToRun .peek ().estimatedMergeSize ());
183- }
184- return null ;
185- }).when (mergeTask ).run ();
186- mergeTasksStillToRun .add (mergeTask );
173+ mergeTasksAvailableToRun .add (mergeTask );
187174 threadPoolMergeExecutorService .submitMergeTask (mergeTask );
188175 }
189- deterministicTaskQueue .runAllTasks ();
176+ while (true ) {
177+ // re-enqueue merge tasks
178+ if (mergeTasksAvailableToRun .isEmpty () || randomBoolean ()) {
179+ boolean backlogReEnqueued = runOneTask (reEnqueueBackloggedTaskQueue );
180+ if (mergeTasksAvailableToRun .isEmpty () && backlogReEnqueued == false ) {
181+ // test complete, all merges ran, and none is backlogged
182+ assertFalse (deterministicTaskQueue .hasAnyTasks ());
183+ assertFalse (reEnqueueBackloggedTaskQueue .hasAnyTasks ());
184+ assertTrue (threadPoolMergeExecutorService .allDone ());
185+ break ;
186+ }
187+ } else {
188+ // run one merge task
189+ runOneTask (deterministicTaskQueue );
190+ }
191+ }
192+ }
193+
194+ private boolean runOneTask (DeterministicTaskQueue deterministicTaskQueue ) {
195+ while (deterministicTaskQueue .hasAnyTasks ()) {
196+ if (deterministicTaskQueue .hasRunnableTasks ()) {
197+ deterministicTaskQueue .runRandomTask ();
198+ return true ;
199+ } else {
200+ deterministicTaskQueue .advanceTime ();
201+ }
202+ }
203+ return false ;
190204 }
191205}
0 commit comments