From 7e7e624e1f415f96f7be5b43960cb6e7c63259bf Mon Sep 17 00:00:00 2001 From: Albert Zaharovits Date: Wed, 26 Mar 2025 13:08:17 +0200 Subject: [PATCH 01/13] Only start indexing throttling when disk throttling is lowest --- .../index/engine/ThreadPoolMergeExecutorService.java | 4 ++++ .../index/engine/ThreadPoolMergeScheduler.java | 8 ++++++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java index 5217edb5490dc..7c78698ac6f66 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java @@ -272,6 +272,10 @@ interface UpdateConsumer { } } + public boolean usingMaxTargetIORateBytesPerSec() { + return MAX_IO_RATE.getBytes() == targetIORateBytesPerSec.get(); + } + // exposed for tests Set getRunningMergeTasks() { return runningMergeTasks; diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java index 8cfdc59268365..d60e24b33131b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java @@ -191,14 +191,18 @@ private void checkMergeTaskThrottling() { int configuredMaxMergeCount = config.getMaxMergeCount(); // both currently running and enqueued merge tasks are considered "active" for throttling purposes int activeMerges = (int) (submittedMergesCount - doneMergesCount); - if (activeMerges > configuredMaxMergeCount && shouldThrottleIncomingMerges.get() == false) { + if (activeMerges > configuredMaxMergeCount + && threadPoolMergeExecutorService.usingMaxTargetIORateBytesPerSec() + && shouldThrottleIncomingMerges.get() == false) { // maybe enable merge task throttling synchronized (shouldThrottleIncomingMerges) { if (shouldThrottleIncomingMerges.getAndSet(true) == false) { enableIndexingThrottling(runningMergesCount, activeMerges - runningMergesCount, configuredMaxMergeCount); } } - } else if (activeMerges <= configuredMaxMergeCount && shouldThrottleIncomingMerges.get()) { + } else if (activeMerges <= configuredMaxMergeCount + && threadPoolMergeExecutorService.usingMaxTargetIORateBytesPerSec() + && shouldThrottleIncomingMerges.get()) { // maybe disable merge task throttling synchronized (shouldThrottleIncomingMerges) { if (shouldThrottleIncomingMerges.getAndSet(false)) { From 5c3ef425e416dab1f643826974fff0833cf0feca Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Wed, 26 Mar 2025 11:21:29 +0000 Subject: [PATCH 02/13] [CI] Auto commit changes from spotless --- .../index/engine/ThreadPoolMergeScheduler.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java index d60e24b33131b..4984aca78c046 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java @@ -203,13 +203,13 @@ private void checkMergeTaskThrottling() { } else if (activeMerges <= configuredMaxMergeCount && threadPoolMergeExecutorService.usingMaxTargetIORateBytesPerSec() && shouldThrottleIncomingMerges.get()) { - // maybe disable merge task throttling - synchronized (shouldThrottleIncomingMerges) { - if (shouldThrottleIncomingMerges.getAndSet(false)) { - disableIndexingThrottling(runningMergesCount, activeMerges - runningMergesCount, configuredMaxMergeCount); + // maybe disable merge task throttling + synchronized (shouldThrottleIncomingMerges) { + if (shouldThrottleIncomingMerges.getAndSet(false)) { + disableIndexingThrottling(runningMergesCount, activeMerges - runningMergesCount, configuredMaxMergeCount); + } } } - } } // exposed for tests From 54cc5abfb945a40cc6a9e5159d196ebd03fd6c05 Mon Sep 17 00:00:00 2001 From: Albert Zaharovits Date: Wed, 26 Mar 2025 13:27:54 +0200 Subject: [PATCH 03/13] Ooops --- .../elasticsearch/index/engine/ThreadPoolMergeScheduler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java index 4984aca78c046..02efb30def0a1 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java @@ -201,7 +201,7 @@ private void checkMergeTaskThrottling() { } } } else if (activeMerges <= configuredMaxMergeCount - && threadPoolMergeExecutorService.usingMaxTargetIORateBytesPerSec() + && threadPoolMergeExecutorService.usingMaxTargetIORateBytesPerSec() == false && shouldThrottleIncomingMerges.get()) { // maybe disable merge task throttling synchronized (shouldThrottleIncomingMerges) { From d1b819374796feeec736ecb2762427633033caf2 Mon Sep 17 00:00:00 2001 From: Albert Zaharovits Date: Thu, 27 Mar 2025 15:43:37 +0200 Subject: [PATCH 04/13] testIndexingThrottling --- .../engine/ThreadPoolMergeScheduler.java | 7 +- .../engine/ThreadPoolMergeSchedulerTests.java | 140 ++++++++++++++++++ 2 files changed, 143 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java index 02efb30def0a1..6df75429d0250 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java @@ -50,7 +50,7 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics ); private final ShardId shardId; private final MergeSchedulerConfig config; - private final Logger logger; + protected final Logger logger; private final MergeTracking mergeTracking; private final ThreadPoolMergeExecutorService threadPoolMergeExecutorService; private final PriorityQueue backloggedMergeTasks = new PriorityQueue<>( @@ -192,6 +192,7 @@ private void checkMergeTaskThrottling() { // both currently running and enqueued merge tasks are considered "active" for throttling purposes int activeMerges = (int) (submittedMergesCount - doneMergesCount); if (activeMerges > configuredMaxMergeCount + // only throttle indexing if disk IO is un-throttled, and we still can't keep up with the merge load && threadPoolMergeExecutorService.usingMaxTargetIORateBytesPerSec() && shouldThrottleIncomingMerges.get() == false) { // maybe enable merge task throttling @@ -200,9 +201,7 @@ private void checkMergeTaskThrottling() { enableIndexingThrottling(runningMergesCount, activeMerges - runningMergesCount, configuredMaxMergeCount); } } - } else if (activeMerges <= configuredMaxMergeCount - && threadPoolMergeExecutorService.usingMaxTargetIORateBytesPerSec() == false - && shouldThrottleIncomingMerges.get()) { + } else if (activeMerges <= configuredMaxMergeCount && shouldThrottleIncomingMerges.get()) { // maybe disable merge task throttling synchronized (shouldThrottleIncomingMerges) { if (shouldThrottleIncomingMerges.getAndSet(false)) { diff --git a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java index 5e085c083b785..013346a078e3b 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java @@ -38,6 +38,8 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; +import java.util.function.Consumer; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; @@ -173,6 +175,117 @@ public void testSimpleMergeTaskReEnqueueingBySize() { } } + public void testIndexingThrottling() { + final int maxThreadCount = randomIntBetween(1, 5); + // settings validation requires maxMergeCount >= maxThreadCount + final int maxMergeCount = maxThreadCount + randomIntBetween(0, 5); + List submittedMergeTasks = new ArrayList<>(); + List scheduledToRunMergeTasks = new ArrayList<>(); + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = mock(ThreadPoolMergeExecutorService.class); + doAnswer(invocation -> { + MergeTask mergeTask = (MergeTask) invocation.getArguments()[0]; + submittedMergeTasks.add(mergeTask); + return null; + }).when(threadPoolMergeExecutorService).submitMergeTask(any(MergeTask.class)); + doAnswer(invocation -> { + MergeTask mergeTask = (MergeTask) invocation.getArguments()[0]; + submittedMergeTasks.add(mergeTask); + return null; + }).when(threadPoolMergeExecutorService).reEnqueueBackloggedMergeTask(any(MergeTask.class)); + AtomicBoolean isUsingMaxTargetIORate = new AtomicBoolean(false); + doAnswer(invocation -> isUsingMaxTargetIORate.get()).when(threadPoolMergeExecutorService).usingMaxTargetIORateBytesPerSec(); + Settings mergeSchedulerSettings = Settings.builder() + .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), maxThreadCount) + .put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), maxMergeCount) + .build(); + AtomicBoolean isIndexingThrottlingEnabled = new AtomicBoolean(false); + ThreadPoolMergeScheduler threadPoolMergeScheduler = new TestThreadPoolMergeScheduler( + new ShardId("index", "_na_", 1), + IndexSettingsModule.newIndexSettings("index", mergeSchedulerSettings), + threadPoolMergeExecutorService, + (numRunningMerges, numQueuedMerges) -> isIndexingThrottlingEnabled.set(true), + (numRunningMerges, numQueuedMerges) -> isIndexingThrottlingEnabled.set(false) + ); + int mergesToRun = randomIntBetween(0, 5); + // make sure there are more merges submitted and not run + int excessMerges = randomIntBetween(1, 10); + int mergesToSubmit = maxMergeCount + mergesToRun + excessMerges; + int mergesOutstanding = 0; + boolean expectIndexThrottling = false; + // simulate merge load, while also scheduling and running merges + while (mergesToSubmit > 0 || mergesToRun > 0) { + // simulate that the {@link ThreadPoolMergeExecutorService} maybe peaked IO un-throttling + isUsingMaxTargetIORate.set(randomBoolean()); + if (mergesToRun > 0 && scheduledToRunMergeTasks.isEmpty() == false && randomBoolean()) { + // maybe run one scheduled merge + MergeTask mergeTask = randomFrom(scheduledToRunMergeTasks); + scheduledToRunMergeTasks.remove(mergeTask); + mergeTask.run(); + mergesToRun--; + mergesOutstanding--; + if (isUsingMaxTargetIORate.get() && mergesOutstanding > maxMergeCount) { + expectIndexThrottling = true; + } else if (mergesOutstanding <= maxMergeCount) { + expectIndexThrottling = false; + } + } else if (submittedMergeTasks.isEmpty() == false && (mergesToSubmit == 0 || randomBoolean())) { + // maybe schedule one submitted merge + MergeTask mergeTask = randomFrom(submittedMergeTasks); + submittedMergeTasks.remove(mergeTask); + Schedule schedule = mergeTask.schedule(); + if (schedule == Schedule.RUN) { + scheduledToRunMergeTasks.add(mergeTask); + } + } else if (mergesToSubmit > 0) { + // submit one merge + MergeSource mergeSource = mock(MergeSource.class); + OneMerge oneMerge = mock(OneMerge.class); + when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L))); + when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress()); + when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null); + threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values())); + mergesToSubmit--; + mergesOutstanding++; + if (isUsingMaxTargetIORate.get() && mergesOutstanding > maxMergeCount) { + expectIndexThrottling = true; + } else if (mergesOutstanding <= maxMergeCount) { + expectIndexThrottling = false; + } + } + // assert IO throttle state + assertThat(isIndexingThrottlingEnabled.get(), is(expectIndexThrottling)); + } + // execute all remaining merges (submitted or scheduled) + while (submittedMergeTasks.isEmpty() == false || scheduledToRunMergeTasks.isEmpty() == false) { + // simulate that the {@link ThreadPoolMergeExecutorService} maybe peaked IO un-throttling + isUsingMaxTargetIORate.set(randomBoolean()); + if (submittedMergeTasks.isEmpty() == false && (scheduledToRunMergeTasks.isEmpty() || randomBoolean())) { + // maybe schedule one submitted merge + MergeTask mergeTask = randomFrom(submittedMergeTasks); + submittedMergeTasks.remove(mergeTask); + Schedule schedule = mergeTask.schedule(); + if (schedule == Schedule.RUN) { + scheduledToRunMergeTasks.add(mergeTask); + } + } else { + // maybe run one scheduled merge + MergeTask mergeTask = randomFrom(scheduledToRunMergeTasks); + scheduledToRunMergeTasks.remove(mergeTask); + mergeTask.run(); + mergesOutstanding--; + if (isUsingMaxTargetIORate.get() && mergesOutstanding > maxMergeCount) { + expectIndexThrottling = true; + } else if (mergesOutstanding <= maxMergeCount) { + expectIndexThrottling = false; + } + } + // assert IO throttle state + assertThat(isIndexingThrottlingEnabled.get(), is(expectIndexThrottling)); + } + // all merges done + assertThat(isIndexingThrottlingEnabled.get(), is(false)); + } + public void testMergeSourceWithFollowUpMergesRunSequentially() throws Exception { // test with min 2 allowed concurrent merges int mergeExecutorThreadCount = randomIntBetween(2, 5); @@ -493,4 +606,31 @@ private static MergeInfo getNewMergeInfo(long estimatedMergeBytes) { private static MergeInfo getNewMergeInfo(long estimatedMergeBytes, int maxNumSegments) { return new MergeInfo(randomNonNegativeInt(), estimatedMergeBytes, randomBoolean(), maxNumSegments); } + + static class TestThreadPoolMergeScheduler extends ThreadPoolMergeScheduler { + private BiConsumer enableIndexingThrottlingHook; + private BiConsumer disableIndexingThrottlingHook; + + public TestThreadPoolMergeScheduler( + ShardId shardId, + IndexSettings indexSettings, + ThreadPoolMergeExecutorService threadPoolMergeExecutorService, + BiConsumer enableIndexingThrottlingHook, + BiConsumer disableIndexingThrottlingHook + ) { + super(shardId, indexSettings, threadPoolMergeExecutorService); + this.enableIndexingThrottlingHook = enableIndexingThrottlingHook; + this.disableIndexingThrottlingHook = disableIndexingThrottlingHook; + } + + @Override + protected void enableIndexingThrottling(int numRunningMerges, int numQueuedMerges, int configuredMaxMergeCount) { + enableIndexingThrottlingHook.accept(numRunningMerges, numQueuedMerges); + } + + @Override + protected void disableIndexingThrottling(int numRunningMerges, int numQueuedMerges, int configuredMaxMergeCount) { + disableIndexingThrottlingHook.accept(numRunningMerges, numQueuedMerges); + } + } } From 67fe301b51f5735aac3eb6bc591cac414d69da5c Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Thu, 27 Mar 2025 13:51:25 +0000 Subject: [PATCH 05/13] [CI] Auto commit changes from spotless --- .../index/engine/ThreadPoolMergeScheduler.java | 10 +++++----- .../index/engine/ThreadPoolMergeSchedulerTests.java | 1 - 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java index 6df75429d0250..f645edaff64a8 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java @@ -202,13 +202,13 @@ private void checkMergeTaskThrottling() { } } } else if (activeMerges <= configuredMaxMergeCount && shouldThrottleIncomingMerges.get()) { - // maybe disable merge task throttling - synchronized (shouldThrottleIncomingMerges) { - if (shouldThrottleIncomingMerges.getAndSet(false)) { - disableIndexingThrottling(runningMergesCount, activeMerges - runningMergesCount, configuredMaxMergeCount); - } + // maybe disable merge task throttling + synchronized (shouldThrottleIncomingMerges) { + if (shouldThrottleIncomingMerges.getAndSet(false)) { + disableIndexingThrottling(runningMergesCount, activeMerges - runningMergesCount, configuredMaxMergeCount); } } + } } // exposed for tests diff --git a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java index 013346a078e3b..24d0d73eea26f 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java @@ -39,7 +39,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; -import java.util.function.Consumer; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; From 774389db9962c16d59fba9e32bc2f4ac95e8f23e Mon Sep 17 00:00:00 2001 From: Albert Zaharovits Date: Thu, 27 Mar 2025 16:05:54 +0200 Subject: [PATCH 06/13] Checkstyle --- .../index/engine/ThreadPoolMergeSchedulerTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java index 24d0d73eea26f..ef0da08ec091f 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java @@ -610,7 +610,7 @@ static class TestThreadPoolMergeScheduler extends ThreadPoolMergeScheduler { private BiConsumer enableIndexingThrottlingHook; private BiConsumer disableIndexingThrottlingHook; - public TestThreadPoolMergeScheduler( + TestThreadPoolMergeScheduler( ShardId shardId, IndexSettings indexSettings, ThreadPoolMergeExecutorService threadPoolMergeExecutorService, From 7864c46b34d065d4c523aa97bfbd49e90d1e9aea Mon Sep 17 00:00:00 2001 From: Albert Zaharovits Date: Fri, 28 Mar 2025 08:11:52 +0200 Subject: [PATCH 07/13] TestThreadPoolMergeScheduler --- .../engine/ThreadPoolMergeSchedulerTests.java | 30 ++++++++----------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java index ef0da08ec091f..140925c27e99d 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java @@ -197,13 +197,10 @@ public void testIndexingThrottling() { .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), maxThreadCount) .put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), maxMergeCount) .build(); - AtomicBoolean isIndexingThrottlingEnabled = new AtomicBoolean(false); - ThreadPoolMergeScheduler threadPoolMergeScheduler = new TestThreadPoolMergeScheduler( + TestThreadPoolMergeScheduler threadPoolMergeScheduler = new TestThreadPoolMergeScheduler( new ShardId("index", "_na_", 1), IndexSettingsModule.newIndexSettings("index", mergeSchedulerSettings), - threadPoolMergeExecutorService, - (numRunningMerges, numQueuedMerges) -> isIndexingThrottlingEnabled.set(true), - (numRunningMerges, numQueuedMerges) -> isIndexingThrottlingEnabled.set(false) + threadPoolMergeExecutorService ); int mergesToRun = randomIntBetween(0, 5); // make sure there are more merges submitted and not run @@ -252,7 +249,7 @@ public void testIndexingThrottling() { } } // assert IO throttle state - assertThat(isIndexingThrottlingEnabled.get(), is(expectIndexThrottling)); + assertThat(threadPoolMergeScheduler.isIndexingThrottlingEnabled(), is(expectIndexThrottling)); } // execute all remaining merges (submitted or scheduled) while (submittedMergeTasks.isEmpty() == false || scheduledToRunMergeTasks.isEmpty() == false) { @@ -279,10 +276,10 @@ public void testIndexingThrottling() { } } // assert IO throttle state - assertThat(isIndexingThrottlingEnabled.get(), is(expectIndexThrottling)); + assertThat(threadPoolMergeScheduler.isIndexingThrottlingEnabled(), is(expectIndexThrottling)); } // all merges done - assertThat(isIndexingThrottlingEnabled.get(), is(false)); + assertThat(threadPoolMergeScheduler.isIndexingThrottlingEnabled(), is(false)); } public void testMergeSourceWithFollowUpMergesRunSequentially() throws Exception { @@ -607,29 +604,28 @@ private static MergeInfo getNewMergeInfo(long estimatedMergeBytes, int maxNumSeg } static class TestThreadPoolMergeScheduler extends ThreadPoolMergeScheduler { - private BiConsumer enableIndexingThrottlingHook; - private BiConsumer disableIndexingThrottlingHook; + AtomicBoolean isIndexingThrottlingEnabled = new AtomicBoolean(false); TestThreadPoolMergeScheduler( ShardId shardId, IndexSettings indexSettings, - ThreadPoolMergeExecutorService threadPoolMergeExecutorService, - BiConsumer enableIndexingThrottlingHook, - BiConsumer disableIndexingThrottlingHook + ThreadPoolMergeExecutorService threadPoolMergeExecutorService ) { super(shardId, indexSettings, threadPoolMergeExecutorService); - this.enableIndexingThrottlingHook = enableIndexingThrottlingHook; - this.disableIndexingThrottlingHook = disableIndexingThrottlingHook; } @Override protected void enableIndexingThrottling(int numRunningMerges, int numQueuedMerges, int configuredMaxMergeCount) { - enableIndexingThrottlingHook.accept(numRunningMerges, numQueuedMerges); + isIndexingThrottlingEnabled.set(true); } @Override protected void disableIndexingThrottling(int numRunningMerges, int numQueuedMerges, int configuredMaxMergeCount) { - disableIndexingThrottlingHook.accept(numRunningMerges, numQueuedMerges); + isIndexingThrottlingEnabled.set(false); + } + + boolean isIndexingThrottlingEnabled() { + return isIndexingThrottlingEnabled.get(); } } } From 948da8117e6f758551e589ddd39c61a5811a8aef Mon Sep 17 00:00:00 2001 From: Albert Zaharovits Date: Fri, 28 Mar 2025 08:14:45 +0200 Subject: [PATCH 08/13] mockThreadPoolMergeExecutorService --- .../engine/ThreadPoolMergeSchedulerTests.java | 27 +++++++++++-------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java index 140925c27e99d..9283dd362cacc 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java @@ -180,17 +180,7 @@ public void testIndexingThrottling() { final int maxMergeCount = maxThreadCount + randomIntBetween(0, 5); List submittedMergeTasks = new ArrayList<>(); List scheduledToRunMergeTasks = new ArrayList<>(); - ThreadPoolMergeExecutorService threadPoolMergeExecutorService = mock(ThreadPoolMergeExecutorService.class); - doAnswer(invocation -> { - MergeTask mergeTask = (MergeTask) invocation.getArguments()[0]; - submittedMergeTasks.add(mergeTask); - return null; - }).when(threadPoolMergeExecutorService).submitMergeTask(any(MergeTask.class)); - doAnswer(invocation -> { - MergeTask mergeTask = (MergeTask) invocation.getArguments()[0]; - submittedMergeTasks.add(mergeTask); - return null; - }).when(threadPoolMergeExecutorService).reEnqueueBackloggedMergeTask(any(MergeTask.class)); + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = mockThreadPoolMergeExecutorService(submittedMergeTasks); AtomicBoolean isUsingMaxTargetIORate = new AtomicBoolean(false); doAnswer(invocation -> isUsingMaxTargetIORate.get()).when(threadPoolMergeExecutorService).usingMaxTargetIORateBytesPerSec(); Settings mergeSchedulerSettings = Settings.builder() @@ -628,4 +618,19 @@ boolean isIndexingThrottlingEnabled() { return isIndexingThrottlingEnabled.get(); } } + + static ThreadPoolMergeExecutorService mockThreadPoolMergeExecutorService(List submittedMergeTasks) { + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = mock(ThreadPoolMergeExecutorService.class); + doAnswer(invocation -> { + MergeTask mergeTask = (MergeTask) invocation.getArguments()[0]; + submittedMergeTasks.add(mergeTask); + return null; + }).when(threadPoolMergeExecutorService).submitMergeTask(any(MergeTask.class)); + doAnswer(invocation -> { + MergeTask mergeTask = (MergeTask) invocation.getArguments()[0]; + submittedMergeTasks.add(mergeTask); + return null; + }).when(threadPoolMergeExecutorService).reEnqueueBackloggedMergeTask(any(MergeTask.class)); + return threadPoolMergeExecutorService; + } } From e3816349ab8764fad334f1dfff1c41b0f76077bc Mon Sep 17 00:00:00 2001 From: Albert Zaharovits Date: Fri, 28 Mar 2025 10:11:20 +0200 Subject: [PATCH 09/13] testIndexingThrottlingWhileMergesAreRunning --- .../engine/ThreadPoolMergeSchedulerTests.java | 53 +++++++++---------- 1 file changed, 25 insertions(+), 28 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java index 9283dd362cacc..d5b410880846e 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java @@ -174,7 +174,7 @@ public void testSimpleMergeTaskReEnqueueingBySize() { } } - public void testIndexingThrottling() { + public void testIndexingThrottlingWhileMergesAreRunning() { final int maxThreadCount = randomIntBetween(1, 5); // settings validation requires maxMergeCount >= maxThreadCount final int maxMergeCount = maxThreadCount + randomIntBetween(0, 5); @@ -198,23 +198,10 @@ public void testIndexingThrottling() { int mergesToSubmit = maxMergeCount + mergesToRun + excessMerges; int mergesOutstanding = 0; boolean expectIndexThrottling = false; - // simulate merge load, while also scheduling and running merges - while (mergesToSubmit > 0 || mergesToRun > 0) { - // simulate that the {@link ThreadPoolMergeExecutorService} maybe peaked IO un-throttling + // merges are submitted, while some are also scheduled and run + while (mergesToSubmit > 0) { isUsingMaxTargetIORate.set(randomBoolean()); - if (mergesToRun > 0 && scheduledToRunMergeTasks.isEmpty() == false && randomBoolean()) { - // maybe run one scheduled merge - MergeTask mergeTask = randomFrom(scheduledToRunMergeTasks); - scheduledToRunMergeTasks.remove(mergeTask); - mergeTask.run(); - mergesToRun--; - mergesOutstanding--; - if (isUsingMaxTargetIORate.get() && mergesOutstanding > maxMergeCount) { - expectIndexThrottling = true; - } else if (mergesOutstanding <= maxMergeCount) { - expectIndexThrottling = false; - } - } else if (submittedMergeTasks.isEmpty() == false && (mergesToSubmit == 0 || randomBoolean())) { + if (submittedMergeTasks.isEmpty() == false && randomBoolean()) { // maybe schedule one submitted merge MergeTask mergeTask = randomFrom(submittedMergeTasks); submittedMergeTasks.remove(mergeTask); @@ -222,16 +209,25 @@ public void testIndexingThrottling() { if (schedule == Schedule.RUN) { scheduledToRunMergeTasks.add(mergeTask); } - } else if (mergesToSubmit > 0) { - // submit one merge - MergeSource mergeSource = mock(MergeSource.class); - OneMerge oneMerge = mock(OneMerge.class); - when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L))); - when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress()); - when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null); - threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values())); - mergesToSubmit--; - mergesOutstanding++; + } else { + if (mergesToRun > 0 && scheduledToRunMergeTasks.isEmpty() == false && randomBoolean()) { + // maybe run one scheduled merge + MergeTask mergeTask = randomFrom(scheduledToRunMergeTasks); + scheduledToRunMergeTasks.remove(mergeTask); + mergeTask.run(); + mergesToRun--; + mergesOutstanding--; + } else { + // submit one merge + MergeSource mergeSource = mock(MergeSource.class); + OneMerge oneMerge = mock(OneMerge.class); + when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L))); + when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress()); + when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null); + threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values())); + mergesToSubmit--; + mergesOutstanding++; + } if (isUsingMaxTargetIORate.get() && mergesOutstanding > maxMergeCount) { expectIndexThrottling = true; } else if (mergesOutstanding <= maxMergeCount) { @@ -242,7 +238,7 @@ public void testIndexingThrottling() { assertThat(threadPoolMergeScheduler.isIndexingThrottlingEnabled(), is(expectIndexThrottling)); } // execute all remaining merges (submitted or scheduled) - while (submittedMergeTasks.isEmpty() == false || scheduledToRunMergeTasks.isEmpty() == false) { + while (mergesToRun > 0 || submittedMergeTasks.isEmpty() == false || scheduledToRunMergeTasks.isEmpty() == false) { // simulate that the {@link ThreadPoolMergeExecutorService} maybe peaked IO un-throttling isUsingMaxTargetIORate.set(randomBoolean()); if (submittedMergeTasks.isEmpty() == false && (scheduledToRunMergeTasks.isEmpty() || randomBoolean())) { @@ -258,6 +254,7 @@ public void testIndexingThrottling() { MergeTask mergeTask = randomFrom(scheduledToRunMergeTasks); scheduledToRunMergeTasks.remove(mergeTask); mergeTask.run(); + mergesToRun--; mergesOutstanding--; if (isUsingMaxTargetIORate.get() && mergesOutstanding > maxMergeCount) { expectIndexThrottling = true; From f41cb27008da7389f79681d77a6cd55e1d135af1 Mon Sep 17 00:00:00 2001 From: Albert Zaharovits Date: Fri, 28 Mar 2025 10:12:45 +0200 Subject: [PATCH 10/13] isUsingMaxTargetIORate --- .../index/engine/ThreadPoolMergeSchedulerTests.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java index d5b410880846e..9fa9122a0e083 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java @@ -38,7 +38,6 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.BiConsumer; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; @@ -180,9 +179,11 @@ public void testIndexingThrottlingWhileMergesAreRunning() { final int maxMergeCount = maxThreadCount + randomIntBetween(0, 5); List submittedMergeTasks = new ArrayList<>(); List scheduledToRunMergeTasks = new ArrayList<>(); - ThreadPoolMergeExecutorService threadPoolMergeExecutorService = mockThreadPoolMergeExecutorService(submittedMergeTasks); AtomicBoolean isUsingMaxTargetIORate = new AtomicBoolean(false); - doAnswer(invocation -> isUsingMaxTargetIORate.get()).when(threadPoolMergeExecutorService).usingMaxTargetIORateBytesPerSec(); + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = mockThreadPoolMergeExecutorService( + submittedMergeTasks, + isUsingMaxTargetIORate + ); Settings mergeSchedulerSettings = Settings.builder() .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), maxThreadCount) .put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), maxMergeCount) @@ -616,7 +617,7 @@ boolean isIndexingThrottlingEnabled() { } } - static ThreadPoolMergeExecutorService mockThreadPoolMergeExecutorService(List submittedMergeTasks) { + static ThreadPoolMergeExecutorService mockThreadPoolMergeExecutorService(List submittedMergeTasks, AtomicBoolean isUsingMaxTargetIORate) { ThreadPoolMergeExecutorService threadPoolMergeExecutorService = mock(ThreadPoolMergeExecutorService.class); doAnswer(invocation -> { MergeTask mergeTask = (MergeTask) invocation.getArguments()[0]; @@ -628,6 +629,7 @@ static ThreadPoolMergeExecutorService mockThreadPoolMergeExecutorService(List isUsingMaxTargetIORate.get()).when(threadPoolMergeExecutorService).usingMaxTargetIORateBytesPerSec(); return threadPoolMergeExecutorService; } } From 5472e6a7ea39a4986f99b72e60e770683c28f4ea Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Fri, 28 Mar 2025 08:22:17 +0000 Subject: [PATCH 11/13] [CI] Auto commit changes from spotless --- .../index/engine/ThreadPoolMergeSchedulerTests.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java index 9fa9122a0e083..4a4e3b7ba45b4 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java @@ -617,7 +617,10 @@ boolean isIndexingThrottlingEnabled() { } } - static ThreadPoolMergeExecutorService mockThreadPoolMergeExecutorService(List submittedMergeTasks, AtomicBoolean isUsingMaxTargetIORate) { + static ThreadPoolMergeExecutorService mockThreadPoolMergeExecutorService( + List submittedMergeTasks, + AtomicBoolean isUsingMaxTargetIORate + ) { ThreadPoolMergeExecutorService threadPoolMergeExecutorService = mock(ThreadPoolMergeExecutorService.class); doAnswer(invocation -> { MergeTask mergeTask = (MergeTask) invocation.getArguments()[0]; From a4199173f0bfd6f50bdac7a7dacf73e0265a31c1 Mon Sep 17 00:00:00 2001 From: Albert Zaharovits Date: Fri, 28 Mar 2025 10:35:39 +0200 Subject: [PATCH 12/13] testIndexingThrottlingWhenSubmittingMerges --- .../engine/ThreadPoolMergeSchedulerTests.java | 62 +++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java index 4a4e3b7ba45b4..ae9168357eb32 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java @@ -173,6 +173,68 @@ public void testSimpleMergeTaskReEnqueueingBySize() { } } + public void testIndexingThrottlingWhenSubmittingMerges() { + final int maxThreadCount = randomIntBetween(1, 5); + // settings validation requires maxMergeCount >= maxThreadCount + final int maxMergeCount = maxThreadCount + randomIntBetween(0, 5); + List submittedMergeTasks = new ArrayList<>(); + AtomicBoolean isUsingMaxTargetIORate = new AtomicBoolean(false); + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = mockThreadPoolMergeExecutorService( + submittedMergeTasks, + isUsingMaxTargetIORate + ); + Settings mergeSchedulerSettings = Settings.builder() + .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), maxThreadCount) + .put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), maxMergeCount) + .build(); + TestThreadPoolMergeScheduler threadPoolMergeScheduler = new TestThreadPoolMergeScheduler( + new ShardId("index", "_na_", 1), + IndexSettingsModule.newIndexSettings("index", mergeSchedulerSettings), + threadPoolMergeExecutorService + ); + // make sure there are more merges submitted than the max merge count limit (which triggers IO throttling) + int excessMerges = randomIntBetween(1, 10); + int mergesToSubmit = maxMergeCount + excessMerges; + boolean expectIndexThrottling = false; + int submittedMerges = 0; + // merges are submitted, while some are also scheduled (but none is run) + while (submittedMerges < mergesToSubmit - 1) { + isUsingMaxTargetIORate.set(randomBoolean()); + if (submittedMergeTasks.isEmpty() == false && randomBoolean()) { + // maybe schedule one submitted merge + MergeTask mergeTask = randomFrom(submittedMergeTasks); + submittedMergeTasks.remove(mergeTask); + mergeTask.schedule(); + } else { + // submit one merge + MergeSource mergeSource = mock(MergeSource.class); + OneMerge oneMerge = mock(OneMerge.class); + when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L))); + when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress()); + when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null); + threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values())); + submittedMerges++; + if (isUsingMaxTargetIORate.get() && submittedMerges > maxMergeCount) { + expectIndexThrottling = true; + } else if (submittedMerges <= maxMergeCount) { + expectIndexThrottling = false; + } + } + // assert IO throttle state + assertThat(threadPoolMergeScheduler.isIndexingThrottlingEnabled(), is(expectIndexThrottling)); + } + // submit one last merge when IO throttling is at max value + isUsingMaxTargetIORate.set(true); + MergeSource mergeSource = mock(MergeSource.class); + OneMerge oneMerge = mock(OneMerge.class); + when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L))); + when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress()); + when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null); + threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values())); + // assert index throttling because IO throttling is at max value + assertThat(threadPoolMergeScheduler.isIndexingThrottlingEnabled(), is(true)); + } + public void testIndexingThrottlingWhileMergesAreRunning() { final int maxThreadCount = randomIntBetween(1, 5); // settings validation requires maxMergeCount >= maxThreadCount From c4cf8b0aa58cedd42b439332ccde2795745509d4 Mon Sep 17 00:00:00 2001 From: Albert Zaharovits Date: Sun, 30 Mar 2025 23:16:49 +0300 Subject: [PATCH 13/13] Fix IndexStatsIT.testThrottleStats --- .../indices/stats/IndexStatsIT.java | 52 +++++++++++-------- 1 file changed, 30 insertions(+), 22 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/stats/IndexStatsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/stats/IndexStatsIT.java index e956999ebf2d2..b5f10002e217e 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/stats/IndexStatsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/stats/IndexStatsIT.java @@ -79,6 +79,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; @@ -466,7 +467,7 @@ public void testNonThrottleStats() throws Exception { assertThat(stats.getPrimaries().getIndexing().getTotal().getThrottleTime().millis(), equalTo(0L)); } - public void testThrottleStats() throws Exception { + public void testThrottleStats() { assertAcked( prepareCreate("test").setSettings( settingsBuilder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1") @@ -479,31 +480,38 @@ public void testThrottleStats() throws Exception { ) ); ensureGreen(); - long termUpto = 0; - IndicesStatsResponse stats; // make sure we see throttling kicking in: - boolean done = false; + AtomicBoolean done = new AtomicBoolean(); + AtomicLong termUpTo = new AtomicLong(); long start = System.currentTimeMillis(); - while (done == false) { - for (int i = 0; i < 100; i++) { - // Provoke slowish merging by making many unique terms: - StringBuilder sb = new StringBuilder(); - for (int j = 0; j < 100; j++) { - sb.append(' '); - sb.append(termUpto++); - } - prepareIndex("test").setId("" + termUpto).setSource("field" + (i % 10), sb.toString()).get(); - if (i % 2 == 0) { + for (int threadIdx = 0; threadIdx < 5; threadIdx++) { + int finalThreadIdx = threadIdx; + new Thread(() -> { + IndicesStatsResponse stats; + while (done.get() == false) { + for (int i = 0; i < 100; i++) { + // Provoke slowish merging by making many unique terms: + StringBuilder sb = new StringBuilder(); + for (int j = 0; j < 100; j++) { + sb.append(' '); + sb.append(termUpTo.incrementAndGet()); + } + prepareIndex("test").setId("" + termUpTo.get()).setSource("field" + (i % 10), sb.toString()).get(); + if (i % 2 == 0) { + refresh(); + } + } refresh(); + if (finalThreadIdx == 0) { + stats = indicesAdmin().prepareStats().get(); + done.set(stats.getPrimaries().getIndexing().getTotal().getThrottleTime().millis() > 0); + } + if (System.currentTimeMillis() - start > 300 * 1000) { // Wait 5 minutes for throttling to kick in + done.set(true); + fail("index throttling didn't kick in after 5 minutes of intense merging"); + } } - } - refresh(); - stats = indicesAdmin().prepareStats().get(); - // nodesStats = clusterAdmin().prepareNodesStats().setIndices(true).get(); - done = stats.getPrimaries().getIndexing().getTotal().getThrottleTime().millis() > 0; - if (System.currentTimeMillis() - start > 300 * 1000) { // Wait 5 minutes for throttling to kick in - fail("index throttling didn't kick in after 5 minutes of intense merging"); - } + }).start(); } // Optimize & flush and wait; else we sometimes get a "Delete Index failed - not acked"