From ca095be6272e7439092716b22517f2bd565a0d65 Mon Sep 17 00:00:00 2001 From: Pooya Salehi Date: Thu, 10 Apr 2025 12:12:53 +0200 Subject: [PATCH 1/9] draft --- .../engine/ThreadPoolMergeExecutorService.java | 14 ++++++++++++++ .../index/engine/ThreadPoolMergeScheduler.java | 9 +++++++++ 2 files changed, 23 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java index 7e41fffdd5357..01e2203937f68 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.core.Nullable; import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.MergeTask; +import org.elasticsearch.index.merge.OnGoingMerge; import org.elasticsearch.threadpool.ThreadPool; import java.util.Comparator; @@ -73,6 +74,8 @@ public class ThreadPoolMergeExecutorService { private final int concurrentMergesFloorLimitForThrottling; private final int concurrentMergesCeilLimitForThrottling; + private volatile MergeEventConsumer mergeEventConsumer; + public static @Nullable ThreadPoolMergeExecutorService maybeCreateThreadPoolMergeExecutorService( ThreadPool threadPool, Settings settings @@ -278,6 +281,17 @@ public boolean usingMaxTargetIORateBytesPerSec() { return MAX_IO_RATE.getBytes() == targetIORateBytesPerSec.get(); } + public void registerMergeEventConsumer(MergeEventConsumer consumer) { + assert this.mergeEventConsumer == null; + this.mergeEventConsumer = consumer; + } + + public interface MergeEventConsumer { + void onMergeQueued(OnGoingMerge merge, long estimateMergeMemoryBytes); + + void onMergeCompleted(OnGoingMerge merge); + } + // exposed for tests Set getRunningMergeTasks() { return runningMergeTasks; diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java index f645edaff64a8..ec44a9f29ae01 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java @@ -65,6 +65,7 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics private final AtomicLong doneMergeTaskCount = new AtomicLong(); private final CountDownLatch closedWithNoRunningMerges = new CountDownLatch(1); private volatile boolean closed = false; + private final MergeMemoryEstimator mergeMemoryEstimator; public ThreadPoolMergeScheduler( ShardId shardId, @@ -449,6 +450,14 @@ long estimatedMergeSize() { return onGoingMerge.getMerge().getStoreMergeInfo().estimatedMergeBytes(); } + public long getEstimateMergeMemoryBytes() { + return mergeMemoryEstimator.estimateMergeMemoryBytes(onGoingMerge.getMerge()); + } + + public OnGoingMerge getOnGoingMerge() { + return onGoingMerge; + } + @Override public String toString() { return name + (onGoingMerge.getMerge().isAborted() ? " (aborted)" : ""); From 643979c717934a16cef35aa0d2a4370667c985cc Mon Sep 17 00:00:00 2001 From: Pooya Salehi Date: Fri, 11 Apr 2025 11:07:57 +0200 Subject: [PATCH 2/9] Expose merge events and their memory usage estimate --- .../index/engine/InternalEngine.java | 2 +- .../index/engine/MergeEventListener.java | 21 ++++++++++++ .../engine/MergeMemoryEstimateProvider.java | 21 ++++++++++++ .../ThreadPoolMergeExecutorService.java | 25 ++++++++------- .../engine/ThreadPoolMergeScheduler.java | 24 ++++++++++---- .../ThreadPoolMergeExecutorServiceTests.java | 2 ++ .../engine/ThreadPoolMergeSchedulerTests.java | 32 ++++++++++++------- 7 files changed, 97 insertions(+), 30 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/index/engine/MergeEventListener.java create mode 100644 server/src/main/java/org/elasticsearch/index/engine/MergeMemoryEstimateProvider.java diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index acd9cec8b064d..3f35dde80d360 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2870,7 +2870,7 @@ private final class EngineThreadPoolMergeScheduler extends ThreadPoolMergeSchedu IndexSettings indexSettings, ThreadPoolMergeExecutorService threadPoolMergeExecutorService ) { - super(shardId, indexSettings, threadPoolMergeExecutorService); + super(shardId, indexSettings, threadPoolMergeExecutorService, InternalEngine.this::estimateMergeBytes); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/engine/MergeEventListener.java b/server/src/main/java/org/elasticsearch/index/engine/MergeEventListener.java new file mode 100644 index 0000000000000..f78f7a65a2111 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/MergeEventListener.java @@ -0,0 +1,21 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.engine; + +import org.elasticsearch.index.merge.OnGoingMerge; + +public interface MergeEventListener { + + void onMergeQueued(OnGoingMerge merge, long estimateMergeMemoryBytes); + + void onMergeCompleted(OnGoingMerge merge); + + void onMergeAborted(OnGoingMerge merge); +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/MergeMemoryEstimateProvider.java b/server/src/main/java/org/elasticsearch/index/engine/MergeMemoryEstimateProvider.java new file mode 100644 index 0000000000000..10d0aa38adf3a --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/MergeMemoryEstimateProvider.java @@ -0,0 +1,21 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.index.MergePolicy; + +@FunctionalInterface +public interface MergeMemoryEstimateProvider { + + /** + * Returns an estimate of the memory needed to perform a merge + */ + long estimateMergeMemoryBytes(MergePolicy.OneMerge merge); +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java index 01e2203937f68..590c8151a3f7b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java @@ -14,11 +14,12 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.core.Nullable; import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.MergeTask; -import org.elasticsearch.index.merge.OnGoingMerge; import org.elasticsearch.threadpool.ThreadPool; import java.util.Comparator; +import java.util.List; import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.RejectedExecutionException; @@ -74,7 +75,7 @@ public class ThreadPoolMergeExecutorService { private final int concurrentMergesFloorLimitForThrottling; private final int concurrentMergesCeilLimitForThrottling; - private volatile MergeEventConsumer mergeEventConsumer; + private final List mergeEventListeners = new CopyOnWriteArrayList<>(); public static @Nullable ThreadPoolMergeExecutorService maybeCreateThreadPoolMergeExecutorService( ThreadPool threadPool, @@ -130,13 +131,18 @@ boolean submitMergeTask(MergeTask mergeTask) { ); } // then enqueue the merge task proper - queuedMergeTasks.add(mergeTask); + enqueueMergeTask(mergeTask); return true; } } void reEnqueueBackloggedMergeTask(MergeTask mergeTask) { + enqueueMergeTask(mergeTask); + } + + private void enqueueMergeTask(MergeTask mergeTask) { queuedMergeTasks.add(mergeTask); + mergeEventListeners.forEach(l -> l.onMergeQueued(mergeTask.getOnGoingMerge(), mergeTask.getEstimateMergeMemoryBytes())); } public boolean allDone() { @@ -204,6 +210,7 @@ private void runMergeTask(MergeTask mergeTask) { if (mergeTask.supportsIOThrottling()) { ioThrottledMergeTasksCount.decrementAndGet(); } + mergeEventListeners.forEach(l -> l.onMergeCompleted(mergeTask.getOnGoingMerge())); } } @@ -216,6 +223,7 @@ private void abortMergeTask(MergeTask mergeTask) { if (mergeTask.supportsIOThrottling()) { ioThrottledMergeTasksCount.decrementAndGet(); } + mergeEventListeners.forEach(l -> l.onMergeAborted(mergeTask.getOnGoingMerge())); } } @@ -281,15 +289,8 @@ public boolean usingMaxTargetIORateBytesPerSec() { return MAX_IO_RATE.getBytes() == targetIORateBytesPerSec.get(); } - public void registerMergeEventConsumer(MergeEventConsumer consumer) { - assert this.mergeEventConsumer == null; - this.mergeEventConsumer = consumer; - } - - public interface MergeEventConsumer { - void onMergeQueued(OnGoingMerge merge, long estimateMergeMemoryBytes); - - void onMergeCompleted(OnGoingMerge merge); + public void registerMergeEventListener(MergeEventListener consumer) { + mergeEventListeners.add(consumer); } // exposed for tests diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java index ec44a9f29ae01..d0c2b6a02a6d1 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java @@ -65,12 +65,13 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics private final AtomicLong doneMergeTaskCount = new AtomicLong(); private final CountDownLatch closedWithNoRunningMerges = new CountDownLatch(1); private volatile boolean closed = false; - private final MergeMemoryEstimator mergeMemoryEstimator; + private final MergeMemoryEstimateProvider mergeMemoryEstimateProvider; public ThreadPoolMergeScheduler( ShardId shardId, IndexSettings indexSettings, - ThreadPoolMergeExecutorService threadPoolMergeExecutorService + ThreadPoolMergeExecutorService threadPoolMergeExecutorService, + MergeMemoryEstimateProvider mergeMemoryEstimateProvider ) { this.shardId = shardId; this.config = indexSettings.getMergeSchedulerConfig(); @@ -82,6 +83,7 @@ public ThreadPoolMergeScheduler( : Double.POSITIVE_INFINITY ); this.threadPoolMergeExecutorService = threadPoolMergeExecutorService; + this.mergeMemoryEstimateProvider = mergeMemoryEstimateProvider; } @Override @@ -177,11 +179,13 @@ MergeTask newMergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, Merg // forced merges, as well as merges triggered when closing a shard, always run un-IO-throttled boolean isAutoThrottle = mergeTrigger != MergeTrigger.CLOSING && merge.getStoreMergeInfo().mergeMaxNumSegments() == -1; // IO throttling cannot be toggled for existing merge tasks, only new merge tasks pick up the updated IO throttling setting + long estimateMergeMemoryBytes = mergeMemoryEstimateProvider.estimateMergeMemoryBytes(merge); return new MergeTask( mergeSource, merge, isAutoThrottle && config.isAutoThrottle(), - "Lucene Merge Task #" + submittedMergeTaskCount.incrementAndGet() + " for shard " + shardId + "Lucene Merge Task #" + submittedMergeTaskCount.incrementAndGet() + " for shard " + shardId, + estimateMergeMemoryBytes ); } @@ -313,14 +317,22 @@ class MergeTask implements Runnable { private final OnGoingMerge onGoingMerge; private final MergeRateLimiter rateLimiter; private final boolean supportsIOThrottling; - - MergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, boolean supportsIOThrottling, String name) { + private final long estimateMergeMemoryBytes; + + MergeTask( + MergeSource mergeSource, + MergePolicy.OneMerge merge, + boolean supportsIOThrottling, + String name, + long estimateMergeMemoryBytes + ) { this.name = name; this.mergeStartTimeNS = new AtomicLong(); this.mergeSource = mergeSource; this.onGoingMerge = new OnGoingMerge(merge); this.rateLimiter = new MergeRateLimiter(merge.getMergeProgress()); this.supportsIOThrottling = supportsIOThrottling; + this.estimateMergeMemoryBytes = estimateMergeMemoryBytes; } Schedule schedule() { @@ -451,7 +463,7 @@ long estimatedMergeSize() { } public long getEstimateMergeMemoryBytes() { - return mergeMemoryEstimator.estimateMergeMemoryBytes(onGoingMerge.getMerge()); + return estimateMergeMemoryBytes; } public OnGoingMerge getOnGoingMerge() { diff --git a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java index 8ce1645148337..e422352ea0cdc 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java @@ -660,6 +660,8 @@ public void testMergeTasksExecuteInSizeOrder() { } } + // todo: testMergeEventListeners + static ThreadPoolMergeExecutorService getThreadPoolMergeExecutorService(ThreadPool threadPool) { ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorService .maybeCreateThreadPoolMergeExecutorService( diff --git a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java index d407e865efbaf..6d39b72a107fe 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java @@ -61,7 +61,8 @@ public void testMergesExecuteInSizeOrder() throws IOException { ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( new ShardId("index", "_na_", 1), IndexSettingsModule.newIndexSettings("index", Settings.EMPTY), - threadPoolMergeExecutorService + threadPoolMergeExecutorService, + merge -> 0 ) ) { List executedMergesList = new ArrayList<>(); @@ -103,7 +104,8 @@ public void testSimpleMergeTaskBacklogging() { ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( new ShardId("index", "_na_", 1), IndexSettingsModule.newIndexSettings("index", mergeSchedulerSettings), - threadPoolMergeExecutorService + threadPoolMergeExecutorService, + merge -> 0 ); // more merge tasks than merge threads int mergeCount = mergeExecutorThreadCount + randomIntBetween(1, 5); @@ -136,7 +138,8 @@ public void testSimpleMergeTaskReEnqueueingBySize() { ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( new ShardId("index", "_na_", 1), IndexSettingsModule.newIndexSettings("index", mergeSchedulerSettings), - threadPoolMergeExecutorService + threadPoolMergeExecutorService, + merge -> 0 ); // sort backlogged merges by size PriorityQueue backloggedMergeTasks = new PriorityQueue<>(16, Comparator.comparingLong(MergeTask::estimatedMergeSize)); @@ -347,7 +350,8 @@ public void testMergeSourceWithFollowUpMergesRunSequentially() throws Exception ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( new ShardId("index", "_na_", 1), IndexSettingsModule.newIndexSettings("index", settings), - threadPoolMergeExecutorService + threadPoolMergeExecutorService, + merge -> 0 ) ) { MergeSource mergeSource = mock(MergeSource.class); @@ -420,7 +424,8 @@ public void testMergesRunConcurrently() throws Exception { ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( new ShardId("index", "_na_", 1), IndexSettingsModule.newIndexSettings("index", settings), - threadPoolMergeExecutorService + threadPoolMergeExecutorService, + merge -> 0 ) ) { // at least 1 extra merge than there are concurrently allowed @@ -504,7 +509,8 @@ public void testSchedulerCloseWaitsForRunningMerge() throws Exception { ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( new ShardId("index", "_na_", 1), IndexSettingsModule.newIndexSettings("index", settings), - threadPoolMergeExecutorService + threadPoolMergeExecutorService, + merge -> 0 ) ) { CountDownLatch mergeDoneLatch = new CountDownLatch(1); @@ -576,7 +582,8 @@ public void testAutoIOThrottleForMergeTasksWhenSchedulerDisablesIt() throws Exce ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( new ShardId("index", "_na_", 1), indexSettings, - threadPoolMergeExecutorService + threadPoolMergeExecutorService, + merge -> 0 ) ) { threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values())); @@ -605,7 +612,8 @@ public void testAutoIOThrottleForMergeTasks() throws Exception { ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( new ShardId("index", "_na_", 1), indexSettings, - threadPoolMergeExecutorService + threadPoolMergeExecutorService, + merge -> 0 ) ) { threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values())); @@ -621,7 +629,8 @@ public void testAutoIOThrottleForMergeTasks() throws Exception { ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( new ShardId("index", "_na_", 1), indexSettings, - threadPoolMergeExecutorService + threadPoolMergeExecutorService, + merge -> 0 ) ) { // merge submitted upon closing @@ -637,7 +646,8 @@ public void testAutoIOThrottleForMergeTasks() throws Exception { ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( new ShardId("index", "_na_", 1), indexSettings, - threadPoolMergeExecutorService + threadPoolMergeExecutorService, + merge -> 0 ) ) { // merge submitted upon closing @@ -668,7 +678,7 @@ static class TestThreadPoolMergeScheduler extends ThreadPoolMergeScheduler { IndexSettings indexSettings, ThreadPoolMergeExecutorService threadPoolMergeExecutorService ) { - super(shardId, indexSettings, threadPoolMergeExecutorService); + super(shardId, indexSettings, threadPoolMergeExecutorService, merge -> 0); } @Override From 74610adb5edcfb1de4ed5d9e9c8d4b64ca4df551 Mon Sep 17 00:00:00 2001 From: Pooya Salehi Date: Wed, 16 Apr 2025 13:18:40 +0200 Subject: [PATCH 3/9] tests --- .../ThreadPoolMergeExecutorServiceTests.java | 34 ++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java index e422352ea0cdc..696f5d8fdf564 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.MergeTask; import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.Schedule; +import org.elasticsearch.index.merge.OnGoingMerge; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -80,10 +81,14 @@ public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutd .build(); TestThreadPool testThreadPool = new TestThreadPool("test", settings); ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(testThreadPool); + var countingListener = new CountingMergeEventListener(); + threadPoolMergeExecutorService.registerMergeEventListener(countingListener); assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount)); Semaphore runMergeSemaphore = new Semaphore(0); ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) testThreadPool.executor(ThreadPool.Names.MERGE); AtomicInteger doneMergesCount = new AtomicInteger(0); + AtomicInteger reEnqueuedBackloggedMergesCount = new AtomicInteger(); + AtomicInteger abortedMergesCount = new AtomicInteger(); // submit more merge tasks than there are threads so that some are enqueued for (int i = 0; i < mergesToSubmit; i++) { MergeTask mergeTask = mock(MergeTask.class); @@ -95,6 +100,7 @@ public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutd if (schedule == BACKLOG) { // reenqueue backlogged merge task new Thread(() -> threadPoolMergeExecutorService.reEnqueueBackloggedMergeTask(mergeTask)).start(); + reEnqueuedBackloggedMergesCount.incrementAndGet(); } return schedule; }).when(mergeTask).schedule(); @@ -114,6 +120,7 @@ public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutd } runMergeSemaphore.acquireUninterruptibly(); doneMergesCount.incrementAndGet(); + abortedMergesCount.incrementAndGet(); return null; }).when(mergeTask).abort(); threadPoolMergeExecutorService.submitMergeTask(mergeTask); @@ -125,6 +132,10 @@ public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutd // with the other merge tasks enqueued assertThat(threadPoolExecutor.getQueue().size(), is(mergesToSubmit - mergeExecutorThreadCount)); }); + assertThat( + countingListener.queued.get(), + equalTo(threadPoolExecutor.getActiveCount() + threadPoolExecutor.getQueue().size() + reEnqueuedBackloggedMergesCount.get()) + ); // shutdown prevents new merge tasks to be enqueued but existing ones should be allowed to continue testThreadPool.shutdown(); // assert all executors, except the merge one, are terminated @@ -165,6 +176,8 @@ public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutd assertTrue(threadPoolExecutor.isTerminated()); assertTrue(threadPoolMergeExecutorService.allDone()); }); + assertThat(countingListener.aborted.get() + countingListener.completed.get(), equalTo(doneMergesCount.get())); + assertThat(countingListener.aborted.get(), equalTo(abortedMergesCount.get())); } public void testTargetIORateChangesWhenSubmittingMergeTasks() throws Exception { @@ -660,7 +673,26 @@ public void testMergeTasksExecuteInSizeOrder() { } } - // todo: testMergeEventListeners + private static class CountingMergeEventListener implements MergeEventListener { + AtomicInteger queued = new AtomicInteger(); + AtomicInteger aborted = new AtomicInteger(); + AtomicInteger completed = new AtomicInteger(); + + @Override + public void onMergeQueued(OnGoingMerge merge, long estimateMergeMemoryBytes) { + queued.incrementAndGet(); + } + + @Override + public void onMergeCompleted(OnGoingMerge merge) { + completed.incrementAndGet(); + } + + @Override + public void onMergeAborted(OnGoingMerge merge) { + aborted.incrementAndGet(); + } + } static ThreadPoolMergeExecutorService getThreadPoolMergeExecutorService(ThreadPool threadPool) { ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorService From 8fbb8b5104168f7365870f49706400356fccbe69 Mon Sep 17 00:00:00 2001 From: Pooya Salehi Date: Thu, 17 Apr 2025 14:18:47 +0200 Subject: [PATCH 4/9] tiny change --- .../index/engine/ThreadPoolMergeExecutorService.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java index 590c8151a3f7b..a56b99088820e 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java @@ -141,8 +141,9 @@ void reEnqueueBackloggedMergeTask(MergeTask mergeTask) { } private void enqueueMergeTask(MergeTask mergeTask) { - queuedMergeTasks.add(mergeTask); - mergeEventListeners.forEach(l -> l.onMergeQueued(mergeTask.getOnGoingMerge(), mergeTask.getEstimateMergeMemoryBytes())); + if (queuedMergeTasks.add(mergeTask)) { + mergeEventListeners.forEach(l -> l.onMergeQueued(mergeTask.getOnGoingMerge(), mergeTask.getEstimateMergeMemoryBytes())); + } } public boolean allDone() { From f86c86b22be89d19d4438f37d1b18a21e77e8241 Mon Sep 17 00:00:00 2001 From: Pooya Salehi Date: Tue, 22 Apr 2025 11:12:57 +0200 Subject: [PATCH 5/9] comment --- .../org/elasticsearch/index/engine/MergeEventListener.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/index/engine/MergeEventListener.java b/server/src/main/java/org/elasticsearch/index/engine/MergeEventListener.java index f78f7a65a2111..f029820535dc1 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/MergeEventListener.java +++ b/server/src/main/java/org/elasticsearch/index/engine/MergeEventListener.java @@ -13,6 +13,11 @@ public interface MergeEventListener { + /** + * + * @param merge + * @param estimateMergeMemoryBytes estimate of the memory needed to perform a merge + */ void onMergeQueued(OnGoingMerge merge, long estimateMergeMemoryBytes); void onMergeCompleted(OnGoingMerge merge); From 3724a21cd110f6f90eec7fc72383a83490184d50 Mon Sep 17 00:00:00 2001 From: Pooya Salehi Date: Wed, 23 Apr 2025 10:58:09 +0200 Subject: [PATCH 6/9] Ensure merge complete/abort listeners are not called before merge queued listeners --- .../ThreadPoolMergeExecutorService.java | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java index a56b99088820e..032b096b69968 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java @@ -76,6 +76,12 @@ public class ThreadPoolMergeExecutorService { private final int concurrentMergesCeilLimitForThrottling; private final List mergeEventListeners = new CopyOnWriteArrayList<>(); + /** + * To ensure that for a given merge {@link org.elasticsearch.index.engine.MergeEventListener#onMergeAborted} or + * {@link org.elasticsearch.index.engine.MergeEventListener#onMergeCompleted} is not called before + * {@link org.elasticsearch.index.engine.MergeEventListener#onMergeQueued}. + */ + private final Object mergeEventsMutex = new Object(); public static @Nullable ThreadPoolMergeExecutorService maybeCreateThreadPoolMergeExecutorService( ThreadPool threadPool, @@ -141,8 +147,10 @@ void reEnqueueBackloggedMergeTask(MergeTask mergeTask) { } private void enqueueMergeTask(MergeTask mergeTask) { - if (queuedMergeTasks.add(mergeTask)) { - mergeEventListeners.forEach(l -> l.onMergeQueued(mergeTask.getOnGoingMerge(), mergeTask.getEstimateMergeMemoryBytes())); + synchronized (mergeEventsMutex) { + if (queuedMergeTasks.add(mergeTask)) { + mergeEventListeners.forEach(l -> l.onMergeQueued(mergeTask.getOnGoingMerge(), mergeTask.getEstimateMergeMemoryBytes())); + } } } @@ -211,7 +219,9 @@ private void runMergeTask(MergeTask mergeTask) { if (mergeTask.supportsIOThrottling()) { ioThrottledMergeTasksCount.decrementAndGet(); } - mergeEventListeners.forEach(l -> l.onMergeCompleted(mergeTask.getOnGoingMerge())); + synchronized (mergeEventsMutex) { + mergeEventListeners.forEach(l -> l.onMergeCompleted(mergeTask.getOnGoingMerge())); + } } } @@ -224,7 +234,9 @@ private void abortMergeTask(MergeTask mergeTask) { if (mergeTask.supportsIOThrottling()) { ioThrottledMergeTasksCount.decrementAndGet(); } - mergeEventListeners.forEach(l -> l.onMergeAborted(mergeTask.getOnGoingMerge())); + synchronized (mergeEventsMutex) { + mergeEventListeners.forEach(l -> l.onMergeAborted(mergeTask.getOnGoingMerge())); + } } } From 74f3b6b1312ca1583efdf912aafbd81b680c38f6 Mon Sep 17 00:00:00 2001 From: Pooya Salehi Date: Wed, 23 Apr 2025 11:02:58 +0200 Subject: [PATCH 7/9] rename --- .../index/engine/ThreadPoolMergeExecutorService.java | 2 +- .../index/engine/ThreadPoolMergeScheduler.java | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java index 032b096b69968..6f1a1b876ce31 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java @@ -149,7 +149,7 @@ void reEnqueueBackloggedMergeTask(MergeTask mergeTask) { private void enqueueMergeTask(MergeTask mergeTask) { synchronized (mergeEventsMutex) { if (queuedMergeTasks.add(mergeTask)) { - mergeEventListeners.forEach(l -> l.onMergeQueued(mergeTask.getOnGoingMerge(), mergeTask.getEstimateMergeMemoryBytes())); + mergeEventListeners.forEach(l -> l.onMergeQueued(mergeTask.getOnGoingMerge(), mergeTask.getMergeMemoryEstimateBytes())); } } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java index d0c2b6a02a6d1..e54c8164c6ab5 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java @@ -317,14 +317,14 @@ class MergeTask implements Runnable { private final OnGoingMerge onGoingMerge; private final MergeRateLimiter rateLimiter; private final boolean supportsIOThrottling; - private final long estimateMergeMemoryBytes; + private final long mergeMemoryEstimateBytes; MergeTask( MergeSource mergeSource, MergePolicy.OneMerge merge, boolean supportsIOThrottling, String name, - long estimateMergeMemoryBytes + long mergeMemoryEstimateBytes ) { this.name = name; this.mergeStartTimeNS = new AtomicLong(); @@ -332,7 +332,7 @@ class MergeTask implements Runnable { this.onGoingMerge = new OnGoingMerge(merge); this.rateLimiter = new MergeRateLimiter(merge.getMergeProgress()); this.supportsIOThrottling = supportsIOThrottling; - this.estimateMergeMemoryBytes = estimateMergeMemoryBytes; + this.mergeMemoryEstimateBytes = mergeMemoryEstimateBytes; } Schedule schedule() { @@ -462,8 +462,8 @@ long estimatedMergeSize() { return onGoingMerge.getMerge().getStoreMergeInfo().estimatedMergeBytes(); } - public long getEstimateMergeMemoryBytes() { - return estimateMergeMemoryBytes; + public long getMergeMemoryEstimateBytes() { + return mergeMemoryEstimateBytes; } public OnGoingMerge getOnGoingMerge() { From 5395c0a0a7a790f02465326a7f742a0fd19e2ac7 Mon Sep 17 00:00:00 2001 From: Pooya Salehi Date: Wed, 23 Apr 2025 11:48:57 +0200 Subject: [PATCH 8/9] fix test --- .../index/engine/ThreadPoolMergeExecutorServiceTests.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java index 696f5d8fdf564..bcc5250ea098d 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java @@ -132,9 +132,11 @@ public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutd // with the other merge tasks enqueued assertThat(threadPoolExecutor.getQueue().size(), is(mergesToSubmit - mergeExecutorThreadCount)); }); - assertThat( - countingListener.queued.get(), - equalTo(threadPoolExecutor.getActiveCount() + threadPoolExecutor.getQueue().size() + reEnqueuedBackloggedMergesCount.get()) + assertBusy( + () -> assertThat( + countingListener.queued.get(), + equalTo(threadPoolExecutor.getActiveCount() + threadPoolExecutor.getQueue().size() + reEnqueuedBackloggedMergesCount.get()) + ) ); // shutdown prevents new merge tasks to be enqueued but existing ones should be allowed to continue testThreadPool.shutdown(); From ba7ebb55d1931834b7f21a7219c69f6678835298 Mon Sep 17 00:00:00 2001 From: Pooya Salehi Date: Wed, 23 Apr 2025 17:32:38 +0200 Subject: [PATCH 9/9] call listener before adding to queue --- .../ThreadPoolMergeExecutorService.java | 24 ++++++------------- 1 file changed, 7 insertions(+), 17 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java index 6f1a1b876ce31..d3dff8c30449a 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java @@ -76,12 +76,6 @@ public class ThreadPoolMergeExecutorService { private final int concurrentMergesCeilLimitForThrottling; private final List mergeEventListeners = new CopyOnWriteArrayList<>(); - /** - * To ensure that for a given merge {@link org.elasticsearch.index.engine.MergeEventListener#onMergeAborted} or - * {@link org.elasticsearch.index.engine.MergeEventListener#onMergeCompleted} is not called before - * {@link org.elasticsearch.index.engine.MergeEventListener#onMergeQueued}. - */ - private final Object mergeEventsMutex = new Object(); public static @Nullable ThreadPoolMergeExecutorService maybeCreateThreadPoolMergeExecutorService( ThreadPool threadPool, @@ -147,11 +141,11 @@ void reEnqueueBackloggedMergeTask(MergeTask mergeTask) { } private void enqueueMergeTask(MergeTask mergeTask) { - synchronized (mergeEventsMutex) { - if (queuedMergeTasks.add(mergeTask)) { - mergeEventListeners.forEach(l -> l.onMergeQueued(mergeTask.getOnGoingMerge(), mergeTask.getMergeMemoryEstimateBytes())); - } - } + // To ensure that for a given merge onMergeQueued is called before onMergeAborted or onMergeCompleted, we call onMergeQueued + // before adding the merge task to the queue. Adding to the queue should not fail. + mergeEventListeners.forEach(l -> l.onMergeQueued(mergeTask.getOnGoingMerge(), mergeTask.getMergeMemoryEstimateBytes())); + boolean added = queuedMergeTasks.add(mergeTask); + assert added; } public boolean allDone() { @@ -219,9 +213,7 @@ private void runMergeTask(MergeTask mergeTask) { if (mergeTask.supportsIOThrottling()) { ioThrottledMergeTasksCount.decrementAndGet(); } - synchronized (mergeEventsMutex) { - mergeEventListeners.forEach(l -> l.onMergeCompleted(mergeTask.getOnGoingMerge())); - } + mergeEventListeners.forEach(l -> l.onMergeCompleted(mergeTask.getOnGoingMerge())); } } @@ -234,9 +226,7 @@ private void abortMergeTask(MergeTask mergeTask) { if (mergeTask.supportsIOThrottling()) { ioThrottledMergeTasksCount.decrementAndGet(); } - synchronized (mergeEventsMutex) { - mergeEventListeners.forEach(l -> l.onMergeAborted(mergeTask.getOnGoingMerge())); - } + mergeEventListeners.forEach(l -> l.onMergeAborted(mergeTask.getOnGoingMerge())); } }