@@ -70,7 +70,83 @@ public void testNewMergeTaskIsAbortedWhenThreadPoolIsShutdown() {
7070 assertTrue (threadPoolMergeExecutorService .allDone ());
7171 }
7272
73- public void testIORateAdjustedForRunningTasks () throws Exception {
73+ public void testTargetIORateChangesWhenSubmittingMergeTasks () throws Exception {
74+ int mergeExecutorThreadCount = randomIntBetween (1 , 5 );
75+ int mergesStillToSubmit = randomIntBetween (1 , 100 );
76+ int mergesStillToComplete = mergesStillToSubmit ;
77+ Settings settings = Settings .builder ()
78+ .put (ThreadPoolMergeScheduler .USE_THREAD_POOL_MERGE_SCHEDULER_SETTING .getKey (), true )
79+ .put (EsExecutors .NODE_PROCESSORS_SETTING .getKey (), mergeExecutorThreadCount )
80+ .build ();
81+ try (TestThreadPool testThreadPool = new TestThreadPool ("test" , settings )) {
82+ ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorService
83+ .maybeCreateThreadPoolMergeExecutorService (testThreadPool , settings );
84+ assertNotNull (threadPoolMergeExecutorService );
85+ assertThat (threadPoolMergeExecutorService .getMaxConcurrentMerges (), equalTo (mergeExecutorThreadCount ));
86+ Semaphore runMergeSemaphore = new Semaphore (0 );
87+ AtomicInteger submittedIOThrottledMergeTasks = new AtomicInteger ();
88+ while (mergesStillToComplete > 0 ) {
89+ if (mergesStillToSubmit > 0
90+ && (threadPoolMergeExecutorService .getCurrentlyRunningMergeTasks ().isEmpty () || randomBoolean ())) {
91+ // submit new merge task
92+ MergeTask mergeTask = mock (MergeTask .class );
93+ boolean supportsIOThrottling = randomBoolean ();
94+ when (mergeTask .supportsIOThrottling ()).thenReturn (supportsIOThrottling );
95+ doAnswer (mock -> {
96+ // each individual merge task can either "run" or be "backlogged"
97+ boolean runNowOrBacklog = randomBoolean ();
98+ if (runNowOrBacklog == false ) {
99+ testThreadPool .executor (ThreadPool .Names .GENERIC ).execute (() -> {
100+ // reenqueue backlogged merge task
101+ threadPoolMergeExecutorService .reEnqueueBackloggedMergeTask (mergeTask );
102+ });
103+ }
104+ return runNowOrBacklog ;
105+ }).when (mergeTask ).runNowOrBacklog ();
106+ doAnswer (mock -> {
107+ // wait to be signalled before completing
108+ runMergeSemaphore .acquire ();
109+ if (supportsIOThrottling ) {
110+ submittedIOThrottledMergeTasks .decrementAndGet ();
111+ }
112+ return null ;
113+ }).when (mergeTask ).run ();
114+ long currentIORate = threadPoolMergeExecutorService .getTargetIORateBytesPerSec ();
115+ threadPoolMergeExecutorService .submitMergeTask (mergeTask );
116+ if (supportsIOThrottling ) {
117+ submittedIOThrottledMergeTasks .incrementAndGet ();
118+ }
119+ long newIORate = threadPoolMergeExecutorService .getTargetIORateBytesPerSec ();
120+ if (supportsIOThrottling ) {
121+ if (submittedIOThrottledMergeTasks .get () < threadPoolMergeExecutorService .getConcurrentMergesFloorLimitForThrottling ()) {
122+ // assert the IO rate decreases, with a floor limit, when there are few merge tasks enqueued
123+ assertThat (newIORate , either (is (MIN_IO_RATE .getBytes ())).or (lessThan (currentIORate )));
124+ } else if (submittedIOThrottledMergeTasks .get () > threadPoolMergeExecutorService .getConcurrentMergesCeilLimitForThrottling ()) {
125+ // assert the IO rate increases, with a ceiling limit, when there are many merge tasks enqueued
126+ assertThat (newIORate , either (is (MAX_IO_RATE .getBytes ())).or (greaterThan (currentIORate )));
127+ } else {
128+ // assert the IO rate does NOT change when there are a couple of merge tasks enqueued
129+ assertThat (newIORate , equalTo (currentIORate ));
130+ }
131+ } else {
132+ // assert the IO rate does change, when the merge task doesn't support IO throttling
133+ assertThat (newIORate , equalTo (currentIORate ));
134+ }
135+ mergesStillToSubmit --;
136+ } else {
137+ ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor ) testThreadPool .executor (ThreadPool .Names .MERGE );
138+ long completedMerges = threadPoolExecutor .getCompletedTaskCount ();
139+ runMergeSemaphore .release ();
140+ // await merge to finish
141+ assertBusy (() -> assertThat (threadPoolExecutor .getCompletedTaskCount (), is (completedMerges + 1 )));
142+ mergesStillToComplete --;
143+ }
144+ }
145+ assertBusy (() -> assertTrue (threadPoolMergeExecutorService .allDone ()));
146+ }
147+ }
148+
149+ public void testIORateIsAdjustedForRunningMergeTasks () throws Exception {
74150 int mergeExecutorThreadCount = randomIntBetween (1 , 3 );
75151 int mergesStillToSubmit = randomIntBetween (1 , 10 );
76152 int mergesStillToComplete = mergesStillToSubmit ;
@@ -111,21 +187,10 @@ public void testIORateAdjustedForRunningTasks() throws Exception {
111187 currentlyRunningMergeTasksSet .remove (mergeTask );
112188 return null ;
113189 }).when (mergeTask ).run ();
114- long currentIORate = threadPoolMergeExecutorService .getTargetIORateBytesPerSec ();
115190 int activeMergeTasksCount = threadPoolExecutor .getActiveCount ();
116191 threadPoolMergeExecutorService .submitMergeTask (mergeTask );
117192 currentlySubmittedTasks .incrementAndGet ();
118193 long newIORate = threadPoolMergeExecutorService .getTargetIORateBytesPerSec ();
119- if (currentlySubmittedTasks .get () < threadPoolMergeExecutorService .getConcurrentMergesFloorLimitForThrottling ()) {
120- // assert the IO rate decreases, with a floor limit, when there are few merge tasks enqueued
121- assertThat (newIORate , either (is (MIN_IO_RATE .getBytes ())).or (lessThan (currentIORate )));
122- } else if (currentlySubmittedTasks .get () > threadPoolMergeExecutorService .getConcurrentMergesCeilLimitForThrottling ()) {
123- // assert the IO rate increases, with a ceiling limit, when there are many merge tasks enqueued
124- assertThat (newIORate , either (is (MAX_IO_RATE .getBytes ())).or (greaterThan (currentIORate )));
125- } else {
126- // assert the IO rate does change, when there are a couple of merge tasks enqueued
127- assertThat (newIORate , equalTo (currentIORate ));
128- }
129194 // all currently running merge tasks must be IO throttled
130195 assertBusy (() -> {
131196 // await new merge to start executing
0 commit comments