diff --git a/docs/changelog/131711.yaml b/docs/changelog/131711.yaml new file mode 100644 index 0000000000000..f10b3663c8e5f --- /dev/null +++ b/docs/changelog/131711.yaml @@ -0,0 +1,5 @@ +pr: 131711 +summary: Track & log when there is insufficient disk space available to execute merges +area: Engine +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java index f6e9257200002..9efb582007aaf 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java @@ -35,6 +35,7 @@ import java.util.IdentityHashMap; import java.util.Iterator; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.PriorityQueue; import java.util.Set; @@ -552,6 +553,8 @@ private static ByteSizeValue getFreeBytesThreshold( } static class MergeTaskPriorityBlockingQueue extends PriorityBlockingQueueWithBudget { + private static final Logger LOGGER = LogManager.getLogger(MergeTaskPriorityBlockingQueue.class); + MergeTaskPriorityBlockingQueue() { // by default, start with 0 budget (so takes on this queue will always block until the first {@link #updateBudget} is invoked) // use the estimated *remaining* merge size as the budget function so that the disk space budget of elements is updated @@ -567,6 +570,55 @@ long getAvailableBudget() { MergeTask peekQueue() { return enqueuedByBudget.peek().v1(); } + + @Override + void postBudgetUpdate() { + assert super.lock.isHeldByCurrentThread(); + Tuple head = enqueuedByBudget.peek(); + if (head != null && head.v2() > availableBudget) { + LOGGER.warn( + String.format( + Locale.ROOT, + "There are merge tasks enqueued but there's insufficient disk space available to execute them " + + "(the smallest merge task requires [%d] bytes, but the available disk space is only [%d] bytes)", + head.v2(), + availableBudget + ) + ); + if (LOGGER.isDebugEnabled()) { + if (unreleasedBudgetPerElement.isEmpty()) { + LOGGER.debug( + String.format( + Locale.ROOT, + "There are no merge tasks currently running, " + + "but there are [%d] enqueued ones that are blocked because of insufficient disk space " + + "(the smallest merge task requires [%d] bytes, but the available disk space is only [%d] bytes)", + enqueuedByBudget.size(), + head.v2(), + availableBudget + ) + ); + } else { + StringBuilder messageBuilder = new StringBuilder(); + messageBuilder.append("The following merge tasks are currently running ["); + for (var runningMergeTask : super.unreleasedBudgetPerElement.entrySet()) { + messageBuilder.append(runningMergeTask.getKey().element().toString()); + messageBuilder.append(" with disk space budgets in bytes ").append(runningMergeTask.getValue()).append(" , "); + } + messageBuilder.delete(messageBuilder.length() - 3, messageBuilder.length()); + messageBuilder.append("], and there are [") + .append(enqueuedByBudget.size()) + .append("] additional enqueued ones that are blocked because of insufficient disk space"); + messageBuilder.append(" (the smallest merge task requires [") + .append(head.v2()) + .append("] bytes, but the available disk space is only [") + .append(availableBudget) + .append("] bytes)"); + LOGGER.debug(messageBuilder.toString()); + } + } + } + } } /** @@ -576,7 +628,7 @@ MergeTask peekQueue() { static class PriorityBlockingQueueWithBudget { private final ToLongFunction budgetFunction; protected final PriorityQueue> enqueuedByBudget; - private final IdentityHashMap unreleasedBudgetPerElement; + protected final IdentityHashMap unreleasedBudgetPerElement; private final ReentrantLock lock; private final Condition elementAvailable; protected long availableBudget; @@ -637,15 +689,23 @@ void updateBudget(long availableBudget) { // updates the budget of enqueued elements (and possibly reorders the priority queue) updateBudgetOfEnqueuedElementsAndReorderQueue(); // update the budget of dequeued, but still in-use elements (these are the elements that are consuming budget) - unreleasedBudgetPerElement.replaceAll((e, v) -> budgetFunction.applyAsLong(e.element())); + unreleasedBudgetPerElement.replaceAll((e, v) -> v.updateBudgetEstimation(budgetFunction.applyAsLong(e.element()))); // the available budget is decreased by the budget of still in-use elements (dequeued elements that are still in-use) - this.availableBudget -= unreleasedBudgetPerElement.values().stream().mapToLong(i -> i).sum(); + this.availableBudget -= unreleasedBudgetPerElement.values() + .stream() + .mapToLong(i -> i.latestBudgetEstimationForElement) + .sum(); elementAvailable.signalAll(); + postBudgetUpdate(); } finally { lock.unlock(); } } + void postBudgetUpdate() { + assert lock.isHeldByCurrentThread(); + }; + private void updateBudgetOfEnqueuedElementsAndReorderQueue() { assert this.lock.isHeldByCurrentThread(); int queueSizeBefore = enqueuedByBudget.size(); @@ -686,7 +746,7 @@ private ElementWithReleasableBudget newElementWithReleasableBudget(E element, lo ElementWithReleasableBudget elementWithReleasableBudget = new ElementWithReleasableBudget(element); assert this.lock.isHeldByCurrentThread(); // the taken element holds up some budget - var prev = this.unreleasedBudgetPerElement.put(elementWithReleasableBudget, budget); + var prev = this.unreleasedBudgetPerElement.put(elementWithReleasableBudget, new Budgets(budget, budget, this.availableBudget)); assert prev == null; this.availableBudget -= budget; assert this.availableBudget >= 0L; @@ -736,6 +796,16 @@ E element() { return element; } } + + record Budgets(long initialBudgetEstimationForElement, long latestBudgetEstimationForElement, long initialTotalAvailableBudget) { + Budgets updateBudgetEstimation(long latestBudgetEstimationForElement) { + return new Budgets( + this.initialBudgetEstimationForElement, + latestBudgetEstimationForElement, + this.initialTotalAvailableBudget + ); + } + } } private static long newTargetIORateBytesPerSec(