Skip to content

Commit 083326e

Browse files
Threadpool merge executor does not block aborted merges (#129613)
This PR addresses a bug where aborted merges are blocked if there's insufficient disk space. Previously, the merge disk space estimation did not consider if the operation has been aborted when/while it was enqueued for execution. Consequently, aborted merges, for e.g. when closing a shard, were blocked if their disk space estimation was exceeding the available disk space threshold. In this case, the shard close operation would itself block. This fix estimates a disk space budget of `0` for aborted merges, and it periodically checks if any enqueued merge tasks have been aborted (more generally, it checks if the budget estimate for any merge tasks has changed, and reorders the queue if so). This way aborted merges are prioritized and are never blocked. Closes #129335
1 parent ee5d652 commit 083326e

File tree

6 files changed

+358
-33
lines changed

6 files changed

+358
-33
lines changed

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -505,9 +505,6 @@ tests:
505505
- class: org.elasticsearch.xpack.esql.qa.single_node.GenerativeForkIT
506506
method: test {lookup-join.MultipleBatches*
507507
issue: https://github.com/elastic/elasticsearch/issues/129210
508-
- class: org.elasticsearch.xpack.autoscaling.storage.ReactiveStorageIT
509-
method: testScaleDuringSplitOrClone
510-
issue: https://github.com/elastic/elasticsearch/issues/129335
511508
- class: org.elasticsearch.entitlement.runtime.policy.FileAccessTreeTests
512509
method: testWindowsMixedCaseAccess
513510
issue: https://github.com/elastic/elasticsearch/issues/129167
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.engine;
11+
12+
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
13+
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
14+
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
15+
import org.elasticsearch.cluster.DiskUsageIntegTestCase;
16+
import org.elasticsearch.cluster.metadata.IndexMetadata;
17+
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
18+
import org.elasticsearch.common.settings.Settings;
19+
import org.elasticsearch.common.util.concurrent.EsExecutors;
20+
import org.elasticsearch.index.IndexNotFoundException;
21+
import org.elasticsearch.indices.IndicesService;
22+
import org.elasticsearch.test.ESIntegTestCase;
23+
import org.elasticsearch.threadpool.ThreadPool;
24+
import org.junit.BeforeClass;
25+
26+
import java.util.Locale;
27+
import java.util.stream.IntStream;
28+
29+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
30+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
31+
import static org.hamcrest.Matchers.equalTo;
32+
import static org.hamcrest.Matchers.lessThan;
33+
34+
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
35+
public class MergeWithLowDiskSpaceIT extends DiskUsageIntegTestCase {
36+
protected static long MERGE_DISK_HIGH_WATERMARK_BYTES;
37+
38+
@BeforeClass
39+
public static void setAvailableDiskSpaceBufferLimit() {
40+
// this has to be big in order to potentially accommodate the disk space for a few 100s of docs and a few merges,
41+
// because of the latency to process used disk space updates, and also because we cannot reliably separate indexing from merging
42+
// operations at this high abstraction level (merging is triggered more or less automatically in the background)
43+
MERGE_DISK_HIGH_WATERMARK_BYTES = randomLongBetween(1_000_000L, 2_000_000L);
44+
}
45+
46+
@Override
47+
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
48+
return Settings.builder()
49+
.put(super.nodeSettings(nodeOrdinal, otherSettings))
50+
// only the threadpool-based merge scheduler has the capability to block merges when disk space is insufficient
51+
.put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true)
52+
// the very short disk space polling interval ensures timely blocking of merges
53+
.put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), "10ms")
54+
// merges pile up more easily when there's only a few threads executing them
55+
.put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), randomIntBetween(1, 2))
56+
.put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING.getKey(), MERGE_DISK_HIGH_WATERMARK_BYTES + "b")
57+
// let's not worry about allocation watermarks (e.g. read-only shards) in this test suite
58+
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "0b")
59+
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "0b")
60+
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "0b")
61+
.build();
62+
}
63+
64+
public void testShardCloseWhenDiskSpaceInsufficient() throws Exception {
65+
String node = internalCluster().startNode();
66+
setTotalSpace(node, Long.MAX_VALUE);
67+
var indicesService = internalCluster().getInstance(IndicesService.class, node);
68+
ensureStableCluster(1);
69+
// create index
70+
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
71+
createIndex(
72+
indexName,
73+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build()
74+
);
75+
// do some indexing
76+
indexRandom(
77+
false,
78+
false,
79+
false,
80+
false,
81+
IntStream.range(1, randomIntBetween(2, 10))
82+
.mapToObj(i -> prepareIndex(indexName).setSource("field", randomAlphaOfLength(50)))
83+
.toList()
84+
);
85+
// get current disk space usage
86+
IndicesStatsResponse stats = indicesAdmin().prepareStats().clear().setStore(true).get();
87+
long usedDiskSpaceAfterIndexing = stats.getTotal().getStore().sizeInBytes();
88+
// restrict the total disk space such that the next merge does not have sufficient disk space
89+
long insufficientTotalDiskSpace = usedDiskSpaceAfterIndexing + MERGE_DISK_HIGH_WATERMARK_BYTES - randomLongBetween(1L, 10L);
90+
setTotalSpace(node, insufficientTotalDiskSpace);
91+
// node stats' FS stats should report that there is insufficient disk space available
92+
assertBusy(() -> {
93+
NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setFs(true).get();
94+
assertThat(nodesStatsResponse.getNodes().size(), equalTo(1));
95+
NodeStats nodeStats = nodesStatsResponse.getNodes().get(0);
96+
assertThat(nodeStats.getFs().getTotal().getTotal().getBytes(), equalTo(insufficientTotalDiskSpace));
97+
assertThat(nodeStats.getFs().getTotal().getAvailable().getBytes(), lessThan(MERGE_DISK_HIGH_WATERMARK_BYTES));
98+
});
99+
while (true) {
100+
// maybe trigger a merge (this still depends on the merge policy, i.e. it is not 100% guaranteed)
101+
assertNoFailures(indicesAdmin().prepareForceMerge(indexName).get());
102+
// keep indexing and ask for merging until node stats' threadpool stats reports enqueued merges,
103+
// and the merge executor says they're blocked due to insufficient disk space if (nodesStatsResponse.getNodes()
104+
NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setThreadPool(true).get();
105+
if (nodesStatsResponse.getNodes()
106+
.getFirst()
107+
.getThreadPool()
108+
.stats()
109+
.stream()
110+
.filter(s -> ThreadPool.Names.MERGE.equals(s.name()))
111+
.findAny()
112+
.get()
113+
.queue() > 0
114+
&& indicesService.getThreadPoolMergeExecutorService().isMergingBlockedDueToInsufficientDiskSpace()) {
115+
break;
116+
}
117+
// more indexing
118+
indexRandom(
119+
false,
120+
false,
121+
false,
122+
false,
123+
IntStream.range(1, randomIntBetween(2, 10))
124+
.mapToObj(i -> prepareIndex(indexName).setSource("another_field", randomAlphaOfLength(50)))
125+
.toList()
126+
);
127+
}
128+
// now delete the index in this state, i.e. with merges enqueued and blocked
129+
assertAcked(indicesAdmin().prepareDelete(indexName).get());
130+
// index should now be gone
131+
assertBusy(() -> {
132+
expectThrows(
133+
IndexNotFoundException.class,
134+
() -> indicesAdmin().prepareGetIndex(TEST_REQUEST_TIMEOUT).setIndices(indexName).get()
135+
);
136+
});
137+
assertBusy(() -> {
138+
// merge thread pool should be done with the enqueue merge tasks
139+
NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setThreadPool(true).get();
140+
assertThat(
141+
nodesStatsResponse.getNodes()
142+
.getFirst()
143+
.getThreadPool()
144+
.stats()
145+
.stream()
146+
.filter(s -> ThreadPool.Names.MERGE.equals(s.name()))
147+
.findAny()
148+
.get()
149+
.queue(),
150+
equalTo(0)
151+
);
152+
// and the merge executor should also report that merging is done now
153+
assertFalse(indicesService.getThreadPoolMergeExecutorService().isMergingBlockedDueToInsufficientDiskSpace());
154+
assertTrue(indicesService.getThreadPoolMergeExecutorService().allDone());
155+
});
156+
}
157+
158+
public void setTotalSpace(String dataNodeName, long totalSpace) {
159+
getTestFileStore(dataNodeName).setTotalSpace(totalSpace);
160+
refreshClusterInfo();
161+
}
162+
}

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java

Lines changed: 50 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.elasticsearch.core.Nullable;
2121
import org.elasticsearch.core.Releasable;
2222
import org.elasticsearch.core.TimeValue;
23+
import org.elasticsearch.core.Tuple;
2324
import org.elasticsearch.env.NodeEnvironment;
2425
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.MergeTask;
2526
import org.elasticsearch.monitor.fs.FsInfo;
@@ -28,6 +29,7 @@
2829

2930
import java.io.Closeable;
3031
import java.io.IOException;
32+
import java.util.ArrayList;
3133
import java.util.Arrays;
3234
import java.util.Comparator;
3335
import java.util.IdentityHashMap;
@@ -59,10 +61,7 @@ public class ThreadPoolMergeExecutorService implements Closeable {
5961
/** How frequently we check disk usage (default: 5 seconds). */
6062
public static final Setting<TimeValue> INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING = Setting.positiveTimeSetting(
6163
"indices.merge.disk.check_interval",
62-
// disabled by default
63-
// there's currently a problem where (aborting) merges are blocked when shards are closed (because disk space is insufficient)
64-
// see: https://github.com/elastic/elasticsearch/issues/129335
65-
TimeValue.timeValueSeconds(0),
64+
TimeValue.timeValueSeconds(5),
6665
Property.Dynamic,
6766
Property.NodeScope
6867
);
@@ -294,6 +293,10 @@ public boolean allDone() {
294293
return queuedMergeTasks.isQueueEmpty() && runningMergeTasks.isEmpty() && ioThrottledMergeTasksCount.get() == 0L;
295294
}
296295

296+
public boolean isMergingBlockedDueToInsufficientDiskSpace() {
297+
return availableDiskSpacePeriodicMonitor.isScheduled() && queuedMergeTasks.queueHeadIsOverTheAvailableBudget();
298+
}
299+
297300
/**
298301
* Enqueues a runnable that executes exactly one merge task, the smallest that is runnable at some point in time.
299302
* A merge task is not runnable if its scheduler already reached the configured max-allowed concurrency level.
@@ -550,9 +553,8 @@ private static ByteSizeValue getFreeBytesThreshold(
550553

551554
static class MergeTaskPriorityBlockingQueue extends PriorityBlockingQueueWithBudget<MergeTask> {
552555
MergeTaskPriorityBlockingQueue() {
553-
// start with 0 budget (so takes on this queue will always block until {@link #updateBudget} is invoked)
554-
// use the estimated *remaining* merge size as the budget function so that the disk space budget of taken (in-use) elements is
555-
// updated according to the remaining disk space requirements of the currently running merge tasks
556+
// by default, start with 0 budget (so takes on this queue will always block until the first {@link #updateBudget} is invoked)
557+
// use the estimated *remaining* merge size as the budget function so that the disk space budget of elements is updated
556558
super(MergeTask::estimatedRemainingMergeSize, 0L);
557559
}
558560

@@ -563,7 +565,7 @@ long getAvailableBudget() {
563565

564566
// exposed for tests
565567
MergeTask peekQueue() {
566-
return enqueuedByBudget.peek();
568+
return enqueuedByBudget.peek().v1();
567569
}
568570
}
569571

@@ -573,15 +575,15 @@ MergeTask peekQueue() {
573575
*/
574576
static class PriorityBlockingQueueWithBudget<E> {
575577
private final ToLongFunction<? super E> budgetFunction;
576-
protected final PriorityQueue<E> enqueuedByBudget;
578+
protected final PriorityQueue<Tuple<E, Long>> enqueuedByBudget;
577579
private final IdentityHashMap<ElementWithReleasableBudget, Long> unreleasedBudgetPerElement;
578580
private final ReentrantLock lock;
579581
private final Condition elementAvailable;
580582
protected long availableBudget;
581583

582584
PriorityBlockingQueueWithBudget(ToLongFunction<? super E> budgetFunction, long initialAvailableBudget) {
583585
this.budgetFunction = budgetFunction;
584-
this.enqueuedByBudget = new PriorityQueue<>(64, Comparator.comparingLong(budgetFunction));
586+
this.enqueuedByBudget = new PriorityQueue<>(64, Comparator.comparingLong(Tuple::v2));
585587
this.unreleasedBudgetPerElement = new IdentityHashMap<>();
586588
this.lock = new ReentrantLock();
587589
this.elementAvailable = lock.newCondition();
@@ -592,7 +594,7 @@ boolean enqueue(E e) {
592594
final ReentrantLock lock = this.lock;
593595
lock.lock();
594596
try {
595-
enqueuedByBudget.offer(e);
597+
enqueuedByBudget.offer(new Tuple<>(e, budgetFunction.applyAsLong(e)));
596598
elementAvailable.signal();
597599
} finally {
598600
lock.unlock();
@@ -608,22 +610,22 @@ ElementWithReleasableBudget take() throws InterruptedException {
608610
final ReentrantLock lock = this.lock;
609611
lock.lockInterruptibly();
610612
try {
611-
E peek;
612-
long peekBudget;
613+
Tuple<E, Long> head;
613614
// blocks until the smallest budget element fits the currently available budget
614-
while ((peek = enqueuedByBudget.peek()) == null || (peekBudget = budgetFunction.applyAsLong(peek)) > availableBudget) {
615+
while ((head = enqueuedByBudget.peek()) == null || head.v2() > availableBudget) {
615616
elementAvailable.await();
616617
}
618+
head = enqueuedByBudget.poll();
617619
// deducts and holds up that element's budget from the available budget
618-
return newElementWithReleasableBudget(enqueuedByBudget.poll(), peekBudget);
620+
return newElementWithReleasableBudget(head.v1(), head.v2());
619621
} finally {
620622
lock.unlock();
621623
}
622624
}
623625

624626
/**
625627
* Updates the available budged given the passed-in argument, from which it deducts the budget hold up by taken elements
626-
* that are still in use. The budget of in-use elements is also updated (by re-applying the budget function).
628+
* that are still in use. The elements budget is also updated by re-applying the budget function.
627629
* The newly updated budget is used to potentially block {@link #take()} operations if the smallest-budget enqueued element
628630
* is over this newly computed available budget.
629631
*/
@@ -632,20 +634,50 @@ void updateBudget(long availableBudget) {
632634
lock.lock();
633635
try {
634636
this.availableBudget = availableBudget;
635-
// update the per-element budget (these are all the elements that are using any budget)
637+
// updates the budget of enqueued elements (and possibly reorders the priority queue)
638+
updateBudgetOfEnqueuedElementsAndReorderQueue();
639+
// update the budget of dequeued, but still in-use elements (these are the elements that are consuming budget)
636640
unreleasedBudgetPerElement.replaceAll((e, v) -> budgetFunction.applyAsLong(e.element()));
637-
// available budget is decreased by the used per-element budget (for all dequeued elements that are still in use)
641+
// the available budget is decreased by the budget of still in-use elements (dequeued elements that are still in-use)
638642
this.availableBudget -= unreleasedBudgetPerElement.values().stream().mapToLong(i -> i).sum();
639643
elementAvailable.signalAll();
640644
} finally {
641645
lock.unlock();
642646
}
643647
}
644648

649+
private void updateBudgetOfEnqueuedElementsAndReorderQueue() {
650+
assert this.lock.isHeldByCurrentThread();
651+
int queueSizeBefore = enqueuedByBudget.size();
652+
var it = enqueuedByBudget.iterator();
653+
List<Tuple<E, Long>> elementsToReorder = new ArrayList<>();
654+
while (it.hasNext()) {
655+
var elementWithBudget = it.next();
656+
Long previousBudget = elementWithBudget.v2();
657+
long latestBudget = budgetFunction.applyAsLong(elementWithBudget.v1());
658+
if (previousBudget.equals(latestBudget) == false) {
659+
// the budget (estimation) of an enqueued element has changed
660+
// this element will be reordered by removing and reinserting using the latest budget (estimation)
661+
it.remove();
662+
elementsToReorder.add(new Tuple<>(elementWithBudget.v1(), latestBudget));
663+
}
664+
}
665+
// reinsert elements based on the latest budget (estimation)
666+
for (var reorderedElement : elementsToReorder) {
667+
enqueuedByBudget.offer(reorderedElement);
668+
}
669+
assert queueSizeBefore == enqueuedByBudget.size();
670+
}
671+
645672
boolean isQueueEmpty() {
646673
return enqueuedByBudget.isEmpty();
647674
}
648675

676+
boolean queueHeadIsOverTheAvailableBudget() {
677+
var head = enqueuedByBudget.peek();
678+
return head != null && head.v2() > availableBudget;
679+
}
680+
649681
int queueSize() {
650682
return enqueuedByBudget.size();
651683
}

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -537,8 +537,13 @@ void abort() {
537537
long estimatedRemainingMergeSize() {
538538
// TODO is it possible that `estimatedMergeBytes` be `0` for correctly initialize merges,
539539
// or is it always the case that if `estimatedMergeBytes` is `0` that means that the merge has not yet been initialized?
540-
long estimatedMergeSize = onGoingMerge.getMerge().getStoreMergeInfo().estimatedMergeBytes();
541-
return Math.max(0L, estimatedMergeSize - rateLimiter.getTotalBytesWritten());
540+
if (onGoingMerge.getMerge().isAborted()) {
541+
// if the merge is aborted the assumption is that merging will soon stop with negligible further writing
542+
return 0L;
543+
} else {
544+
long estimatedMergeSize = onGoingMerge.getMerge().getStoreMergeInfo().estimatedMergeBytes();
545+
return Math.max(0L, estimatedMergeSize - rateLimiter.getTotalBytesWritten());
546+
}
542547
}
543548

544549
public long getMergeMemoryEstimateBytes() {

0 commit comments

Comments
 (0)