2222import org .elasticsearch .threadpool .ThreadPool ;
2323import org .mockito .ArgumentCaptor ;
2424
25+ import java .io .IOException ;
2526import java .util .ArrayList ;
2627import java .util .Collection ;
2728import java .util .Comparator ;
5758
5859public class ThreadPoolMergeExecutorServiceTests extends ESTestCase {
5960
60- public void testNewMergeTaskIsAbortedWhenThreadPoolIsShutdown () {
61- TestThreadPool testThreadPool = new TestThreadPool ("test" );
62- ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService (testThreadPool );
61+ public void testNewMergeTaskIsAbortedWhenThreadPoolIsShutdown () throws IOException {
62+ TestThreadPool testThreadPool = new TestThreadPool ("test" , Settings .EMPTY );
63+ ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService (
64+ testThreadPool ,
65+ Settings .EMPTY ,
66+ newNodeEnvironment (Settings .EMPTY )
67+ );
6368 // shutdown the thread pool
6469 testThreadPool .shutdown ();
6570 MergeTask mergeTask = mock (MergeTask .class );
@@ -81,7 +86,11 @@ public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutd
8186 .put (EsExecutors .NODE_PROCESSORS_SETTING .getKey (), mergeExecutorThreadCount )
8287 .build ();
8388 TestThreadPool testThreadPool = new TestThreadPool ("test" , settings );
84- ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService (testThreadPool );
89+ ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService (
90+ testThreadPool ,
91+ settings ,
92+ newNodeEnvironment (settings )
93+ );
8594 var countingListener = new CountingMergeEventListener ();
8695 threadPoolMergeExecutorService .registerMergeEventListener (countingListener );
8796 assertThat (threadPoolMergeExecutorService .getMaxConcurrentMerges (), equalTo (mergeExecutorThreadCount ));
@@ -192,7 +201,11 @@ public void testTargetIORateChangesWhenSubmittingMergeTasks() throws Exception {
192201 .put (EsExecutors .NODE_PROCESSORS_SETTING .getKey (), mergeExecutorThreadCount )
193202 .build ();
194203 try (TestThreadPool testThreadPool = new TestThreadPool ("test" , settings )) {
195- ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService (testThreadPool );
204+ ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService (
205+ testThreadPool ,
206+ settings ,
207+ newNodeEnvironment (settings )
208+ );
196209 assertThat (threadPoolMergeExecutorService .getMaxConcurrentMerges (), equalTo (mergeExecutorThreadCount ));
197210 Semaphore runMergeSemaphore = new Semaphore (0 );
198211 AtomicInteger submittedIOThrottledMergeTasks = new AtomicInteger ();
@@ -272,7 +285,11 @@ public void testIORateIsAdjustedForRunningMergeTasks() throws Exception {
272285 .put (EsExecutors .NODE_PROCESSORS_SETTING .getKey (), mergeExecutorThreadCount )
273286 .build ();
274287 try (TestThreadPool testThreadPool = new TestThreadPool ("test" , settings )) {
275- ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService (testThreadPool );
288+ ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService (
289+ testThreadPool ,
290+ settings ,
291+ newNodeEnvironment (settings )
292+ );
276293 assertThat (threadPoolMergeExecutorService .getMaxConcurrentMerges (), equalTo (mergeExecutorThreadCount ));
277294 ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor ) testThreadPool .executor (ThreadPool .Names .MERGE );
278295 Semaphore runMergeSemaphore = new Semaphore (0 );
@@ -334,23 +351,23 @@ public void testIORateIsAdjustedForRunningMergeTasks() throws Exception {
334351 }
335352 }
336353
337- public void testIORateAdjustedForSubmittedTasksWhenExecutionRateIsSpeedy () {
354+ public void testIORateAdjustedForSubmittedTasksWhenExecutionRateIsSpeedy () throws IOException {
338355 // the executor runs merge tasks at a faster rate than the rate that merge tasks are submitted
339356 int submittedVsExecutedRateOutOf1000 = randomIntBetween (0 , 250 );
340357 testIORateAdjustedForSubmittedTasks (randomIntBetween (50 , 1000 ), submittedVsExecutedRateOutOf1000 , randomIntBetween (0 , 5 ));
341358 // executor starts running merges only after a considerable amount of merge tasks have already been submitted
342359 testIORateAdjustedForSubmittedTasks (randomIntBetween (50 , 1000 ), submittedVsExecutedRateOutOf1000 , randomIntBetween (5 , 50 ));
343360 }
344361
345- public void testIORateAdjustedForSubmittedTasksWhenExecutionRateIsSluggish () {
362+ public void testIORateAdjustedForSubmittedTasksWhenExecutionRateIsSluggish () throws IOException {
346363 // the executor runs merge tasks at a faster rate than the rate that merge tasks are submitted
347364 int submittedVsExecutedRateOutOf1000 = randomIntBetween (750 , 1000 );
348365 testIORateAdjustedForSubmittedTasks (randomIntBetween (50 , 1000 ), submittedVsExecutedRateOutOf1000 , randomIntBetween (0 , 5 ));
349366 // executor starts running merges only after a considerable amount of merge tasks have already been submitted
350367 testIORateAdjustedForSubmittedTasks (randomIntBetween (50 , 1000 ), submittedVsExecutedRateOutOf1000 , randomIntBetween (5 , 50 ));
351368 }
352369
353- public void testIORateAdjustedForSubmittedTasksWhenExecutionRateIsOnPar () {
370+ public void testIORateAdjustedForSubmittedTasksWhenExecutionRateIsOnPar () throws IOException {
354371 // the executor runs merge tasks at a faster rate than the rate that merge tasks are submitted
355372 int submittedVsExecutedRateOutOf1000 = randomIntBetween (250 , 750 );
356373 testIORateAdjustedForSubmittedTasks (randomIntBetween (50 , 1000 ), submittedVsExecutedRateOutOf1000 , randomIntBetween (0 , 5 ));
@@ -362,10 +379,11 @@ private void testIORateAdjustedForSubmittedTasks(
362379 int totalTasksToSubmit ,
363380 int submittedVsExecutedRateOutOf1000 ,
364381 int initialTasksToSubmit
365- ) {
382+ ) throws IOException {
366383 DeterministicTaskQueue mergeExecutorTaskQueue = new DeterministicTaskQueue ();
367384 ThreadPool mergeExecutorThreadPool = mergeExecutorTaskQueue .getThreadPool ();
368- ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService (mergeExecutorThreadPool );
385+ ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService (mergeExecutorThreadPool ,
386+ Settings .EMPTY , newNodeEnvironment (Settings .EMPTY ));
369387 final AtomicInteger currentlySubmittedMergeTaskCount = new AtomicInteger ();
370388 final AtomicLong targetIORateLimit = new AtomicLong (ThreadPoolMergeExecutorService .START_IO_RATE .getBytes ());
371389 final AtomicReference <MergeTask > lastRunTask = new AtomicReference <>();
@@ -425,7 +443,11 @@ public void testMergeTasksRunConcurrently() throws Exception {
425443 .put (EsExecutors .NODE_PROCESSORS_SETTING .getKey (), mergeExecutorThreadCount )
426444 .build ();
427445 try (TestThreadPool testThreadPool = new TestThreadPool ("test" , settings )) {
428- ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService (testThreadPool );
446+ ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService (
447+ testThreadPool ,
448+ settings ,
449+ newNodeEnvironment (settings )
450+ );
429451 assertThat (threadPoolMergeExecutorService .getMaxConcurrentMerges (), equalTo (mergeExecutorThreadCount ));
430452 // more merge tasks than max concurrent merges allowed to run concurrently
431453 int totalMergeTasksCount = mergeExecutorThreadCount + randomIntBetween (1 , 5 );
@@ -505,7 +527,8 @@ public void testThreadPoolStatsWithBackloggedMergeTasks() throws Exception {
505527 .put (EsExecutors .NODE_PROCESSORS_SETTING .getKey (), mergeExecutorThreadCount )
506528 .build ();
507529 try (TestThreadPool testThreadPool = new TestThreadPool ("test" , settings )) {
508- ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService (testThreadPool );
530+ ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService (testThreadPool , settings ,
531+ newNodeEnvironment (settings ));
509532 assertThat (threadPoolMergeExecutorService .getMaxConcurrentMerges (), equalTo (mergeExecutorThreadCount ));
510533 int totalMergeTasksCount = randomIntBetween (1 , 10 );
511534 ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor ) testThreadPool .executor (ThreadPool .Names .MERGE );
@@ -558,7 +581,11 @@ public void testBackloggedMergeTasksExecuteExactlyOnce() throws Exception {
558581 .put (EsExecutors .NODE_PROCESSORS_SETTING .getKey (), mergeExecutorThreadCount )
559582 .build ();
560583 try (TestThreadPool testThreadPool = new TestThreadPool ("test" , settings )) {
561- ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService (testThreadPool );
584+ ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService (
585+ testThreadPool ,
586+ settings ,
587+ newNodeEnvironment (settings )
588+ );
562589 assertThat (threadPoolMergeExecutorService .getMaxConcurrentMerges (), equalTo (mergeExecutorThreadCount ));
563590 // many merge tasks concurrently
564591 int mergeTaskCount = randomIntBetween (10 , 100 );
@@ -614,10 +641,14 @@ public void testBackloggedMergeTasksExecuteExactlyOnce() throws Exception {
614641 }
615642 }
616643
617- public void testMergeTasksExecuteInSizeOrder () {
644+ public void testMergeTasksExecuteInSizeOrder () throws IOException {
618645 DeterministicTaskQueue mergeExecutorTaskQueue = new DeterministicTaskQueue ();
619646 ThreadPool mergeExecutorThreadPool = mergeExecutorTaskQueue .getThreadPool ();
620- ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService (mergeExecutorThreadPool );
647+ ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService (
648+ mergeExecutorThreadPool ,
649+ Settings .EMPTY ,
650+ newNodeEnvironment (Settings .EMPTY )
651+ );
621652 DeterministicTaskQueue reEnqueueBackloggedTaskQueue = new DeterministicTaskQueue ();
622653 int mergeTaskCount = randomIntBetween (10 , 100 );
623654 // sort merge tasks available to run by size
@@ -697,14 +728,21 @@ public void onMergeAborted(OnGoingMerge merge) {
697728 }
698729 }
699730
700- static ThreadPoolMergeExecutorService getThreadPoolMergeExecutorService (ThreadPool threadPool ) {
731+ static ThreadPoolMergeExecutorService getThreadPoolMergeExecutorService (
732+ ThreadPool threadPool ,
733+ Settings settings ,
734+ NodeEnvironment nodeEnvironment
735+ ) {
701736 ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorService
702737 .maybeCreateThreadPoolMergeExecutorService (
703738 threadPool ,
704739 randomBoolean ()
705- ? Settings .EMPTY
706- : Settings .builder ().put (ThreadPoolMergeScheduler .USE_THREAD_POOL_MERGE_SCHEDULER_SETTING .getKey (), true ).build (),
707- mock (NodeEnvironment .class )
740+ ? settings
741+ : Settings .builder ()
742+ .put (settings )
743+ .put (ThreadPoolMergeScheduler .USE_THREAD_POOL_MERGE_SCHEDULER_SETTING .getKey (), true )
744+ .build (),
745+ nodeEnvironment
708746 );
709747 assertNotNull (threadPoolMergeExecutorService );
710748 assertTrue (threadPoolMergeExecutorService .allDone ());
0 commit comments