Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,10 @@ interface UpdateConsumer {
}
}

public boolean usingMaxTargetIORateBytesPerSec() {
return MAX_IO_RATE.getBytes() == targetIORateBytesPerSec.get();
}

// exposed for tests
Set<MergeTask> getRunningMergeTasks() {
return runningMergeTasks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
);
private final ShardId shardId;
private final MergeSchedulerConfig config;
private final Logger logger;
protected final Logger logger;
Copy link
Contributor Author

@albertzaharovits albertzaharovits Mar 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated change, in order to let these log messages go to the ThreadPoolMergeScheduler's logger instead of the InternalEngine's.

private final MergeTracking mergeTracking;
private final ThreadPoolMergeExecutorService threadPoolMergeExecutorService;
private final PriorityQueue<MergeTask> backloggedMergeTasks = new PriorityQueue<>(
Expand Down Expand Up @@ -191,21 +191,24 @@ private void checkMergeTaskThrottling() {
int configuredMaxMergeCount = config.getMaxMergeCount();
// both currently running and enqueued merge tasks are considered "active" for throttling purposes
int activeMerges = (int) (submittedMergesCount - doneMergesCount);
if (activeMerges > configuredMaxMergeCount && shouldThrottleIncomingMerges.get() == false) {
if (activeMerges > configuredMaxMergeCount
// only throttle indexing if disk IO is un-throttled, and we still can't keep up with the merge load
&& threadPoolMergeExecutorService.usingMaxTargetIORateBytesPerSec()
&& shouldThrottleIncomingMerges.get() == false) {
// maybe enable merge task throttling
synchronized (shouldThrottleIncomingMerges) {
if (shouldThrottleIncomingMerges.getAndSet(true) == false) {
enableIndexingThrottling(runningMergesCount, activeMerges - runningMergesCount, configuredMaxMergeCount);
}
}
} else if (activeMerges <= configuredMaxMergeCount && shouldThrottleIncomingMerges.get()) {
// maybe disable merge task throttling
synchronized (shouldThrottleIncomingMerges) {
if (shouldThrottleIncomingMerges.getAndSet(false)) {
disableIndexingThrottling(runningMergesCount, activeMerges - runningMergesCount, configuredMaxMergeCount);
// maybe disable merge task throttling
synchronized (shouldThrottleIncomingMerges) {
if (shouldThrottleIncomingMerges.getAndSet(false)) {
disableIndexingThrottling(runningMergesCount, activeMerges - runningMergesCount, configuredMaxMergeCount);
}
}
}
}
}

// exposed for tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -173,6 +175,117 @@ public void testSimpleMergeTaskReEnqueueingBySize() {
}
}

public void testIndexingThrottling() {
final int maxThreadCount = randomIntBetween(1, 5);
// settings validation requires maxMergeCount >= maxThreadCount
final int maxMergeCount = maxThreadCount + randomIntBetween(0, 5);
List<MergeTask> submittedMergeTasks = new ArrayList<>();
List<MergeTask> scheduledToRunMergeTasks = new ArrayList<>();
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = mock(ThreadPoolMergeExecutorService.class);
doAnswer(invocation -> {
MergeTask mergeTask = (MergeTask) invocation.getArguments()[0];
submittedMergeTasks.add(mergeTask);
return null;
}).when(threadPoolMergeExecutorService).submitMergeTask(any(MergeTask.class));
doAnswer(invocation -> {
MergeTask mergeTask = (MergeTask) invocation.getArguments()[0];
submittedMergeTasks.add(mergeTask);
return null;
}).when(threadPoolMergeExecutorService).reEnqueueBackloggedMergeTask(any(MergeTask.class));
AtomicBoolean isUsingMaxTargetIORate = new AtomicBoolean(false);
doAnswer(invocation -> isUsingMaxTargetIORate.get()).when(threadPoolMergeExecutorService).usingMaxTargetIORateBytesPerSec();
Settings mergeSchedulerSettings = Settings.builder()
.put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), maxThreadCount)
.put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), maxMergeCount)
.build();
AtomicBoolean isIndexingThrottlingEnabled = new AtomicBoolean(false);
ThreadPoolMergeScheduler threadPoolMergeScheduler = new TestThreadPoolMergeScheduler(
new ShardId("index", "_na_", 1),
IndexSettingsModule.newIndexSettings("index", mergeSchedulerSettings),
threadPoolMergeExecutorService,
(numRunningMerges, numQueuedMerges) -> isIndexingThrottlingEnabled.set(true),
(numRunningMerges, numQueuedMerges) -> isIndexingThrottlingEnabled.set(false)
);
int mergesToRun = randomIntBetween(0, 5);
// make sure there are more merges submitted and not run
int excessMerges = randomIntBetween(1, 10);
int mergesToSubmit = maxMergeCount + mergesToRun + excessMerges;
int mergesOutstanding = 0;
boolean expectIndexThrottling = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not clear to me that every test run invokes index throttling. I wonder if we are overcomplicating the test a little here and whether we should simply make it have X merges submitted and then verify that index-throttling kicks in (twice, one with the max-io-rate-simulation set to false first, then one where it is set to true)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, it's complicated and it not always tests that index throttling kicks in when IO throttling is at max level.
But, I still think the test is valuable because it constantly asserts the indexing throttling state while merges are submitted, scheduler, and running at the same time (and it's deterministic).

I've added another test, that asserts indexing throttling kicks in when more than max_merge_count merges are submitted (some are scheduled, but none is run). It's simpler, it always asserts that index throttling is toggled, but it's covering less.

Let me know if this now looks OK with you.

// simulate merge load, while also scheduling and running merges
while (mergesToSubmit > 0 || mergesToRun > 0) {
// simulate that the {@link ThreadPoolMergeExecutorService} maybe peaked IO un-throttling
isUsingMaxTargetIORate.set(randomBoolean());
if (mergesToRun > 0 && scheduledToRunMergeTasks.isEmpty() == false && randomBoolean()) {
// maybe run one scheduled merge
MergeTask mergeTask = randomFrom(scheduledToRunMergeTasks);
scheduledToRunMergeTasks.remove(mergeTask);
mergeTask.run();
mergesToRun--;
mergesOutstanding--;
if (isUsingMaxTargetIORate.get() && mergesOutstanding > maxMergeCount) {
expectIndexThrottling = true;
} else if (mergesOutstanding <= maxMergeCount) {
expectIndexThrottling = false;
}
} else if (submittedMergeTasks.isEmpty() == false && (mergesToSubmit == 0 || randomBoolean())) {
// maybe schedule one submitted merge
MergeTask mergeTask = randomFrom(submittedMergeTasks);
submittedMergeTasks.remove(mergeTask);
Schedule schedule = mergeTask.schedule();
if (schedule == Schedule.RUN) {
scheduledToRunMergeTasks.add(mergeTask);
}
} else if (mergesToSubmit > 0) {
// submit one merge
MergeSource mergeSource = mock(MergeSource.class);
OneMerge oneMerge = mock(OneMerge.class);
when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L)));
when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress());
when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null);
threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values()));
mergesToSubmit--;
mergesOutstanding++;
if (isUsingMaxTargetIORate.get() && mergesOutstanding > maxMergeCount) {
expectIndexThrottling = true;
} else if (mergesOutstanding <= maxMergeCount) {
expectIndexThrottling = false;
}
}
// assert IO throttle state
assertThat(isIndexingThrottlingEnabled.get(), is(expectIndexThrottling));
}
// execute all remaining merges (submitted or scheduled)
while (submittedMergeTasks.isEmpty() == false || scheduledToRunMergeTasks.isEmpty() == false) {
// simulate that the {@link ThreadPoolMergeExecutorService} maybe peaked IO un-throttling
isUsingMaxTargetIORate.set(randomBoolean());
if (submittedMergeTasks.isEmpty() == false && (scheduledToRunMergeTasks.isEmpty() || randomBoolean())) {
// maybe schedule one submitted merge
MergeTask mergeTask = randomFrom(submittedMergeTasks);
submittedMergeTasks.remove(mergeTask);
Schedule schedule = mergeTask.schedule();
if (schedule == Schedule.RUN) {
scheduledToRunMergeTasks.add(mergeTask);
}
} else {
// maybe run one scheduled merge
MergeTask mergeTask = randomFrom(scheduledToRunMergeTasks);
scheduledToRunMergeTasks.remove(mergeTask);
mergeTask.run();
mergesOutstanding--;
if (isUsingMaxTargetIORate.get() && mergesOutstanding > maxMergeCount) {
expectIndexThrottling = true;
} else if (mergesOutstanding <= maxMergeCount) {
expectIndexThrottling = false;
}
}
// assert IO throttle state
assertThat(isIndexingThrottlingEnabled.get(), is(expectIndexThrottling));
}
// all merges done
assertThat(isIndexingThrottlingEnabled.get(), is(false));
}

public void testMergeSourceWithFollowUpMergesRunSequentially() throws Exception {
// test with min 2 allowed concurrent merges
int mergeExecutorThreadCount = randomIntBetween(2, 5);
Expand Down Expand Up @@ -493,4 +606,31 @@ private static MergeInfo getNewMergeInfo(long estimatedMergeBytes) {
private static MergeInfo getNewMergeInfo(long estimatedMergeBytes, int maxNumSegments) {
return new MergeInfo(randomNonNegativeInt(), estimatedMergeBytes, randomBoolean(), maxNumSegments);
}

static class TestThreadPoolMergeScheduler extends ThreadPoolMergeScheduler {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have to override a class to test it (or for it), in this case, probably what you need is a field you can change in the original class itself to trigger a different behaviour. The reason is a refactor in the code could give unexpected side effects not easy to spot by not testing agains the actual implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We typically try to avoid such artificial test pieces in production code. I think such an override as here is quite common so do not mind the current form. Feel free to maybe open a draft PR after this is merged to illustrate your point more (I am not sure I fully understood it).

private BiConsumer<Integer, Integer> enableIndexingThrottlingHook;
private BiConsumer<Integer, Integer> disableIndexingThrottlingHook;

public TestThreadPoolMergeScheduler(
ShardId shardId,
IndexSettings indexSettings,
ThreadPoolMergeExecutorService threadPoolMergeExecutorService,
BiConsumer<Integer, Integer> enableIndexingThrottlingHook,
BiConsumer<Integer, Integer> disableIndexingThrottlingHook
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like these can be just Runnable which would remove a bit of code elsewhere too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed these, the TestThreadPoolMergeScheduler directly exposes "if index throttling" as a boolean.

) {
super(shardId, indexSettings, threadPoolMergeExecutorService);
this.enableIndexingThrottlingHook = enableIndexingThrottlingHook;
this.disableIndexingThrottlingHook = disableIndexingThrottlingHook;
}

@Override
protected void enableIndexingThrottling(int numRunningMerges, int numQueuedMerges, int configuredMaxMergeCount) {
enableIndexingThrottlingHook.accept(numRunningMerges, numQueuedMerges);
}

@Override
protected void disableIndexingThrottling(int numRunningMerges, int numQueuedMerges, int configuredMaxMergeCount) {
disableIndexingThrottlingHook.accept(numRunningMerges, numQueuedMerges);
}
}
}