Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
838fae3
update budget of enqueued elements
albertzaharovits Jun 18, 2025
e222ab9
Test scaffold
albertzaharovits Jun 18, 2025
4baa30c
Unmute test
albertzaharovits Jun 18, 2025
c7530d2
Restore setting default
albertzaharovits Jun 18, 2025
1363bbb
Merge branch 'main' into consider-aborting-merges-while-enqueued
albertzaharovits Jun 18, 2025
8d7b18d
[CI] Auto commit changes from spotless
Jun 18, 2025
51b5d62
Update docs/changelog/129613.yaml
albertzaharovits Jun 18, 2025
73abc28
ES|QL: No plain strings in Literal (#129399)
luigidellaquila Jun 18, 2025
ea996c1
More test scaffolding
albertzaharovits Jun 18, 2025
31fa7ae
Merge branch 'main' into consider-aborting-merges-while-enqueued
albertzaharovits Jun 18, 2025
d870bfb
boolean test for queue head over the available budget
albertzaharovits Jun 18, 2025
96b6cb9
Even more test scaffolding
albertzaharovits Jun 19, 2025
9297945
Merge branch 'main' into consider-aborting-merges-while-enqueued
albertzaharovits Jun 19, 2025
8b0507c
[CI] Auto commit changes from spotless
Jun 19, 2025
b6aed72
IT done
albertzaharovits Jun 19, 2025
1fc3b64
Fix testShardCloseWhenDiskSpaceInsufficient
albertzaharovits Jun 19, 2025
12119c5
[CI] Auto commit changes from spotless
Jun 19, 2025
328ac82
do not force merge when indexing
albertzaharovits Jun 19, 2025
f47a419
testEnqueuedMergeTasksAreUnblockedWhenEstimatedMergeSizeChanges
albertzaharovits Jun 19, 2025
f2afd19
Merge branch 'main' into consider-aborting-merges-while-enqueued
albertzaharovits Jun 19, 2025
2527c37
Delete docs/changelog/129613.yaml
albertzaharovits Jun 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/129613.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 129613
summary: Update budget estimates for enqueued merge tasks
area: Engine
type: bug
issues:
- 129335
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -505,9 +505,6 @@ tests:
- class: org.elasticsearch.xpack.esql.qa.single_node.GenerativeForkIT
method: test {lookup-join.MultipleBatches*
issue: https://github.com/elastic/elasticsearch/issues/129210
- class: org.elasticsearch.xpack.autoscaling.storage.ReactiveStorageIT
method: testScaleDuringSplitOrClone
issue: https://github.com/elastic/elasticsearch/issues/129335
- class: org.elasticsearch.entitlement.runtime.policy.FileAccessTreeTests
method: testWindowsMixedCaseAccess
issue: https://github.com/elastic/elasticsearch/issues/129167
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.engine;

import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.cluster.DiskUsageIntegTestCase;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESIntegTestCase;

import java.util.Locale;
import java.util.stream.IntStream;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class MergeWithLowDiskSpaceIT extends DiskUsageIntegTestCase {

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
// the default of "5s" slows down testing
.put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), "100ms")
.put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true)
.build();
}

public void testShardCloseWhenDiskSpaceInsufficient() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not clear to me what this verifies? AFAICS, there is no merge at the end of the test and thus it may not verify anything?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the test was not ready when you looked at it, it was still WIP, sorry for not being clear.

It is now ready and it tests that we can close a shard (an index) with enqueued merges that are blocked due to insufficient disk space. The merges will be aborted, which should unblock and prioritize them in the queue.

internalCluster().startNode();
ensureStableCluster(1);
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createIndex(
indexName,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build()
);
indexRandom(
true,
IntStream.range(1, 100)
.mapToObj(i -> prepareIndex(indexName).setSource("field", randomAlphaOfLength(50)))
.toArray(IndexRequestBuilder[]::new)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.MergeTask;
import org.elasticsearch.monitor.fs.FsInfo;
Expand All @@ -28,6 +29,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.IdentityHashMap;
Expand Down Expand Up @@ -59,10 +61,7 @@ public class ThreadPoolMergeExecutorService implements Closeable {
/** How frequently we check disk usage (default: 5 seconds). */
public static final Setting<TimeValue> INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING = Setting.positiveTimeSetting(
"indices.merge.disk.check_interval",
// disabled by default
// there's currently a problem where (aborting) merges are blocked when shards are closed (because disk space is insufficient)
// see: https://github.com/elastic/elasticsearch/issues/129335
TimeValue.timeValueSeconds(0),
TimeValue.timeValueSeconds(5),
Property.Dynamic,
Property.NodeScope
);
Expand Down Expand Up @@ -550,9 +549,8 @@ private static ByteSizeValue getFreeBytesThreshold(

static class MergeTaskPriorityBlockingQueue extends PriorityBlockingQueueWithBudget<MergeTask> {
MergeTaskPriorityBlockingQueue() {
// start with 0 budget (so takes on this queue will always block until {@link #updateBudget} is invoked)
// use the estimated *remaining* merge size as the budget function so that the disk space budget of taken (in-use) elements is
// updated according to the remaining disk space requirements of the currently running merge tasks
// by default, start with 0 budget (so takes on this queue will always block until the first {@link #updateBudget} is invoked)
// use the estimated *remaining* merge size as the budget function so that the disk space budget of elements is updated
super(MergeTask::estimatedRemainingMergeSize, 0L);
}

Expand All @@ -563,7 +561,7 @@ long getAvailableBudget() {

// exposed for tests
MergeTask peekQueue() {
return enqueuedByBudget.peek();
return enqueuedByBudget.peek().v1();
}
}

Expand All @@ -573,15 +571,15 @@ MergeTask peekQueue() {
*/
static class PriorityBlockingQueueWithBudget<E> {
private final ToLongFunction<? super E> budgetFunction;
protected final PriorityQueue<E> enqueuedByBudget;
protected final PriorityQueue<Tuple<E, Long>> enqueuedByBudget;
private final IdentityHashMap<ElementWithReleasableBudget, Long> unreleasedBudgetPerElement;
private final ReentrantLock lock;
private final Condition elementAvailable;
protected long availableBudget;

PriorityBlockingQueueWithBudget(ToLongFunction<? super E> budgetFunction, long initialAvailableBudget) {
this.budgetFunction = budgetFunction;
this.enqueuedByBudget = new PriorityQueue<>(64, Comparator.comparingLong(budgetFunction));
this.enqueuedByBudget = new PriorityQueue<>(64, Comparator.comparingLong(Tuple::v2));
this.unreleasedBudgetPerElement = new IdentityHashMap<>();
this.lock = new ReentrantLock();
this.elementAvailable = lock.newCondition();
Expand All @@ -592,7 +590,7 @@ boolean enqueue(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
enqueuedByBudget.offer(e);
enqueuedByBudget.offer(new Tuple<>(e, budgetFunction.applyAsLong(e)));
elementAvailable.signal();
} finally {
lock.unlock();
Expand All @@ -608,22 +606,22 @@ ElementWithReleasableBudget take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
E peek;
long peekBudget;
Tuple<E, Long> head;
// blocks until the smallest budget element fits the currently available budget
while ((peek = enqueuedByBudget.peek()) == null || (peekBudget = budgetFunction.applyAsLong(peek)) > availableBudget) {
while ((head = enqueuedByBudget.peek()) == null || head.v2() > availableBudget) {
elementAvailable.await();
}
head = enqueuedByBudget.poll();
// deducts and holds up that element's budget from the available budget
return newElementWithReleasableBudget(enqueuedByBudget.poll(), peekBudget);
return newElementWithReleasableBudget(head.v1(), head.v2());
} finally {
lock.unlock();
}
}

/**
* Updates the available budged given the passed-in argument, from which it deducts the budget hold up by taken elements
* that are still in use. The budget of in-use elements is also updated (by re-applying the budget function).
* that are still in use. The elements budget is also updated by re-applying the budget function.
* The newly updated budget is used to potentially block {@link #take()} operations if the smallest-budget enqueued element
* is over this newly computed available budget.
*/
Expand All @@ -632,16 +630,41 @@ void updateBudget(long availableBudget) {
lock.lock();
try {
this.availableBudget = availableBudget;
// update the per-element budget (these are all the elements that are using any budget)
// updates the budget of enqueued elements (and possibly reorders the priority queue)
updateBudgetOfEnqueuedElementsAndReorderQueue();
// update the budget of dequeued, but still in-use elements (these are the elements that are consuming budget)
unreleasedBudgetPerElement.replaceAll((e, v) -> budgetFunction.applyAsLong(e.element()));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change will also adjust the budget of running merges that have been aborted to 0. That's a bit optimistic, but I find the alternative implementation convoluted, and it's probably counter-intuitive to estimate 0 for to-be-run merges but not for already-running ones.

// available budget is decreased by the used per-element budget (for all dequeued elements that are still in use)
// the available budget is decreased by the budget of still in-use elements (dequeued elements that are still in-use)
this.availableBudget -= unreleasedBudgetPerElement.values().stream().mapToLong(i -> i).sum();
elementAvailable.signalAll();
} finally {
lock.unlock();
}
}

private void updateBudgetOfEnqueuedElementsAndReorderQueue() {
assert this.lock.isHeldByCurrentThread();
int queueSizeBefore = enqueuedByBudget.size();
var it = enqueuedByBudget.iterator();
List<Tuple<E, Long>> elementsToReorder = new ArrayList<>();
while (it.hasNext()) {
var elementWithBudget = it.next();
Long previousBudget = elementWithBudget.v2();
long latestBudget = budgetFunction.applyAsLong(elementWithBudget.v1());
if (previousBudget.equals(latestBudget) == false) {
// the budget (estimation) of an enqueued element has changed
// this element will be reordered by removing and reinserting using the latest budget (estimation)
it.remove();
elementsToReorder.add(new Tuple<>(elementWithBudget.v1(), latestBudget));
}
}
// reinsert elements based on the latest budget (estimation)
for (var reorderedElement : elementsToReorder) {
enqueuedByBudget.offer(reorderedElement);
}
assert queueSizeBefore == enqueuedByBudget.size();
}

boolean isQueueEmpty() {
return enqueuedByBudget.isEmpty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,8 +525,13 @@ void abort() {
long estimatedRemainingMergeSize() {
// TODO is it possible that `estimatedMergeBytes` be `0` for correctly initialize merges,
// or is it always the case that if `estimatedMergeBytes` is `0` that means that the merge has not yet been initialized?
long estimatedMergeSize = onGoingMerge.getMerge().getStoreMergeInfo().estimatedMergeBytes();
return Math.max(0L, estimatedMergeSize - rateLimiter.getTotalBytesWritten());
if (onGoingMerge.getMerge().isAborted()) {
// if the merge is aborted the assumption is that merging will soon stop with negligible further writing
return 0L;
} else {
long estimatedMergeSize = onGoingMerge.getMerge().getStoreMergeInfo().estimatedMergeBytes();
return Math.max(0L, estimatedMergeSize - rateLimiter.getTotalBytesWritten());
}
}

public long getMergeMemoryEstimateBytes() {
Expand Down
Loading