-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Track & log when there is insufficient disk space available to execute merges #131711
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
5b72bb0
d0cac29
7fc3e4d
d941f2c
88d5f0d
d334433
7d33a29
91901dc
5352c0a
595ad45
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| pr: 131711 | ||
| summary: Track & log when there is insufficient disk space available to execute merges | ||
| area: Engine | ||
| type: enhancement | ||
| issues: [] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -35,6 +35,7 @@ | |
| import java.util.IdentityHashMap; | ||
| import java.util.Iterator; | ||
| import java.util.List; | ||
| import java.util.Locale; | ||
| import java.util.Map; | ||
| import java.util.PriorityQueue; | ||
| import java.util.Set; | ||
|
|
@@ -552,6 +553,8 @@ private static ByteSizeValue getFreeBytesThreshold( | |
| } | ||
|
|
||
| static class MergeTaskPriorityBlockingQueue extends PriorityBlockingQueueWithBudget<MergeTask> { | ||
| private static final Logger LOGGER = LogManager.getLogger(MergeTaskPriorityBlockingQueue.class); | ||
|
|
||
| MergeTaskPriorityBlockingQueue() { | ||
| // by default, start with 0 budget (so takes on this queue will always block until the first {@link #updateBudget} is invoked) | ||
| // use the estimated *remaining* merge size as the budget function so that the disk space budget of elements is updated | ||
|
|
@@ -567,6 +570,40 @@ long getAvailableBudget() { | |
| MergeTask peekQueue() { | ||
| return enqueuedByBudget.peek().v1(); | ||
| } | ||
|
|
||
| @Override | ||
| void postBudgetUpdate() { | ||
| assert super.lock.isHeldByCurrentThread(); | ||
| Tuple<MergeTask, Long> head = enqueuedByBudget.peek(); | ||
| if (head != null && head.v2() > availableBudget) { | ||
| LOGGER.warn("There are merge tasks enqueued but there's insufficient disk space available to execute them"); | ||
| if (LOGGER.isDebugEnabled()) { | ||
| if (unreleasedBudgetPerElement.isEmpty()) { | ||
| LOGGER.debug( | ||
| String.format( | ||
| Locale.ROOT, | ||
| "There are no merge tasks currently running, " | ||
| + "but there are [%d] enqueued ones that are blocked because of insufficient disk space", | ||
| enqueuedByBudget.size() | ||
|
||
| ) | ||
| ); | ||
| } else { | ||
| StringBuilder messageBuilder = new StringBuilder(); | ||
| messageBuilder.append("The following merge tasks are currently running ["); | ||
| for (var runningMergeTask : super.unreleasedBudgetPerElement.entrySet()) { | ||
| messageBuilder.append(runningMergeTask.getKey().element().toString()); | ||
| messageBuilder.append(" with disk space budgets in bytes " + runningMergeTask.getValue() + " , "); | ||
| } | ||
| messageBuilder.delete(messageBuilder.length() - 3, messageBuilder.length()); | ||
| messageBuilder.append( | ||
| "], and there are [" | ||
| + enqueuedByBudget.size() | ||
| + "] additional enqueued ones that are blocked because of insufficient disk space" | ||
|
||
| ); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -576,7 +613,7 @@ MergeTask peekQueue() { | |
| static class PriorityBlockingQueueWithBudget<E> { | ||
| private final ToLongFunction<? super E> budgetFunction; | ||
| protected final PriorityQueue<Tuple<E, Long>> enqueuedByBudget; | ||
| private final IdentityHashMap<ElementWithReleasableBudget, Long> unreleasedBudgetPerElement; | ||
| protected final IdentityHashMap<ElementWithReleasableBudget, Budgets> unreleasedBudgetPerElement; | ||
| private final ReentrantLock lock; | ||
| private final Condition elementAvailable; | ||
| protected long availableBudget; | ||
|
|
@@ -637,15 +674,21 @@ void updateBudget(long availableBudget) { | |
| // updates the budget of enqueued elements (and possibly reorders the priority queue) | ||
| updateBudgetOfEnqueuedElementsAndReorderQueue(); | ||
| // update the budget of dequeued, but still in-use elements (these are the elements that are consuming budget) | ||
| unreleasedBudgetPerElement.replaceAll((e, v) -> budgetFunction.applyAsLong(e.element())); | ||
| unreleasedBudgetPerElement.replaceAll((e, v) -> v.updateBudgetEstimation(budgetFunction.applyAsLong(e.element()))); | ||
| // the available budget is decreased by the budget of still in-use elements (dequeued elements that are still in-use) | ||
| this.availableBudget -= unreleasedBudgetPerElement.values().stream().mapToLong(i -> i).sum(); | ||
| this.availableBudget -= unreleasedBudgetPerElement.values() | ||
| .stream() | ||
| .mapToLong(i -> i.latestBudgetEstimationForElement) | ||
| .sum(); | ||
| elementAvailable.signalAll(); | ||
| postBudgetUpdate(); | ||
| } finally { | ||
| lock.unlock(); | ||
| } | ||
| } | ||
|
|
||
| void postBudgetUpdate() {}; | ||
|
||
|
|
||
| private void updateBudgetOfEnqueuedElementsAndReorderQueue() { | ||
| assert this.lock.isHeldByCurrentThread(); | ||
| int queueSizeBefore = enqueuedByBudget.size(); | ||
|
|
@@ -686,7 +729,7 @@ private ElementWithReleasableBudget newElementWithReleasableBudget(E element, lo | |
| ElementWithReleasableBudget elementWithReleasableBudget = new ElementWithReleasableBudget(element); | ||
| assert this.lock.isHeldByCurrentThread(); | ||
| // the taken element holds up some budget | ||
| var prev = this.unreleasedBudgetPerElement.put(elementWithReleasableBudget, budget); | ||
| var prev = this.unreleasedBudgetPerElement.put(elementWithReleasableBudget, new Budgets(budget, budget, this.availableBudget)); | ||
| assert prev == null; | ||
| this.availableBudget -= budget; | ||
| assert this.availableBudget >= 0L; | ||
|
|
@@ -736,6 +779,16 @@ E element() { | |
| return element; | ||
| } | ||
| } | ||
|
|
||
| record Budgets(long initialBudgetEstimationForElement, long latestBudgetEstimationForElement, long initialTotalAvailableBudget) { | ||
| Budgets updateBudgetEstimation(long latestBudgetEstimationForElement) { | ||
| return new Budgets( | ||
| this.initialBudgetEstimationForElement, | ||
| latestBudgetEstimationForElement, | ||
| this.initialTotalAvailableBudget | ||
| ); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private static long newTargetIORateBytesPerSec( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These log messages will be printed every
indices.merge.disk.check_interval(5 sec). That sounds Ok to you, right?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be useful to include the
availableBudgetandhead.v2()values in the log message too?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, thanks for suggesting! Pushed: d334433