Skip to content

Commit f599fe3

Browse files
authored
Expose merge events and their memory usage estimate (#126667)
Relates ES-10961
1 parent a684e10 commit f599fe3

File tree

7 files changed

+150
-18
lines changed

7 files changed

+150
-18
lines changed

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2909,7 +2909,7 @@ private final class EngineThreadPoolMergeScheduler extends ThreadPoolMergeSchedu
29092909
IndexSettings indexSettings,
29102910
ThreadPoolMergeExecutorService threadPoolMergeExecutorService
29112911
) {
2912-
super(shardId, indexSettings, threadPoolMergeExecutorService);
2912+
super(shardId, indexSettings, threadPoolMergeExecutorService, InternalEngine.this::estimateMergeBytes);
29132913
}
29142914

29152915
@Override
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.engine;
11+
12+
import org.elasticsearch.index.merge.OnGoingMerge;
13+
14+
public interface MergeEventListener {
15+
16+
/**
17+
*
18+
* @param merge
19+
* @param estimateMergeMemoryBytes estimate of the memory needed to perform a merge
20+
*/
21+
void onMergeQueued(OnGoingMerge merge, long estimateMergeMemoryBytes);
22+
23+
void onMergeCompleted(OnGoingMerge merge);
24+
25+
void onMergeAborted(OnGoingMerge merge);
26+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.engine;
11+
12+
import org.apache.lucene.index.MergePolicy;
13+
14+
@FunctionalInterface
15+
public interface MergeMemoryEstimateProvider {
16+
17+
/**
18+
* Returns an estimate of the memory needed to perform a merge
19+
*/
20+
long estimateMergeMemoryBytes(MergePolicy.OneMerge merge);
21+
}

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
import org.elasticsearch.threadpool.ThreadPool;
1818

1919
import java.util.Comparator;
20+
import java.util.List;
2021
import java.util.Set;
22+
import java.util.concurrent.CopyOnWriteArrayList;
2123
import java.util.concurrent.ExecutorService;
2224
import java.util.concurrent.PriorityBlockingQueue;
2325
import java.util.concurrent.RejectedExecutionException;
@@ -73,6 +75,8 @@ public class ThreadPoolMergeExecutorService {
7375
private final int concurrentMergesFloorLimitForThrottling;
7476
private final int concurrentMergesCeilLimitForThrottling;
7577

78+
private final List<MergeEventListener> mergeEventListeners = new CopyOnWriteArrayList<>();
79+
7680
public static @Nullable ThreadPoolMergeExecutorService maybeCreateThreadPoolMergeExecutorService(
7781
ThreadPool threadPool,
7882
Settings settings
@@ -127,13 +131,21 @@ boolean submitMergeTask(MergeTask mergeTask) {
127131
);
128132
}
129133
// then enqueue the merge task proper
130-
queuedMergeTasks.add(mergeTask);
134+
enqueueMergeTask(mergeTask);
131135
return true;
132136
}
133137
}
134138

135139
void reEnqueueBackloggedMergeTask(MergeTask mergeTask) {
136-
queuedMergeTasks.add(mergeTask);
140+
enqueueMergeTask(mergeTask);
141+
}
142+
143+
private void enqueueMergeTask(MergeTask mergeTask) {
144+
// To ensure that for a given merge onMergeQueued is called before onMergeAborted or onMergeCompleted, we call onMergeQueued
145+
// before adding the merge task to the queue. Adding to the queue should not fail.
146+
mergeEventListeners.forEach(l -> l.onMergeQueued(mergeTask.getOnGoingMerge(), mergeTask.getMergeMemoryEstimateBytes()));
147+
boolean added = queuedMergeTasks.add(mergeTask);
148+
assert added;
137149
}
138150

139151
public boolean allDone() {
@@ -201,6 +213,7 @@ private void runMergeTask(MergeTask mergeTask) {
201213
if (mergeTask.supportsIOThrottling()) {
202214
ioThrottledMergeTasksCount.decrementAndGet();
203215
}
216+
mergeEventListeners.forEach(l -> l.onMergeCompleted(mergeTask.getOnGoingMerge()));
204217
}
205218
}
206219

@@ -213,6 +226,7 @@ private void abortMergeTask(MergeTask mergeTask) {
213226
if (mergeTask.supportsIOThrottling()) {
214227
ioThrottledMergeTasksCount.decrementAndGet();
215228
}
229+
mergeEventListeners.forEach(l -> l.onMergeAborted(mergeTask.getOnGoingMerge()));
216230
}
217231
}
218232

@@ -278,6 +292,10 @@ public boolean usingMaxTargetIORateBytesPerSec() {
278292
return MAX_IO_RATE.getBytes() == targetIORateBytesPerSec.get();
279293
}
280294

295+
public void registerMergeEventListener(MergeEventListener consumer) {
296+
mergeEventListeners.add(consumer);
297+
}
298+
281299
// exposed for tests
282300
Set<MergeTask> getRunningMergeTasks() {
283301
return runningMergeTasks;

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,13 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
6565
private final AtomicLong doneMergeTaskCount = new AtomicLong();
6666
private final CountDownLatch closedWithNoRunningMerges = new CountDownLatch(1);
6767
private volatile boolean closed = false;
68+
private final MergeMemoryEstimateProvider mergeMemoryEstimateProvider;
6869

6970
public ThreadPoolMergeScheduler(
7071
ShardId shardId,
7172
IndexSettings indexSettings,
72-
ThreadPoolMergeExecutorService threadPoolMergeExecutorService
73+
ThreadPoolMergeExecutorService threadPoolMergeExecutorService,
74+
MergeMemoryEstimateProvider mergeMemoryEstimateProvider
7375
) {
7476
this.shardId = shardId;
7577
this.config = indexSettings.getMergeSchedulerConfig();
@@ -81,6 +83,7 @@ public ThreadPoolMergeScheduler(
8183
: Double.POSITIVE_INFINITY
8284
);
8385
this.threadPoolMergeExecutorService = threadPoolMergeExecutorService;
86+
this.mergeMemoryEstimateProvider = mergeMemoryEstimateProvider;
8487
}
8588

8689
@Override
@@ -176,11 +179,13 @@ MergeTask newMergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, Merg
176179
// forced merges, as well as merges triggered when closing a shard, always run un-IO-throttled
177180
boolean isAutoThrottle = mergeTrigger != MergeTrigger.CLOSING && merge.getStoreMergeInfo().mergeMaxNumSegments() == -1;
178181
// IO throttling cannot be toggled for existing merge tasks, only new merge tasks pick up the updated IO throttling setting
182+
long estimateMergeMemoryBytes = mergeMemoryEstimateProvider.estimateMergeMemoryBytes(merge);
179183
return new MergeTask(
180184
mergeSource,
181185
merge,
182186
isAutoThrottle && config.isAutoThrottle(),
183-
"Lucene Merge Task #" + submittedMergeTaskCount.incrementAndGet() + " for shard " + shardId
187+
"Lucene Merge Task #" + submittedMergeTaskCount.incrementAndGet() + " for shard " + shardId,
188+
estimateMergeMemoryBytes
184189
);
185190
}
186191

@@ -312,14 +317,22 @@ class MergeTask implements Runnable {
312317
private final OnGoingMerge onGoingMerge;
313318
private final MergeRateLimiter rateLimiter;
314319
private final boolean supportsIOThrottling;
315-
316-
MergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, boolean supportsIOThrottling, String name) {
320+
private final long mergeMemoryEstimateBytes;
321+
322+
MergeTask(
323+
MergeSource mergeSource,
324+
MergePolicy.OneMerge merge,
325+
boolean supportsIOThrottling,
326+
String name,
327+
long mergeMemoryEstimateBytes
328+
) {
317329
this.name = name;
318330
this.mergeStartTimeNS = new AtomicLong();
319331
this.mergeSource = mergeSource;
320332
this.onGoingMerge = new OnGoingMerge(merge);
321333
this.rateLimiter = new MergeRateLimiter(merge.getMergeProgress());
322334
this.supportsIOThrottling = supportsIOThrottling;
335+
this.mergeMemoryEstimateBytes = mergeMemoryEstimateBytes;
323336
}
324337

325338
Schedule schedule() {
@@ -449,6 +462,14 @@ long estimatedMergeSize() {
449462
return onGoingMerge.getMerge().getStoreMergeInfo().estimatedMergeBytes();
450463
}
451464

465+
public long getMergeMemoryEstimateBytes() {
466+
return mergeMemoryEstimateBytes;
467+
}
468+
469+
public OnGoingMerge getOnGoingMerge() {
470+
return onGoingMerge;
471+
}
472+
452473
@Override
453474
public String toString() {
454475
return name + (onGoingMerge.getMerge().isAborted() ? " (aborted)" : "");

server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.common.util.concurrent.EsExecutors;
1616
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.MergeTask;
1717
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.Schedule;
18+
import org.elasticsearch.index.merge.OnGoingMerge;
1819
import org.elasticsearch.test.ESTestCase;
1920
import org.elasticsearch.threadpool.TestThreadPool;
2021
import org.elasticsearch.threadpool.ThreadPool;
@@ -80,10 +81,14 @@ public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutd
8081
.build();
8182
TestThreadPool testThreadPool = new TestThreadPool("test", settings);
8283
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(testThreadPool);
84+
var countingListener = new CountingMergeEventListener();
85+
threadPoolMergeExecutorService.registerMergeEventListener(countingListener);
8386
assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount));
8487
Semaphore runMergeSemaphore = new Semaphore(0);
8588
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) testThreadPool.executor(ThreadPool.Names.MERGE);
8689
AtomicInteger doneMergesCount = new AtomicInteger(0);
90+
AtomicInteger reEnqueuedBackloggedMergesCount = new AtomicInteger();
91+
AtomicInteger abortedMergesCount = new AtomicInteger();
8792
// submit more merge tasks than there are threads so that some are enqueued
8893
for (int i = 0; i < mergesToSubmit; i++) {
8994
MergeTask mergeTask = mock(MergeTask.class);
@@ -95,6 +100,7 @@ public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutd
95100
if (schedule == BACKLOG) {
96101
// reenqueue backlogged merge task
97102
new Thread(() -> threadPoolMergeExecutorService.reEnqueueBackloggedMergeTask(mergeTask)).start();
103+
reEnqueuedBackloggedMergesCount.incrementAndGet();
98104
}
99105
return schedule;
100106
}).when(mergeTask).schedule();
@@ -114,6 +120,7 @@ public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutd
114120
}
115121
runMergeSemaphore.acquireUninterruptibly();
116122
doneMergesCount.incrementAndGet();
123+
abortedMergesCount.incrementAndGet();
117124
return null;
118125
}).when(mergeTask).abort();
119126
threadPoolMergeExecutorService.submitMergeTask(mergeTask);
@@ -125,6 +132,12 @@ public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutd
125132
// with the other merge tasks enqueued
126133
assertThat(threadPoolExecutor.getQueue().size(), is(mergesToSubmit - mergeExecutorThreadCount));
127134
});
135+
assertBusy(
136+
() -> assertThat(
137+
countingListener.queued.get(),
138+
equalTo(threadPoolExecutor.getActiveCount() + threadPoolExecutor.getQueue().size() + reEnqueuedBackloggedMergesCount.get())
139+
)
140+
);
128141
// shutdown prevents new merge tasks to be enqueued but existing ones should be allowed to continue
129142
testThreadPool.shutdown();
130143
// assert all executors, except the merge one, are terminated
@@ -165,6 +178,8 @@ public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutd
165178
assertTrue(threadPoolExecutor.isTerminated());
166179
assertTrue(threadPoolMergeExecutorService.allDone());
167180
});
181+
assertThat(countingListener.aborted.get() + countingListener.completed.get(), equalTo(doneMergesCount.get()));
182+
assertThat(countingListener.aborted.get(), equalTo(abortedMergesCount.get()));
168183
}
169184

170185
public void testTargetIORateChangesWhenSubmittingMergeTasks() throws Exception {
@@ -660,6 +675,27 @@ public void testMergeTasksExecuteInSizeOrder() {
660675
}
661676
}
662677

678+
private static class CountingMergeEventListener implements MergeEventListener {
679+
AtomicInteger queued = new AtomicInteger();
680+
AtomicInteger aborted = new AtomicInteger();
681+
AtomicInteger completed = new AtomicInteger();
682+
683+
@Override
684+
public void onMergeQueued(OnGoingMerge merge, long estimateMergeMemoryBytes) {
685+
queued.incrementAndGet();
686+
}
687+
688+
@Override
689+
public void onMergeCompleted(OnGoingMerge merge) {
690+
completed.incrementAndGet();
691+
}
692+
693+
@Override
694+
public void onMergeAborted(OnGoingMerge merge) {
695+
aborted.incrementAndGet();
696+
}
697+
}
698+
663699
static ThreadPoolMergeExecutorService getThreadPoolMergeExecutorService(ThreadPool threadPool) {
664700
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorService
665701
.maybeCreateThreadPoolMergeExecutorService(

0 commit comments

Comments
 (0)