Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2909,7 +2909,7 @@ private final class EngineThreadPoolMergeScheduler extends ThreadPoolMergeSchedu
IndexSettings indexSettings,
ThreadPoolMergeExecutorService threadPoolMergeExecutorService
) {
super(shardId, indexSettings, threadPoolMergeExecutorService);
super(shardId, indexSettings, threadPoolMergeExecutorService, InternalEngine.this::estimateMergeBytes);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.engine;

import org.elasticsearch.index.merge.OnGoingMerge;

public interface MergeEventListener {

/**
*
* @param merge
* @param estimateMergeMemoryBytes estimate of the memory needed to perform a merge
*/
void onMergeQueued(OnGoingMerge merge, long estimateMergeMemoryBytes);

void onMergeCompleted(OnGoingMerge merge);

void onMergeAborted(OnGoingMerge merge);
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this out of stateless.

Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.engine;

import org.apache.lucene.index.MergePolicy;

@FunctionalInterface
public interface MergeMemoryEstimateProvider {

/**
* Returns an estimate of the memory needed to perform a merge
*/
long estimateMergeMemoryBytes(MergePolicy.OneMerge merge);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
Expand Down Expand Up @@ -73,6 +75,14 @@ public class ThreadPoolMergeExecutorService {
private final int concurrentMergesFloorLimitForThrottling;
private final int concurrentMergesCeilLimitForThrottling;

private final List<MergeEventListener> mergeEventListeners = new CopyOnWriteArrayList<>();
/**
* To ensure that for a given merge {@link org.elasticsearch.index.engine.MergeEventListener#onMergeAborted} or
* {@link org.elasticsearch.index.engine.MergeEventListener#onMergeCompleted} is not called before
* {@link org.elasticsearch.index.engine.MergeEventListener#onMergeQueued}.
*/
private final Object mergeEventsMutex = new Object();

public static @Nullable ThreadPoolMergeExecutorService maybeCreateThreadPoolMergeExecutorService(
ThreadPool threadPool,
Settings settings
Expand Down Expand Up @@ -127,13 +137,21 @@ boolean submitMergeTask(MergeTask mergeTask) {
);
}
// then enqueue the merge task proper
queuedMergeTasks.add(mergeTask);
enqueueMergeTask(mergeTask);
return true;
}
}

void reEnqueueBackloggedMergeTask(MergeTask mergeTask) {
queuedMergeTasks.add(mergeTask);
enqueueMergeTask(mergeTask);
}

private void enqueueMergeTask(MergeTask mergeTask) {
synchronized (mergeEventsMutex) {
if (queuedMergeTasks.add(mergeTask)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can assert that it always returns true instead, thus avoid the if-statement, invoke the listener before adding and thus avoid the synchronized?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. See ba7ebb5. This doesn't change anything in the stateless PR though.

mergeEventListeners.forEach(l -> l.onMergeQueued(mergeTask.getOnGoingMerge(), mergeTask.getMergeMemoryEstimateBytes()));
}
}
}

public boolean allDone() {
Expand Down Expand Up @@ -201,6 +219,9 @@ private void runMergeTask(MergeTask mergeTask) {
if (mergeTask.supportsIOThrottling()) {
ioThrottledMergeTasksCount.decrementAndGet();
}
synchronized (mergeEventsMutex) {
mergeEventListeners.forEach(l -> l.onMergeCompleted(mergeTask.getOnGoingMerge()));
}
}
}

Expand All @@ -213,6 +234,9 @@ private void abortMergeTask(MergeTask mergeTask) {
if (mergeTask.supportsIOThrottling()) {
ioThrottledMergeTasksCount.decrementAndGet();
}
synchronized (mergeEventsMutex) {
mergeEventListeners.forEach(l -> l.onMergeAborted(mergeTask.getOnGoingMerge()));
}
}
}

Expand Down Expand Up @@ -278,6 +302,10 @@ public boolean usingMaxTargetIORateBytesPerSec() {
return MAX_IO_RATE.getBytes() == targetIORateBytesPerSec.get();
}

public void registerMergeEventListener(MergeEventListener consumer) {
mergeEventListeners.add(consumer);
}

// exposed for tests
Set<MergeTask> getRunningMergeTasks() {
return runningMergeTasks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,13 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
private final AtomicLong doneMergeTaskCount = new AtomicLong();
private final CountDownLatch closedWithNoRunningMerges = new CountDownLatch(1);
private volatile boolean closed = false;
private final MergeMemoryEstimateProvider mergeMemoryEstimateProvider;

public ThreadPoolMergeScheduler(
ShardId shardId,
IndexSettings indexSettings,
ThreadPoolMergeExecutorService threadPoolMergeExecutorService
ThreadPoolMergeExecutorService threadPoolMergeExecutorService,
MergeMemoryEstimateProvider mergeMemoryEstimateProvider
) {
this.shardId = shardId;
this.config = indexSettings.getMergeSchedulerConfig();
Expand All @@ -81,6 +83,7 @@ public ThreadPoolMergeScheduler(
: Double.POSITIVE_INFINITY
);
this.threadPoolMergeExecutorService = threadPoolMergeExecutorService;
this.mergeMemoryEstimateProvider = mergeMemoryEstimateProvider;
}

@Override
Expand Down Expand Up @@ -176,11 +179,13 @@ MergeTask newMergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, Merg
// forced merges, as well as merges triggered when closing a shard, always run un-IO-throttled
boolean isAutoThrottle = mergeTrigger != MergeTrigger.CLOSING && merge.getStoreMergeInfo().mergeMaxNumSegments() == -1;
// IO throttling cannot be toggled for existing merge tasks, only new merge tasks pick up the updated IO throttling setting
long estimateMergeMemoryBytes = mergeMemoryEstimateProvider.estimateMergeMemoryBytes(merge);
return new MergeTask(
mergeSource,
merge,
isAutoThrottle && config.isAutoThrottle(),
"Lucene Merge Task #" + submittedMergeTaskCount.incrementAndGet() + " for shard " + shardId
"Lucene Merge Task #" + submittedMergeTaskCount.incrementAndGet() + " for shard " + shardId,
estimateMergeMemoryBytes
);
}

Expand Down Expand Up @@ -312,14 +317,22 @@ class MergeTask implements Runnable {
private final OnGoingMerge onGoingMerge;
private final MergeRateLimiter rateLimiter;
private final boolean supportsIOThrottling;

MergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, boolean supportsIOThrottling, String name) {
private final long mergeMemoryEstimateBytes;

MergeTask(
MergeSource mergeSource,
MergePolicy.OneMerge merge,
boolean supportsIOThrottling,
String name,
long mergeMemoryEstimateBytes
) {
this.name = name;
this.mergeStartTimeNS = new AtomicLong();
this.mergeSource = mergeSource;
this.onGoingMerge = new OnGoingMerge(merge);
this.rateLimiter = new MergeRateLimiter(merge.getMergeProgress());
this.supportsIOThrottling = supportsIOThrottling;
this.mergeMemoryEstimateBytes = mergeMemoryEstimateBytes;
}

Schedule schedule() {
Expand Down Expand Up @@ -449,6 +462,14 @@ long estimatedMergeSize() {
return onGoingMerge.getMerge().getStoreMergeInfo().estimatedMergeBytes();
}

public long getMergeMemoryEstimateBytes() {
return mergeMemoryEstimateBytes;
}

public OnGoingMerge getOnGoingMerge() {
return onGoingMerge;
}

@Override
public String toString() {
return name + (onGoingMerge.getMerge().isAborted() ? " (aborted)" : "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.MergeTask;
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.Schedule;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -80,10 +81,14 @@ public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutd
.build();
TestThreadPool testThreadPool = new TestThreadPool("test", settings);
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(testThreadPool);
var countingListener = new CountingMergeEventListener();
threadPoolMergeExecutorService.registerMergeEventListener(countingListener);
assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount));
Semaphore runMergeSemaphore = new Semaphore(0);
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) testThreadPool.executor(ThreadPool.Names.MERGE);
AtomicInteger doneMergesCount = new AtomicInteger(0);
AtomicInteger reEnqueuedBackloggedMergesCount = new AtomicInteger();
AtomicInteger abortedMergesCount = new AtomicInteger();
// submit more merge tasks than there are threads so that some are enqueued
for (int i = 0; i < mergesToSubmit; i++) {
MergeTask mergeTask = mock(MergeTask.class);
Expand All @@ -95,6 +100,7 @@ public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutd
if (schedule == BACKLOG) {
// reenqueue backlogged merge task
new Thread(() -> threadPoolMergeExecutorService.reEnqueueBackloggedMergeTask(mergeTask)).start();
reEnqueuedBackloggedMergesCount.incrementAndGet();
}
return schedule;
}).when(mergeTask).schedule();
Expand All @@ -114,6 +120,7 @@ public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutd
}
runMergeSemaphore.acquireUninterruptibly();
doneMergesCount.incrementAndGet();
abortedMergesCount.incrementAndGet();
return null;
}).when(mergeTask).abort();
threadPoolMergeExecutorService.submitMergeTask(mergeTask);
Expand All @@ -125,6 +132,12 @@ public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutd
// with the other merge tasks enqueued
assertThat(threadPoolExecutor.getQueue().size(), is(mergesToSubmit - mergeExecutorThreadCount));
});
assertBusy(
() -> assertThat(
countingListener.queued.get(),
equalTo(threadPoolExecutor.getActiveCount() + threadPoolExecutor.getQueue().size() + reEnqueuedBackloggedMergesCount.get())
)
);
// shutdown prevents new merge tasks to be enqueued but existing ones should be allowed to continue
testThreadPool.shutdown();
// assert all executors, except the merge one, are terminated
Expand Down Expand Up @@ -165,6 +178,8 @@ public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutd
assertTrue(threadPoolExecutor.isTerminated());
assertTrue(threadPoolMergeExecutorService.allDone());
});
assertThat(countingListener.aborted.get() + countingListener.completed.get(), equalTo(doneMergesCount.get()));
assertThat(countingListener.aborted.get(), equalTo(abortedMergesCount.get()));
}

public void testTargetIORateChangesWhenSubmittingMergeTasks() throws Exception {
Expand Down Expand Up @@ -660,6 +675,27 @@ public void testMergeTasksExecuteInSizeOrder() {
}
}

private static class CountingMergeEventListener implements MergeEventListener {
AtomicInteger queued = new AtomicInteger();
AtomicInteger aborted = new AtomicInteger();
AtomicInteger completed = new AtomicInteger();

@Override
public void onMergeQueued(OnGoingMerge merge, long estimateMergeMemoryBytes) {
queued.incrementAndGet();
}

@Override
public void onMergeCompleted(OnGoingMerge merge) {
completed.incrementAndGet();
}

@Override
public void onMergeAborted(OnGoingMerge merge) {
aborted.incrementAndGet();
}
}

static ThreadPoolMergeExecutorService getThreadPoolMergeExecutorService(ThreadPool threadPool) {
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorService
.maybeCreateThreadPoolMergeExecutorService(
Expand Down
Loading