Skip to content

Commit 60a71b8

Browse files
WIP
1 parent 57c2a5c commit 60a71b8

File tree

14 files changed

+249
-141
lines changed

14 files changed

+249
-141
lines changed

server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -625,6 +625,7 @@ public static final IndexShard newIndexShard(
625625
indexService.getIndexEventListener(),
626626
wrapper,
627627
indexService.getThreadPool(),
628+
indexService.getThreadPoolMergeExecutor(),
628629
indexService.getBigArrays(),
629630
null,
630631
Collections.emptyList(),

server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) {
5959
return new EngineConfig(
6060
config.getShardId(),
6161
config.getThreadPool(),
62+
config.getThreadPoolMergeExecutor(),
6263
indexSettings,
6364
config.getWarmer(),
6465
config.getStore(),

server/src/main/java/org/elasticsearch/index/IndexService.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@
4949
import org.elasticsearch.index.cache.query.QueryCache;
5050
import org.elasticsearch.index.engine.Engine;
5151
import org.elasticsearch.index.engine.EngineFactory;
52+
import org.elasticsearch.index.engine.ThreadPoolMergeExecutor;
53+
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler;
5254
import org.elasticsearch.index.fielddata.FieldDataContext;
5355
import org.elasticsearch.index.fielddata.IndexFieldData;
5456
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
@@ -154,6 +156,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
154156

155157
private final AsyncTrimTranslogTask trimTranslogTask;
156158
private final ThreadPool threadPool;
159+
private final ThreadPoolMergeExecutor threadPoolMergeExecutor;
157160
private final BigArrays bigArrays;
158161
private final ScriptService scriptService;
159162
private final ClusterService clusterService;
@@ -260,6 +263,7 @@ public IndexService(
260263
this.indexFoldersDeletionListener = indexFoldersDeletionListener;
261264
this.bigArrays = bigArrays;
262265
this.threadPool = threadPool;
266+
this.threadPoolMergeExecutor = new ThreadPoolMergeExecutor(threadPool);
263267
this.scriptService = scriptService;
264268
this.clusterService = clusterService;
265269
this.client = client;
@@ -555,6 +559,7 @@ public synchronized IndexShard createShard(
555559
eventListener,
556560
readerWrapper,
557561
threadPool,
562+
threadPoolMergeExecutor,
558563
bigArrays,
559564
engineWarmer,
560565
searchOperationListeners,
@@ -818,6 +823,10 @@ public ThreadPool getThreadPool() {
818823
return threadPool;
819824
}
820825

826+
public ThreadPoolMergeExecutor getThreadPoolMergeExecutor() {
827+
return threadPoolMergeExecutor;
828+
}
829+
821830
/**
822831
* The {@link BigArrays} to use for this index.
823832
*/

server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public final class EngineConfig {
5858
private final MapperService mapperService;
5959
private final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier;
6060
private final ThreadPool threadPool;
61+
private final ThreadPoolMergeExecutor threadPoolMergeExecutor;
6162
private final Engine.Warmer warmer;
6263
private final Store store;
6364
private final MergePolicy mergePolicy;
@@ -150,6 +151,7 @@ public Supplier<RetentionLeases> retentionLeasesSupplier() {
150151
public EngineConfig(
151152
ShardId shardId,
152153
ThreadPool threadPool,
154+
ThreadPoolMergeExecutor threadPoolMergeExecutor,
153155
IndexSettings indexSettings,
154156
Engine.Warmer warmer,
155157
Store store,
@@ -179,6 +181,7 @@ public EngineConfig(
179181
this.shardId = shardId;
180182
this.indexSettings = indexSettings;
181183
this.threadPool = threadPool;
184+
this.threadPoolMergeExecutor = threadPoolMergeExecutor;
182185
this.warmer = warmer == null ? (a) -> {} : warmer;
183186
this.store = store;
184187
this.mergePolicy = mergePolicy;
@@ -287,6 +290,10 @@ public ThreadPool getThreadPool() {
287290
return threadPool;
288291
}
289292

293+
public ThreadPoolMergeExecutor getThreadPoolMergeExecutor() {
294+
return threadPoolMergeExecutor;
295+
}
296+
290297
/**
291298
* Returns an {@link org.elasticsearch.index.engine.Engine.Warmer} used to warm new searchers before they are used for searching.
292299
*/

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ public InternalEngine(EngineConfig engineConfig) {
253253
boolean success = false;
254254
try {
255255
this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().relativeTimeInMillis();
256-
mergeScheduler = createMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings());
256+
mergeScheduler = createMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings(), engineConfig.getThreadPoolMergeExecutor());
257257
scheduler = mergeScheduler.getMergeScheduler();
258258
throttle = new IndexThrottle();
259259
try {
@@ -2823,9 +2823,13 @@ LiveIndexWriterConfig getCurrentIndexWriterConfig() {
28232823
return indexWriter.getConfig();
28242824
}
28252825

2826-
protected ElasticsearchMergeScheduler createMergeScheduler(ShardId shardId, IndexSettings indexSettings) {
2826+
protected ElasticsearchMergeScheduler createMergeScheduler(
2827+
ShardId shardId,
2828+
IndexSettings indexSettings,
2829+
ThreadPoolMergeExecutor threadPoolMergeExecutor
2830+
) {
28272831
// return new EngineMergeScheduler(shardId, indexSettings);
2828-
return new ThreadPoolMergeScheduler(shardId, indexSettings, engineConfig.getThreadPool()) {
2832+
return new ThreadPoolMergeScheduler(shardId, indexSettings, threadPoolMergeExecutor) {
28292833

28302834
@Override
28312835
protected synchronized void activateThrottling(int numRunningMerges, int numQueuedMerges, int configuredMaxMergeCount) {
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.engine;
11+
12+
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.MergeTask;
13+
import org.elasticsearch.threadpool.ThreadPool;
14+
15+
import java.util.Comparator;
16+
import java.util.SortedSet;
17+
import java.util.TreeSet;
18+
import java.util.concurrent.ExecutorService;
19+
import java.util.function.Consumer;
20+
21+
public class ThreadPoolMergeExecutor {
22+
/**
23+
* Floor for IO write rate limit (we will never go any lower than this)
24+
*/
25+
private static final double MIN_MERGE_MB_PER_SEC = 5.0;
26+
/**
27+
* Ceiling for IO write rate limit (we will never go any higher than this)
28+
*/
29+
private static final double MAX_MERGE_MB_PER_SEC = 10240.0;
30+
/**
31+
* Initial value for IO write rate limit when doAutoIOThrottle is true
32+
*/
33+
private static final double START_MB_PER_SEC = 20.0;
34+
/**
35+
* Current IO write throttle rate, for all merge, across all merge schedulers (shards) on the node
36+
*/
37+
private double targetMBPerSec = START_MB_PER_SEC;
38+
private final SortedSet<ThreadPoolMergeScheduler> registeredMergeSchedulers = new TreeSet<>(new Comparator<ThreadPoolMergeScheduler>() {
39+
@Override
40+
public int compare(ThreadPoolMergeScheduler tpms1, ThreadPoolMergeScheduler tpms2) {
41+
MergeTask mergeTask1 = tpms1.peekMergeTaskToExecute();
42+
MergeTask mergeTask2 = tpms2.peekMergeTaskToExecute();
43+
if (mergeTask1 == null && mergeTask2 == null) {
44+
// arbitrary order between schedulers that cannot run any merge right now
45+
return System.identityHashCode(mergeTask1) - System.identityHashCode(mergeTask2);
46+
} else if (mergeTask1 == null) {
47+
// "merge task 2" can run because "merge scheduler 1" cannot run any merges
48+
return 1;
49+
} else if (mergeTask2 == null) {
50+
// "merge task 1" can run because "merge scheduler 2" cannot run any merges
51+
return -1;
52+
} else {
53+
// run smaller merge task first
54+
return mergeTask1.compareTo(mergeTask2);
55+
}
56+
}
57+
});
58+
private final ExecutorService executorService;
59+
private final int maxConcurrentMerges;
60+
private int currentlyExecutingMergesCount;
61+
62+
public ThreadPoolMergeExecutor(ThreadPool threadPool) {
63+
this.executorService = threadPool.executor(ThreadPool.Names.MERGE);
64+
this.maxConcurrentMerges = threadPool.info(ThreadPool.Names.MERGE).getMax();
65+
}
66+
67+
public double getTargetMBPerSec() {
68+
return targetMBPerSec;
69+
}
70+
71+
public synchronized void updateMergeScheduler(ThreadPoolMergeScheduler threadPoolMergeScheduler,
72+
Consumer<ThreadPoolMergeScheduler> updater) {
73+
registeredMergeSchedulers.remove(threadPoolMergeScheduler);
74+
currentlyExecutingMergesCount -= threadPoolMergeScheduler.getCurrentlyRunningMergeTasks().size();
75+
updater.accept(threadPoolMergeScheduler);
76+
registeredMergeSchedulers.add(threadPoolMergeScheduler);
77+
currentlyExecutingMergesCount += threadPoolMergeScheduler.getCurrentlyRunningMergeTasks().size();
78+
maybeExecuteNextMerges();
79+
}
80+
81+
public synchronized void maybeExecuteNextMerges() {
82+
while (true) {
83+
if (currentlyExecutingMergesCount >= maxConcurrentMerges) {
84+
// all merge threads are busy
85+
return;
86+
}
87+
if (registeredMergeSchedulers.first().peekMergeTaskToExecute() == null) {
88+
// no merges available to run
89+
return;
90+
}
91+
ThreadPoolMergeScheduler threadPoolMergeScheduler = registeredMergeSchedulers.removeFirst();
92+
currentlyExecutingMergesCount -= threadPoolMergeScheduler.getCurrentlyRunningMergeTasks().size();
93+
MergeTask mergeTask = threadPoolMergeScheduler.executeNextMergeTask();
94+
assert mergeTask != null;
95+
executorService.execute(mergeTask);
96+
registeredMergeSchedulers.add(threadPoolMergeScheduler);
97+
currentlyExecutingMergesCount += threadPoolMergeScheduler.getCurrentlyRunningMergeTasks().size();
98+
}
99+
}
100+
}

0 commit comments

Comments
 (0)