Skip to content

Commit 55a477a

Browse files
Threadpool merge executor is aware of available disk space (#127613)
This PR introduces 3 new settings: indices.merge.disk.check_interval, indices.merge.disk.watermark.high, and indices.merge.disk.watermark.high.max_headroom that control if the threadpool merge executor starts executing new merges when the disk space is getting low. The intent of this change is to avoid the situation where in-progress merges exhaust the available disk space on the node's local filesystem. To this end, the thread pool merge executor periodically monitors the available disk space, as well as the current disk space estimates required by all in-progress (currently running) merges on the node, and will NOT schedule any new merges if the disk space is getting low (by default below the 5% limit of the total disk space, or 100 GB, whichever is smaller (same as the disk allocation flood stage level)).
1 parent 646069d commit 55a477a

File tree

13 files changed

+1924
-97
lines changed

13 files changed

+1924
-97
lines changed

docs/changelog/127613.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 127613
2+
summary: Threadpool merge executor is aware of available disk space
3+
area: Engine
4+
type: feature
5+
issues: []

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
import org.elasticsearch.index.IndexSettings;
8888
import org.elasticsearch.index.IndexingPressure;
8989
import org.elasticsearch.index.MergePolicyConfig;
90+
import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService;
9091
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler;
9192
import org.elasticsearch.indices.IndexingMemoryController;
9293
import org.elasticsearch.indices.IndicesQueryCache;
@@ -621,6 +622,9 @@ public void apply(Settings value, Settings current, Settings previous) {
621622
MergePolicyConfig.DEFAULT_MAX_MERGED_SEGMENT_SETTING,
622623
MergePolicyConfig.DEFAULT_MAX_TIME_BASED_MERGED_SEGMENT_SETTING,
623624
ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING,
625+
ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING,
626+
ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_HIGH_MAX_HEADROOM_SETTING,
627+
ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING,
624628
TransportService.ENABLE_STACK_OVERFLOW_AVOIDANCE,
625629
DataStreamGlobalRetentionSettings.DATA_STREAMS_DEFAULT_RETENTION_SETTING,
626630
DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING,

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java

Lines changed: 525 additions & 33 deletions
Large diffs are not rendered by default.

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
5555
private final ThreadPoolMergeExecutorService threadPoolMergeExecutorService;
5656
private final PriorityQueue<MergeTask> backloggedMergeTasks = new PriorityQueue<>(
5757
16,
58-
Comparator.comparingLong(MergeTask::estimatedMergeSize)
58+
Comparator.comparingLong(MergeTask::estimatedRemainingMergeSize)
5959
);
6060
private final Map<MergePolicy.OneMerge, MergeTask> runningMergeTasks = new HashMap<>();
6161
// set when incoming merges should be throttled (i.e. restrict the indexing rate)
@@ -214,7 +214,7 @@ private void checkMergeTaskThrottling() {
214214
// exposed for tests
215215
// synchronized so that {@code #closed}, {@code #runningMergeTasks} and {@code #backloggedMergeTasks} are modified atomically
216216
synchronized Schedule schedule(MergeTask mergeTask) {
217-
assert mergeTask.isRunning() == false;
217+
assert mergeTask.hasStartedRunning() == false;
218218
if (closed) {
219219
// do not run or backlog tasks when closing the merge scheduler, instead abort them
220220
return Schedule.ABORT;
@@ -223,6 +223,7 @@ synchronized Schedule schedule(MergeTask mergeTask) {
223223
assert added : "starting merge task [" + mergeTask + "] registered as already running";
224224
return Schedule.RUN;
225225
} else {
226+
assert mergeTask.hasStartedRunning() == false;
226227
backloggedMergeTasks.add(mergeTask);
227228
return Schedule.BACKLOG;
228229
}
@@ -337,8 +338,14 @@ public void setIORateLimit(long ioRateLimitBytesPerSec) {
337338
this.rateLimiter.setMBPerSec(ByteSizeValue.ofBytes(ioRateLimitBytesPerSec).getMbFrac());
338339
}
339340

340-
public boolean isRunning() {
341-
return mergeStartTimeNS.get() > 0L;
341+
/**
342+
* Returns {@code true} if this task is currently running, or was run in the past.
343+
* An aborted task (see {@link #abort()}) is considered as NOT run.
344+
*/
345+
public boolean hasStartedRunning() {
346+
boolean isRunning = mergeStartTimeNS.get() > 0L;
347+
assert isRunning != false || rateLimiter.getTotalBytesWritten() == 0L;
348+
return isRunning;
342349
}
343350

344351
/**
@@ -349,7 +356,7 @@ public boolean isRunning() {
349356
*/
350357
@Override
351358
public void run() {
352-
assert isRunning() == false;
359+
assert hasStartedRunning() == false;
353360
assert ThreadPoolMergeScheduler.this.runningMergeTasks.containsKey(onGoingMerge.getMerge())
354361
: "runNowOrBacklog must be invoked before actually running the merge task";
355362
try {
@@ -414,7 +421,7 @@ public void run() {
414421
* (by the {@link org.apache.lucene.index.IndexWriter}) to any subsequent merges.
415422
*/
416423
void abort() {
417-
assert isRunning() == false;
424+
assert hasStartedRunning() == false;
418425
assert ThreadPoolMergeScheduler.this.runningMergeTasks.containsKey(onGoingMerge.getMerge()) == false
419426
: "cannot abort a merge task that's already running";
420427
if (verbose()) {
@@ -443,10 +450,17 @@ void abort() {
443450
}
444451
}
445452

446-
long estimatedMergeSize() {
453+
/**
454+
* Before the merge task started running, this returns the estimated required disk space for the merge to complete
455+
* (i.e. the estimated disk space size of the resulting segment following the merge).
456+
* While the merge is running, the returned estimation is updated to take into account the data that's already been written.
457+
* After the merge completes, the estimation returned here should ideally be close to "0".
458+
*/
459+
long estimatedRemainingMergeSize() {
447460
// TODO is it possible that `estimatedMergeBytes` be `0` for correctly initialize merges,
448461
// or is it always the case that if `estimatedMergeBytes` is `0` that means that the merge has not yet been initialized?
449-
return onGoingMerge.getMerge().getStoreMergeInfo().estimatedMergeBytes();
462+
long estimatedMergeSize = onGoingMerge.getMerge().getStoreMergeInfo().estimatedMergeBytes();
463+
return Math.max(0L, estimatedMergeSize - rateLimiter.getTotalBytesWritten());
450464
}
451465

452466
@Override

server/src/main/java/org/elasticsearch/indices/IndicesService.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -289,10 +289,6 @@ protected void doStart() {
289289
IndicesService(IndicesServiceBuilder builder) {
290290
this.settings = builder.settings;
291291
this.threadPool = builder.threadPool;
292-
this.threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(
293-
threadPool,
294-
settings
295-
);
296292
this.pluginsService = builder.pluginsService;
297293
this.nodeEnv = builder.nodeEnv;
298294
this.parserConfig = XContentParserConfiguration.EMPTY.withDeprecationHandler(LoggingDeprecationHandler.INSTANCE)
@@ -315,6 +311,12 @@ protected void doStart() {
315311
this.bigArrays = builder.bigArrays;
316312
this.scriptService = builder.scriptService;
317313
this.clusterService = builder.clusterService;
314+
this.threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(
315+
threadPool,
316+
clusterService.getClusterSettings(),
317+
nodeEnv
318+
);
319+
this.projectResolver = builder.projectResolver;
318320
this.client = builder.client;
319321
this.featureService = builder.featureService;
320322
this.idFieldDataEnabled = INDICES_ID_FIELD_DATA_ENABLED_SETTING.get(clusterService.getSettings());
@@ -362,7 +364,8 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon
362364
indicesFieldDataCache,
363365
cacheCleaner,
364366
indicesRequestCache,
365-
indicesQueryCache
367+
indicesQueryCache,
368+
threadPoolMergeExecutorService
366369
);
367370
} catch (IOException e) {
368371
throw new UncheckedIOException(e);

server/src/test/java/org/elasticsearch/index/IndexModuleTests.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,13 +192,17 @@ public void setUp() throws Exception {
192192
emptyMap()
193193
);
194194
threadPool = new TestThreadPool("test");
195-
threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(threadPool, settings);
196195
circuitBreakerService = new NoneCircuitBreakerService();
197196
PageCacheRecycler pageCacheRecycler = new PageCacheRecycler(settings);
198197
bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.REQUEST);
199198
scriptService = new ScriptService(settings, Collections.emptyMap(), Collections.emptyMap(), () -> 1L);
200-
clusterService = ClusterServiceUtils.createClusterService(threadPool);
199+
clusterService = ClusterServiceUtils.createClusterService(threadPool, ClusterSettings.createBuiltInClusterSettings(settings));
201200
nodeEnvironment = new NodeEnvironment(settings, environment);
201+
threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(
202+
threadPool,
203+
clusterService.getClusterSettings(),
204+
nodeEnvironment
205+
);
202206
mapperRegistry = new IndicesModule(Collections.emptyList()).getMapperRegistry();
203207
indexNameExpressionResolver = TestIndexNameExpressionResolver.newInstance(threadPool.getThreadContext());
204208
}

0 commit comments

Comments
 (0)