From fb951ebcee4cf2f8a8f466ced53dd9fa3a2d9c98 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Tue, 30 Sep 2025 16:02:24 -0400 Subject: [PATCH 1/4] ESQL: Fill in topn values if competitive Skip filling in the topn *values* only if the row is competitive. This cuts the runtime of topn pretty significantly. That's important when topn is dominating the runtime, like we see when querying many many indices at once. We can emulate that a little locally with something like: ``` rm /tmp/fields for field in {1..500}; do echo -n ',"f'$field'": "foo"' >> /tmp/fields done for idx in {1..100}; do curl -uelastic:password -XDELETE localhost:9200/test$idx echo '{ "settings": { "index.mapping.total_fields.limit": 10000 }, "mappings": { "properties": { "@timestamp": { "type": "date" } ' > /tmp/idx for field in {1..500}; do echo ',"f'$field'": { "type": "keyword" }' >> /tmp/idx done echo ' } } }' >> /tmp/idx curl -uelastic:password -XPUT -HContent-Type:application/json localhost:9200/test$idx --data @/tmp/idx rm /tmp/bulk for doc in {1..1000}; do echo '{"index":{}}' >> /tmp/bulk echo -n '{"@timestamp": '$(($idx * 10000 + $doc)) >> /tmp/bulk cat /tmp/fields >> /tmp/bulk echo '}' >> /tmp/bulk done echo curl -s -uelastic:password -XPOST -HContent-Type:application/json "localhost:9200/test$idx/_bulk?refresh&pretty" --data-binary @/tmp/bulk | tee /tmp/bulk_result | grep error echo done while true; do curl -s -uelastic:password -XPOST -HContent-Type:application/json 'localhost:9200/_query?pretty' -d'{ "query": "FROM *", "pragma": { "max_concurrent_shards_per_node": 100 } }' | jq .took curl -s -uelastic:password -XPOST -HContent-Type:application/json 'localhost:9200/_query?pretty' -d'{ "query": "FROM * | SORT @timestamp DESC", "pragma": { "max_concurrent_shards_per_node": 100 } }' | jq .took done ``` This only spends about 12.6% of it's time on topn and takes 2.7 seconds locally. If we apply this fix we spend 3.6% of our time on topn, taking 2.5 seconds. That's not a huge improvement. 7% is nothing to sneeze at, but it's not great. But the topn is dropping from 340 millis to 90 millis. But in some summary clusters I'm seeing 65% of time spent on topn for queries taking 3 seconds. My kind of bad math says this improvement should drop this query to 1.6 seconds. Let's hope! Hopefully our nightlies will see this and enjoy prove my math right. --- .../compute/operator/topn/TopNOperator.java | 28 ++++++++++--------- .../operator/topn/TopNOperatorTests.java | 7 +++-- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java index ba522bf130f82..50b8bfe68a7d8 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java @@ -199,15 +199,7 @@ static final class RowFiller { } } - /** - * Fill a {@link Row} for {@code position}. - */ - void row(int position, Row destination) { - writeKey(position, destination); - writeValues(position, destination); - } - - private void writeKey(int position, Row row) { + void writeKey(int position, Row row) { int orderByCompositeKeyCurrentPosition = 0; for (int i = 0; i < keyFactories.length; i++) { int valueAsBytesSize = keyFactories[i].extractor.writeKey(row.keys, position); @@ -217,7 +209,7 @@ private void writeKey(int position, Row row) { } } - private void writeValues(int position, Row destination) { + void writeValues(int position, Row destination) { for (ValueExtractor e : valueExtractors) { var refCounted = e.getRefCountedForShard(position); if (refCounted != null) { @@ -416,15 +408,25 @@ public void addInput(Page page) { spare.values.clear(); spare.clearRefCounters(); } - rowFiller.row(i, spare); + rowFiller.writeKey(i, spare); // When rows are very long, appending the values one by one can lead to lots of allocations. // To avoid this, pre-allocate at least as much size as in the last seen row. // Let the pre-allocation size decay in case we only have 1 huge row and smaller rows otherwise. spareKeysPreAllocSize = Math.max(spare.keys.length(), spareKeysPreAllocSize / 2); - spareValuesPreAllocSize = Math.max(spare.values.length(), spareValuesPreAllocSize / 2); - spare = inputQueue.insertWithOverflow(spare); + if (inputQueue.size() < inputQueue.topCount) { + rowFiller.writeValues(i, spare); + spareValuesPreAllocSize = Math.max(spare.values.length(), spareValuesPreAllocSize / 2); + inputQueue.add(spare); + spare = null; + } else if (inputQueue.lessThan(inputQueue.top(), spare)) { + Row nextSpare = inputQueue.top(); + rowFiller.writeValues(i, spare); + spareValuesPreAllocSize = Math.max(spare.values.length(), spareValuesPreAllocSize / 2); + inputQueue.updateTop(spare); + spare = nextSpare; + } } } finally { page.releaseBlocks(); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java index 97f301e252fd2..fe9743d029f3e 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java @@ -92,7 +92,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -470,7 +469,8 @@ private TopNOperator.Row row( page ); TopNOperator.Row row = new TopNOperator.Row(nonBreakingBigArrays().breakerService().getBreaker("request"), sortOrders, 0, 0); - rf.row(position, row); + rf.writeKey(position, row); + rf.writeValues(position, row); return row; } @@ -1491,7 +1491,8 @@ public void testRowResizes() { // 105 are from the objects // 1 is for the min-heap itself - assertThat(breaker.getMemoryRequestCount(), is(106L)); + // could be less than because we don't always insert + assertThat(breaker.getMemoryRequestCount(), lessThanOrEqualTo(106L)); } } From 83f9ed17d85506d6d7701042003519bd14d941a5 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Tue, 30 Sep 2025 17:02:34 -0400 Subject: [PATCH 2/4] comments --- .../org/elasticsearch/compute/operator/topn/TopNOperator.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java index 50b8bfe68a7d8..37aa9623ea7c6 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java @@ -416,11 +416,13 @@ public void addInput(Page page) { spareKeysPreAllocSize = Math.max(spare.keys.length(), spareKeysPreAllocSize / 2); if (inputQueue.size() < inputQueue.topCount) { + // Heap not yet full, just add elements rowFiller.writeValues(i, spare); spareValuesPreAllocSize = Math.max(spare.values.length(), spareValuesPreAllocSize / 2); inputQueue.add(spare); spare = null; } else if (inputQueue.lessThan(inputQueue.top(), spare)) { + // Heap full AND this node fit in it. Row nextSpare = inputQueue.top(); rowFiller.writeValues(i, spare); spareValuesPreAllocSize = Math.max(spare.values.length(), spareValuesPreAllocSize / 2); From 16cdf9cce22b8d9088d0ac0d8f15522ad0b1d918 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Tue, 30 Sep 2025 17:03:48 -0400 Subject: [PATCH 3/4] Update docs/changelog/135734.yaml --- docs/changelog/135734.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/135734.yaml diff --git a/docs/changelog/135734.yaml b/docs/changelog/135734.yaml new file mode 100644 index 0000000000000..b02abfa20e53f --- /dev/null +++ b/docs/changelog/135734.yaml @@ -0,0 +1,5 @@ +pr: 135734 +summary: Fill in topn values if competitive +area: ES|QL +type: enhancement +issues: [] From 04ac9dd557bb8786b82fe5d1d0ecbf56a94837b6 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Thu, 2 Oct 2025 14:34:17 -0400 Subject: [PATCH 4/4] Better lock --- .../elasticsearch/compute/operator/topn/TopNOperator.java | 1 + .../compute/operator/topn/TopNOperatorTests.java | 7 ++++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java index 37aa9623ea7c6..1bc6434786675 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java @@ -415,6 +415,7 @@ public void addInput(Page page) { // Let the pre-allocation size decay in case we only have 1 huge row and smaller rows otherwise. spareKeysPreAllocSize = Math.max(spare.keys.length(), spareKeysPreAllocSize / 2); + // This is `inputQueue.insertWithOverflow` with followed by filling in the value only if we inserted. if (inputQueue.size() < inputQueue.topCount) { // Heap not yet full, just add elements rowFiller.writeValues(i, spare); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java index fe9743d029f3e..7d48b79ab12c5 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java @@ -1468,6 +1468,7 @@ public void testRowResizes() { ); List types = Collections.nCopies(columns, INT); List encoders = Collections.nCopies(columns, DEFAULT_UNSORTABLE); + boolean asc = randomBoolean(); try ( TopNOperator op = new TopNOperator( driverContext().blockFactory(), @@ -1475,7 +1476,7 @@ public void testRowResizes() { 10, types, encoders, - List.of(new TopNOperator.SortOrder(0, randomBoolean(), randomBoolean())), + List.of(new TopNOperator.SortOrder(0, asc, randomBoolean())), randomPageSize() ) ) { @@ -1491,8 +1492,8 @@ public void testRowResizes() { // 105 are from the objects // 1 is for the min-heap itself - // could be less than because we don't always insert - assertThat(breaker.getMemoryRequestCount(), lessThanOrEqualTo(106L)); + // -1 IF we're sorting ascending. We encode one less value. + assertThat(breaker.getMemoryRequestCount(), equalTo(asc ? 105L : 106L)); } }