@@ -173,6 +173,165 @@ public void testSimpleMergeTaskReEnqueueingBySize() {
173173 }
174174 }
175175
176+ public void testIndexingThrottlingWhenSubmittingMerges () {
177+ final int maxThreadCount = randomIntBetween (1 , 5 );
178+ // settings validation requires maxMergeCount >= maxThreadCount
179+ final int maxMergeCount = maxThreadCount + randomIntBetween (0 , 5 );
180+ List <MergeTask > submittedMergeTasks = new ArrayList <>();
181+ AtomicBoolean isUsingMaxTargetIORate = new AtomicBoolean (false );
182+ ThreadPoolMergeExecutorService threadPoolMergeExecutorService = mockThreadPoolMergeExecutorService (
183+ submittedMergeTasks ,
184+ isUsingMaxTargetIORate
185+ );
186+ Settings mergeSchedulerSettings = Settings .builder ()
187+ .put (MergeSchedulerConfig .MAX_THREAD_COUNT_SETTING .getKey (), maxThreadCount )
188+ .put (MergeSchedulerConfig .MAX_MERGE_COUNT_SETTING .getKey (), maxMergeCount )
189+ .build ();
190+ TestThreadPoolMergeScheduler threadPoolMergeScheduler = new TestThreadPoolMergeScheduler (
191+ new ShardId ("index" , "_na_" , 1 ),
192+ IndexSettingsModule .newIndexSettings ("index" , mergeSchedulerSettings ),
193+ threadPoolMergeExecutorService
194+ );
195+ // make sure there are more merges submitted than the max merge count limit (which triggers IO throttling)
196+ int excessMerges = randomIntBetween (1 , 10 );
197+ int mergesToSubmit = maxMergeCount + excessMerges ;
198+ boolean expectIndexThrottling = false ;
199+ int submittedMerges = 0 ;
200+ // merges are submitted, while some are also scheduled (but none is run)
201+ while (submittedMerges < mergesToSubmit - 1 ) {
202+ isUsingMaxTargetIORate .set (randomBoolean ());
203+ if (submittedMergeTasks .isEmpty () == false && randomBoolean ()) {
204+ // maybe schedule one submitted merge
205+ MergeTask mergeTask = randomFrom (submittedMergeTasks );
206+ submittedMergeTasks .remove (mergeTask );
207+ mergeTask .schedule ();
208+ } else {
209+ // submit one merge
210+ MergeSource mergeSource = mock (MergeSource .class );
211+ OneMerge oneMerge = mock (OneMerge .class );
212+ when (oneMerge .getStoreMergeInfo ()).thenReturn (getNewMergeInfo (randomLongBetween (1L , 10L )));
213+ when (oneMerge .getMergeProgress ()).thenReturn (new MergePolicy .OneMergeProgress ());
214+ when (mergeSource .getNextMerge ()).thenReturn (oneMerge , (OneMerge ) null );
215+ threadPoolMergeScheduler .merge (mergeSource , randomFrom (MergeTrigger .values ()));
216+ submittedMerges ++;
217+ if (isUsingMaxTargetIORate .get () && submittedMerges > maxMergeCount ) {
218+ expectIndexThrottling = true ;
219+ } else if (submittedMerges <= maxMergeCount ) {
220+ expectIndexThrottling = false ;
221+ }
222+ }
223+ // assert IO throttle state
224+ assertThat (threadPoolMergeScheduler .isIndexingThrottlingEnabled (), is (expectIndexThrottling ));
225+ }
226+ // submit one last merge when IO throttling is at max value
227+ isUsingMaxTargetIORate .set (true );
228+ MergeSource mergeSource = mock (MergeSource .class );
229+ OneMerge oneMerge = mock (OneMerge .class );
230+ when (oneMerge .getStoreMergeInfo ()).thenReturn (getNewMergeInfo (randomLongBetween (1L , 10L )));
231+ when (oneMerge .getMergeProgress ()).thenReturn (new MergePolicy .OneMergeProgress ());
232+ when (mergeSource .getNextMerge ()).thenReturn (oneMerge , (OneMerge ) null );
233+ threadPoolMergeScheduler .merge (mergeSource , randomFrom (MergeTrigger .values ()));
234+ // assert index throttling because IO throttling is at max value
235+ assertThat (threadPoolMergeScheduler .isIndexingThrottlingEnabled (), is (true ));
236+ }
237+
238+ public void testIndexingThrottlingWhileMergesAreRunning () {
239+ final int maxThreadCount = randomIntBetween (1 , 5 );
240+ // settings validation requires maxMergeCount >= maxThreadCount
241+ final int maxMergeCount = maxThreadCount + randomIntBetween (0 , 5 );
242+ List <MergeTask > submittedMergeTasks = new ArrayList <>();
243+ List <MergeTask > scheduledToRunMergeTasks = new ArrayList <>();
244+ AtomicBoolean isUsingMaxTargetIORate = new AtomicBoolean (false );
245+ ThreadPoolMergeExecutorService threadPoolMergeExecutorService = mockThreadPoolMergeExecutorService (
246+ submittedMergeTasks ,
247+ isUsingMaxTargetIORate
248+ );
249+ Settings mergeSchedulerSettings = Settings .builder ()
250+ .put (MergeSchedulerConfig .MAX_THREAD_COUNT_SETTING .getKey (), maxThreadCount )
251+ .put (MergeSchedulerConfig .MAX_MERGE_COUNT_SETTING .getKey (), maxMergeCount )
252+ .build ();
253+ TestThreadPoolMergeScheduler threadPoolMergeScheduler = new TestThreadPoolMergeScheduler (
254+ new ShardId ("index" , "_na_" , 1 ),
255+ IndexSettingsModule .newIndexSettings ("index" , mergeSchedulerSettings ),
256+ threadPoolMergeExecutorService
257+ );
258+ int mergesToRun = randomIntBetween (0 , 5 );
259+ // make sure there are more merges submitted and not run
260+ int excessMerges = randomIntBetween (1 , 10 );
261+ int mergesToSubmit = maxMergeCount + mergesToRun + excessMerges ;
262+ int mergesOutstanding = 0 ;
263+ boolean expectIndexThrottling = false ;
264+ // merges are submitted, while some are also scheduled and run
265+ while (mergesToSubmit > 0 ) {
266+ isUsingMaxTargetIORate .set (randomBoolean ());
267+ if (submittedMergeTasks .isEmpty () == false && randomBoolean ()) {
268+ // maybe schedule one submitted merge
269+ MergeTask mergeTask = randomFrom (submittedMergeTasks );
270+ submittedMergeTasks .remove (mergeTask );
271+ Schedule schedule = mergeTask .schedule ();
272+ if (schedule == Schedule .RUN ) {
273+ scheduledToRunMergeTasks .add (mergeTask );
274+ }
275+ } else {
276+ if (mergesToRun > 0 && scheduledToRunMergeTasks .isEmpty () == false && randomBoolean ()) {
277+ // maybe run one scheduled merge
278+ MergeTask mergeTask = randomFrom (scheduledToRunMergeTasks );
279+ scheduledToRunMergeTasks .remove (mergeTask );
280+ mergeTask .run ();
281+ mergesToRun --;
282+ mergesOutstanding --;
283+ } else {
284+ // submit one merge
285+ MergeSource mergeSource = mock (MergeSource .class );
286+ OneMerge oneMerge = mock (OneMerge .class );
287+ when (oneMerge .getStoreMergeInfo ()).thenReturn (getNewMergeInfo (randomLongBetween (1L , 10L )));
288+ when (oneMerge .getMergeProgress ()).thenReturn (new MergePolicy .OneMergeProgress ());
289+ when (mergeSource .getNextMerge ()).thenReturn (oneMerge , (OneMerge ) null );
290+ threadPoolMergeScheduler .merge (mergeSource , randomFrom (MergeTrigger .values ()));
291+ mergesToSubmit --;
292+ mergesOutstanding ++;
293+ }
294+ if (isUsingMaxTargetIORate .get () && mergesOutstanding > maxMergeCount ) {
295+ expectIndexThrottling = true ;
296+ } else if (mergesOutstanding <= maxMergeCount ) {
297+ expectIndexThrottling = false ;
298+ }
299+ }
300+ // assert IO throttle state
301+ assertThat (threadPoolMergeScheduler .isIndexingThrottlingEnabled (), is (expectIndexThrottling ));
302+ }
303+ // execute all remaining merges (submitted or scheduled)
304+ while (mergesToRun > 0 || submittedMergeTasks .isEmpty () == false || scheduledToRunMergeTasks .isEmpty () == false ) {
305+ // simulate that the {@link ThreadPoolMergeExecutorService} maybe peaked IO un-throttling
306+ isUsingMaxTargetIORate .set (randomBoolean ());
307+ if (submittedMergeTasks .isEmpty () == false && (scheduledToRunMergeTasks .isEmpty () || randomBoolean ())) {
308+ // maybe schedule one submitted merge
309+ MergeTask mergeTask = randomFrom (submittedMergeTasks );
310+ submittedMergeTasks .remove (mergeTask );
311+ Schedule schedule = mergeTask .schedule ();
312+ if (schedule == Schedule .RUN ) {
313+ scheduledToRunMergeTasks .add (mergeTask );
314+ }
315+ } else {
316+ // maybe run one scheduled merge
317+ MergeTask mergeTask = randomFrom (scheduledToRunMergeTasks );
318+ scheduledToRunMergeTasks .remove (mergeTask );
319+ mergeTask .run ();
320+ mergesToRun --;
321+ mergesOutstanding --;
322+ if (isUsingMaxTargetIORate .get () && mergesOutstanding > maxMergeCount ) {
323+ expectIndexThrottling = true ;
324+ } else if (mergesOutstanding <= maxMergeCount ) {
325+ expectIndexThrottling = false ;
326+ }
327+ }
328+ // assert IO throttle state
329+ assertThat (threadPoolMergeScheduler .isIndexingThrottlingEnabled (), is (expectIndexThrottling ));
330+ }
331+ // all merges done
332+ assertThat (threadPoolMergeScheduler .isIndexingThrottlingEnabled (), is (false ));
333+ }
334+
176335 public void testMergeSourceWithFollowUpMergesRunSequentially () throws Exception {
177336 // test with min 2 allowed concurrent merges
178337 int mergeExecutorThreadCount = randomIntBetween (2 , 5 );
@@ -493,4 +652,49 @@ private static MergeInfo getNewMergeInfo(long estimatedMergeBytes) {
493652 private static MergeInfo getNewMergeInfo (long estimatedMergeBytes , int maxNumSegments ) {
494653 return new MergeInfo (randomNonNegativeInt (), estimatedMergeBytes , randomBoolean (), maxNumSegments );
495654 }
655+
656+ static class TestThreadPoolMergeScheduler extends ThreadPoolMergeScheduler {
657+ AtomicBoolean isIndexingThrottlingEnabled = new AtomicBoolean (false );
658+
659+ TestThreadPoolMergeScheduler (
660+ ShardId shardId ,
661+ IndexSettings indexSettings ,
662+ ThreadPoolMergeExecutorService threadPoolMergeExecutorService
663+ ) {
664+ super (shardId , indexSettings , threadPoolMergeExecutorService );
665+ }
666+
667+ @ Override
668+ protected void enableIndexingThrottling (int numRunningMerges , int numQueuedMerges , int configuredMaxMergeCount ) {
669+ isIndexingThrottlingEnabled .set (true );
670+ }
671+
672+ @ Override
673+ protected void disableIndexingThrottling (int numRunningMerges , int numQueuedMerges , int configuredMaxMergeCount ) {
674+ isIndexingThrottlingEnabled .set (false );
675+ }
676+
677+ boolean isIndexingThrottlingEnabled () {
678+ return isIndexingThrottlingEnabled .get ();
679+ }
680+ }
681+
682+ static ThreadPoolMergeExecutorService mockThreadPoolMergeExecutorService (
683+ List <MergeTask > submittedMergeTasks ,
684+ AtomicBoolean isUsingMaxTargetIORate
685+ ) {
686+ ThreadPoolMergeExecutorService threadPoolMergeExecutorService = mock (ThreadPoolMergeExecutorService .class );
687+ doAnswer (invocation -> {
688+ MergeTask mergeTask = (MergeTask ) invocation .getArguments ()[0 ];
689+ submittedMergeTasks .add (mergeTask );
690+ return null ;
691+ }).when (threadPoolMergeExecutorService ).submitMergeTask (any (MergeTask .class ));
692+ doAnswer (invocation -> {
693+ MergeTask mergeTask = (MergeTask ) invocation .getArguments ()[0 ];
694+ submittedMergeTasks .add (mergeTask );
695+ return null ;
696+ }).when (threadPoolMergeExecutorService ).reEnqueueBackloggedMergeTask (any (MergeTask .class ));
697+ doAnswer (invocation -> isUsingMaxTargetIORate .get ()).when (threadPoolMergeExecutorService ).usingMaxTargetIORateBytesPerSec ();
698+ return threadPoolMergeExecutorService ;
699+ }
496700}
0 commit comments