Skip to content

Commit cdabfb7

Browse files
authored
Merge branch 'main' into sorting_on_range_fields_throws_400
2 parents 7388cab + 083326e commit cdabfb7

File tree

31 files changed

+2166
-1148
lines changed

31 files changed

+2166
-1148
lines changed

muted-tests.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -505,9 +505,6 @@ tests:
505505
- class: org.elasticsearch.xpack.esql.qa.single_node.GenerativeForkIT
506506
method: test {lookup-join.MultipleBatches*
507507
issue: https://github.com/elastic/elasticsearch/issues/129210
508-
- class: org.elasticsearch.xpack.autoscaling.storage.ReactiveStorageIT
509-
method: testScaleDuringSplitOrClone
510-
issue: https://github.com/elastic/elasticsearch/issues/129335
511508
- class: org.elasticsearch.entitlement.runtime.policy.FileAccessTreeTests
512509
method: testWindowsMixedCaseAccess
513510
issue: https://github.com/elastic/elasticsearch/issues/129167
@@ -574,6 +571,9 @@ tests:
574571
- class: org.elasticsearch.xpack.esql.qa.multi_node.EsqlSpecIT
575572
method: test {scoring.TestMatchPhraseWithScoreBoost ASYNC}
576573
issue: https://github.com/elastic/elasticsearch/issues/129676
574+
- class: org.elasticsearch.search.query.RescoreKnnVectorQueryIT
575+
method: testKnnSearchRescore
576+
issue: https://github.com/elastic/elasticsearch/issues/129713
577577

578578
# Examples:
579579
#
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.engine;
11+
12+
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
13+
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
14+
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
15+
import org.elasticsearch.cluster.DiskUsageIntegTestCase;
16+
import org.elasticsearch.cluster.metadata.IndexMetadata;
17+
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
18+
import org.elasticsearch.common.settings.Settings;
19+
import org.elasticsearch.common.util.concurrent.EsExecutors;
20+
import org.elasticsearch.index.IndexNotFoundException;
21+
import org.elasticsearch.indices.IndicesService;
22+
import org.elasticsearch.test.ESIntegTestCase;
23+
import org.elasticsearch.threadpool.ThreadPool;
24+
import org.junit.BeforeClass;
25+
26+
import java.util.Locale;
27+
import java.util.stream.IntStream;
28+
29+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
30+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
31+
import static org.hamcrest.Matchers.equalTo;
32+
import static org.hamcrest.Matchers.lessThan;
33+
34+
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
35+
public class MergeWithLowDiskSpaceIT extends DiskUsageIntegTestCase {
36+
protected static long MERGE_DISK_HIGH_WATERMARK_BYTES;
37+
38+
@BeforeClass
39+
public static void setAvailableDiskSpaceBufferLimit() {
40+
// this has to be big in order to potentially accommodate the disk space for a few 100s of docs and a few merges,
41+
// because of the latency to process used disk space updates, and also because we cannot reliably separate indexing from merging
42+
// operations at this high abstraction level (merging is triggered more or less automatically in the background)
43+
MERGE_DISK_HIGH_WATERMARK_BYTES = randomLongBetween(1_000_000L, 2_000_000L);
44+
}
45+
46+
@Override
47+
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
48+
return Settings.builder()
49+
.put(super.nodeSettings(nodeOrdinal, otherSettings))
50+
// only the threadpool-based merge scheduler has the capability to block merges when disk space is insufficient
51+
.put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true)
52+
// the very short disk space polling interval ensures timely blocking of merges
53+
.put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), "10ms")
54+
// merges pile up more easily when there's only a few threads executing them
55+
.put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), randomIntBetween(1, 2))
56+
.put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING.getKey(), MERGE_DISK_HIGH_WATERMARK_BYTES + "b")
57+
// let's not worry about allocation watermarks (e.g. read-only shards) in this test suite
58+
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "0b")
59+
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "0b")
60+
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "0b")
61+
.build();
62+
}
63+
64+
public void testShardCloseWhenDiskSpaceInsufficient() throws Exception {
65+
String node = internalCluster().startNode();
66+
setTotalSpace(node, Long.MAX_VALUE);
67+
var indicesService = internalCluster().getInstance(IndicesService.class, node);
68+
ensureStableCluster(1);
69+
// create index
70+
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
71+
createIndex(
72+
indexName,
73+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build()
74+
);
75+
// do some indexing
76+
indexRandom(
77+
false,
78+
false,
79+
false,
80+
false,
81+
IntStream.range(1, randomIntBetween(2, 10))
82+
.mapToObj(i -> prepareIndex(indexName).setSource("field", randomAlphaOfLength(50)))
83+
.toList()
84+
);
85+
// get current disk space usage
86+
IndicesStatsResponse stats = indicesAdmin().prepareStats().clear().setStore(true).get();
87+
long usedDiskSpaceAfterIndexing = stats.getTotal().getStore().sizeInBytes();
88+
// restrict the total disk space such that the next merge does not have sufficient disk space
89+
long insufficientTotalDiskSpace = usedDiskSpaceAfterIndexing + MERGE_DISK_HIGH_WATERMARK_BYTES - randomLongBetween(1L, 10L);
90+
setTotalSpace(node, insufficientTotalDiskSpace);
91+
// node stats' FS stats should report that there is insufficient disk space available
92+
assertBusy(() -> {
93+
NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setFs(true).get();
94+
assertThat(nodesStatsResponse.getNodes().size(), equalTo(1));
95+
NodeStats nodeStats = nodesStatsResponse.getNodes().get(0);
96+
assertThat(nodeStats.getFs().getTotal().getTotal().getBytes(), equalTo(insufficientTotalDiskSpace));
97+
assertThat(nodeStats.getFs().getTotal().getAvailable().getBytes(), lessThan(MERGE_DISK_HIGH_WATERMARK_BYTES));
98+
});
99+
while (true) {
100+
// maybe trigger a merge (this still depends on the merge policy, i.e. it is not 100% guaranteed)
101+
assertNoFailures(indicesAdmin().prepareForceMerge(indexName).get());
102+
// keep indexing and ask for merging until node stats' threadpool stats reports enqueued merges,
103+
// and the merge executor says they're blocked due to insufficient disk space if (nodesStatsResponse.getNodes()
104+
NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setThreadPool(true).get();
105+
if (nodesStatsResponse.getNodes()
106+
.getFirst()
107+
.getThreadPool()
108+
.stats()
109+
.stream()
110+
.filter(s -> ThreadPool.Names.MERGE.equals(s.name()))
111+
.findAny()
112+
.get()
113+
.queue() > 0
114+
&& indicesService.getThreadPoolMergeExecutorService().isMergingBlockedDueToInsufficientDiskSpace()) {
115+
break;
116+
}
117+
// more indexing
118+
indexRandom(
119+
false,
120+
false,
121+
false,
122+
false,
123+
IntStream.range(1, randomIntBetween(2, 10))
124+
.mapToObj(i -> prepareIndex(indexName).setSource("another_field", randomAlphaOfLength(50)))
125+
.toList()
126+
);
127+
}
128+
// now delete the index in this state, i.e. with merges enqueued and blocked
129+
assertAcked(indicesAdmin().prepareDelete(indexName).get());
130+
// index should now be gone
131+
assertBusy(() -> {
132+
expectThrows(
133+
IndexNotFoundException.class,
134+
() -> indicesAdmin().prepareGetIndex(TEST_REQUEST_TIMEOUT).setIndices(indexName).get()
135+
);
136+
});
137+
assertBusy(() -> {
138+
// merge thread pool should be done with the enqueue merge tasks
139+
NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setThreadPool(true).get();
140+
assertThat(
141+
nodesStatsResponse.getNodes()
142+
.getFirst()
143+
.getThreadPool()
144+
.stats()
145+
.stream()
146+
.filter(s -> ThreadPool.Names.MERGE.equals(s.name()))
147+
.findAny()
148+
.get()
149+
.queue(),
150+
equalTo(0)
151+
);
152+
// and the merge executor should also report that merging is done now
153+
assertFalse(indicesService.getThreadPoolMergeExecutorService().isMergingBlockedDueToInsufficientDiskSpace());
154+
assertTrue(indicesService.getThreadPoolMergeExecutorService().allDone());
155+
});
156+
}
157+
158+
public void setTotalSpace(String dataNodeName, long totalSpace) {
159+
getTestFileStore(dataNodeName).setTotalSpace(totalSpace);
160+
refreshClusterInfo();
161+
}
162+
}

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java

Lines changed: 50 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.elasticsearch.core.Nullable;
2121
import org.elasticsearch.core.Releasable;
2222
import org.elasticsearch.core.TimeValue;
23+
import org.elasticsearch.core.Tuple;
2324
import org.elasticsearch.env.NodeEnvironment;
2425
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.MergeTask;
2526
import org.elasticsearch.monitor.fs.FsInfo;
@@ -28,6 +29,7 @@
2829

2930
import java.io.Closeable;
3031
import java.io.IOException;
32+
import java.util.ArrayList;
3133
import java.util.Arrays;
3234
import java.util.Comparator;
3335
import java.util.IdentityHashMap;
@@ -59,10 +61,7 @@ public class ThreadPoolMergeExecutorService implements Closeable {
5961
/** How frequently we check disk usage (default: 5 seconds). */
6062
public static final Setting<TimeValue> INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING = Setting.positiveTimeSetting(
6163
"indices.merge.disk.check_interval",
62-
// disabled by default
63-
// there's currently a problem where (aborting) merges are blocked when shards are closed (because disk space is insufficient)
64-
// see: https://github.com/elastic/elasticsearch/issues/129335
65-
TimeValue.timeValueSeconds(0),
64+
TimeValue.timeValueSeconds(5),
6665
Property.Dynamic,
6766
Property.NodeScope
6867
);
@@ -294,6 +293,10 @@ public boolean allDone() {
294293
return queuedMergeTasks.isQueueEmpty() && runningMergeTasks.isEmpty() && ioThrottledMergeTasksCount.get() == 0L;
295294
}
296295

296+
public boolean isMergingBlockedDueToInsufficientDiskSpace() {
297+
return availableDiskSpacePeriodicMonitor.isScheduled() && queuedMergeTasks.queueHeadIsOverTheAvailableBudget();
298+
}
299+
297300
/**
298301
* Enqueues a runnable that executes exactly one merge task, the smallest that is runnable at some point in time.
299302
* A merge task is not runnable if its scheduler already reached the configured max-allowed concurrency level.
@@ -550,9 +553,8 @@ private static ByteSizeValue getFreeBytesThreshold(
550553

551554
static class MergeTaskPriorityBlockingQueue extends PriorityBlockingQueueWithBudget<MergeTask> {
552555
MergeTaskPriorityBlockingQueue() {
553-
// start with 0 budget (so takes on this queue will always block until {@link #updateBudget} is invoked)
554-
// use the estimated *remaining* merge size as the budget function so that the disk space budget of taken (in-use) elements is
555-
// updated according to the remaining disk space requirements of the currently running merge tasks
556+
// by default, start with 0 budget (so takes on this queue will always block until the first {@link #updateBudget} is invoked)
557+
// use the estimated *remaining* merge size as the budget function so that the disk space budget of elements is updated
556558
super(MergeTask::estimatedRemainingMergeSize, 0L);
557559
}
558560

@@ -563,7 +565,7 @@ long getAvailableBudget() {
563565

564566
// exposed for tests
565567
MergeTask peekQueue() {
566-
return enqueuedByBudget.peek();
568+
return enqueuedByBudget.peek().v1();
567569
}
568570
}
569571

@@ -573,15 +575,15 @@ MergeTask peekQueue() {
573575
*/
574576
static class PriorityBlockingQueueWithBudget<E> {
575577
private final ToLongFunction<? super E> budgetFunction;
576-
protected final PriorityQueue<E> enqueuedByBudget;
578+
protected final PriorityQueue<Tuple<E, Long>> enqueuedByBudget;
577579
private final IdentityHashMap<ElementWithReleasableBudget, Long> unreleasedBudgetPerElement;
578580
private final ReentrantLock lock;
579581
private final Condition elementAvailable;
580582
protected long availableBudget;
581583

582584
PriorityBlockingQueueWithBudget(ToLongFunction<? super E> budgetFunction, long initialAvailableBudget) {
583585
this.budgetFunction = budgetFunction;
584-
this.enqueuedByBudget = new PriorityQueue<>(64, Comparator.comparingLong(budgetFunction));
586+
this.enqueuedByBudget = new PriorityQueue<>(64, Comparator.comparingLong(Tuple::v2));
585587
this.unreleasedBudgetPerElement = new IdentityHashMap<>();
586588
this.lock = new ReentrantLock();
587589
this.elementAvailable = lock.newCondition();
@@ -592,7 +594,7 @@ boolean enqueue(E e) {
592594
final ReentrantLock lock = this.lock;
593595
lock.lock();
594596
try {
595-
enqueuedByBudget.offer(e);
597+
enqueuedByBudget.offer(new Tuple<>(e, budgetFunction.applyAsLong(e)));
596598
elementAvailable.signal();
597599
} finally {
598600
lock.unlock();
@@ -608,22 +610,22 @@ ElementWithReleasableBudget take() throws InterruptedException {
608610
final ReentrantLock lock = this.lock;
609611
lock.lockInterruptibly();
610612
try {
611-
E peek;
612-
long peekBudget;
613+
Tuple<E, Long> head;
613614
// blocks until the smallest budget element fits the currently available budget
614-
while ((peek = enqueuedByBudget.peek()) == null || (peekBudget = budgetFunction.applyAsLong(peek)) > availableBudget) {
615+
while ((head = enqueuedByBudget.peek()) == null || head.v2() > availableBudget) {
615616
elementAvailable.await();
616617
}
618+
head = enqueuedByBudget.poll();
617619
// deducts and holds up that element's budget from the available budget
618-
return newElementWithReleasableBudget(enqueuedByBudget.poll(), peekBudget);
620+
return newElementWithReleasableBudget(head.v1(), head.v2());
619621
} finally {
620622
lock.unlock();
621623
}
622624
}
623625

624626
/**
625627
* Updates the available budged given the passed-in argument, from which it deducts the budget hold up by taken elements
626-
* that are still in use. The budget of in-use elements is also updated (by re-applying the budget function).
628+
* that are still in use. The elements budget is also updated by re-applying the budget function.
627629
* The newly updated budget is used to potentially block {@link #take()} operations if the smallest-budget enqueued element
628630
* is over this newly computed available budget.
629631
*/
@@ -632,20 +634,50 @@ void updateBudget(long availableBudget) {
632634
lock.lock();
633635
try {
634636
this.availableBudget = availableBudget;
635-
// update the per-element budget (these are all the elements that are using any budget)
637+
// updates the budget of enqueued elements (and possibly reorders the priority queue)
638+
updateBudgetOfEnqueuedElementsAndReorderQueue();
639+
// update the budget of dequeued, but still in-use elements (these are the elements that are consuming budget)
636640
unreleasedBudgetPerElement.replaceAll((e, v) -> budgetFunction.applyAsLong(e.element()));
637-
// available budget is decreased by the used per-element budget (for all dequeued elements that are still in use)
641+
// the available budget is decreased by the budget of still in-use elements (dequeued elements that are still in-use)
638642
this.availableBudget -= unreleasedBudgetPerElement.values().stream().mapToLong(i -> i).sum();
639643
elementAvailable.signalAll();
640644
} finally {
641645
lock.unlock();
642646
}
643647
}
644648

649+
private void updateBudgetOfEnqueuedElementsAndReorderQueue() {
650+
assert this.lock.isHeldByCurrentThread();
651+
int queueSizeBefore = enqueuedByBudget.size();
652+
var it = enqueuedByBudget.iterator();
653+
List<Tuple<E, Long>> elementsToReorder = new ArrayList<>();
654+
while (it.hasNext()) {
655+
var elementWithBudget = it.next();
656+
Long previousBudget = elementWithBudget.v2();
657+
long latestBudget = budgetFunction.applyAsLong(elementWithBudget.v1());
658+
if (previousBudget.equals(latestBudget) == false) {
659+
// the budget (estimation) of an enqueued element has changed
660+
// this element will be reordered by removing and reinserting using the latest budget (estimation)
661+
it.remove();
662+
elementsToReorder.add(new Tuple<>(elementWithBudget.v1(), latestBudget));
663+
}
664+
}
665+
// reinsert elements based on the latest budget (estimation)
666+
for (var reorderedElement : elementsToReorder) {
667+
enqueuedByBudget.offer(reorderedElement);
668+
}
669+
assert queueSizeBefore == enqueuedByBudget.size();
670+
}
671+
645672
boolean isQueueEmpty() {
646673
return enqueuedByBudget.isEmpty();
647674
}
648675

676+
boolean queueHeadIsOverTheAvailableBudget() {
677+
var head = enqueuedByBudget.peek();
678+
return head != null && head.v2() > availableBudget;
679+
}
680+
649681
int queueSize() {
650682
return enqueuedByBudget.size();
651683
}

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -537,8 +537,13 @@ void abort() {
537537
long estimatedRemainingMergeSize() {
538538
// TODO is it possible that `estimatedMergeBytes` be `0` for correctly initialize merges,
539539
// or is it always the case that if `estimatedMergeBytes` is `0` that means that the merge has not yet been initialized?
540-
long estimatedMergeSize = onGoingMerge.getMerge().getStoreMergeInfo().estimatedMergeBytes();
541-
return Math.max(0L, estimatedMergeSize - rateLimiter.getTotalBytesWritten());
540+
if (onGoingMerge.getMerge().isAborted()) {
541+
// if the merge is aborted the assumption is that merging will soon stop with negligible further writing
542+
return 0L;
543+
} else {
544+
long estimatedMergeSize = onGoingMerge.getMerge().getStoreMergeInfo().estimatedMergeBytes();
545+
return Math.max(0L, estimatedMergeSize - rateLimiter.getTotalBytesWritten());
546+
}
542547
}
543548

544549
public long getMergeMemoryEstimateBytes() {

0 commit comments

Comments
 (0)