Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/135734.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 135734
summary: Fill in topn values if competitive
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -199,15 +199,7 @@ static final class RowFiller {
}
}

/**
* Fill a {@link Row} for {@code position}.
*/
void row(int position, Row destination) {
writeKey(position, destination);
writeValues(position, destination);
}

private void writeKey(int position, Row row) {
void writeKey(int position, Row row) {
int orderByCompositeKeyCurrentPosition = 0;
for (int i = 0; i < keyFactories.length; i++) {
int valueAsBytesSize = keyFactories[i].extractor.writeKey(row.keys, position);
Expand All @@ -217,7 +209,7 @@ private void writeKey(int position, Row row) {
}
}

private void writeValues(int position, Row destination) {
void writeValues(int position, Row destination) {
for (ValueExtractor e : valueExtractors) {
var refCounted = e.getRefCountedForShard(position);
if (refCounted != null) {
Expand Down Expand Up @@ -416,15 +408,27 @@ public void addInput(Page page) {
spare.values.clear();
spare.clearRefCounters();
}
rowFiller.row(i, spare);
rowFiller.writeKey(i, spare);

// When rows are very long, appending the values one by one can lead to lots of allocations.
// To avoid this, pre-allocate at least as much size as in the last seen row.
// Let the pre-allocation size decay in case we only have 1 huge row and smaller rows otherwise.
spareKeysPreAllocSize = Math.max(spare.keys.length(), spareKeysPreAllocSize / 2);
spareValuesPreAllocSize = Math.max(spare.values.length(), spareValuesPreAllocSize / 2);

spare = inputQueue.insertWithOverflow(spare);
if (inputQueue.size() < inputQueue.topCount) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Maybe worth commenting here that this is a insertWithOverflow() that skips some work if the value is not competitive

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

// Heap not yet full, just add elements
rowFiller.writeValues(i, spare);
spareValuesPreAllocSize = Math.max(spare.values.length(), spareValuesPreAllocSize / 2);
inputQueue.add(spare);
spare = null;
} else if (inputQueue.lessThan(inputQueue.top(), spare)) {
// Heap full AND this node fit in it.
Row nextSpare = inputQueue.top();
rowFiller.writeValues(i, spare);
spareValuesPreAllocSize = Math.max(spare.values.length(), spareValuesPreAllocSize / 2);
inputQueue.updateTop(spare);
spare = nextSpare;
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could have used the slightly shorter:

Row inserted = spare;
spare = inputQueue.insertWithOverflow(spare);
if (inserted != spare) {
  rowFiller.writeValues(i, spare);
  spareValuesPreAllocSize = Math.max(spare.values.length(), spareValuesPreAllocSize / 2);
}

but this feels less magic. And this is the hot path so I prefer seeing the guts a little bit.

Also, it cries out for a further optimization where we bail from the loop as soon as inputQueue.size() < inputQueue.topCount and then make another loop with inputQueue.lessThan(inputQueue.top(), spare).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the shorter one has the definite of making the common parts (the code inside the if) more obvious. Perhaps just extract Math.max(spare.values.length(), spareValuesPreAllocSize / 2); to a helper function?

}
} finally {
page.releaseBlocks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

Expand Down Expand Up @@ -470,7 +469,8 @@ private TopNOperator.Row row(
page
);
TopNOperator.Row row = new TopNOperator.Row(nonBreakingBigArrays().breakerService().getBreaker("request"), sortOrders, 0, 0);
rf.row(position, row);
rf.writeKey(position, row);
rf.writeValues(position, row);
return row;
}

Expand Down Expand Up @@ -1491,7 +1491,8 @@ public void testRowResizes() {

// 105 are from the objects
// 1 is for the min-heap itself
assertThat(breaker.getMemoryRequestCount(), is(106L));
// could be less than because we don't always insert
assertThat(breaker.getMemoryRequestCount(), lessThanOrEqualTo(106L));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're making a performance improvement, and at the same time making the tests more lax, with no extra cases (No functional change + less/laxer testing == ⚠️).
Should we add a more specific case for the expected usage? Maybe something less randomized (Or not randomized at all). Or try to calculate the usage in this test (which feels a bit too intrincated).
Just a gut feeling, consider it a nitpick

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me double check. I was getting 105 sometimes and 106. which, maybe I should just make it either 105, 106 in that case. I'd assumed it was because we don't insert every time. But the input isn't randomized so I'm not entirely sure why. checking.

}
}

Expand Down