Skip to content

Commit 771c294

Browse files
pxsalehialbertzaharovits
authored andcommitted
Expose merge events and their memory usage estimate (elastic#126667)
Relates ES-10961
1 parent 55a477a commit 771c294

File tree

7 files changed

+117
-17
lines changed

7 files changed

+117
-17
lines changed

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2870,7 +2870,7 @@ private final class EngineThreadPoolMergeScheduler extends ThreadPoolMergeSchedu
28702870
IndexSettings indexSettings,
28712871
ThreadPoolMergeExecutorService threadPoolMergeExecutorService
28722872
) {
2873-
super(shardId, indexSettings, threadPoolMergeExecutorService);
2873+
super(shardId, indexSettings, threadPoolMergeExecutorService, InternalEngine.this::estimateMergeBytes);
28742874
}
28752875

28762876
@Override
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.engine;
11+
12+
import org.elasticsearch.index.merge.OnGoingMerge;
13+
14+
public interface MergeEventListener {
15+
16+
/**
17+
*
18+
* @param merge
19+
* @param estimateMergeMemoryBytes estimate of the memory needed to perform a merge
20+
*/
21+
void onMergeQueued(OnGoingMerge merge, long estimateMergeMemoryBytes);
22+
23+
void onMergeCompleted(OnGoingMerge merge);
24+
25+
void onMergeAborted(OnGoingMerge merge);
26+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.engine;
11+
12+
import org.apache.lucene.index.MergePolicy;
13+
14+
@FunctionalInterface
15+
public interface MergeMemoryEstimateProvider {
16+
17+
/**
18+
* Returns an estimate of the memory needed to perform a merge
19+
*/
20+
long estimateMergeMemoryBytes(MergePolicy.OneMerge merge);
21+
}

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.Map;
3737
import java.util.PriorityQueue;
3838
import java.util.Set;
39+
import java.util.concurrent.CopyOnWriteArrayList;
3940
import java.util.concurrent.ExecutorService;
4041
import java.util.concurrent.RejectedExecutionException;
4142
import java.util.concurrent.atomic.AtomicInteger;
@@ -199,6 +200,8 @@ public Iterator<Setting<?>> settings() {
199200
private final int concurrentMergesCeilLimitForThrottling;
200201
private final AvailableDiskSpacePeriodicMonitor availableDiskSpacePeriodicMonitor;
201202

203+
private final List<MergeEventListener> mergeEventListeners = new CopyOnWriteArrayList<>();
204+
202205
public static @Nullable ThreadPoolMergeExecutorService maybeCreateThreadPoolMergeExecutorService(
203206
ThreadPool threadPool,
204207
ClusterSettings clusterSettings,
@@ -266,7 +269,7 @@ boolean submitMergeTask(MergeTask mergeTask) {
266269
);
267270
}
268271
// then enqueue the merge task proper
269-
queuedMergeTasks.add(mergeTask);
272+
enqueueMergeTask(mergeTask);
270273
return true;
271274
}
272275
}
@@ -358,6 +361,7 @@ private void runMergeTask(MergeTask mergeTask) {
358361
if (mergeTask.supportsIOThrottling()) {
359362
ioThrottledMergeTasksCount.decrementAndGet();
360363
}
364+
mergeEventListeners.forEach(l -> l.onMergeCompleted(mergeTask.getOnGoingMerge()));
361365
}
362366
}
363367

@@ -370,6 +374,7 @@ private void abortMergeTask(MergeTask mergeTask) {
370374
if (mergeTask.supportsIOThrottling()) {
371375
ioThrottledMergeTasksCount.decrementAndGet();
372376
}
377+
mergeEventListeners.forEach(l -> l.onMergeAborted(mergeTask.getOnGoingMerge()));
373378
}
374379
}
375380

@@ -760,6 +765,10 @@ public boolean usingMaxTargetIORateBytesPerSec() {
760765
return MAX_IO_RATE.getBytes() == targetIORateBytesPerSec.get();
761766
}
762767

768+
public void registerMergeEventListener(MergeEventListener consumer) {
769+
mergeEventListeners.add(consumer);
770+
}
771+
763772
// exposed for tests
764773
Set<MergeTask> getRunningMergeTasks() {
765774
return runningMergeTasks;

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,13 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
6565
private final AtomicLong doneMergeTaskCount = new AtomicLong();
6666
private final CountDownLatch closedWithNoRunningMerges = new CountDownLatch(1);
6767
private volatile boolean closed = false;
68+
private final MergeMemoryEstimateProvider mergeMemoryEstimateProvider;
6869

6970
public ThreadPoolMergeScheduler(
7071
ShardId shardId,
7172
IndexSettings indexSettings,
72-
ThreadPoolMergeExecutorService threadPoolMergeExecutorService
73+
ThreadPoolMergeExecutorService threadPoolMergeExecutorService,
74+
MergeMemoryEstimateProvider mergeMemoryEstimateProvider
7375
) {
7476
this.shardId = shardId;
7577
this.config = indexSettings.getMergeSchedulerConfig();
@@ -81,6 +83,7 @@ public ThreadPoolMergeScheduler(
8183
: Double.POSITIVE_INFINITY
8284
);
8385
this.threadPoolMergeExecutorService = threadPoolMergeExecutorService;
86+
this.mergeMemoryEstimateProvider = mergeMemoryEstimateProvider;
8487
}
8588

8689
@Override
@@ -176,11 +179,13 @@ MergeTask newMergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, Merg
176179
// forced merges, as well as merges triggered when closing a shard, always run un-IO-throttled
177180
boolean isAutoThrottle = mergeTrigger != MergeTrigger.CLOSING && merge.getStoreMergeInfo().mergeMaxNumSegments() == -1;
178181
// IO throttling cannot be toggled for existing merge tasks, only new merge tasks pick up the updated IO throttling setting
182+
long estimateMergeMemoryBytes = mergeMemoryEstimateProvider.estimateMergeMemoryBytes(merge);
179183
return new MergeTask(
180184
mergeSource,
181185
merge,
182186
isAutoThrottle && config.isAutoThrottle(),
183-
"Lucene Merge Task #" + submittedMergeTaskCount.incrementAndGet() + " for shard " + shardId
187+
"Lucene Merge Task #" + submittedMergeTaskCount.incrementAndGet() + " for shard " + shardId,
188+
estimateMergeMemoryBytes
184189
);
185190
}
186191

@@ -313,14 +318,22 @@ class MergeTask implements Runnable {
313318
private final OnGoingMerge onGoingMerge;
314319
private final MergeRateLimiter rateLimiter;
315320
private final boolean supportsIOThrottling;
316-
317-
MergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, boolean supportsIOThrottling, String name) {
321+
private final long mergeMemoryEstimateBytes;
322+
323+
MergeTask(
324+
MergeSource mergeSource,
325+
MergePolicy.OneMerge merge,
326+
boolean supportsIOThrottling,
327+
String name,
328+
long mergeMemoryEstimateBytes
329+
) {
318330
this.name = name;
319331
this.mergeStartTimeNS = new AtomicLong();
320332
this.mergeSource = mergeSource;
321333
this.onGoingMerge = new OnGoingMerge(merge);
322334
this.rateLimiter = new MergeRateLimiter(merge.getMergeProgress());
323335
this.supportsIOThrottling = supportsIOThrottling;
336+
this.mergeMemoryEstimateBytes = mergeMemoryEstimateBytes;
324337
}
325338

326339
Schedule schedule() {
@@ -463,6 +476,14 @@ long estimatedRemainingMergeSize() {
463476
return Math.max(0L, estimatedMergeSize - rateLimiter.getTotalBytesWritten());
464477
}
465478

479+
public long getMergeMemoryEstimateBytes() {
480+
return mergeMemoryEstimateBytes;
481+
}
482+
483+
public OnGoingMerge getOnGoingMerge() {
484+
return onGoingMerge;
485+
}
486+
466487
@Override
467488
public String toString() {
468489
return name + (onGoingMerge.getMerge().isAborted() ? " (aborted)" : "");

server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService.PriorityBlockingQueueWithBudget;
2020
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.MergeTask;
2121
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.Schedule;
22+
import org.elasticsearch.index.merge.OnGoingMerge;
2223
import org.elasticsearch.test.ESTestCase;
2324
import org.elasticsearch.threadpool.TestThreadPool;
2425
import org.elasticsearch.threadpool.ThreadPool;
@@ -116,6 +117,8 @@ public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutd
116117
Semaphore runMergeSemaphore = new Semaphore(0);
117118
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) testThreadPool.executor(ThreadPool.Names.MERGE);
118119
AtomicInteger doneMergesCount = new AtomicInteger(0);
120+
AtomicInteger reEnqueuedBackloggedMergesCount = new AtomicInteger();
121+
AtomicInteger abortedMergesCount = new AtomicInteger();
119122
// submit more merge tasks than there are threads so that some are enqueued
120123
for (int i = 0; i < mergesToSubmit; i++) {
121124
MergeTask mergeTask = mock(MergeTask.class);
@@ -127,6 +130,7 @@ public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutd
127130
if (schedule == BACKLOG) {
128131
// reenqueue backlogged merge task
129132
new Thread(() -> threadPoolMergeExecutorService.reEnqueueBackloggedMergeTask(mergeTask)).start();
133+
reEnqueuedBackloggedMergesCount.incrementAndGet();
130134
}
131135
return schedule;
132136
}).when(mergeTask).schedule();
@@ -146,6 +150,7 @@ public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutd
146150
}
147151
runMergeSemaphore.acquireUninterruptibly();
148152
doneMergesCount.incrementAndGet();
153+
abortedMergesCount.incrementAndGet();
149154
return null;
150155
}).when(mergeTask).abort();
151156
threadPoolMergeExecutorService.submitMergeTask(mergeTask);
@@ -157,6 +162,12 @@ public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutd
157162
// with the other merge tasks enqueued
158163
assertThat(threadPoolExecutor.getQueue().size(), is(mergesToSubmit - mergeExecutorThreadCount));
159164
});
165+
assertBusy(
166+
() -> assertThat(
167+
countingListener.queued.get(),
168+
equalTo(threadPoolExecutor.getActiveCount() + threadPoolExecutor.getQueue().size() + reEnqueuedBackloggedMergesCount.get())
169+
)
170+
);
160171
// shutdown prevents new merge tasks to be enqueued but existing ones should be allowed to continue
161172
testThreadPool.shutdown();
162173
// assert all executors, except the merge one, are terminated
@@ -197,6 +208,8 @@ public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutd
197208
assertTrue(threadPoolExecutor.isTerminated());
198209
assertTrue(threadPoolMergeExecutorService.allDone());
199210
});
211+
assertThat(countingListener.aborted.get() + countingListener.completed.get(), equalTo(doneMergesCount.get()));
212+
assertThat(countingListener.aborted.get(), equalTo(abortedMergesCount.get()));
200213
}
201214

202215
public void testTargetIORateChangesWhenSubmittingMergeTasks() throws Exception {

server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ public void testMergesExecuteInSizeOrder() throws IOException {
7878
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
7979
new ShardId("index", "_na_", 1),
8080
IndexSettingsModule.newIndexSettings("index", Settings.EMPTY),
81-
threadPoolMergeExecutorService
81+
threadPoolMergeExecutorService,
82+
merge -> 0
8283
)
8384
) {
8485
List<OneMerge> executedMergesList = new ArrayList<>();
@@ -120,7 +121,8 @@ public void testSimpleMergeTaskBacklogging() {
120121
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
121122
new ShardId("index", "_na_", 1),
122123
IndexSettingsModule.newIndexSettings("index", mergeSchedulerSettings),
123-
threadPoolMergeExecutorService
124+
threadPoolMergeExecutorService,
125+
merge -> 0
124126
);
125127
// more merge tasks than merge threads
126128
int mergeCount = mergeExecutorThreadCount + randomIntBetween(1, 5);
@@ -153,7 +155,8 @@ public void testSimpleMergeTaskReEnqueueingBySize() {
153155
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
154156
new ShardId("index", "_na_", 1),
155157
IndexSettingsModule.newIndexSettings("index", mergeSchedulerSettings),
156-
threadPoolMergeExecutorService
158+
threadPoolMergeExecutorService,
159+
merge -> 0
157160
);
158161
// sort backlogged merges by size
159162
PriorityQueue<MergeTask> backloggedMergeTasks = new PriorityQueue<>(
@@ -370,7 +373,8 @@ public void testMergeSourceWithFollowUpMergesRunSequentially() throws Exception
370373
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
371374
new ShardId("index", "_na_", 1),
372375
IndexSettingsModule.newIndexSettings("index", settings),
373-
threadPoolMergeExecutorService
376+
threadPoolMergeExecutorService,
377+
merge -> 0
374378
)
375379
) {
376380
MergeSource mergeSource = mock(MergeSource.class);
@@ -446,7 +450,8 @@ public void testMergesRunConcurrently() throws Exception {
446450
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
447451
new ShardId("index", "_na_", 1),
448452
IndexSettingsModule.newIndexSettings("index", settings),
449-
threadPoolMergeExecutorService
453+
threadPoolMergeExecutorService,
454+
merge -> 0
450455
)
451456
) {
452457
// at least 1 extra merge than there are concurrently allowed
@@ -533,7 +538,8 @@ public void testSchedulerCloseWaitsForRunningMerge() throws Exception {
533538
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
534539
new ShardId("index", "_na_", 1),
535540
IndexSettingsModule.newIndexSettings("index", settings),
536-
threadPoolMergeExecutorService
541+
threadPoolMergeExecutorService,
542+
merge -> 0
537543
)
538544
) {
539545
CountDownLatch mergeDoneLatch = new CountDownLatch(1);
@@ -605,7 +611,8 @@ public void testAutoIOThrottleForMergeTasksWhenSchedulerDisablesIt() throws Exce
605611
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
606612
new ShardId("index", "_na_", 1),
607613
indexSettings,
608-
threadPoolMergeExecutorService
614+
threadPoolMergeExecutorService,
615+
merge -> 0
609616
)
610617
) {
611618
threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values()));
@@ -634,7 +641,8 @@ public void testAutoIOThrottleForMergeTasks() throws Exception {
634641
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
635642
new ShardId("index", "_na_", 1),
636643
indexSettings,
637-
threadPoolMergeExecutorService
644+
threadPoolMergeExecutorService,
645+
merge -> 0
638646
)
639647
) {
640648
threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values()));
@@ -650,7 +658,8 @@ public void testAutoIOThrottleForMergeTasks() throws Exception {
650658
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
651659
new ShardId("index", "_na_", 1),
652660
indexSettings,
653-
threadPoolMergeExecutorService
661+
threadPoolMergeExecutorService,
662+
merge -> 0
654663
)
655664
) {
656665
// merge submitted upon closing
@@ -666,7 +675,8 @@ public void testAutoIOThrottleForMergeTasks() throws Exception {
666675
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
667676
new ShardId("index", "_na_", 1),
668677
indexSettings,
669-
threadPoolMergeExecutorService
678+
threadPoolMergeExecutorService,
679+
merge -> 0
670680
)
671681
) {
672682
// merge submitted upon closing
@@ -697,7 +707,7 @@ static class TestThreadPoolMergeScheduler extends ThreadPoolMergeScheduler {
697707
IndexSettings indexSettings,
698708
ThreadPoolMergeExecutorService threadPoolMergeExecutorService
699709
) {
700-
super(shardId, indexSettings, threadPoolMergeExecutorService);
710+
super(shardId, indexSettings, threadPoolMergeExecutorService, merge -> 0);
701711
}
702712

703713
@Override

0 commit comments

Comments
 (0)