From de1b9d556e8f6b91a07cc9885fdd76d0d20bcc45 Mon Sep 17 00:00:00 2001 From: Albert Zaharovits Date: Wed, 23 Jul 2025 18:17:20 +0300 Subject: [PATCH] Track & log when there is insufficient disk space available to execute merges (#131711) This goal of this PR is to record information about the available disk space at the time that merge tasks are scheduled (the info will show up in heap dumps). This should aid troubleshooting in cases where there are running merge tasks all while the available disk space on the node is below the watermark. In this case we should increase the threshold for scheduling merge tasks `indices.merge.disk.watermark.high`, and possibly consider implementing aborting already executing merges too. Relates https://github.com/elastic/elasticsearch/issues/88606#issuecomment-3027324084 --- docs/changelog/131711.yaml | 5 ++ .../ThreadPoolMergeExecutorService.java | 78 ++++++++++++++++++- 2 files changed, 79 insertions(+), 4 deletions(-) create mode 100644 docs/changelog/131711.yaml diff --git a/docs/changelog/131711.yaml b/docs/changelog/131711.yaml new file mode 100644 index 0000000000000..f10b3663c8e5f --- /dev/null +++ b/docs/changelog/131711.yaml @@ -0,0 +1,5 @@ +pr: 131711 +summary: Track & log when there is insufficient disk space available to execute merges +area: Engine +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java index f6e9257200002..9efb582007aaf 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java @@ -35,6 +35,7 @@ import java.util.IdentityHashMap; import java.util.Iterator; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.PriorityQueue; import java.util.Set; @@ -552,6 +553,8 @@ private static ByteSizeValue getFreeBytesThreshold( } static class MergeTaskPriorityBlockingQueue extends PriorityBlockingQueueWithBudget { + private static final Logger LOGGER = LogManager.getLogger(MergeTaskPriorityBlockingQueue.class); + MergeTaskPriorityBlockingQueue() { // by default, start with 0 budget (so takes on this queue will always block until the first {@link #updateBudget} is invoked) // use the estimated *remaining* merge size as the budget function so that the disk space budget of elements is updated @@ -567,6 +570,55 @@ long getAvailableBudget() { MergeTask peekQueue() { return enqueuedByBudget.peek().v1(); } + + @Override + void postBudgetUpdate() { + assert super.lock.isHeldByCurrentThread(); + Tuple head = enqueuedByBudget.peek(); + if (head != null && head.v2() > availableBudget) { + LOGGER.warn( + String.format( + Locale.ROOT, + "There are merge tasks enqueued but there's insufficient disk space available to execute them " + + "(the smallest merge task requires [%d] bytes, but the available disk space is only [%d] bytes)", + head.v2(), + availableBudget + ) + ); + if (LOGGER.isDebugEnabled()) { + if (unreleasedBudgetPerElement.isEmpty()) { + LOGGER.debug( + String.format( + Locale.ROOT, + "There are no merge tasks currently running, " + + "but there are [%d] enqueued ones that are blocked because of insufficient disk space " + + "(the smallest merge task requires [%d] bytes, but the available disk space is only [%d] bytes)", + enqueuedByBudget.size(), + head.v2(), + availableBudget + ) + ); + } else { + StringBuilder messageBuilder = new StringBuilder(); + messageBuilder.append("The following merge tasks are currently running ["); + for (var runningMergeTask : super.unreleasedBudgetPerElement.entrySet()) { + messageBuilder.append(runningMergeTask.getKey().element().toString()); + messageBuilder.append(" with disk space budgets in bytes ").append(runningMergeTask.getValue()).append(" , "); + } + messageBuilder.delete(messageBuilder.length() - 3, messageBuilder.length()); + messageBuilder.append("], and there are [") + .append(enqueuedByBudget.size()) + .append("] additional enqueued ones that are blocked because of insufficient disk space"); + messageBuilder.append(" (the smallest merge task requires [") + .append(head.v2()) + .append("] bytes, but the available disk space is only [") + .append(availableBudget) + .append("] bytes)"); + LOGGER.debug(messageBuilder.toString()); + } + } + } + } } /** @@ -576,7 +628,7 @@ MergeTask peekQueue() { static class PriorityBlockingQueueWithBudget { private final ToLongFunction budgetFunction; protected final PriorityQueue> enqueuedByBudget; - private final IdentityHashMap unreleasedBudgetPerElement; + protected final IdentityHashMap unreleasedBudgetPerElement; private final ReentrantLock lock; private final Condition elementAvailable; protected long availableBudget; @@ -637,15 +689,23 @@ void updateBudget(long availableBudget) { // updates the budget of enqueued elements (and possibly reorders the priority queue) updateBudgetOfEnqueuedElementsAndReorderQueue(); // update the budget of dequeued, but still in-use elements (these are the elements that are consuming budget) - unreleasedBudgetPerElement.replaceAll((e, v) -> budgetFunction.applyAsLong(e.element())); + unreleasedBudgetPerElement.replaceAll((e, v) -> v.updateBudgetEstimation(budgetFunction.applyAsLong(e.element()))); // the available budget is decreased by the budget of still in-use elements (dequeued elements that are still in-use) - this.availableBudget -= unreleasedBudgetPerElement.values().stream().mapToLong(i -> i).sum(); + this.availableBudget -= unreleasedBudgetPerElement.values() + .stream() + .mapToLong(i -> i.latestBudgetEstimationForElement) + .sum(); elementAvailable.signalAll(); + postBudgetUpdate(); } finally { lock.unlock(); } } + void postBudgetUpdate() { + assert lock.isHeldByCurrentThread(); + }; + private void updateBudgetOfEnqueuedElementsAndReorderQueue() { assert this.lock.isHeldByCurrentThread(); int queueSizeBefore = enqueuedByBudget.size(); @@ -686,7 +746,7 @@ private ElementWithReleasableBudget newElementWithReleasableBudget(E element, lo ElementWithReleasableBudget elementWithReleasableBudget = new ElementWithReleasableBudget(element); assert this.lock.isHeldByCurrentThread(); // the taken element holds up some budget - var prev = this.unreleasedBudgetPerElement.put(elementWithReleasableBudget, budget); + var prev = this.unreleasedBudgetPerElement.put(elementWithReleasableBudget, new Budgets(budget, budget, this.availableBudget)); assert prev == null; this.availableBudget -= budget; assert this.availableBudget >= 0L; @@ -736,6 +796,16 @@ E element() { return element; } } + + record Budgets(long initialBudgetEstimationForElement, long latestBudgetEstimationForElement, long initialTotalAvailableBudget) { + Budgets updateBudgetEstimation(long latestBudgetEstimationForElement) { + return new Budgets( + this.initialBudgetEstimationForElement, + latestBudgetEstimationForElement, + this.initialTotalAvailableBudget + ); + } + } } private static long newTargetIORateBytesPerSec(