Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -1053,7 +1053,13 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
flushThresholdAge = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_AGE_SETTING);
generationThresholdSize = scopedSettings.get(INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING);
flushAfterMergeThresholdSize = scopedSettings.get(INDEX_FLUSH_AFTER_MERGE_THRESHOLD_SIZE_SETTING);
mergeSchedulerConfig = new MergeSchedulerConfig(this);
if (DiscoveryNode.isStateless(nodeSettings)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to avoid this if we can and added suggestions to use a sub-class for the merge scheduler in stateless.

// disable IO throttling, and thread and merge count throttling for stateless. By setting max merge threads and count to
// Integer.MAX_VALUE we effectively disable them
mergeSchedulerConfig = new MergeSchedulerConfig(false, Integer.MAX_VALUE, Integer.MAX_VALUE);
} else {
mergeSchedulerConfig = new MergeSchedulerConfig(this);
}
gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis();
softDeleteEnabled = scopedSettings.get(INDEX_SOFT_DELETES_SETTING);
assert softDeleteEnabled || version.before(IndexVersions.V_8_0_0) : "soft deletes must be enabled in version " + version;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ public final class MergeSchedulerConfig {
this.autoThrottle = indexSettings.getValue(AUTO_THROTTLE_SETTING);
}

MergeSchedulerConfig(boolean autoThrottle, int maxThreadCount, int maxMergeCount) {
this.autoThrottle = autoThrottle;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than overriding it like this, could we instead add ThreadPoolMergeScheduler.isAutoThrottle() that returns config.isAutoThrottle() by default but is overloaded in stateless to return false?

Similar for the two other fields.

this.maxThreadCount = maxThreadCount;
this.maxMergeCount = maxMergeCount;
setMaxThreadAndMergeCount(maxThreadCount, maxMergeCount);
}

/**
* Returns <code>true</code> iff auto throttle is enabled.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2909,7 +2909,7 @@ private final class EngineThreadPoolMergeScheduler extends ThreadPoolMergeSchedu
IndexSettings indexSettings,
ThreadPoolMergeExecutorService threadPoolMergeExecutorService
) {
super(shardId, indexSettings, threadPoolMergeExecutorService, InternalEngine.this::estimateMergeBytes);
super(shardId, indexSettings, threadPoolMergeExecutorService, InternalEngine.this::estimateMergeBytes, () -> false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;

public class ThreadPoolMergeScheduler extends MergeScheduler implements ElasticsearchMergeScheduler {
public static final Setting<Boolean> USE_THREAD_POOL_MERGE_SCHEDULER_SETTING = Setting.boolSetting(
Expand All @@ -66,12 +67,24 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
private final CountDownLatch closedWithNoRunningMerges = new CountDownLatch(1);
private volatile boolean closed = false;
private final MergeMemoryEstimateProvider mergeMemoryEstimateProvider;
// if true, scheduled merges will be aborted
private final BooleanSupplier shouldSkipMerge;

/**
* Creates a thread-pool-based merge scheduler that runs merges in a thread pool.
*
* @param shardId the shard id associated with this merge scheduler
* @param indexSettings used to obtain the {@link MergeSchedulerConfig}
* @param threadPoolMergeExecutorService the executor service used to execute merge tasks from this scheduler
* @param mergeMemoryEstimateProvider provides an estimate for how much memory a merge will take
* @param shouldSkipMerge if true, scheduled merges will be skipped i.e. aborted by the executor
*/
public ThreadPoolMergeScheduler(
ShardId shardId,
IndexSettings indexSettings,
ThreadPoolMergeExecutorService threadPoolMergeExecutorService,
MergeMemoryEstimateProvider mergeMemoryEstimateProvider
MergeMemoryEstimateProvider mergeMemoryEstimateProvider,
BooleanSupplier shouldSkipMerge
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should add a method shouldSkipMerge returning false here that we then implement in a stateless specific subclass instead?

) {
this.shardId = shardId;
this.config = indexSettings.getMergeSchedulerConfig();
Expand All @@ -84,6 +97,7 @@ public ThreadPoolMergeScheduler(
);
this.threadPoolMergeExecutorService = threadPoolMergeExecutorService;
this.mergeMemoryEstimateProvider = mergeMemoryEstimateProvider;
this.shouldSkipMerge = shouldSkipMerge;
}

@Override
Expand Down Expand Up @@ -146,6 +160,16 @@ protected void beforeMerge(OnGoingMerge merge) {}
*/
protected void afterMerge(OnGoingMerge merge) {}

/**
* A callback allowing for custom logic when a merge is queued.
*/
protected void mergeQueued(OnGoingMerge merge) {}

/**
* A callback allowing for custom logic after a merge is executed or aborted.
*/
protected void mergeExecutedOrAborted(OnGoingMerge merge) {}

/**
* A callback that's invoked when indexing should throttle down indexing in order to let merging to catch up.
*/
Expand All @@ -168,6 +192,7 @@ protected void handleMergeException(Throwable t) {
boolean submitNewMergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, MergeTrigger mergeTrigger) {
try {
MergeTask mergeTask = newMergeTask(mergeSource, merge, mergeTrigger);
mergeQueued(mergeTask.onGoingMerge);
return threadPoolMergeExecutorService.submitMergeTask(mergeTask);
} finally {
checkMergeTaskThrottling();
Expand Down Expand Up @@ -223,6 +248,11 @@ synchronized Schedule schedule(MergeTask mergeTask) {
if (closed) {
// do not run or backlog tasks when closing the merge scheduler, instead abort them
return Schedule.ABORT;
} else if (shouldSkipMerge.getAsBoolean()) {
if (verbose()) {
message(String.format(Locale.ROOT, "skipping merge task %s", mergeTask));
}
return Schedule.ABORT;
} else if (runningMergeTasks.size() < config.getMaxThreadCount()) {
boolean added = runningMergeTasks.put(mergeTask.onGoingMerge.getMerge(), mergeTask) == null;
assert added : "starting merge task [" + mergeTask + "] registered as already running";
Expand All @@ -243,8 +273,9 @@ synchronized void mergeTaskFinishedRunning(MergeTask mergeTask) {
maybeSignalAllMergesDoneAfterClose();
}

private void mergeTaskDone() {
private void mergeTaskDone(OnGoingMerge merge) {
doneMergeTaskCount.incrementAndGet();
mergeExecutedOrAborted(merge);
checkMergeTaskThrottling();
}

Expand Down Expand Up @@ -408,7 +439,7 @@ public void run() {
try {
mergeTaskFinishedRunning(this);
} finally {
mergeTaskDone();
mergeTaskDone(onGoingMerge);
}
try {
// kick-off any follow-up merge
Expand Down Expand Up @@ -452,7 +483,7 @@ void abort() {
if (verbose()) {
message(String.format(Locale.ROOT, "merge task %s end abort", this));
}
mergeTaskDone();
mergeTaskDone(onGoingMerge);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,26 @@ public void testStatelessFastRefreshDisableRefreshInterval() {
assertEquals(TimeValue.MINUS_ONE, settings.getRefreshInterval());
}

public void testStatelessMergeSchedulerConfig() {
IndexMetadata metadata = IndexMetadata.builder("index")
.system(true)
.settings(
indexSettings(IndexVersion.current(), 1, 1).put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current())
// set auto throttling to true and set merge/thread count to a low number
.put(MergeSchedulerConfig.AUTO_THROTTLE_SETTING.getKey(), true)
.put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), 2)
.put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), 2)
.build()
)
.build();
IndexSettings settings = new IndexSettings(metadata, Settings.builder().put(STATELESS_ENABLED_SETTING_NAME, true).build());
// for stateless we want to ensure that auto throttling, merge count throttling, and thread throttling are all effectively
// disabled, ignoring the specified settings
assertFalse(settings.getMergeSchedulerConfig().isAutoThrottle());
assertEquals(Integer.MAX_VALUE, settings.getMergeSchedulerConfig().getMaxMergeCount());
assertEquals(Integer.MAX_VALUE, settings.getMergeSchedulerConfig().getMaxThreadCount());
}

private String getRandomTimeString() {
int refreshIntervalInt = randomFrom(-1, Math.abs(randomInt()));
String refreshInterval = Integer.toString(refreshIntervalInt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BooleanSupplier;

import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
Expand All @@ -62,7 +63,8 @@ public void testMergesExecuteInSizeOrder() throws IOException {
new ShardId("index", "_na_", 1),
IndexSettingsModule.newIndexSettings("index", Settings.EMPTY),
threadPoolMergeExecutorService,
merge -> 0
merge -> 0,
() -> false
)
) {
List<OneMerge> executedMergesList = new ArrayList<>();
Expand Down Expand Up @@ -105,7 +107,8 @@ public void testSimpleMergeTaskBacklogging() {
new ShardId("index", "_na_", 1),
IndexSettingsModule.newIndexSettings("index", mergeSchedulerSettings),
threadPoolMergeExecutorService,
merge -> 0
merge -> 0,
() -> false
);
// more merge tasks than merge threads
int mergeCount = mergeExecutorThreadCount + randomIntBetween(1, 5);
Expand Down Expand Up @@ -139,7 +142,8 @@ public void testSimpleMergeTaskReEnqueueingBySize() {
new ShardId("index", "_na_", 1),
IndexSettingsModule.newIndexSettings("index", mergeSchedulerSettings),
threadPoolMergeExecutorService,
merge -> 0
merge -> 0,
() -> false
);
// sort backlogged merges by size
PriorityQueue<MergeTask> backloggedMergeTasks = new PriorityQueue<>(16, Comparator.comparingLong(MergeTask::estimatedMergeSize));
Expand Down Expand Up @@ -193,7 +197,8 @@ public void testIndexingThrottlingWhenSubmittingMerges() {
TestThreadPoolMergeScheduler threadPoolMergeScheduler = new TestThreadPoolMergeScheduler(
new ShardId("index", "_na_", 1),
IndexSettingsModule.newIndexSettings("index", mergeSchedulerSettings),
threadPoolMergeExecutorService
threadPoolMergeExecutorService,
() -> false
);
// make sure there are more merges submitted than the max merge count limit (which triggers IO throttling)
int excessMerges = randomIntBetween(1, 10);
Expand Down Expand Up @@ -256,7 +261,8 @@ public void testIndexingThrottlingWhileMergesAreRunning() {
TestThreadPoolMergeScheduler threadPoolMergeScheduler = new TestThreadPoolMergeScheduler(
new ShardId("index", "_na_", 1),
IndexSettingsModule.newIndexSettings("index", mergeSchedulerSettings),
threadPoolMergeExecutorService
threadPoolMergeExecutorService,
() -> false
);
int mergesToRun = randomIntBetween(0, 5);
// make sure there are more merges submitted and not run
Expand Down Expand Up @@ -351,7 +357,8 @@ public void testMergeSourceWithFollowUpMergesRunSequentially() throws Exception
new ShardId("index", "_na_", 1),
IndexSettingsModule.newIndexSettings("index", settings),
threadPoolMergeExecutorService,
merge -> 0
merge -> 0,
() -> false
)
) {
MergeSource mergeSource = mock(MergeSource.class);
Expand Down Expand Up @@ -425,7 +432,8 @@ public void testMergesRunConcurrently() throws Exception {
new ShardId("index", "_na_", 1),
IndexSettingsModule.newIndexSettings("index", settings),
threadPoolMergeExecutorService,
merge -> 0
merge -> 0,
() -> false
)
) {
// at least 1 extra merge than there are concurrently allowed
Expand Down Expand Up @@ -510,7 +518,8 @@ public void testSchedulerCloseWaitsForRunningMerge() throws Exception {
new ShardId("index", "_na_", 1),
IndexSettingsModule.newIndexSettings("index", settings),
threadPoolMergeExecutorService,
merge -> 0
merge -> 0,
() -> false
)
) {
CountDownLatch mergeDoneLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -583,7 +592,8 @@ public void testAutoIOThrottleForMergeTasksWhenSchedulerDisablesIt() throws Exce
new ShardId("index", "_na_", 1),
indexSettings,
threadPoolMergeExecutorService,
merge -> 0
merge -> 0,
() -> false
)
) {
threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values()));
Expand Down Expand Up @@ -613,7 +623,8 @@ public void testAutoIOThrottleForMergeTasks() throws Exception {
new ShardId("index", "_na_", 1),
indexSettings,
threadPoolMergeExecutorService,
merge -> 0
merge -> 0,
() -> false
)
) {
threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values()));
Expand All @@ -630,7 +641,8 @@ public void testAutoIOThrottleForMergeTasks() throws Exception {
new ShardId("index", "_na_", 1),
indexSettings,
threadPoolMergeExecutorService,
merge -> 0
merge -> 0,
() -> false
)
) {
// merge submitted upon closing
Expand All @@ -647,7 +659,8 @@ public void testAutoIOThrottleForMergeTasks() throws Exception {
new ShardId("index", "_na_", 1),
indexSettings,
threadPoolMergeExecutorService,
merge -> 0
merge -> 0,
() -> false
)
) {
// merge submitted upon closing
Expand All @@ -662,6 +675,27 @@ public void testAutoIOThrottleForMergeTasks() throws Exception {
}
}

public void testMergeSchedulerAbortsMergeWhenShouldSkipMergeIsTrue() {
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = mock(ThreadPoolMergeExecutorService.class);
// build a scheduler that always returns true for shouldSkipMerge
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
new ShardId("index", "_na_", 1),
IndexSettingsModule.newIndexSettings("index", Settings.builder().build()),
threadPoolMergeExecutorService,
merge -> 0,
() -> true
);
MergeSource mergeSource = mock(MergeSource.class);
OneMerge oneMerge = mock(OneMerge.class);
when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L)));
when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress());
when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null);
MergeTask mergeTask = threadPoolMergeScheduler.newMergeTask(mergeSource, oneMerge, randomFrom(MergeTrigger.values()));
// verify that calling schedule on the merge task indicates the merge should be aborted
Schedule schedule = threadPoolMergeScheduler.schedule(mergeTask);
assertThat(schedule, is(Schedule.ABORT));
}

private static MergeInfo getNewMergeInfo(long estimatedMergeBytes) {
return getNewMergeInfo(estimatedMergeBytes, randomFrom(-1, randomNonNegativeInt()));
}
Expand All @@ -676,9 +710,10 @@ static class TestThreadPoolMergeScheduler extends ThreadPoolMergeScheduler {
TestThreadPoolMergeScheduler(
ShardId shardId,
IndexSettings indexSettings,
ThreadPoolMergeExecutorService threadPoolMergeExecutorService
ThreadPoolMergeExecutorService threadPoolMergeExecutorService,
BooleanSupplier shouldSkipMerge
) {
super(shardId, indexSettings, threadPoolMergeExecutorService, merge -> 0);
super(shardId, indexSettings, threadPoolMergeExecutorService, merge -> 0, shouldSkipMerge);
}

@Override
Expand Down