Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/135734.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 135734
summary: Fill in topn values if competitive
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -199,15 +199,7 @@ static final class RowFiller {
}
}

/**
* Fill a {@link Row} for {@code position}.
*/
void row(int position, Row destination) {
writeKey(position, destination);
writeValues(position, destination);
}

private void writeKey(int position, Row row) {
void writeKey(int position, Row row) {
int orderByCompositeKeyCurrentPosition = 0;
for (int i = 0; i < keyFactories.length; i++) {
int valueAsBytesSize = keyFactories[i].extractor.writeKey(row.keys, position);
Expand All @@ -217,7 +209,7 @@ private void writeKey(int position, Row row) {
}
}

private void writeValues(int position, Row destination) {
void writeValues(int position, Row destination) {
for (ValueExtractor e : valueExtractors) {
var refCounted = e.getRefCountedForShard(position);
if (refCounted != null) {
Expand Down Expand Up @@ -416,15 +408,28 @@ public void addInput(Page page) {
spare.values.clear();
spare.clearRefCounters();
}
rowFiller.row(i, spare);
rowFiller.writeKey(i, spare);

// When rows are very long, appending the values one by one can lead to lots of allocations.
// To avoid this, pre-allocate at least as much size as in the last seen row.
// Let the pre-allocation size decay in case we only have 1 huge row and smaller rows otherwise.
spareKeysPreAllocSize = Math.max(spare.keys.length(), spareKeysPreAllocSize / 2);
spareValuesPreAllocSize = Math.max(spare.values.length(), spareValuesPreAllocSize / 2);

spare = inputQueue.insertWithOverflow(spare);
// This is `inputQueue.insertWithOverflow` with followed by filling in the value only if we inserted.
if (inputQueue.size() < inputQueue.topCount) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Maybe worth commenting here that this is a insertWithOverflow() that skips some work if the value is not competitive

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

// Heap not yet full, just add elements
rowFiller.writeValues(i, spare);
spareValuesPreAllocSize = Math.max(spare.values.length(), spareValuesPreAllocSize / 2);
inputQueue.add(spare);
spare = null;
} else if (inputQueue.lessThan(inputQueue.top(), spare)) {
// Heap full AND this node fit in it.
Row nextSpare = inputQueue.top();
rowFiller.writeValues(i, spare);
spareValuesPreAllocSize = Math.max(spare.values.length(), spareValuesPreAllocSize / 2);
inputQueue.updateTop(spare);
spare = nextSpare;
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could have used the slightly shorter:

Row inserted = spare;
spare = inputQueue.insertWithOverflow(spare);
if (inserted != spare) {
  rowFiller.writeValues(i, spare);
  spareValuesPreAllocSize = Math.max(spare.values.length(), spareValuesPreAllocSize / 2);
}

but this feels less magic. And this is the hot path so I prefer seeing the guts a little bit.

Also, it cries out for a further optimization where we bail from the loop as soon as inputQueue.size() < inputQueue.topCount and then make another loop with inputQueue.lessThan(inputQueue.top(), spare).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the shorter one has the definite of making the common parts (the code inside the if) more obvious. Perhaps just extract Math.max(spare.values.length(), spareValuesPreAllocSize / 2); to a helper function?

}
} finally {
page.releaseBlocks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

Expand Down Expand Up @@ -470,7 +469,8 @@ private TopNOperator.Row row(
page
);
TopNOperator.Row row = new TopNOperator.Row(nonBreakingBigArrays().breakerService().getBreaker("request"), sortOrders, 0, 0);
rf.row(position, row);
rf.writeKey(position, row);
rf.writeValues(position, row);
return row;
}

Expand Down Expand Up @@ -1468,14 +1468,15 @@ public void testRowResizes() {
);
List<ElementType> types = Collections.nCopies(columns, INT);
List<TopNEncoder> encoders = Collections.nCopies(columns, DEFAULT_UNSORTABLE);
boolean asc = randomBoolean();
try (
TopNOperator op = new TopNOperator(
driverContext().blockFactory(),
breaker,
10,
types,
encoders,
List.of(new TopNOperator.SortOrder(0, randomBoolean(), randomBoolean())),
List.of(new TopNOperator.SortOrder(0, asc, randomBoolean())),
randomPageSize()
)
) {
Expand All @@ -1491,7 +1492,8 @@ public void testRowResizes() {

// 105 are from the objects
// 1 is for the min-heap itself
assertThat(breaker.getMemoryRequestCount(), is(106L));
// -1 IF we're sorting ascending. We encode one less value.
assertThat(breaker.getMemoryRequestCount(), equalTo(asc ? 105L : 106L));
}
}

Expand Down