Skip to content

Commit 3023232

Browse files
authored
Merge branch 'main' into spatialcard
2 parents 077958e + 53f3ab2 commit 3023232

File tree

13 files changed

+1891
-97
lines changed

13 files changed

+1891
-97
lines changed

docs/changelog/127613.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 127613
2+
summary: Threadpool merge executor is aware of available disk space
3+
area: Engine
4+
type: feature
5+
issues: []

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
import org.elasticsearch.index.IndexSettings;
8989
import org.elasticsearch.index.IndexingPressure;
9090
import org.elasticsearch.index.MergePolicyConfig;
91+
import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService;
9192
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler;
9293
import org.elasticsearch.index.shard.IndexingStatsSettings;
9394
import org.elasticsearch.indices.IndexingMemoryController;
@@ -629,6 +630,9 @@ public void apply(Settings value, Settings current, Settings previous) {
629630
MergePolicyConfig.DEFAULT_MAX_MERGED_SEGMENT_SETTING,
630631
MergePolicyConfig.DEFAULT_MAX_TIME_BASED_MERGED_SEGMENT_SETTING,
631632
ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING,
633+
ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING,
634+
ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_HIGH_MAX_HEADROOM_SETTING,
635+
ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING,
632636
TransportService.ENABLE_STACK_OVERFLOW_AVOIDANCE,
633637
DataStreamGlobalRetentionSettings.DATA_STREAMS_DEFAULT_RETENTION_SETTING,
634638
DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING,

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java

Lines changed: 516 additions & 33 deletions
Large diffs are not rendered by default.

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
5555
private final ThreadPoolMergeExecutorService threadPoolMergeExecutorService;
5656
private final PriorityQueue<MergeTask> backloggedMergeTasks = new PriorityQueue<>(
5757
16,
58-
Comparator.comparingLong(MergeTask::estimatedMergeSize)
58+
Comparator.comparingLong(MergeTask::estimatedRemainingMergeSize)
5959
);
6060
private final Map<MergePolicy.OneMerge, MergeTask> runningMergeTasks = new HashMap<>();
6161
// set when incoming merges should be throttled (i.e. restrict the indexing rate)
@@ -266,7 +266,7 @@ private void checkMergeTaskThrottling() {
266266
// exposed for tests
267267
// synchronized so that {@code #closed}, {@code #runningMergeTasks} and {@code #backloggedMergeTasks} are modified atomically
268268
synchronized Schedule schedule(MergeTask mergeTask) {
269-
assert mergeTask.isRunning() == false;
269+
assert mergeTask.hasStartedRunning() == false;
270270
if (closed) {
271271
// do not run or backlog tasks when closing the merge scheduler, instead abort them
272272
return Schedule.ABORT;
@@ -280,6 +280,7 @@ synchronized Schedule schedule(MergeTask mergeTask) {
280280
assert added : "starting merge task [" + mergeTask + "] registered as already running";
281281
return Schedule.RUN;
282282
} else {
283+
assert mergeTask.hasStartedRunning() == false;
283284
backloggedMergeTasks.add(mergeTask);
284285
return Schedule.BACKLOG;
285286
}
@@ -403,8 +404,14 @@ public void setIORateLimit(long ioRateLimitBytesPerSec) {
403404
this.rateLimiter.setMBPerSec(ByteSizeValue.ofBytes(ioRateLimitBytesPerSec).getMbFrac());
404405
}
405406

406-
public boolean isRunning() {
407-
return mergeStartTimeNS.get() > 0L;
407+
/**
408+
* Returns {@code true} if this task is currently running, or was run in the past.
409+
* An aborted task (see {@link #abort()}) is considered as NOT run.
410+
*/
411+
public boolean hasStartedRunning() {
412+
boolean isRunning = mergeStartTimeNS.get() > 0L;
413+
assert isRunning != false || rateLimiter.getTotalBytesWritten() == 0L;
414+
return isRunning;
408415
}
409416

410417
/**
@@ -415,7 +422,7 @@ public boolean isRunning() {
415422
*/
416423
@Override
417424
public void run() {
418-
assert isRunning() == false;
425+
assert hasStartedRunning() == false;
419426
assert ThreadPoolMergeScheduler.this.runningMergeTasks.containsKey(onGoingMerge.getMerge())
420427
: "runNowOrBacklog must be invoked before actually running the merge task";
421428
try {
@@ -480,7 +487,7 @@ public void run() {
480487
* (by the {@link org.apache.lucene.index.IndexWriter}) to any subsequent merges.
481488
*/
482489
void abort() {
483-
assert isRunning() == false;
490+
assert hasStartedRunning() == false;
484491
assert ThreadPoolMergeScheduler.this.runningMergeTasks.containsKey(onGoingMerge.getMerge()) == false
485492
: "cannot abort a merge task that's already running";
486493
if (verbose()) {
@@ -509,10 +516,17 @@ void abort() {
509516
}
510517
}
511518

512-
long estimatedMergeSize() {
519+
/**
520+
* Before the merge task started running, this returns the estimated required disk space for the merge to complete
521+
* (i.e. the estimated disk space size of the resulting segment following the merge).
522+
* While the merge is running, the returned estimation is updated to take into account the data that's already been written.
523+
* After the merge completes, the estimation returned here should ideally be close to "0".
524+
*/
525+
long estimatedRemainingMergeSize() {
513526
// TODO is it possible that `estimatedMergeBytes` be `0` for correctly initialize merges,
514527
// or is it always the case that if `estimatedMergeBytes` is `0` that means that the merge has not yet been initialized?
515-
return onGoingMerge.getMerge().getStoreMergeInfo().estimatedMergeBytes();
528+
long estimatedMergeSize = onGoingMerge.getMerge().getStoreMergeInfo().estimatedMergeBytes();
529+
return Math.max(0L, estimatedMergeSize - rateLimiter.getTotalBytesWritten());
516530
}
517531

518532
public long getMergeMemoryEstimateBytes() {

server/src/main/java/org/elasticsearch/indices/IndicesService.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -295,10 +295,6 @@ protected void doStart() {
295295
IndicesService(IndicesServiceBuilder builder) {
296296
this.settings = builder.settings;
297297
this.threadPool = builder.threadPool;
298-
this.threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(
299-
threadPool,
300-
settings
301-
);
302298
this.pluginsService = builder.pluginsService;
303299
this.nodeEnv = builder.nodeEnv;
304300
this.parserConfig = XContentParserConfiguration.EMPTY.withDeprecationHandler(LoggingDeprecationHandler.INSTANCE)
@@ -321,6 +317,11 @@ protected void doStart() {
321317
this.bigArrays = builder.bigArrays;
322318
this.scriptService = builder.scriptService;
323319
this.clusterService = builder.clusterService;
320+
this.threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(
321+
threadPool,
322+
clusterService.getClusterSettings(),
323+
nodeEnv
324+
);
324325
this.projectResolver = builder.projectResolver;
325326
this.client = builder.client;
326327
this.idFieldDataEnabled = INDICES_ID_FIELD_DATA_ENABLED_SETTING.get(clusterService.getSettings());
@@ -368,7 +369,8 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon
368369
indicesFieldDataCache,
369370
cacheCleaner,
370371
indicesRequestCache,
371-
indicesQueryCache
372+
indicesQueryCache,
373+
threadPoolMergeExecutorService
372374
);
373375
} catch (IOException e) {
374376
throw new UncheckedIOException(e);

server/src/test/java/org/elasticsearch/index/IndexModuleTests.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,13 +194,17 @@ public void setUp() throws Exception {
194194
emptyMap()
195195
);
196196
threadPool = new TestThreadPool("test");
197-
threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(threadPool, settings);
198197
circuitBreakerService = new NoneCircuitBreakerService();
199198
PageCacheRecycler pageCacheRecycler = new PageCacheRecycler(settings);
200199
bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.REQUEST);
201200
scriptService = new ScriptService(settings, Collections.emptyMap(), Collections.emptyMap(), () -> 1L);
202-
clusterService = ClusterServiceUtils.createClusterService(threadPool);
201+
clusterService = ClusterServiceUtils.createClusterService(threadPool, ClusterSettings.createBuiltInClusterSettings(settings));
203202
nodeEnvironment = new NodeEnvironment(settings, environment);
203+
threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(
204+
threadPool,
205+
clusterService.getClusterSettings(),
206+
nodeEnvironment
207+
);
204208
mapperRegistry = new IndicesModule(Collections.emptyList()).getMapperRegistry();
205209
indexNameExpressionResolver = TestIndexNameExpressionResolver.newInstance(threadPool.getThreadContext());
206210
}

0 commit comments

Comments
 (0)