From b2c6195262f7c9755ca30dec9f352e123109bf44 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Mon, 8 Sep 2025 11:35:45 -0400 Subject: [PATCH] ESQL: Reserve memory TopN (#134235) Tracks the more memory that's involved in topn. Lucene doesn't track memory usage for TopN and can use a fair bit of it. Try this query: ``` FROM big_table | SORT a, b, c, d, e | LIMIT 1000000 | STATS MAX(a) ``` We attempt to return all million documents from lucene. Is we did this with the compute engine we're track all of the memory usage. With lucene we have to reserve it. In the case of the query above the sort keys weight 8 bytes each. 40 bytes total. Plus another 72 for Lucene's `FieldDoc`. And another 40 at least for copying to the values to `FieldDoc`. That totals something like 152 bytes a piece. That's 145mb. Worth tracking! ## Esql Engine TopN Esql *does* track memory for topn, but it doesn't track the memory used by the min heap itself. It's just a big array of pointers. But it can get very big! --- docs/changelog/134235.yaml | 5 + .../xpack/esql/heap_attack/HeapAttackIT.java | 85 ++++++++++--- .../lucene/LuceneTopNSourceOperator.java | 113 +++++++++++++----- .../compute/operator/topn/TopNOperator.java | 82 +++++++++---- .../LuceneTopNSourceOperatorScoringTests.java | 2 + .../lucene/LuceneTopNSourceOperatorTests.java | 2 + .../operator/topn/TopNOperatorTests.java | 4 +- .../xpack/esql/plan/physical/EsQueryExec.java | 21 ++++ .../planner/EsPhysicalOperationProviders.java | 10 ++ 9 files changed, 255 insertions(+), 69 deletions(-) create mode 100644 docs/changelog/134235.yaml diff --git a/docs/changelog/134235.yaml b/docs/changelog/134235.yaml new file mode 100644 index 0000000000000..b2e1ca6c9499c --- /dev/null +++ b/docs/changelog/134235.yaml @@ -0,0 +1,5 @@ +pr: 134235 +summary: Reserve memory for Lucene's TopN +area: ES|QL +type: bug +issues: [] diff --git a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java index 8ae9db4c904c9..1a5fb1c352abb 100644 --- a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java +++ b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java @@ -90,7 +90,7 @@ public void skipOnAborted() { * This used to fail, but we've since compacted top n so it actually succeeds now. */ public void testSortByManyLongsSuccess() throws IOException { - initManyLongs(); + initManyLongs(10); Map response = sortByManyLongs(500); ListMatcher columns = matchesList().item(matchesMap().entry("name", "a").entry("type", "long")) .item(matchesMap().entry("name", "b").entry("type", "long")); @@ -107,7 +107,7 @@ public void testSortByManyLongsSuccess() throws IOException { * This used to crash the node with an out of memory, but now it just trips a circuit breaker. */ public void testSortByManyLongsTooMuchMemory() throws IOException { - initManyLongs(); + initManyLongs(10); // 5000 is plenty to break on most nodes assertCircuitBreaks(attempt -> sortByManyLongs(attempt * 5000)); } @@ -116,7 +116,7 @@ public void testSortByManyLongsTooMuchMemory() throws IOException { * This should record an async response with a {@link CircuitBreakingException}. */ public void testSortByManyLongsTooMuchMemoryAsync() throws IOException { - initManyLongs(); + initManyLongs(10); Request request = new Request("POST", "/_query/async"); request.addParameter("error_trace", ""); request.setJsonEntity(makeSortByManyLongs(5000).toString().replace("\n", "\\n")); @@ -193,6 +193,29 @@ public void testSortByManyLongsTooMuchMemoryAsync() throws IOException { ); } + public void testSortByManyLongsGiantTopN() throws IOException { + initManyLongs(10); + assertMap( + sortBySomeLongsLimit(100000), + matchesMap().entry("took", greaterThan(0)) + .entry("is_partial", false) + .entry("columns", List.of(Map.of("name", "MAX(a)", "type", "long"))) + .entry("values", List.of(List.of(9))) + .entry("documents_found", greaterThan(0)) + .entry("values_loaded", greaterThan(0)) + ); + } + + public void testSortByManyLongsGiantTopNTooMuchMemory() throws IOException { + initManyLongs(20); + assertCircuitBreaks(attempt -> sortBySomeLongsLimit(attempt * 500000)); + } + + public void testStupidTopN() throws IOException { + initManyLongs(1); // Doesn't actually matter how much data there is. + assertCircuitBreaks(attempt -> sortBySomeLongsLimit(2147483630)); + } + private static final int MAX_ATTEMPTS = 5; interface TryCircuitBreaking { @@ -251,11 +274,25 @@ private StringBuilder makeSortByManyLongs(int count) { return query; } + private Map sortBySomeLongsLimit(int count) throws IOException { + logger.info("sorting by 5 longs, keeping {}", count); + return responseAsMap(query(makeSortBySomeLongsLimit(count), null)); + } + + private String makeSortBySomeLongsLimit(int count) { + StringBuilder query = new StringBuilder("{\"query\": \"FROM manylongs\n"); + query.append("| SORT a, b, c, d, e\n"); + query.append("| LIMIT ").append(count).append("\n"); + query.append("| STATS MAX(a)\n"); + query.append("\"}"); + return query.toString(); + } + /** * This groups on about 200 columns which is a lot but has never caused us trouble. */ public void testGroupOnSomeLongs() throws IOException { - initManyLongs(); + initManyLongs(10); Response resp = groupOnManyLongs(200); Map map = responseAsMap(resp); ListMatcher columns = matchesList().item(matchesMap().entry("name", "MAX(a)").entry("type", "long")); @@ -267,7 +304,7 @@ public void testGroupOnSomeLongs() throws IOException { * This groups on 5000 columns which used to throw a {@link StackOverflowError}. */ public void testGroupOnManyLongs() throws IOException { - initManyLongs(); + initManyLongs(10); Response resp = groupOnManyLongs(5000); Map map = responseAsMap(resp); ListMatcher columns = matchesList().item(matchesMap().entry("name", "MAX(a)").entry("type", "long")); @@ -335,7 +372,7 @@ private Response concat(int evals) throws IOException { */ public void testManyConcat() throws IOException { int strings = 300; - initManyLongs(); + initManyLongs(10); assertManyStrings(manyConcat("FROM manylongs", strings), strings); } @@ -343,7 +380,7 @@ public void testManyConcat() throws IOException { * Hits a circuit breaker by building many moderately long strings. */ public void testHugeManyConcat() throws IOException { - initManyLongs(); + initManyLongs(10); // 2000 is plenty to break on most nodes assertCircuitBreaks(attempt -> manyConcat("FROM manylongs", attempt * 2000)); } @@ -414,7 +451,7 @@ private Map manyConcat(String init, int strings) throws IOExcept */ public void testManyRepeat() throws IOException { int strings = 30; - initManyLongs(); + initManyLongs(10); assertManyStrings(manyRepeat("FROM manylongs", strings), 30); } @@ -422,7 +459,7 @@ public void testManyRepeat() throws IOException { * Hits a circuit breaker by building many moderately long strings. */ public void testHugeManyRepeat() throws IOException { - initManyLongs(); + initManyLongs(10); // 75 is plenty to break on most nodes assertCircuitBreaks(attempt -> manyRepeat("FROM manylongs", attempt * 75)); } @@ -480,7 +517,7 @@ private void assertManyStrings(Map resp, int strings) throws IOE } public void testManyEval() throws IOException { - initManyLongs(); + initManyLongs(10); Map response = manyEval(1); ListMatcher columns = matchesList(); columns = columns.item(matchesMap().entry("name", "a").entry("type", "long")); @@ -495,7 +532,7 @@ public void testManyEval() throws IOException { } public void testTooManyEval() throws IOException { - initManyLongs(); + initManyLongs(10); // 490 is plenty to fail on most nodes assertCircuitBreaks(attempt -> manyEval(attempt * 490)); } @@ -810,24 +847,34 @@ private Map enrichExplosion(int sensorDataCount, int lookupEntri } } - private void initManyLongs() throws IOException { + private void initManyLongs(int countPerLong) throws IOException { logger.info("loading many documents with longs"); StringBuilder bulk = new StringBuilder(); - for (int a = 0; a < 10; a++) { - for (int b = 0; b < 10; b++) { - for (int c = 0; c < 10; c++) { - for (int d = 0; d < 10; d++) { - for (int e = 0; e < 10; e++) { + int flush = 0; + for (int a = 0; a < countPerLong; a++) { + for (int b = 0; b < countPerLong; b++) { + for (int c = 0; c < countPerLong; c++) { + for (int d = 0; d < countPerLong; d++) { + for (int e = 0; e < countPerLong; e++) { bulk.append(String.format(Locale.ROOT, """ {"create":{}} {"a":%d,"b":%d,"c":%d,"d":%d,"e":%d} """, a, b, c, d, e)); + flush++; + if (flush % 10_000 == 0) { + bulk("manylongs", bulk.toString()); + bulk.setLength(0); + logger.info( + "flushing {}/{} to manylongs", + flush, + countPerLong * countPerLong * countPerLong * countPerLong * countPerLong + ); + + } } } } } - bulk("manylongs", bulk.toString()); - bulk.setLength(0); } initIndex("manylongs", bulk.toString()); } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java index 553b4319f22e9..9d5b0a7ae64b8 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java @@ -19,7 +19,9 @@ import org.apache.lucene.search.TopDocsCollector; import org.apache.lucene.search.TopFieldCollectorManager; import org.apache.lucene.search.TopScoreDocCollectorManager; +import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.DocBlock; import org.elasticsearch.compute.data.DocVector; @@ -44,7 +46,11 @@ import java.util.stream.Collectors; /** - * Source operator that builds Pages out of the output of a TopFieldCollector (aka TopN) + * Source operator that builds Pages out of the output of a TopFieldCollector (aka TopN). + *

+ * Makes {@link Page}s of the shape {@code (docBlock)} or {@code (docBlock, score)}. + * Lucene loads the sort keys, but we don't read them from lucene. Yet. We should. + *

*/ public final class LuceneTopNSourceOperator extends LuceneOperator { @@ -52,6 +58,7 @@ public static class Factory extends LuceneOperator.Factory { private final List contexts; private final int maxPageSize; private final List> sorts; + private final long estimatedPerRowSortSize; public Factory( List contexts, @@ -61,6 +68,7 @@ public Factory( int maxPageSize, int limit, List> sorts, + long estimatedPerRowSortSize, boolean needsScore ) { super( @@ -76,11 +84,22 @@ public Factory( this.contexts = contexts; this.maxPageSize = maxPageSize; this.sorts = sorts; + this.estimatedPerRowSortSize = estimatedPerRowSortSize; } @Override public SourceOperator get(DriverContext driverContext) { - return new LuceneTopNSourceOperator(contexts, driverContext.blockFactory(), maxPageSize, sorts, limit, sliceQueue, needsScore); + return new LuceneTopNSourceOperator( + driverContext.breaker(), + contexts, + driverContext.blockFactory(), + maxPageSize, + sorts, + estimatedPerRowSortSize, + limit, + sliceQueue, + needsScore + ); } public int maxPageSize() { @@ -104,35 +123,44 @@ public String describe() { } } + private final List contexts; + private final CircuitBreaker breaker; + private final List> sorts; + private final long estimatedPerRowSortSize; + private final int limit; + private final boolean needsScore; + /** - * Collected docs. {@code null} until we're {@link #emit(boolean)}. + * Collected docs. {@code null} until we're ready to {@link #emit()}. */ - private ScoreDoc[] scoreDocs; + private ScoreDoc[] topDocs; + /** - * The offset in {@link #scoreDocs} of the next page. + * The offset in {@link #topDocs} of the next page. */ private int offset = 0; private PerShardCollector perShardCollector; - private final List contexts; - private final List> sorts; - private final int limit; - private final boolean needsScore; public LuceneTopNSourceOperator( + CircuitBreaker breaker, List contexts, BlockFactory blockFactory, int maxPageSize, List> sorts, + long estimatedPerRowSortSize, int limit, LuceneSliceQueue sliceQueue, boolean needsScore ) { super(contexts, blockFactory, maxPageSize, sliceQueue); + this.breaker = breaker; this.contexts = contexts; this.sorts = sorts; + this.estimatedPerRowSortSize = estimatedPerRowSortSize; this.limit = limit; this.needsScore = needsScore; + breaker.addEstimateBytesAndMaybeBreak(reserveSize(), "esql lucene topn"); } @Override @@ -143,7 +171,7 @@ public boolean isFinished() { @Override public void finish() { doneCollecting = true; - scoreDocs = null; + topDocs = null; assert isFinished(); } @@ -155,7 +183,7 @@ public Page getCheckedOutput() throws IOException { long start = System.nanoTime(); try { if (isEmitting()) { - return emit(false); + return emit(); } else { return collect(); } @@ -169,7 +197,8 @@ private Page collect() throws IOException { var scorer = getCurrentOrLoadNextScorer(); if (scorer == null) { doneCollecting = true; - return emit(true); + startEmitting(); + return emit(); } try { if (scorer.tags().isEmpty() == false) { @@ -188,30 +217,45 @@ private Page collect() throws IOException { if (scorer.isDone()) { var nextScorer = getCurrentOrLoadNextScorer(); if (nextScorer == null || nextScorer.shardContext().index() != scorer.shardContext().index()) { - return emit(true); + startEmitting(); + return emit(); } } return null; } private boolean isEmitting() { - return scoreDocs != null && offset < scoreDocs.length; + return topDocs != null; } - private Page emit(boolean startEmitting) { - if (startEmitting) { - assert isEmitting() == false : "offset=" + offset + " score_docs=" + Arrays.toString(scoreDocs); - offset = 0; - if (perShardCollector != null) { - scoreDocs = perShardCollector.collector.topDocs().scoreDocs; - } else { - scoreDocs = new ScoreDoc[0]; - } + private void startEmitting() { + assert isEmitting() == false : "offset=" + offset + " score_docs=" + Arrays.toString(topDocs); + offset = 0; + if (perShardCollector != null) { + /* + * Important note for anyone who looks at this and has bright ideas: + * There *is* a method in lucene to return topDocs with an offset + * and a limit. So you'd *think* you can scroll the top docs there. + * But you can't. It's expressly forbidden to call any of the `topDocs` + * methods more than once. You *must* call `topDocs` once and use the + * array. + */ + topDocs = perShardCollector.collector.topDocs().scoreDocs; + } else { + topDocs = new ScoreDoc[0]; } - if (offset >= scoreDocs.length) { + } + + private void stopEmitting() { + topDocs = null; + } + + private Page emit() { + if (offset >= topDocs.length) { + stopEmitting(); return null; } - int size = Math.min(maxPageSize, scoreDocs.length - offset); + int size = Math.min(maxPageSize, topDocs.length - offset); IntBlock shard = null; IntVector segments = null; IntVector docs = null; @@ -227,14 +271,16 @@ private Page emit(boolean startEmitting) { offset += size; List leafContexts = perShardCollector.shardContext.searcher().getLeafContexts(); for (int i = start; i < offset; i++) { - int doc = scoreDocs[i].doc; + int doc = topDocs[i].doc; int segment = ReaderUtil.subIndex(doc, leafContexts); currentSegmentBuilder.appendInt(segment); currentDocsBuilder.appendInt(doc - leafContexts.get(segment).docBase); // the offset inside the segment if (currentScoresBuilder != null) { - float score = getScore(scoreDocs[i]); + float score = getScore(topDocs[i]); currentScoresBuilder.appendDouble(score); } + // Null the top doc so it can be GCed early, just in case. + topDocs[i] = null; } int shardId = perShardCollector.shardContext.index(); @@ -292,9 +338,20 @@ protected void describe(StringBuilder sb) { sb.append(", sorts = [").append(notPrettySorts).append("]"); } + @Override + protected void additionalClose() { + Releasables.close(() -> breaker.addWithoutBreaking(-reserveSize())); + } + + private long reserveSize() { + long perRowSize = FIELD_DOC_SIZE + estimatedPerRowSortSize; + return limit * perRowSize; + } + abstract static class PerShardCollector { private final ShardContext shardContext; private final TopDocsCollector collector; + private int leafIndex; private LeafCollector leafCollector; private Thread currentThread; @@ -360,4 +417,6 @@ private static PerShardCollector newPerShardCollector(ShardContext context, List sort = new Sort(l.toArray(SortField[]::new)); return new ScoringPerShardCollector(context, new TopFieldCollectorManager(sort, limit, null, 0).newCollector()); } + + private static final int FIELD_DOC_SIZE = Math.toIntExact(RamUsageEstimator.shallowSizeOf(FieldDoc.class)); } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java index fde51d4642ae0..328f2ec8ff413 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java @@ -294,7 +294,6 @@ public String describe() { private final BlockFactory blockFactory; private final CircuitBreaker breaker; - private final Queue inputQueue; private final int maxPageSize; @@ -302,6 +301,7 @@ public String describe() { private final List encoders; private final List sortOrders; + private Queue inputQueue; private Row spare; private int spareValuesPreAllocSize = 0; private int spareKeysPreAllocSize = 0; @@ -343,7 +343,7 @@ public TopNOperator( this.elementTypes = elementTypes; this.encoders = encoders; this.sortOrders = sortOrders; - this.inputQueue = new Queue(topCount); + this.inputQueue = Queue.build(breaker, topCount); } static int compareRows(Row r1, Row r2) { @@ -450,6 +450,8 @@ private Iterator toPages() { list.add(inputQueue.pop()); } Collections.reverse(list); + inputQueue.close(); + inputQueue = null; int p = 0; int size = 0; @@ -556,19 +558,27 @@ public Page getOutput() { @Override public void close() { - /* - * If we close before calling finish then spare and inputQueue will be live rows - * that need closing. If we close after calling finish then the output iterator - * will contain pages of results that have yet to be returned. - */ Releasables.closeExpectNoException( + /* + * The spare is used during most collections. It's cleared when this Operator + * is finish()ed. So it could be null here. + */ spare, - inputQueue == null ? null : Releasables.wrap(inputQueue), + /* + * The inputQueue is a min heap of all live rows. Closing it will close all + * the rows it contains and all decrement the breaker for the size of + * the heap itself. + */ + inputQueue, + /* + * If we're in the process of outputting pages then output will contain all + * allocated but un-emitted pages. + */ output == null ? null : Releasables.wrap(() -> Iterators.map(output, p -> p::releaseBlocks)) ); } - private static long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(TopNOperator.class) + RamUsageEstimator + private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(TopNOperator.class) + RamUsageEstimator .shallowSizeOfInstance(List.class) * 3; @Override @@ -582,13 +592,22 @@ public long ramBytesUsed() { size += RamUsageEstimator.alignObjectSize(arrHeader + ref * encoders.size()); size += RamUsageEstimator.alignObjectSize(arrHeader + ref * sortOrders.size()); size += sortOrders.size() * SortOrder.SHALLOW_SIZE; - size += inputQueue.ramBytesUsed(); + if (inputQueue != null) { + size += inputQueue.ramBytesUsed(); + } return size; } @Override public Status status() { - return new TopNOperatorStatus(inputQueue.size(), ramBytesUsed(), pagesReceived, pagesEmitted, rowsReceived, rowsEmitted); + return new TopNOperatorStatus( + inputQueue != null ? inputQueue.size() : 0, + ramBytesUsed(), + pagesReceived, + pagesEmitted, + rowsReceived, + rowsEmitted + ); } @Override @@ -604,17 +623,23 @@ public String toString() { + "]"; } - CircuitBreaker breaker() { - return breaker; - } - - private static class Queue extends PriorityQueue implements Accountable { + private static class Queue extends PriorityQueue implements Accountable, Releasable { private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(Queue.class); - private final int maxSize; + private final CircuitBreaker breaker; + private final int topCount; + + /** + * Track memory usage in the breaker then build the {@link Queue}. + */ + static Queue build(CircuitBreaker breaker, int topCount) { + breaker.addEstimateBytesAndMaybeBreak(Queue.sizeOf(topCount), "esql engine topn"); + return new Queue(breaker, topCount); + } - Queue(int maxSize) { - super(maxSize); - this.maxSize = maxSize; + private Queue(CircuitBreaker breaker, int topCount) { + super(topCount); + this.breaker = breaker; + this.topCount = topCount; } @Override @@ -624,19 +649,32 @@ protected boolean lessThan(Row r1, Row r2) { @Override public String toString() { - return size() + "/" + maxSize; + return size() + "/" + topCount; } @Override public long ramBytesUsed() { long total = SHALLOW_SIZE; total += RamUsageEstimator.alignObjectSize( - RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + RamUsageEstimator.NUM_BYTES_OBJECT_REF * (maxSize + 1) + RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + RamUsageEstimator.NUM_BYTES_OBJECT_REF * ((long) topCount + 1) ); for (Row r : this) { total += r == null ? 0 : r.ramBytesUsed(); } return total; } + + @Override + public void close() { + Releasables.close(Releasables.wrap(this), () -> breaker.addWithoutBreaking(-Queue.sizeOf(topCount))); + } + + public static long sizeOf(int topCount) { + long total = SHALLOW_SIZE; + total += RamUsageEstimator.alignObjectSize( + RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + RamUsageEstimator.NUM_BYTES_OBJECT_REF * ((long) topCount + 1) + ); + return total; + } } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java index 7ba1f9790ecbe..306c91c0b742f 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java @@ -97,6 +97,7 @@ public Optional buildSort(List> sorts) { int taskConcurrency = 0; int maxPageSize = between(10, Math.max(10, size)); List> sorts = List.of(new FieldSortBuilder("s")); + long estimatedPerRowSortSize = 16; return new LuceneTopNSourceOperator.Factory( List.of(ctx), queryFunction, @@ -105,6 +106,7 @@ public Optional buildSort(List> sorts) { maxPageSize, limit, sorts, + estimatedPerRowSortSize, true // scoring ); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java index 26540caee0b1f..30da9a753ddb4 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java @@ -102,6 +102,7 @@ public Optional buildSort(List> sorts) { int taskConcurrency = 0; int maxPageSize = between(10, Math.max(10, size)); List> sorts = List.of(new FieldSortBuilder("s")); + long estimatedPerRowSortSize = 16; return new LuceneTopNSourceOperator.Factory( List.of(ctx), queryFunction, @@ -110,6 +111,7 @@ public Optional buildSort(List> sorts) { maxPageSize, limit, sorts, + estimatedPerRowSortSize, scoring ); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java index 1180cdca64569..03669bdd7c641 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java @@ -1489,7 +1489,9 @@ public void testRowResizes() { block.decRef(); op.addInput(new Page(blocks)); - assertThat(breaker.getMemoryRequestCount(), is(94L)); + // 94 are from the collection process + // 1 is for the min-heap itself + assertThat(breaker.getMemoryRequestCount(), is(95L)); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExec.java index 2e74c7153f77e..4ad428fbea12c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExec.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExec.java @@ -69,6 +69,12 @@ public interface Sort { Order.OrderDirection direction(); FieldAttribute field(); + + /** + * Type of the result of the sort. For example, + * geo distance will be {@link DataType#DOUBLE}. + */ + DataType resulType(); } public record FieldSort(FieldAttribute field, Order.OrderDirection direction, Order.NullsPosition nulls) implements Sort { @@ -88,6 +94,11 @@ private static FieldSort readFrom(StreamInput in) throws IOException { in.readEnum(Order.NullsPosition.class) ); } + + @Override + public DataType resulType() { + return field.dataType(); + } } public record GeoDistanceSort(FieldAttribute field, Order.OrderDirection direction, double lat, double lon) implements Sort { @@ -97,6 +108,11 @@ public SortBuilder sortBuilder() { builder.order(Direction.from(direction).asOrder()); return builder; } + + @Override + public DataType resulType() { + return DataType.DOUBLE; + } } public record ScoreSort(Order.OrderDirection direction) implements Sort { @@ -110,6 +126,11 @@ public FieldAttribute field() { // TODO: refactor this: not all Sorts are backed by FieldAttributes return null; } + + @Override + public DataType resulType() { + return DataType.DOUBLE; + } } public EsQueryExec( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index ac63c644281dd..1f5308c28718b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -69,6 +69,7 @@ import org.elasticsearch.xpack.esql.plan.physical.AggregateExec; import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec.Sort; +import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize; import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec; import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec; import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesFieldExtractExec; @@ -277,9 +278,17 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec, boolean scoring = esQueryExec.hasScoring(); if ((sorts != null && sorts.isEmpty() == false)) { List> sortBuilders = new ArrayList<>(sorts.size()); + long estimatedPerRowSortSize = 0; for (Sort sort : sorts) { sortBuilders.add(sort.sortBuilder()); + estimatedPerRowSortSize += EstimatesRowSize.estimateSize(sort.resulType()); } + /* + * In the worst case Lucene's TopN keeps each value in memory twice. Once + * for the actual sort and once for the top doc. In the best case they share + * references to the same underlying data, but we're being a bit paranoid here. + */ + estimatedPerRowSortSize *= 2; luceneFactory = new LuceneTopNSourceOperator.Factory( shardContexts, querySupplier(esQueryExec.query()), @@ -288,6 +297,7 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec, context.pageSize(rowEstimatedSize), limit, sortBuilders, + estimatedPerRowSortSize, scoring ); } else {