Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2909,7 +2909,7 @@ private final class EngineThreadPoolMergeScheduler extends ThreadPoolMergeSchedu
IndexSettings indexSettings,
ThreadPoolMergeExecutorService threadPoolMergeExecutorService
) {
super(shardId, indexSettings, threadPoolMergeExecutorService);
super(shardId, indexSettings, threadPoolMergeExecutorService, InternalEngine.this::estimateMergeBytes);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.engine;

import org.elasticsearch.index.merge.OnGoingMerge;

public interface MergeEventListener {

/**
*
* @param merge
* @param estimateMergeMemoryBytes estimate of the memory needed to perform a merge
*/
void onMergeQueued(OnGoingMerge merge, long estimateMergeMemoryBytes);

void onMergeCompleted(OnGoingMerge merge);

void onMergeAborted(OnGoingMerge merge);
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this out of stateless.

Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.engine;

import org.apache.lucene.index.MergePolicy;

@FunctionalInterface
public interface MergeMemoryEstimateProvider {

/**
* Returns an estimate of the memory needed to perform a merge
*/
long estimateMergeMemoryBytes(MergePolicy.OneMerge merge);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
Expand Down Expand Up @@ -73,6 +75,8 @@ public class ThreadPoolMergeExecutorService {
private final int concurrentMergesFloorLimitForThrottling;
private final int concurrentMergesCeilLimitForThrottling;

private final List<MergeEventListener> mergeEventListeners = new CopyOnWriteArrayList<>();

public static @Nullable ThreadPoolMergeExecutorService maybeCreateThreadPoolMergeExecutorService(
ThreadPool threadPool,
Settings settings
Expand Down Expand Up @@ -127,13 +131,19 @@ boolean submitMergeTask(MergeTask mergeTask) {
);
}
// then enqueue the merge task proper
queuedMergeTasks.add(mergeTask);
enqueueMergeTask(mergeTask);
return true;
}
}

void reEnqueueBackloggedMergeTask(MergeTask mergeTask) {
queuedMergeTasks.add(mergeTask);
enqueueMergeTask(mergeTask);
}

private void enqueueMergeTask(MergeTask mergeTask) {
if (queuedMergeTasks.add(mergeTask)) {
mergeEventListeners.forEach(l -> l.onMergeQueued(mergeTask.getOnGoingMerge(), mergeTask.getEstimateMergeMemoryBytes()));
}
}

public boolean allDone() {
Expand Down Expand Up @@ -201,6 +211,7 @@ private void runMergeTask(MergeTask mergeTask) {
if (mergeTask.supportsIOThrottling()) {
ioThrottledMergeTasksCount.decrementAndGet();
}
mergeEventListeners.forEach(l -> l.onMergeCompleted(mergeTask.getOnGoingMerge()));
}
}

Expand All @@ -213,6 +224,7 @@ private void abortMergeTask(MergeTask mergeTask) {
if (mergeTask.supportsIOThrottling()) {
ioThrottledMergeTasksCount.decrementAndGet();
}
mergeEventListeners.forEach(l -> l.onMergeAborted(mergeTask.getOnGoingMerge()));
}
}

Expand Down Expand Up @@ -278,6 +290,10 @@ public boolean usingMaxTargetIORateBytesPerSec() {
return MAX_IO_RATE.getBytes() == targetIORateBytesPerSec.get();
}

public void registerMergeEventListener(MergeEventListener consumer) {
mergeEventListeners.add(consumer);
}

// exposed for tests
Set<MergeTask> getRunningMergeTasks() {
return runningMergeTasks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,13 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
private final AtomicLong doneMergeTaskCount = new AtomicLong();
private final CountDownLatch closedWithNoRunningMerges = new CountDownLatch(1);
private volatile boolean closed = false;
private final MergeMemoryEstimateProvider mergeMemoryEstimateProvider;

public ThreadPoolMergeScheduler(
ShardId shardId,
IndexSettings indexSettings,
ThreadPoolMergeExecutorService threadPoolMergeExecutorService
ThreadPoolMergeExecutorService threadPoolMergeExecutorService,
MergeMemoryEstimateProvider mergeMemoryEstimateProvider
) {
this.shardId = shardId;
this.config = indexSettings.getMergeSchedulerConfig();
Expand All @@ -81,6 +83,7 @@ public ThreadPoolMergeScheduler(
: Double.POSITIVE_INFINITY
);
this.threadPoolMergeExecutorService = threadPoolMergeExecutorService;
this.mergeMemoryEstimateProvider = mergeMemoryEstimateProvider;
}

@Override
Expand Down Expand Up @@ -176,11 +179,13 @@ MergeTask newMergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, Merg
// forced merges, as well as merges triggered when closing a shard, always run un-IO-throttled
boolean isAutoThrottle = mergeTrigger != MergeTrigger.CLOSING && merge.getStoreMergeInfo().mergeMaxNumSegments() == -1;
// IO throttling cannot be toggled for existing merge tasks, only new merge tasks pick up the updated IO throttling setting
long estimateMergeMemoryBytes = mergeMemoryEstimateProvider.estimateMergeMemoryBytes(merge);
return new MergeTask(
mergeSource,
merge,
isAutoThrottle && config.isAutoThrottle(),
"Lucene Merge Task #" + submittedMergeTaskCount.incrementAndGet() + " for shard " + shardId
"Lucene Merge Task #" + submittedMergeTaskCount.incrementAndGet() + " for shard " + shardId,
estimateMergeMemoryBytes
);
}

Expand Down Expand Up @@ -312,14 +317,22 @@ class MergeTask implements Runnable {
private final OnGoingMerge onGoingMerge;
private final MergeRateLimiter rateLimiter;
private final boolean supportsIOThrottling;

MergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, boolean supportsIOThrottling, String name) {
private final long estimateMergeMemoryBytes;

MergeTask(
MergeSource mergeSource,
MergePolicy.OneMerge merge,
boolean supportsIOThrottling,
String name,
long estimateMergeMemoryBytes
) {
this.name = name;
this.mergeStartTimeNS = new AtomicLong();
this.mergeSource = mergeSource;
this.onGoingMerge = new OnGoingMerge(merge);
this.rateLimiter = new MergeRateLimiter(merge.getMergeProgress());
this.supportsIOThrottling = supportsIOThrottling;
this.estimateMergeMemoryBytes = estimateMergeMemoryBytes;
}

Schedule schedule() {
Expand Down Expand Up @@ -449,6 +462,14 @@ long estimatedMergeSize() {
return onGoingMerge.getMerge().getStoreMergeInfo().estimatedMergeBytes();
}

public long getEstimateMergeMemoryBytes() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd prefer to call this getMergeMemoryEstimateBytes(), similar to the provider (also rename various variales).

return estimateMergeMemoryBytes;
}

public OnGoingMerge getOnGoingMerge() {
return onGoingMerge;
}

@Override
public String toString() {
return name + (onGoingMerge.getMerge().isAborted() ? " (aborted)" : "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.MergeTask;
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.Schedule;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -80,10 +81,14 @@ public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutd
.build();
TestThreadPool testThreadPool = new TestThreadPool("test", settings);
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(testThreadPool);
var countingListener = new CountingMergeEventListener();
threadPoolMergeExecutorService.registerMergeEventListener(countingListener);
assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount));
Semaphore runMergeSemaphore = new Semaphore(0);
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) testThreadPool.executor(ThreadPool.Names.MERGE);
AtomicInteger doneMergesCount = new AtomicInteger(0);
AtomicInteger reEnqueuedBackloggedMergesCount = new AtomicInteger();
AtomicInteger abortedMergesCount = new AtomicInteger();
// submit more merge tasks than there are threads so that some are enqueued
for (int i = 0; i < mergesToSubmit; i++) {
MergeTask mergeTask = mock(MergeTask.class);
Expand All @@ -95,6 +100,7 @@ public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutd
if (schedule == BACKLOG) {
// reenqueue backlogged merge task
new Thread(() -> threadPoolMergeExecutorService.reEnqueueBackloggedMergeTask(mergeTask)).start();
reEnqueuedBackloggedMergesCount.incrementAndGet();
}
return schedule;
}).when(mergeTask).schedule();
Expand All @@ -114,6 +120,7 @@ public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutd
}
runMergeSemaphore.acquireUninterruptibly();
doneMergesCount.incrementAndGet();
abortedMergesCount.incrementAndGet();
return null;
}).when(mergeTask).abort();
threadPoolMergeExecutorService.submitMergeTask(mergeTask);
Expand All @@ -125,6 +132,10 @@ public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutd
// with the other merge tasks enqueued
assertThat(threadPoolExecutor.getQueue().size(), is(mergesToSubmit - mergeExecutorThreadCount));
});
assertThat(
countingListener.queued.get(),
equalTo(threadPoolExecutor.getActiveCount() + threadPoolExecutor.getQueue().size() + reEnqueuedBackloggedMergesCount.get())
);
// shutdown prevents new merge tasks to be enqueued but existing ones should be allowed to continue
testThreadPool.shutdown();
// assert all executors, except the merge one, are terminated
Expand Down Expand Up @@ -165,6 +176,8 @@ public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutd
assertTrue(threadPoolExecutor.isTerminated());
assertTrue(threadPoolMergeExecutorService.allDone());
});
assertThat(countingListener.aborted.get() + countingListener.completed.get(), equalTo(doneMergesCount.get()));
assertThat(countingListener.aborted.get(), equalTo(abortedMergesCount.get()));
}

public void testTargetIORateChangesWhenSubmittingMergeTasks() throws Exception {
Expand Down Expand Up @@ -660,6 +673,27 @@ public void testMergeTasksExecuteInSizeOrder() {
}
}

private static class CountingMergeEventListener implements MergeEventListener {
AtomicInteger queued = new AtomicInteger();
AtomicInteger aborted = new AtomicInteger();
AtomicInteger completed = new AtomicInteger();

@Override
public void onMergeQueued(OnGoingMerge merge, long estimateMergeMemoryBytes) {
queued.incrementAndGet();
}

@Override
public void onMergeCompleted(OnGoingMerge merge) {
completed.incrementAndGet();
}

@Override
public void onMergeAborted(OnGoingMerge merge) {
aborted.incrementAndGet();
}
}

static ThreadPoolMergeExecutorService getThreadPoolMergeExecutorService(ThreadPool threadPool) {
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorService
.maybeCreateThreadPoolMergeExecutorService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public void testMergesExecuteInSizeOrder() throws IOException {
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
new ShardId("index", "_na_", 1),
IndexSettingsModule.newIndexSettings("index", Settings.EMPTY),
threadPoolMergeExecutorService
threadPoolMergeExecutorService,
merge -> 0
)
) {
List<OneMerge> executedMergesList = new ArrayList<>();
Expand Down Expand Up @@ -103,7 +104,8 @@ public void testSimpleMergeTaskBacklogging() {
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
new ShardId("index", "_na_", 1),
IndexSettingsModule.newIndexSettings("index", mergeSchedulerSettings),
threadPoolMergeExecutorService
threadPoolMergeExecutorService,
merge -> 0
);
// more merge tasks than merge threads
int mergeCount = mergeExecutorThreadCount + randomIntBetween(1, 5);
Expand Down Expand Up @@ -136,7 +138,8 @@ public void testSimpleMergeTaskReEnqueueingBySize() {
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
new ShardId("index", "_na_", 1),
IndexSettingsModule.newIndexSettings("index", mergeSchedulerSettings),
threadPoolMergeExecutorService
threadPoolMergeExecutorService,
merge -> 0
);
// sort backlogged merges by size
PriorityQueue<MergeTask> backloggedMergeTasks = new PriorityQueue<>(16, Comparator.comparingLong(MergeTask::estimatedMergeSize));
Expand Down Expand Up @@ -347,7 +350,8 @@ public void testMergeSourceWithFollowUpMergesRunSequentially() throws Exception
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
new ShardId("index", "_na_", 1),
IndexSettingsModule.newIndexSettings("index", settings),
threadPoolMergeExecutorService
threadPoolMergeExecutorService,
merge -> 0
)
) {
MergeSource mergeSource = mock(MergeSource.class);
Expand Down Expand Up @@ -420,7 +424,8 @@ public void testMergesRunConcurrently() throws Exception {
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
new ShardId("index", "_na_", 1),
IndexSettingsModule.newIndexSettings("index", settings),
threadPoolMergeExecutorService
threadPoolMergeExecutorService,
merge -> 0
)
) {
// at least 1 extra merge than there are concurrently allowed
Expand Down Expand Up @@ -504,7 +509,8 @@ public void testSchedulerCloseWaitsForRunningMerge() throws Exception {
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
new ShardId("index", "_na_", 1),
IndexSettingsModule.newIndexSettings("index", settings),
threadPoolMergeExecutorService
threadPoolMergeExecutorService,
merge -> 0
)
) {
CountDownLatch mergeDoneLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -576,7 +582,8 @@ public void testAutoIOThrottleForMergeTasksWhenSchedulerDisablesIt() throws Exce
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
new ShardId("index", "_na_", 1),
indexSettings,
threadPoolMergeExecutorService
threadPoolMergeExecutorService,
merge -> 0
)
) {
threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values()));
Expand Down Expand Up @@ -605,7 +612,8 @@ public void testAutoIOThrottleForMergeTasks() throws Exception {
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
new ShardId("index", "_na_", 1),
indexSettings,
threadPoolMergeExecutorService
threadPoolMergeExecutorService,
merge -> 0
)
) {
threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values()));
Expand All @@ -621,7 +629,8 @@ public void testAutoIOThrottleForMergeTasks() throws Exception {
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
new ShardId("index", "_na_", 1),
indexSettings,
threadPoolMergeExecutorService
threadPoolMergeExecutorService,
merge -> 0
)
) {
// merge submitted upon closing
Expand All @@ -637,7 +646,8 @@ public void testAutoIOThrottleForMergeTasks() throws Exception {
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
new ShardId("index", "_na_", 1),
indexSettings,
threadPoolMergeExecutorService
threadPoolMergeExecutorService,
merge -> 0
)
) {
// merge submitted upon closing
Expand Down Expand Up @@ -668,7 +678,7 @@ static class TestThreadPoolMergeScheduler extends ThreadPoolMergeScheduler {
IndexSettings indexSettings,
ThreadPoolMergeExecutorService threadPoolMergeExecutorService
) {
super(shardId, indexSettings, threadPoolMergeExecutorService);
super(shardId, indexSettings, threadPoolMergeExecutorService, merge -> 0);
}

@Override
Expand Down