diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index df350f7bc5ed2..86fb88c40252d 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2909,7 +2909,7 @@ private final class EngineThreadPoolMergeScheduler extends ThreadPoolMergeSchedu IndexSettings indexSettings, ThreadPoolMergeExecutorService threadPoolMergeExecutorService ) { - super(shardId, indexSettings, threadPoolMergeExecutorService); + super(shardId, indexSettings, threadPoolMergeExecutorService, InternalEngine.this::estimateMergeBytes); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/engine/MergeEventListener.java b/server/src/main/java/org/elasticsearch/index/engine/MergeEventListener.java new file mode 100644 index 0000000000000..f029820535dc1 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/MergeEventListener.java @@ -0,0 +1,26 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.engine; + +import org.elasticsearch.index.merge.OnGoingMerge; + +public interface MergeEventListener { + + /** + * + * @param merge + * @param estimateMergeMemoryBytes estimate of the memory needed to perform a merge + */ + void onMergeQueued(OnGoingMerge merge, long estimateMergeMemoryBytes); + + void onMergeCompleted(OnGoingMerge merge); + + void onMergeAborted(OnGoingMerge merge); +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/MergeMemoryEstimateProvider.java b/server/src/main/java/org/elasticsearch/index/engine/MergeMemoryEstimateProvider.java new file mode 100644 index 0000000000000..10d0aa38adf3a --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/MergeMemoryEstimateProvider.java @@ -0,0 +1,21 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.index.MergePolicy; + +@FunctionalInterface +public interface MergeMemoryEstimateProvider { + + /** + * Returns an estimate of the memory needed to perform a merge + */ + long estimateMergeMemoryBytes(MergePolicy.OneMerge merge); +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java index 7e41fffdd5357..d3dff8c30449a 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java @@ -17,7 +17,9 @@ import org.elasticsearch.threadpool.ThreadPool; import java.util.Comparator; +import java.util.List; import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.RejectedExecutionException; @@ -73,6 +75,8 @@ public class ThreadPoolMergeExecutorService { private final int concurrentMergesFloorLimitForThrottling; private final int concurrentMergesCeilLimitForThrottling; + private final List mergeEventListeners = new CopyOnWriteArrayList<>(); + public static @Nullable ThreadPoolMergeExecutorService maybeCreateThreadPoolMergeExecutorService( ThreadPool threadPool, Settings settings @@ -127,13 +131,21 @@ boolean submitMergeTask(MergeTask mergeTask) { ); } // then enqueue the merge task proper - queuedMergeTasks.add(mergeTask); + enqueueMergeTask(mergeTask); return true; } } void reEnqueueBackloggedMergeTask(MergeTask mergeTask) { - queuedMergeTasks.add(mergeTask); + enqueueMergeTask(mergeTask); + } + + private void enqueueMergeTask(MergeTask mergeTask) { + // To ensure that for a given merge onMergeQueued is called before onMergeAborted or onMergeCompleted, we call onMergeQueued + // before adding the merge task to the queue. Adding to the queue should not fail. + mergeEventListeners.forEach(l -> l.onMergeQueued(mergeTask.getOnGoingMerge(), mergeTask.getMergeMemoryEstimateBytes())); + boolean added = queuedMergeTasks.add(mergeTask); + assert added; } public boolean allDone() { @@ -201,6 +213,7 @@ private void runMergeTask(MergeTask mergeTask) { if (mergeTask.supportsIOThrottling()) { ioThrottledMergeTasksCount.decrementAndGet(); } + mergeEventListeners.forEach(l -> l.onMergeCompleted(mergeTask.getOnGoingMerge())); } } @@ -213,6 +226,7 @@ private void abortMergeTask(MergeTask mergeTask) { if (mergeTask.supportsIOThrottling()) { ioThrottledMergeTasksCount.decrementAndGet(); } + mergeEventListeners.forEach(l -> l.onMergeAborted(mergeTask.getOnGoingMerge())); } } @@ -278,6 +292,10 @@ public boolean usingMaxTargetIORateBytesPerSec() { return MAX_IO_RATE.getBytes() == targetIORateBytesPerSec.get(); } + public void registerMergeEventListener(MergeEventListener consumer) { + mergeEventListeners.add(consumer); + } + // exposed for tests Set getRunningMergeTasks() { return runningMergeTasks; diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java index f645edaff64a8..e54c8164c6ab5 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java @@ -65,11 +65,13 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics private final AtomicLong doneMergeTaskCount = new AtomicLong(); private final CountDownLatch closedWithNoRunningMerges = new CountDownLatch(1); private volatile boolean closed = false; + private final MergeMemoryEstimateProvider mergeMemoryEstimateProvider; public ThreadPoolMergeScheduler( ShardId shardId, IndexSettings indexSettings, - ThreadPoolMergeExecutorService threadPoolMergeExecutorService + ThreadPoolMergeExecutorService threadPoolMergeExecutorService, + MergeMemoryEstimateProvider mergeMemoryEstimateProvider ) { this.shardId = shardId; this.config = indexSettings.getMergeSchedulerConfig(); @@ -81,6 +83,7 @@ public ThreadPoolMergeScheduler( : Double.POSITIVE_INFINITY ); this.threadPoolMergeExecutorService = threadPoolMergeExecutorService; + this.mergeMemoryEstimateProvider = mergeMemoryEstimateProvider; } @Override @@ -176,11 +179,13 @@ MergeTask newMergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, Merg // forced merges, as well as merges triggered when closing a shard, always run un-IO-throttled boolean isAutoThrottle = mergeTrigger != MergeTrigger.CLOSING && merge.getStoreMergeInfo().mergeMaxNumSegments() == -1; // IO throttling cannot be toggled for existing merge tasks, only new merge tasks pick up the updated IO throttling setting + long estimateMergeMemoryBytes = mergeMemoryEstimateProvider.estimateMergeMemoryBytes(merge); return new MergeTask( mergeSource, merge, isAutoThrottle && config.isAutoThrottle(), - "Lucene Merge Task #" + submittedMergeTaskCount.incrementAndGet() + " for shard " + shardId + "Lucene Merge Task #" + submittedMergeTaskCount.incrementAndGet() + " for shard " + shardId, + estimateMergeMemoryBytes ); } @@ -312,14 +317,22 @@ class MergeTask implements Runnable { private final OnGoingMerge onGoingMerge; private final MergeRateLimiter rateLimiter; private final boolean supportsIOThrottling; - - MergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, boolean supportsIOThrottling, String name) { + private final long mergeMemoryEstimateBytes; + + MergeTask( + MergeSource mergeSource, + MergePolicy.OneMerge merge, + boolean supportsIOThrottling, + String name, + long mergeMemoryEstimateBytes + ) { this.name = name; this.mergeStartTimeNS = new AtomicLong(); this.mergeSource = mergeSource; this.onGoingMerge = new OnGoingMerge(merge); this.rateLimiter = new MergeRateLimiter(merge.getMergeProgress()); this.supportsIOThrottling = supportsIOThrottling; + this.mergeMemoryEstimateBytes = mergeMemoryEstimateBytes; } Schedule schedule() { @@ -449,6 +462,14 @@ long estimatedMergeSize() { return onGoingMerge.getMerge().getStoreMergeInfo().estimatedMergeBytes(); } + public long getMergeMemoryEstimateBytes() { + return mergeMemoryEstimateBytes; + } + + public OnGoingMerge getOnGoingMerge() { + return onGoingMerge; + } + @Override public String toString() { return name + (onGoingMerge.getMerge().isAborted() ? " (aborted)" : ""); diff --git a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java index 8ce1645148337..bcc5250ea098d 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.MergeTask; import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.Schedule; +import org.elasticsearch.index.merge.OnGoingMerge; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -80,10 +81,14 @@ public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutd .build(); TestThreadPool testThreadPool = new TestThreadPool("test", settings); ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(testThreadPool); + var countingListener = new CountingMergeEventListener(); + threadPoolMergeExecutorService.registerMergeEventListener(countingListener); assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount)); Semaphore runMergeSemaphore = new Semaphore(0); ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) testThreadPool.executor(ThreadPool.Names.MERGE); AtomicInteger doneMergesCount = new AtomicInteger(0); + AtomicInteger reEnqueuedBackloggedMergesCount = new AtomicInteger(); + AtomicInteger abortedMergesCount = new AtomicInteger(); // submit more merge tasks than there are threads so that some are enqueued for (int i = 0; i < mergesToSubmit; i++) { MergeTask mergeTask = mock(MergeTask.class); @@ -95,6 +100,7 @@ public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutd if (schedule == BACKLOG) { // reenqueue backlogged merge task new Thread(() -> threadPoolMergeExecutorService.reEnqueueBackloggedMergeTask(mergeTask)).start(); + reEnqueuedBackloggedMergesCount.incrementAndGet(); } return schedule; }).when(mergeTask).schedule(); @@ -114,6 +120,7 @@ public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutd } runMergeSemaphore.acquireUninterruptibly(); doneMergesCount.incrementAndGet(); + abortedMergesCount.incrementAndGet(); return null; }).when(mergeTask).abort(); threadPoolMergeExecutorService.submitMergeTask(mergeTask); @@ -125,6 +132,12 @@ public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutd // with the other merge tasks enqueued assertThat(threadPoolExecutor.getQueue().size(), is(mergesToSubmit - mergeExecutorThreadCount)); }); + assertBusy( + () -> assertThat( + countingListener.queued.get(), + equalTo(threadPoolExecutor.getActiveCount() + threadPoolExecutor.getQueue().size() + reEnqueuedBackloggedMergesCount.get()) + ) + ); // shutdown prevents new merge tasks to be enqueued but existing ones should be allowed to continue testThreadPool.shutdown(); // assert all executors, except the merge one, are terminated @@ -165,6 +178,8 @@ public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutd assertTrue(threadPoolExecutor.isTerminated()); assertTrue(threadPoolMergeExecutorService.allDone()); }); + assertThat(countingListener.aborted.get() + countingListener.completed.get(), equalTo(doneMergesCount.get())); + assertThat(countingListener.aborted.get(), equalTo(abortedMergesCount.get())); } public void testTargetIORateChangesWhenSubmittingMergeTasks() throws Exception { @@ -660,6 +675,27 @@ public void testMergeTasksExecuteInSizeOrder() { } } + private static class CountingMergeEventListener implements MergeEventListener { + AtomicInteger queued = new AtomicInteger(); + AtomicInteger aborted = new AtomicInteger(); + AtomicInteger completed = new AtomicInteger(); + + @Override + public void onMergeQueued(OnGoingMerge merge, long estimateMergeMemoryBytes) { + queued.incrementAndGet(); + } + + @Override + public void onMergeCompleted(OnGoingMerge merge) { + completed.incrementAndGet(); + } + + @Override + public void onMergeAborted(OnGoingMerge merge) { + aborted.incrementAndGet(); + } + } + static ThreadPoolMergeExecutorService getThreadPoolMergeExecutorService(ThreadPool threadPool) { ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorService .maybeCreateThreadPoolMergeExecutorService( diff --git a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java index d407e865efbaf..6d39b72a107fe 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java @@ -61,7 +61,8 @@ public void testMergesExecuteInSizeOrder() throws IOException { ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( new ShardId("index", "_na_", 1), IndexSettingsModule.newIndexSettings("index", Settings.EMPTY), - threadPoolMergeExecutorService + threadPoolMergeExecutorService, + merge -> 0 ) ) { List executedMergesList = new ArrayList<>(); @@ -103,7 +104,8 @@ public void testSimpleMergeTaskBacklogging() { ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( new ShardId("index", "_na_", 1), IndexSettingsModule.newIndexSettings("index", mergeSchedulerSettings), - threadPoolMergeExecutorService + threadPoolMergeExecutorService, + merge -> 0 ); // more merge tasks than merge threads int mergeCount = mergeExecutorThreadCount + randomIntBetween(1, 5); @@ -136,7 +138,8 @@ public void testSimpleMergeTaskReEnqueueingBySize() { ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( new ShardId("index", "_na_", 1), IndexSettingsModule.newIndexSettings("index", mergeSchedulerSettings), - threadPoolMergeExecutorService + threadPoolMergeExecutorService, + merge -> 0 ); // sort backlogged merges by size PriorityQueue backloggedMergeTasks = new PriorityQueue<>(16, Comparator.comparingLong(MergeTask::estimatedMergeSize)); @@ -347,7 +350,8 @@ public void testMergeSourceWithFollowUpMergesRunSequentially() throws Exception ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( new ShardId("index", "_na_", 1), IndexSettingsModule.newIndexSettings("index", settings), - threadPoolMergeExecutorService + threadPoolMergeExecutorService, + merge -> 0 ) ) { MergeSource mergeSource = mock(MergeSource.class); @@ -420,7 +424,8 @@ public void testMergesRunConcurrently() throws Exception { ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( new ShardId("index", "_na_", 1), IndexSettingsModule.newIndexSettings("index", settings), - threadPoolMergeExecutorService + threadPoolMergeExecutorService, + merge -> 0 ) ) { // at least 1 extra merge than there are concurrently allowed @@ -504,7 +509,8 @@ public void testSchedulerCloseWaitsForRunningMerge() throws Exception { ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( new ShardId("index", "_na_", 1), IndexSettingsModule.newIndexSettings("index", settings), - threadPoolMergeExecutorService + threadPoolMergeExecutorService, + merge -> 0 ) ) { CountDownLatch mergeDoneLatch = new CountDownLatch(1); @@ -576,7 +582,8 @@ public void testAutoIOThrottleForMergeTasksWhenSchedulerDisablesIt() throws Exce ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( new ShardId("index", "_na_", 1), indexSettings, - threadPoolMergeExecutorService + threadPoolMergeExecutorService, + merge -> 0 ) ) { threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values())); @@ -605,7 +612,8 @@ public void testAutoIOThrottleForMergeTasks() throws Exception { ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( new ShardId("index", "_na_", 1), indexSettings, - threadPoolMergeExecutorService + threadPoolMergeExecutorService, + merge -> 0 ) ) { threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values())); @@ -621,7 +629,8 @@ public void testAutoIOThrottleForMergeTasks() throws Exception { ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( new ShardId("index", "_na_", 1), indexSettings, - threadPoolMergeExecutorService + threadPoolMergeExecutorService, + merge -> 0 ) ) { // merge submitted upon closing @@ -637,7 +646,8 @@ public void testAutoIOThrottleForMergeTasks() throws Exception { ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( new ShardId("index", "_na_", 1), indexSettings, - threadPoolMergeExecutorService + threadPoolMergeExecutorService, + merge -> 0 ) ) { // merge submitted upon closing @@ -668,7 +678,7 @@ static class TestThreadPoolMergeScheduler extends ThreadPoolMergeScheduler { IndexSettings indexSettings, ThreadPoolMergeExecutorService threadPoolMergeExecutorService ) { - super(shardId, indexSettings, threadPoolMergeExecutorService); + super(shardId, indexSettings, threadPoolMergeExecutorService, merge -> 0); } @Override