Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/131711.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 131711
summary: Track & log when there is insufficient disk space available to execute merges
area: Engine
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
Expand Down Expand Up @@ -552,6 +553,8 @@ private static ByteSizeValue getFreeBytesThreshold(
}

static class MergeTaskPriorityBlockingQueue extends PriorityBlockingQueueWithBudget<MergeTask> {
private static final Logger LOGGER = LogManager.getLogger(MergeTaskPriorityBlockingQueue.class);

MergeTaskPriorityBlockingQueue() {
// by default, start with 0 budget (so takes on this queue will always block until the first {@link #updateBudget} is invoked)
// use the estimated *remaining* merge size as the budget function so that the disk space budget of elements is updated
Expand All @@ -567,6 +570,55 @@ long getAvailableBudget() {
MergeTask peekQueue() {
return enqueuedByBudget.peek().v1();
}

@Override
void postBudgetUpdate() {
assert super.lock.isHeldByCurrentThread();
Tuple<MergeTask, Long> head = enqueuedByBudget.peek();
if (head != null && head.v2() > availableBudget) {
LOGGER.warn(
String.format(
Locale.ROOT,
"There are merge tasks enqueued but there's insufficient disk space available to execute them "
+ "(the smallest merge task requires [%d] bytes, but the available disk space is only [%d] bytes)",
head.v2(),
availableBudget
)
);
if (LOGGER.isDebugEnabled()) {
if (unreleasedBudgetPerElement.isEmpty()) {
LOGGER.debug(
String.format(
Locale.ROOT,
"There are no merge tasks currently running, "
+ "but there are [%d] enqueued ones that are blocked because of insufficient disk space "
+ "(the smallest merge task requires [%d] bytes, but the available disk space is only [%d] bytes)",
enqueuedByBudget.size(),
head.v2(),
availableBudget
)
);
} else {
StringBuilder messageBuilder = new StringBuilder();
messageBuilder.append("The following merge tasks are currently running [");
for (var runningMergeTask : super.unreleasedBudgetPerElement.entrySet()) {
messageBuilder.append(runningMergeTask.getKey().element().toString());
messageBuilder.append(" with disk space budgets in bytes ").append(runningMergeTask.getValue()).append(" , ");
}
messageBuilder.delete(messageBuilder.length() - 3, messageBuilder.length());
messageBuilder.append("], and there are [")
.append(enqueuedByBudget.size())
.append("] additional enqueued ones that are blocked because of insufficient disk space");
messageBuilder.append(" (the smallest merge task requires [")
.append(head.v2())
.append("] bytes, but the available disk space is only [")
.append(availableBudget)
.append("] bytes)");
LOGGER.debug(messageBuilder.toString());
}
}
}
}
}

/**
Expand All @@ -576,7 +628,7 @@ MergeTask peekQueue() {
static class PriorityBlockingQueueWithBudget<E> {
private final ToLongFunction<? super E> budgetFunction;
protected final PriorityQueue<Tuple<E, Long>> enqueuedByBudget;
private final IdentityHashMap<ElementWithReleasableBudget, Long> unreleasedBudgetPerElement;
protected final IdentityHashMap<ElementWithReleasableBudget, Budgets> unreleasedBudgetPerElement;
private final ReentrantLock lock;
private final Condition elementAvailable;
protected long availableBudget;
Expand Down Expand Up @@ -637,15 +689,23 @@ void updateBudget(long availableBudget) {
// updates the budget of enqueued elements (and possibly reorders the priority queue)
updateBudgetOfEnqueuedElementsAndReorderQueue();
// update the budget of dequeued, but still in-use elements (these are the elements that are consuming budget)
unreleasedBudgetPerElement.replaceAll((e, v) -> budgetFunction.applyAsLong(e.element()));
unreleasedBudgetPerElement.replaceAll((e, v) -> v.updateBudgetEstimation(budgetFunction.applyAsLong(e.element())));
// the available budget is decreased by the budget of still in-use elements (dequeued elements that are still in-use)
this.availableBudget -= unreleasedBudgetPerElement.values().stream().mapToLong(i -> i).sum();
this.availableBudget -= unreleasedBudgetPerElement.values()
.stream()
.mapToLong(i -> i.latestBudgetEstimationForElement)
.sum();
elementAvailable.signalAll();
postBudgetUpdate();
} finally {
lock.unlock();
}
}

void postBudgetUpdate() {
assert lock.isHeldByCurrentThread();
};

private void updateBudgetOfEnqueuedElementsAndReorderQueue() {
assert this.lock.isHeldByCurrentThread();
int queueSizeBefore = enqueuedByBudget.size();
Expand Down Expand Up @@ -686,7 +746,7 @@ private ElementWithReleasableBudget newElementWithReleasableBudget(E element, lo
ElementWithReleasableBudget elementWithReleasableBudget = new ElementWithReleasableBudget(element);
assert this.lock.isHeldByCurrentThread();
// the taken element holds up some budget
var prev = this.unreleasedBudgetPerElement.put(elementWithReleasableBudget, budget);
var prev = this.unreleasedBudgetPerElement.put(elementWithReleasableBudget, new Budgets(budget, budget, this.availableBudget));
assert prev == null;
this.availableBudget -= budget;
assert this.availableBudget >= 0L;
Expand Down Expand Up @@ -736,6 +796,16 @@ E element() {
return element;
}
}

record Budgets(long initialBudgetEstimationForElement, long latestBudgetEstimationForElement, long initialTotalAvailableBudget) {
Budgets updateBudgetEstimation(long latestBudgetEstimationForElement) {
return new Budgets(
this.initialBudgetEstimationForElement,
latestBudgetEstimationForElement,
this.initialTotalAvailableBudget
);
}
}
}

private static long newTargetIORateBytesPerSec(
Expand Down
Loading