Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
private volatile boolean closed = false;
private final MergeMemoryEstimateProvider mergeMemoryEstimateProvider;

/**
* Creates a thread-pool-based merge scheduler that runs merges in a thread pool.
*
* @param shardId the shard id associated with this merge scheduler
* @param indexSettings used to obtain the {@link MergeSchedulerConfig}
* @param threadPoolMergeExecutorService the executor service used to execute merge tasks from this scheduler
* @param mergeMemoryEstimateProvider provides an estimate for how much memory a merge will take
*/
public ThreadPoolMergeScheduler(
ShardId shardId,
IndexSettings indexSettings,
Expand Down Expand Up @@ -146,6 +154,16 @@ protected void beforeMerge(OnGoingMerge merge) {}
*/
protected void afterMerge(OnGoingMerge merge) {}

/**
* A callback allowing for custom logic when a merge is queued.
*/
protected void mergeQueued(OnGoingMerge merge) {}

/**
* A callback allowing for custom logic after a merge is executed or aborted.
*/
protected void mergeExecutedOrAborted(OnGoingMerge merge) {}

/**
* A callback that's invoked when indexing should throttle down indexing in order to let merging to catch up.
*/
Expand All @@ -157,6 +175,34 @@ protected void enableIndexingThrottling(int numRunningMerges, int numQueuedMerge
*/
protected void disableIndexingThrottling(int numRunningMerges, int numQueuedMerges, int configuredMaxMergeCount) {}

/**
* Returns true if scheduled merges should be skipped (aborted)
*/
protected boolean shouldSkipMerge() {
return false;
}

/**
* Returns true if IO-throttling is enabled
*/
protected boolean isAutoThrottle() {
return config.isAutoThrottle();
}

/**
* Returns the maximum number of active merges before being throttled
*/
protected int getMaxMergeCount() {
return config.getMaxMergeCount();
}

/**
* Returns the maximum number of threads running merges before being throttled
*/
protected int getMaxThreadCount() {
return config.getMaxThreadCount();
}

/**
* A callback for exceptions thrown while merging.
*/
Expand All @@ -168,6 +214,7 @@ protected void handleMergeException(Throwable t) {
boolean submitNewMergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, MergeTrigger mergeTrigger) {
try {
MergeTask mergeTask = newMergeTask(mergeSource, merge, mergeTrigger);
mergeQueued(mergeTask.onGoingMerge);
return threadPoolMergeExecutorService.submitMergeTask(mergeTask);
} finally {
checkMergeTaskThrottling();
Expand All @@ -183,7 +230,7 @@ MergeTask newMergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, Merg
return new MergeTask(
mergeSource,
merge,
isAutoThrottle && config.isAutoThrottle(),
isAutoThrottle && isAutoThrottle(),
"Lucene Merge Task #" + submittedMergeTaskCount.incrementAndGet() + " for shard " + shardId,
estimateMergeMemoryBytes
);
Expand All @@ -193,7 +240,7 @@ private void checkMergeTaskThrottling() {
long submittedMergesCount = submittedMergeTaskCount.get();
long doneMergesCount = doneMergeTaskCount.get();
int runningMergesCount = runningMergeTasks.size();
int configuredMaxMergeCount = config.getMaxMergeCount();
int configuredMaxMergeCount = getMaxMergeCount();
// both currently running and enqueued merge tasks are considered "active" for throttling purposes
int activeMerges = (int) (submittedMergesCount - doneMergesCount);
if (activeMerges > configuredMaxMergeCount
Expand Down Expand Up @@ -223,7 +270,12 @@ synchronized Schedule schedule(MergeTask mergeTask) {
if (closed) {
// do not run or backlog tasks when closing the merge scheduler, instead abort them
return Schedule.ABORT;
} else if (runningMergeTasks.size() < config.getMaxThreadCount()) {
} else if (shouldSkipMerge()) {
if (verbose()) {
message(String.format(Locale.ROOT, "skipping merge task %s", mergeTask));
}
return Schedule.ABORT;
} else if (runningMergeTasks.size() < getMaxThreadCount()) {
boolean added = runningMergeTasks.put(mergeTask.onGoingMerge.getMerge(), mergeTask) == null;
assert added : "starting merge task [" + mergeTask + "] registered as already running";
return Schedule.RUN;
Expand All @@ -243,8 +295,9 @@ synchronized void mergeTaskFinishedRunning(MergeTask mergeTask) {
maybeSignalAllMergesDoneAfterClose();
}

private void mergeTaskDone() {
private void mergeTaskDone(OnGoingMerge merge) {
doneMergeTaskCount.incrementAndGet();
mergeExecutedOrAborted(merge);
checkMergeTaskThrottling();
}

Expand All @@ -255,7 +308,7 @@ private synchronized void maybeSignalAllMergesDoneAfterClose() {
}

private synchronized void enqueueBackloggedTasks() {
int maxBackloggedTasksToEnqueue = config.getMaxThreadCount() - runningMergeTasks.size();
int maxBackloggedTasksToEnqueue = getMaxThreadCount() - runningMergeTasks.size();
// enqueue all backlogged tasks when closing, as the queue expects all backlogged tasks to always be enqueued back
while (closed || maxBackloggedTasksToEnqueue-- > 0) {
MergeTask backloggedMergeTask = backloggedMergeTasks.poll();
Expand Down Expand Up @@ -408,7 +461,7 @@ public void run() {
try {
mergeTaskFinishedRunning(this);
} finally {
mergeTaskDone();
mergeTaskDone(onGoingMerge);
}
try {
// kick-off any follow-up merge
Expand Down Expand Up @@ -452,7 +505,7 @@ void abort() {
if (verbose()) {
message(String.format(Locale.ROOT, "merge task %s end abort", this));
}
mergeTaskDone();
mergeTaskDone(onGoingMerge);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,31 @@ public void testAutoIOThrottleForMergeTasks() throws Exception {
}
}

public void testMergeSchedulerAbortsMergeWhenShouldSkipMergeIsTrue() {
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = mock(ThreadPoolMergeExecutorService.class);
// build a scheduler that always returns true for shouldSkipMerge
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
new ShardId("index", "_na_", 1),
IndexSettingsModule.newIndexSettings("index", Settings.builder().build()),
threadPoolMergeExecutorService,
merge -> 0
) {
@Override
protected boolean shouldSkipMerge() {
return true;
}
};
MergeSource mergeSource = mock(MergeSource.class);
OneMerge oneMerge = mock(OneMerge.class);
when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L)));
when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress());
when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null);
MergeTask mergeTask = threadPoolMergeScheduler.newMergeTask(mergeSource, oneMerge, randomFrom(MergeTrigger.values()));
// verify that calling schedule on the merge task indicates the merge should be aborted
Schedule schedule = threadPoolMergeScheduler.schedule(mergeTask);
assertThat(schedule, is(Schedule.ABORT));
}

private static MergeInfo getNewMergeInfo(long estimatedMergeBytes) {
return getNewMergeInfo(estimatedMergeBytes, randomFrom(-1, randomNonNegativeInt()));
}
Expand Down