diff --git a/docs/changelog/135734.yaml b/docs/changelog/135734.yaml new file mode 100644 index 0000000000000..b02abfa20e53f --- /dev/null +++ b/docs/changelog/135734.yaml @@ -0,0 +1,5 @@ +pr: 135734 +summary: Fill in topn values if competitive +area: ES|QL +type: enhancement +issues: [] diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java index ba522bf130f82..1bc6434786675 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java @@ -199,15 +199,7 @@ static final class RowFiller { } } - /** - * Fill a {@link Row} for {@code position}. - */ - void row(int position, Row destination) { - writeKey(position, destination); - writeValues(position, destination); - } - - private void writeKey(int position, Row row) { + void writeKey(int position, Row row) { int orderByCompositeKeyCurrentPosition = 0; for (int i = 0; i < keyFactories.length; i++) { int valueAsBytesSize = keyFactories[i].extractor.writeKey(row.keys, position); @@ -217,7 +209,7 @@ private void writeKey(int position, Row row) { } } - private void writeValues(int position, Row destination) { + void writeValues(int position, Row destination) { for (ValueExtractor e : valueExtractors) { var refCounted = e.getRefCountedForShard(position); if (refCounted != null) { @@ -416,15 +408,28 @@ public void addInput(Page page) { spare.values.clear(); spare.clearRefCounters(); } - rowFiller.row(i, spare); + rowFiller.writeKey(i, spare); // When rows are very long, appending the values one by one can lead to lots of allocations. // To avoid this, pre-allocate at least as much size as in the last seen row. // Let the pre-allocation size decay in case we only have 1 huge row and smaller rows otherwise. spareKeysPreAllocSize = Math.max(spare.keys.length(), spareKeysPreAllocSize / 2); - spareValuesPreAllocSize = Math.max(spare.values.length(), spareValuesPreAllocSize / 2); - spare = inputQueue.insertWithOverflow(spare); + // This is `inputQueue.insertWithOverflow` with followed by filling in the value only if we inserted. + if (inputQueue.size() < inputQueue.topCount) { + // Heap not yet full, just add elements + rowFiller.writeValues(i, spare); + spareValuesPreAllocSize = Math.max(spare.values.length(), spareValuesPreAllocSize / 2); + inputQueue.add(spare); + spare = null; + } else if (inputQueue.lessThan(inputQueue.top(), spare)) { + // Heap full AND this node fit in it. + Row nextSpare = inputQueue.top(); + rowFiller.writeValues(i, spare); + spareValuesPreAllocSize = Math.max(spare.values.length(), spareValuesPreAllocSize / 2); + inputQueue.updateTop(spare); + spare = nextSpare; + } } } finally { page.releaseBlocks(); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java index 97f301e252fd2..7d48b79ab12c5 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java @@ -92,7 +92,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -470,7 +469,8 @@ private TopNOperator.Row row( page ); TopNOperator.Row row = new TopNOperator.Row(nonBreakingBigArrays().breakerService().getBreaker("request"), sortOrders, 0, 0); - rf.row(position, row); + rf.writeKey(position, row); + rf.writeValues(position, row); return row; } @@ -1468,6 +1468,7 @@ public void testRowResizes() { ); List types = Collections.nCopies(columns, INT); List encoders = Collections.nCopies(columns, DEFAULT_UNSORTABLE); + boolean asc = randomBoolean(); try ( TopNOperator op = new TopNOperator( driverContext().blockFactory(), @@ -1475,7 +1476,7 @@ public void testRowResizes() { 10, types, encoders, - List.of(new TopNOperator.SortOrder(0, randomBoolean(), randomBoolean())), + List.of(new TopNOperator.SortOrder(0, asc, randomBoolean())), randomPageSize() ) ) { @@ -1491,7 +1492,8 @@ public void testRowResizes() { // 105 are from the objects // 1 is for the min-heap itself - assertThat(breaker.getMemoryRequestCount(), is(106L)); + // -1 IF we're sorting ascending. We encode one less value. + assertThat(breaker.getMemoryRequestCount(), equalTo(asc ? 105L : 106L)); } }