Skip to content

Commit ea64bf4

Browse files
authored
ESQL: Fill in topn values if competitive (#135734)
Skip filling in the topn *values* only if the row is competitive. This cuts the runtime of topn pretty significantly. That's important when topn is dominating the runtime, like we see when querying many many indices at once. We can emulate that a little locally with something like: ``` rm /tmp/fields for field in {1..500}; do echo -n ',"f'$field'": "foo"' >> /tmp/fields done for idx in {1..100}; do curl -uelastic:password -XDELETE localhost:9200/test$idx echo '{ "settings": { "index.mapping.total_fields.limit": 10000 }, "mappings": { "properties": { "@timestamp": { "type": "date" } ' > /tmp/idx for field in {1..500}; do echo ',"f'$field'": { "type": "keyword" }' >> /tmp/idx done echo ' } } }' >> /tmp/idx curl -uelastic:password -XPUT -HContent-Type:application/json localhost:9200/test$idx --data @/tmp/idx rm /tmp/bulk for doc in {1..1000}; do echo '{"index":{}}' >> /tmp/bulk echo -n '{"@timestamp": '$(($idx * 10000 + $doc)) >> /tmp/bulk cat /tmp/fields >> /tmp/bulk echo '}' >> /tmp/bulk done echo curl -s -uelastic:password -XPOST -HContent-Type:application/json "localhost:9200/test$idx/_bulk?refresh&pretty" --data-binary @/tmp/bulk | tee /tmp/bulk_result | grep error echo done while true; do curl -s -uelastic:password -XPOST -HContent-Type:application/json 'localhost:9200/_query?pretty' -d'{ "query": "FROM *", "pragma": { "max_concurrent_shards_per_node": 100 } }' | jq .took curl -s -uelastic:password -XPOST -HContent-Type:application/json 'localhost:9200/_query?pretty' -d'{ "query": "FROM * | SORT @timestamp DESC", "pragma": { "max_concurrent_shards_per_node": 100 } }' | jq .took done ``` This only spends about 12.6% of it's time on topn and takes 2.7 seconds locally. If we apply this fix we spend 3.6% of our time on topn, taking 2.5 seconds. That's not a huge improvement. 7% is nothing to sneeze at, but it's not great. But the topn is dropping from 340 millis to 90 millis. But in some summary clusters I'm seeing 65% of time spent on topn for queries taking 3 seconds. My kind of bad math says this improvement should drop this query to 1.6 seconds. Let's hope! Hopefully our nightlies will see this and enjoy prove my math right.
1 parent 16617b0 commit ea64bf4

File tree

3 files changed

+29
-17
lines changed

3 files changed

+29
-17
lines changed

docs/changelog/135734.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 135734
2+
summary: Fill in topn values if competitive
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -199,15 +199,7 @@ static final class RowFiller {
199199
}
200200
}
201201

202-
/**
203-
* Fill a {@link Row} for {@code position}.
204-
*/
205-
void row(int position, Row destination) {
206-
writeKey(position, destination);
207-
writeValues(position, destination);
208-
}
209-
210-
private void writeKey(int position, Row row) {
202+
void writeKey(int position, Row row) {
211203
int orderByCompositeKeyCurrentPosition = 0;
212204
for (int i = 0; i < keyFactories.length; i++) {
213205
int valueAsBytesSize = keyFactories[i].extractor.writeKey(row.keys, position);
@@ -217,7 +209,7 @@ private void writeKey(int position, Row row) {
217209
}
218210
}
219211

220-
private void writeValues(int position, Row destination) {
212+
void writeValues(int position, Row destination) {
221213
for (ValueExtractor e : valueExtractors) {
222214
var refCounted = e.getRefCountedForShard(position);
223215
if (refCounted != null) {
@@ -416,15 +408,28 @@ public void addInput(Page page) {
416408
spare.values.clear();
417409
spare.clearRefCounters();
418410
}
419-
rowFiller.row(i, spare);
411+
rowFiller.writeKey(i, spare);
420412

421413
// When rows are very long, appending the values one by one can lead to lots of allocations.
422414
// To avoid this, pre-allocate at least as much size as in the last seen row.
423415
// Let the pre-allocation size decay in case we only have 1 huge row and smaller rows otherwise.
424416
spareKeysPreAllocSize = Math.max(spare.keys.length(), spareKeysPreAllocSize / 2);
425-
spareValuesPreAllocSize = Math.max(spare.values.length(), spareValuesPreAllocSize / 2);
426417

427-
spare = inputQueue.insertWithOverflow(spare);
418+
// This is `inputQueue.insertWithOverflow` with followed by filling in the value only if we inserted.
419+
if (inputQueue.size() < inputQueue.topCount) {
420+
// Heap not yet full, just add elements
421+
rowFiller.writeValues(i, spare);
422+
spareValuesPreAllocSize = Math.max(spare.values.length(), spareValuesPreAllocSize / 2);
423+
inputQueue.add(spare);
424+
spare = null;
425+
} else if (inputQueue.lessThan(inputQueue.top(), spare)) {
426+
// Heap full AND this node fit in it.
427+
Row nextSpare = inputQueue.top();
428+
rowFiller.writeValues(i, spare);
429+
spareValuesPreAllocSize = Math.max(spare.values.length(), spareValuesPreAllocSize / 2);
430+
inputQueue.updateTop(spare);
431+
spare = nextSpare;
432+
}
428433
}
429434
} finally {
430435
page.releaseBlocks();

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,6 @@
9292
import static org.hamcrest.Matchers.equalTo;
9393
import static org.hamcrest.Matchers.greaterThan;
9494
import static org.hamcrest.Matchers.hasSize;
95-
import static org.hamcrest.Matchers.is;
9695
import static org.hamcrest.Matchers.lessThan;
9796
import static org.hamcrest.Matchers.lessThanOrEqualTo;
9897

@@ -470,7 +469,8 @@ private TopNOperator.Row row(
470469
page
471470
);
472471
TopNOperator.Row row = new TopNOperator.Row(nonBreakingBigArrays().breakerService().getBreaker("request"), sortOrders, 0, 0);
473-
rf.row(position, row);
472+
rf.writeKey(position, row);
473+
rf.writeValues(position, row);
474474
return row;
475475
}
476476

@@ -1468,14 +1468,15 @@ public void testRowResizes() {
14681468
);
14691469
List<ElementType> types = Collections.nCopies(columns, INT);
14701470
List<TopNEncoder> encoders = Collections.nCopies(columns, DEFAULT_UNSORTABLE);
1471+
boolean asc = randomBoolean();
14711472
try (
14721473
TopNOperator op = new TopNOperator(
14731474
driverContext().blockFactory(),
14741475
breaker,
14751476
10,
14761477
types,
14771478
encoders,
1478-
List.of(new TopNOperator.SortOrder(0, randomBoolean(), randomBoolean())),
1479+
List.of(new TopNOperator.SortOrder(0, asc, randomBoolean())),
14791480
randomPageSize()
14801481
)
14811482
) {
@@ -1491,7 +1492,8 @@ public void testRowResizes() {
14911492

14921493
// 105 are from the objects
14931494
// 1 is for the min-heap itself
1494-
assertThat(breaker.getMemoryRequestCount(), is(106L));
1495+
// -1 IF we're sorting ascending. We encode one less value.
1496+
assertThat(breaker.getMemoryRequestCount(), equalTo(asc ? 105L : 106L));
14951497
}
14961498
}
14971499

0 commit comments

Comments
 (0)