Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/131711.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 131711
summary: Track & log when there is insufficient disk space available to execute merges
area: Engine
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
Expand Down Expand Up @@ -552,6 +553,7 @@ private static ByteSizeValue getFreeBytesThreshold(
}

static class MergeTaskPriorityBlockingQueue extends PriorityBlockingQueueWithBudget<MergeTask> {
private static final Logger LOGGER = LogManager.getLogger(MergeTaskPriorityBlockingQueue.class);
MergeTaskPriorityBlockingQueue() {
// by default, start with 0 budget (so takes on this queue will always block until the first {@link #updateBudget} is invoked)
// use the estimated *remaining* merge size as the budget function so that the disk space budget of elements is updated
Expand All @@ -567,6 +569,40 @@ long getAvailableBudget() {
MergeTask peekQueue() {
return enqueuedByBudget.peek().v1();
}

@Override
void postBudgetUpdate() {
assert super.lock.isHeldByCurrentThread();
Tuple<MergeTask, Long> head = enqueuedByBudget.peek();
if (head != null && head.v2() > availableBudget) {
LOGGER.warn("There are merge tasks enqueued but there's insufficient disk space available to execute them");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These log messages will be printed every indices.merge.disk.check_interval (5 sec). That sounds Ok to you, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be useful to include the availableBudget and head.v2() values in the log message too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, thanks for suggesting! Pushed: d334433

if (LOGGER.isDebugEnabled()) {
if (unreleasedBudgetPerElement.isEmpty()) {
LOGGER.debug(
String.format(
Locale.ROOT,
"There are no merge tasks currently running, "
+ "but there are [%d] enqueued ones that are blocked because of insufficient disk space",
enqueuedByBudget.size()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, would it be useful to have the availableBudget and head.v2() values in the log message here?

)
);
} else {
StringBuilder messageBuilder = new StringBuilder();
messageBuilder.append("The following merge tasks are currently running [");
for (var runningMergeTask : super.unreleasedBudgetPerElement.entrySet()) {
messageBuilder.append(runningMergeTask.getKey().element().toString());
messageBuilder.append(" with disk space budgets in bytes " + runningMergeTask.getValue() + " , ");
}
messageBuilder.delete(messageBuilder.length() - 3, messageBuilder.length());
messageBuilder.append(
"], and there are ["
+ enqueuedByBudget.size()
+ "] additional enqueued ones that are blocked because of insufficient disk space"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here too?

);
}
}
}
}
}

/**
Expand All @@ -576,7 +612,7 @@ MergeTask peekQueue() {
static class PriorityBlockingQueueWithBudget<E> {
private final ToLongFunction<? super E> budgetFunction;
protected final PriorityQueue<Tuple<E, Long>> enqueuedByBudget;
private final IdentityHashMap<ElementWithReleasableBudget, Long> unreleasedBudgetPerElement;
protected final IdentityHashMap<ElementWithReleasableBudget, Budgets> unreleasedBudgetPerElement;
private final ReentrantLock lock;
private final Condition elementAvailable;
protected long availableBudget;
Expand Down Expand Up @@ -637,15 +673,18 @@ void updateBudget(long availableBudget) {
// updates the budget of enqueued elements (and possibly reorders the priority queue)
updateBudgetOfEnqueuedElementsAndReorderQueue();
// update the budget of dequeued, but still in-use elements (these are the elements that are consuming budget)
unreleasedBudgetPerElement.replaceAll((e, v) -> budgetFunction.applyAsLong(e.element()));
unreleasedBudgetPerElement.replaceAll((e, v) -> v.updateBudgetEstimation(budgetFunction.applyAsLong(e.element())));
// the available budget is decreased by the budget of still in-use elements (dequeued elements that are still in-use)
this.availableBudget -= unreleasedBudgetPerElement.values().stream().mapToLong(i -> i).sum();
this.availableBudget -= unreleasedBudgetPerElement.values().stream().mapToLong(i -> i.latestBudgetEstimationForElement).sum();
elementAvailable.signalAll();
postBudgetUpdate();
} finally {
lock.unlock();
}
}

void postBudgetUpdate() {};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would assert super.lock.isHeldByCurrentThread(); here too


private void updateBudgetOfEnqueuedElementsAndReorderQueue() {
assert this.lock.isHeldByCurrentThread();
int queueSizeBefore = enqueuedByBudget.size();
Expand Down Expand Up @@ -686,7 +725,7 @@ private ElementWithReleasableBudget newElementWithReleasableBudget(E element, lo
ElementWithReleasableBudget elementWithReleasableBudget = new ElementWithReleasableBudget(element);
assert this.lock.isHeldByCurrentThread();
// the taken element holds up some budget
var prev = this.unreleasedBudgetPerElement.put(elementWithReleasableBudget, budget);
var prev = this.unreleasedBudgetPerElement.put(elementWithReleasableBudget, new Budgets(budget, budget, this.availableBudget));
assert prev == null;
this.availableBudget -= budget;
assert this.availableBudget >= 0L;
Expand Down Expand Up @@ -736,6 +775,16 @@ E element() {
return element;
}
}

record Budgets(long initialBudgetEstimationForElement, long latestBudgetEstimationForElement, long initialTotalAvailableBudget) {
Budgets updateBudgetEstimation(long latestBudgetEstimationForElement) {
return new Budgets(
this.initialBudgetEstimationForElement,
latestBudgetEstimationForElement,
this.initialTotalAvailableBudget
);
}
}
}

private static long newTargetIORateBytesPerSec(
Expand Down
Loading