From b05ba31dc57773f856a1771639d4aa35399bfe68 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Fri, 5 Sep 2025 12:57:30 -0400 Subject: [PATCH 1/7] ESQL: Reserve memory for Lucene's TopN Lucene doesn't track memory usage for TopN and can use a fair bit of it. Try this query: ``` FROM big_table | SORT a, b, c, d, e | LIMIT 1000000 | STATS MAX(a) ``` We attempt to return all million documents from lucene. Is we did this with the compute engine we're track all of the memory usage. With lucene we have to reserve it. In the case of the query above the sort keys weight 8 bytes each. 40 bytes total. Plus another 72 for Lucene's `FieldDoc`. And another 40 at least for copying to the values to `FieldDoc`. That totals something like 152 bytes a piece. That's 145mb. Worth tracking! --- .../xpack/esql/heap_attack/HeapAttackIT.java | 80 ++++-- .../lucene/LuceneTopNSourceOperator.java | 262 ++++++++++++++---- 2 files changed, 273 insertions(+), 69 deletions(-) diff --git a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java index 2f7b21fa3564b..2696d41aa47f0 100644 --- a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java +++ b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java @@ -91,7 +91,7 @@ public void skipOnAborted() { * This used to fail, but we've since compacted top n so it actually succeeds now. */ public void testSortByManyLongsSuccess() throws IOException { - initManyLongs(); + initManyLongs(10); Map response = sortByManyLongs(500); ListMatcher columns = matchesList().item(matchesMap().entry("name", "a").entry("type", "long")) .item(matchesMap().entry("name", "b").entry("type", "long")); @@ -108,7 +108,7 @@ public void testSortByManyLongsSuccess() throws IOException { * This used to crash the node with an out of memory, but now it just trips a circuit breaker. */ public void testSortByManyLongsTooMuchMemory() throws IOException { - initManyLongs(); + initManyLongs(10); // 5000 is plenty to break on most nodes assertCircuitBreaks(attempt -> sortByManyLongs(attempt * 5000)); } @@ -117,7 +117,7 @@ public void testSortByManyLongsTooMuchMemory() throws IOException { * This should record an async response with a {@link CircuitBreakingException}. */ public void testSortByManyLongsTooMuchMemoryAsync() throws IOException { - initManyLongs(); + initManyLongs(10); Request request = new Request("POST", "/_query/async"); request.addParameter("error_trace", ""); request.setJsonEntity(makeSortByManyLongs(5000).toString().replace("\n", "\\n")); @@ -194,6 +194,24 @@ public void testSortByManyLongsTooMuchMemoryAsync() throws IOException { ); } + public void testSortByManyLongsGiantTopN() throws IOException { + initManyLongs(10); + assertMap(sortBySomeLongsLimit(100000), + matchesMap() + .entry("took", greaterThan(0)) + .entry("is_partial", false) + .entry("columns", List.of(Map.of("name", "MAX(a)", "type", "long"))) + .entry("values", List.of(List.of(9))) + .entry("documents_found", greaterThan(0)) + .entry("values_loaded", greaterThan(0)) + ); + } + + public void testSortByManyLongsGiantTopNTooMuchMemory() throws IOException { + initManyLongs(20); + assertCircuitBreaks(attempt -> sortBySomeLongsLimit(attempt * 500000)); + } + private static final int MAX_ATTEMPTS = 5; interface TryCircuitBreaking { @@ -252,11 +270,25 @@ private StringBuilder makeSortByManyLongs(int count) { return query; } + private Map sortBySomeLongsLimit(int count) throws IOException { + logger.info("sorting by 5 longs, keeping {}", count); + return responseAsMap(query(makeSortBySomeLongsLimit(count), null)); + } + + private String makeSortBySomeLongsLimit(int count) { + StringBuilder query = new StringBuilder("{\"query\": \"FROM manylongs\n"); + query.append("| SORT a, b, c, d, e\n"); + query.append("| LIMIT ").append(count).append("\n"); + query.append("| STATS MAX(a)\n"); + query.append("\"}"); + return query.toString(); + } + /** * This groups on about 200 columns which is a lot but has never caused us trouble. */ public void testGroupOnSomeLongs() throws IOException { - initManyLongs(); + initManyLongs(10); Response resp = groupOnManyLongs(200); Map map = responseAsMap(resp); ListMatcher columns = matchesList().item(matchesMap().entry("name", "MAX(a)").entry("type", "long")); @@ -268,7 +300,7 @@ public void testGroupOnSomeLongs() throws IOException { * This groups on 5000 columns which used to throw a {@link StackOverflowError}. */ public void testGroupOnManyLongs() throws IOException { - initManyLongs(); + initManyLongs(10); Response resp = groupOnManyLongs(5000); Map map = responseAsMap(resp); ListMatcher columns = matchesList().item(matchesMap().entry("name", "MAX(a)").entry("type", "long")); @@ -336,7 +368,7 @@ private Response concat(int evals) throws IOException { */ public void testManyConcat() throws IOException { int strings = 300; - initManyLongs(); + initManyLongs(10); assertManyStrings(manyConcat("FROM manylongs", strings), strings); } @@ -344,7 +376,7 @@ public void testManyConcat() throws IOException { * Hits a circuit breaker by building many moderately long strings. */ public void testHugeManyConcat() throws IOException { - initManyLongs(); + initManyLongs(10); // 2000 is plenty to break on most nodes assertCircuitBreaks(attempt -> manyConcat("FROM manylongs", attempt * 2000)); } @@ -415,7 +447,7 @@ private Map manyConcat(String init, int strings) throws IOExcept */ public void testManyRepeat() throws IOException { int strings = 30; - initManyLongs(); + initManyLongs(10); assertManyStrings(manyRepeat("FROM manylongs", strings), 30); } @@ -423,7 +455,7 @@ public void testManyRepeat() throws IOException { * Hits a circuit breaker by building many moderately long strings. */ public void testHugeManyRepeat() throws IOException { - initManyLongs(); + initManyLongs(10); // 75 is plenty to break on most nodes assertCircuitBreaks(attempt -> manyRepeat("FROM manylongs", attempt * 75)); } @@ -481,7 +513,7 @@ private void assertManyStrings(Map resp, int strings) throws IOE } public void testManyEval() throws IOException { - initManyLongs(); + initManyLongs(10); Map response = manyEval(1); ListMatcher columns = matchesList(); columns = columns.item(matchesMap().entry("name", "a").entry("type", "long")); @@ -496,7 +528,7 @@ public void testManyEval() throws IOException { } public void testTooManyEval() throws IOException { - initManyLongs(); + initManyLongs(10); // 490 is plenty to fail on most nodes assertCircuitBreaks(attempt -> manyEval(attempt * 490)); } @@ -855,24 +887,34 @@ private Map enrichExplosion(int sensorDataCount, int lookupEntri } } - private void initManyLongs() throws IOException { + private void initManyLongs(int countPerLong) throws IOException { logger.info("loading many documents with longs"); StringBuilder bulk = new StringBuilder(); - for (int a = 0; a < 10; a++) { - for (int b = 0; b < 10; b++) { - for (int c = 0; c < 10; c++) { - for (int d = 0; d < 10; d++) { - for (int e = 0; e < 10; e++) { + int flush = 0; + for (int a = 0; a < countPerLong; a++) { + for (int b = 0; b < countPerLong; b++) { + for (int c = 0; c < countPerLong; c++) { + for (int d = 0; d < countPerLong; d++) { + for (int e = 0; e < countPerLong; e++) { bulk.append(String.format(Locale.ROOT, """ {"create":{}} {"a":%d,"b":%d,"c":%d,"d":%d,"e":%d} """, a, b, c, d, e)); + flush++; + if (flush % 10_000 == 0) { + bulk("manylongs", bulk.toString()); + bulk.setLength(0); + logger.info( + "flushing {}/{} to manylongs", + flush, + countPerLong * countPerLong * countPerLong * countPerLong * countPerLong + ); + + } } } } } - bulk("manylongs", bulk.toString()); - bulk.setLength(0); } initIndex("manylongs", bulk.toString()); } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java index 22c438f9a45e3..f53a0df744624 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java @@ -16,10 +16,15 @@ import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Sort; import org.apache.lucene.search.SortField; +import org.apache.lucene.search.SortedNumericSortField; +import org.apache.lucene.search.SortedSetSortField; import org.apache.lucene.search.TopDocsCollector; import org.apache.lucene.search.TopFieldCollectorManager; import org.apache.lucene.search.TopScoreDocCollectorManager; +import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.NoopCircuitBreaker; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.DocBlock; import org.elasticsearch.compute.data.DocVector; @@ -30,13 +35,13 @@ import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.SourceOperator; +import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.search.sort.SortAndFormats; import org.elasticsearch.search.sort.SortBuilder; import java.io.IOException; import java.io.UncheckedIOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Optional; @@ -44,7 +49,11 @@ import java.util.stream.Collectors; /** - * Source operator that builds Pages out of the output of a TopFieldCollector (aka TopN) + * Source operator that builds Pages out of the output of a TopFieldCollector (aka TopN). + *

+ * Makes {@link Page}s of the shape {@code (docBlock)} or {@code (docBlock, score)}. + * Lucene loads the sort keys, but we don't read them from lucene. Yet. We should. + *

*/ public final class LuceneTopNSourceOperator extends LuceneOperator { @@ -80,7 +89,16 @@ public Factory( @Override public SourceOperator get(DriverContext driverContext) { - return new LuceneTopNSourceOperator(contexts, driverContext.blockFactory(), maxPageSize, sorts, limit, sliceQueue, needsScore); + return new LuceneTopNSourceOperator( + contexts, + driverContext.breaker(), + driverContext.blockFactory(), + maxPageSize, + sorts, + limit, + sliceQueue, + needsScore + ); } public int maxPageSize() { @@ -104,10 +122,15 @@ public String describe() { } } + private final CircuitBreaker breaker; + private final List> sorts; + private final int limit; + private final boolean needsScore; + /** - * Collected docs. {@code null} until we're {@link #emit(boolean)}. + * Collected docs. {@code null} until we're ready to {@link #emit()}. */ - private ScoreDoc[] scoreDocs; + private ScoreDoc[] topDocs; /** * {@link ShardRefCounted} for collected docs. @@ -115,17 +138,15 @@ public String describe() { private ShardRefCounted shardRefCounted; /** - * The offset in {@link #scoreDocs} of the next page. + * The offset in {@link #topDocs} of the next page. */ private int offset = 0; private PerShardCollector perShardCollector; - private final List> sorts; - private final int limit; - private final boolean needsScore; public LuceneTopNSourceOperator( List contexts, + CircuitBreaker breaker, BlockFactory blockFactory, int maxPageSize, List> sorts, @@ -134,6 +155,7 @@ public LuceneTopNSourceOperator( boolean needsScore ) { super(contexts, blockFactory, maxPageSize, sliceQueue); + this.breaker = breaker; this.sorts = sorts; this.limit = limit; this.needsScore = needsScore; @@ -147,7 +169,7 @@ public boolean isFinished() { @Override public void finish() { doneCollecting = true; - scoreDocs = null; + topDocs = null; shardRefCounted = null; assert isFinished(); } @@ -160,7 +182,7 @@ public Page getCheckedOutput() throws IOException { long start = System.nanoTime(); try { if (isEmitting()) { - return emit(false); + return emit(); } else { return collect(); } @@ -174,15 +196,19 @@ private Page collect() throws IOException { var scorer = getCurrentOrLoadNextScorer(); if (scorer == null) { doneCollecting = true; - return emit(true); + startEmitting(); + return emit(); } try { if (scorer.tags().isEmpty() == false) { throw new UnsupportedOperationException("tags not supported by " + getClass()); } - if (perShardCollector == null || perShardCollector.shardContext.index() != scorer.shardContext().index()) { + if (perShardCollector == null) { + perShardCollector = newPerShardCollector(breaker, scorer.shardContext(), sorts, needsScore, limit); + } else if (perShardCollector.shardContext.index() != scorer.shardContext().index()) { // TODO: share the bottom between shardCollectors - perShardCollector = newPerShardCollector(scorer.shardContext(), sorts, needsScore, limit); + Releasables.close(perShardCollector); + perShardCollector = newPerShardCollector(breaker, scorer.shardContext(), sorts, needsScore, limit); } var leafCollector = perShardCollector.getLeafCollector(scorer.leafReaderContext()); scorer.scoreNextRange(leafCollector, scorer.leafReaderContext().reader().getLiveDocs(), maxPageSize); @@ -193,32 +219,47 @@ private Page collect() throws IOException { if (scorer.isDone()) { var nextScorer = getCurrentOrLoadNextScorer(); if (nextScorer == null || nextScorer.shardContext().index() != scorer.shardContext().index()) { - return emit(true); + startEmitting(); + return emit(); } } return null; } private boolean isEmitting() { - return scoreDocs != null && offset < scoreDocs.length; + return topDocs != null; } - private Page emit(boolean startEmitting) { - if (startEmitting) { - assert isEmitting() == false : "offset=" + offset + " score_docs=" + Arrays.toString(scoreDocs); - offset = 0; - if (perShardCollector != null) { - scoreDocs = perShardCollector.collector.topDocs().scoreDocs; - int shardId = perShardCollector.shardContext.index(); - shardRefCounted = new ShardRefCounted.Single(shardId, shardContextCounters.get(shardId)); - } else { - scoreDocs = new ScoreDoc[0]; - } + private void startEmitting() { + assert isEmitting() == false : "offset=" + offset + " score_docs=" + Arrays.toString(topDocs); + offset = 0; + if (perShardCollector != null) { + /* + * Important note for anyone who looks at this and has bright ideas: + * There *is* a method in lucene to return topDocs with an offset + * and a limit. So you'd *think* you can scroll the top docs there. + * But you can't. It's expressly forbidden to call any of the `topDocs` + * methods more than once. You *must* call `topDocs` once and use the + * array. + */ + topDocs = perShardCollector.collector.topDocs().scoreDocs; + int shardId = perShardCollector.shardContext.index(); + shardRefCounted = new ShardRefCounted.Single(shardId, shardContextCounters.get(shardId)); + } else { + topDocs = new ScoreDoc[0]; } - if (offset >= scoreDocs.length) { + } + + private void stopEmitting() { + topDocs = null; + } + + private Page emit() { + if (offset >= topDocs.length) { + stopEmitting(); return null; } - int size = Math.min(maxPageSize, scoreDocs.length - offset); + int size = Math.min(maxPageSize, topDocs.length - offset); IntBlock shard = null; IntVector segments = null; IntVector docs = null; @@ -234,14 +275,16 @@ private Page emit(boolean startEmitting) { offset += size; List leafContexts = perShardCollector.shardContext.searcher().getLeafContexts(); for (int i = start; i < offset; i++) { - int doc = scoreDocs[i].doc; + int doc = topDocs[i].doc; int segment = ReaderUtil.subIndex(doc, leafContexts); currentSegmentBuilder.appendInt(segment); currentDocsBuilder.appendInt(doc - leafContexts.get(segment).docBase); // the offset inside the segment if (currentScoresBuilder != null) { - float score = getScore(scoreDocs[i]); + float score = getScore(topDocs[i]); currentScoresBuilder.appendDouble(score); } + // Null the top doc so it can be GCed early, just in case. + topDocs[i] = null; } int shardId = perShardCollector.shardContext.index(); @@ -298,14 +341,32 @@ protected void describe(StringBuilder sb) { sb.append(", sorts = [").append(notPrettySorts).append("]"); } - abstract static class PerShardCollector { + @Override + protected void additionalClose() { + Releasables.close(perShardCollector); + } + + abstract static class PerShardCollector implements Releasable { + private final CircuitBreaker breaker; private final ShardContext shardContext; private final TopDocsCollector collector; + private final long ramBytesUsed; + private int leafIndex; private LeafCollector leafCollector; private Thread currentThread; - PerShardCollector(ShardContext shardContext, TopDocsCollector collector) { + PerShardCollector( + CircuitBreaker breaker, + ShardContext shardContext, + TopDocsCollector collector, + int perDocMemoryUsage, + int limit + ) { + this.ramBytesUsed = ((long) perDocMemoryUsage) * limit; + breaker.addEstimateBytesAndMaybeBreak(ramBytesUsed, "esql topn"); + + this.breaker = breaker; this.shardContext = shardContext; this.collector = collector; } @@ -318,18 +379,29 @@ LeafCollector getLeafCollector(LeafReaderContext leafReaderContext) throws IOExc } return leafCollector; } + + @Override + public void close() { + breaker.addWithoutBreaking(-ramBytesUsed); + } } static final class NonScoringPerShardCollector extends PerShardCollector { - NonScoringPerShardCollector(ShardContext shardContext, Sort sort, int limit) { + NonScoringPerShardCollector(CircuitBreaker breaker, ShardContext shardContext, Sort sort, int limit, int perDocMemoryUsage) { // We don't use CollectorManager here as we don't retrieve the total hits and sort by score. - super(shardContext, new TopFieldCollectorManager(sort, limit, null, 0).newCollector()); + super(breaker, shardContext, new TopFieldCollectorManager(sort, limit, null, 0).newCollector(), limit, perDocMemoryUsage); } } static final class ScoringPerShardCollector extends PerShardCollector { - ScoringPerShardCollector(ShardContext shardContext, TopDocsCollector topDocsCollector) { - super(shardContext, topDocsCollector); + ScoringPerShardCollector( + CircuitBreaker breaker, + ShardContext shardContext, + TopDocsCollector topDocsCollector, + int limit, + int perDocMemoryUsage + ) { + super(breaker, shardContext, topDocsCollector, limit, perDocMemoryUsage); } } @@ -337,33 +409,123 @@ private static Function scoreModeFunction(List { try { // we create a collector with a limit of 1 to determine the appropriate score mode to use. - return newPerShardCollector(ctx, sorts, needsScore, 1).collector.scoreMode(); + return newPerShardCollector(new NoopCircuitBreaker("sniff mode"), ctx, sorts, needsScore, 1).collector.scoreMode(); } catch (IOException e) { throw new UncheckedIOException(e); } }; } - private static PerShardCollector newPerShardCollector(ShardContext context, List> sorts, boolean needsScore, int limit) - throws IOException { + private static PerShardCollector newPerShardCollector( + CircuitBreaker breaker, + ShardContext context, + List> sorts, + boolean needsScore, + int limit + ) throws IOException { Optional sortAndFormats = context.buildSort(sorts); if (sortAndFormats.isEmpty()) { throw new IllegalStateException("sorts must not be disabled in TopN"); } - if (needsScore == false) { - return new NonScoringPerShardCollector(context, sortAndFormats.get().sort, limit); - } Sort sort = sortAndFormats.get().sort; - if (Sort.RELEVANCE.equals(sort)) { + if (needsScore && Sort.RELEVANCE.equals(sort)) { // SORT _score DESC - return new ScoringPerShardCollector(context, new TopScoreDocCollectorManager(limit, null, 0).newCollector()); + return new ScoringPerShardCollector( + breaker, + context, + new TopScoreDocCollectorManager(limit, null, 0).newCollector(), + limit, + SCORE_DOC_SIZE + ); } + if (needsScore == false) { + SortField[] sortFields = new SortField[sort.getSort().length]; + for (int s = 0; s < sort.getSort().length; s++) { + sortFields[s] = sort.getSort()[s].rewrite(context.searcher()); + } + sort = new Sort(sortFields); + return new NonScoringPerShardCollector(breaker, context, sort, limit, perDocMemoryUsage(sortFields)); + } // SORT ..., _score, ... - var l = new ArrayList<>(Arrays.asList(sort.getSort())); - l.add(SortField.FIELD_DOC); - l.add(SortField.FIELD_SCORE); - sort = new Sort(l.toArray(SortField[]::new)); - return new ScoringPerShardCollector(context, new TopFieldCollectorManager(sort, limit, null, 0).newCollector()); + SortField[] sortFields = new SortField[sort.getSort().length + 2]; + for (int s = 0; s < sort.getSort().length; s++) { + sortFields[s] = sort.getSort()[s].rewrite(context.searcher()); + } + sortFields[sort.getSort().length] = SortField.FIELD_DOC; + sortFields[sort.getSort().length + 1] = SortField.FIELD_SCORE; + sort = new Sort(sortFields); + return new ScoringPerShardCollector( + breaker, + context, + new TopFieldCollectorManager(sort, limit, null, 0).newCollector(), + limit, + perDocMemoryUsage(sort.getSort()) + ); } + + private static int perDocMemoryUsage(SortField[] sorts) { + int usage = FIELD_DOC_SIZE; + for (SortField sort : sorts) { + usage += perDocMemoryUsage(sort); + } + return usage; + } + + private static int perDocMemoryUsage(SortField sort) { + if (sort.getType() == SortField.Type.CUSTOM) { + return perDocMemoryUsageForCustom(sort); + } + return perDocMemoryUsageByType(sort, sort.getType()); + + } + + private static int perDocMemoryUsageByType(SortField sort, SortField.Type type) { + return switch (type) { + case SCORE, DOC -> + /* SCORE and DOC are always part of ScoreDoc/FieldDoc + * So they are in FIELD_DOC_SIZE already. + * And they can't be removed. */ + 0; + case DOUBLE, LONG -> + // 8 for the long, 8 for the long copied to the topDoc. + 16; + case INT, FLOAT -> + // 4 for the int, 8 boxed object copied to topDoc. + 12; + case STRING -> + /* `keyword`-like fields. Compares ordinals when possible, otherwise + * the strings. Does a bunch of deduplication, but in the worst + * case we end up with the string itself, plus two BytesRefs. Let's + * presume short-ish strings. */ + 1024; + case STRING_VAL -> + /* Other string fields. Compares the string itself. Let's assume two + * 2kb per string because they tend to be bigger than the keyword + * versions. */ + 2048; + case CUSTOM -> throw new IllegalArgumentException("unsupported type " + sort.getClass() + ": " + sort); + case REWRITEABLE -> { + assert false : "rewriteable " + sort.getClass() + ": " + sort; + yield 2048; + } + }; + } + + private static int perDocMemoryUsageForCustom(SortField sort) { + return switch (sort) { + case SortedNumericSortField f -> perDocMemoryUsageByType(f, f.getNumericType()); + case SortedSetSortField f -> perDocMemoryUsageByType(f, SortField.Type.STRING); + default -> { + if (sort.getClass().getName().equals("org.apache.lucene.document.LatLonPointSortField")) { + yield perDocMemoryUsageByType(sort, SortField.Type.DOUBLE); + } + assert false : "unknown type " + sort.getClass() + ": " + sort; + yield 2048; + } + }; + } + + private static final int SCORE_DOC_SIZE = Math.toIntExact(RamUsageEstimator.shallowSizeOf(ScoreDoc.class)); + private static final int FIELD_DOC_SIZE = Math.toIntExact(RamUsageEstimator.shallowSizeOf(FieldDoc.class)); } From bc5de0a66a6d2558ccd622abf0a09bc081b84820 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Fri, 5 Sep 2025 13:14:32 -0400 Subject: [PATCH 2/7] Update docs/changelog/134235.yaml --- docs/changelog/134235.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/134235.yaml diff --git a/docs/changelog/134235.yaml b/docs/changelog/134235.yaml new file mode 100644 index 0000000000000..b2e1ca6c9499c --- /dev/null +++ b/docs/changelog/134235.yaml @@ -0,0 +1,5 @@ +pr: 134235 +summary: Reserve memory for Lucene's TopN +area: ES|QL +type: bug +issues: [] From 673bdcfa71be991fa8f56f6bac17f450a43d456a Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Fri, 5 Sep 2025 17:21:15 +0000 Subject: [PATCH 3/7] [CI] Auto commit changes from spotless --- .../elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java index 2696d41aa47f0..3dc947c6be4ba 100644 --- a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java +++ b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java @@ -196,9 +196,9 @@ public void testSortByManyLongsTooMuchMemoryAsync() throws IOException { public void testSortByManyLongsGiantTopN() throws IOException { initManyLongs(10); - assertMap(sortBySomeLongsLimit(100000), - matchesMap() - .entry("took", greaterThan(0)) + assertMap( + sortBySomeLongsLimit(100000), + matchesMap().entry("took", greaterThan(0)) .entry("is_partial", false) .entry("columns", List.of(Map.of("name", "MAX(a)", "type", "long"))) .entry("values", List.of(List.of(9))) From 64fac59f8f7a1dcec65475328be08aaf68b0f0ed Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Fri, 5 Sep 2025 13:40:47 -0400 Subject: [PATCH 4/7] Format --- .../elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java index 2696d41aa47f0..3dc947c6be4ba 100644 --- a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java +++ b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java @@ -196,9 +196,9 @@ public void testSortByManyLongsTooMuchMemoryAsync() throws IOException { public void testSortByManyLongsGiantTopN() throws IOException { initManyLongs(10); - assertMap(sortBySomeLongsLimit(100000), - matchesMap() - .entry("took", greaterThan(0)) + assertMap( + sortBySomeLongsLimit(100000), + matchesMap().entry("took", greaterThan(0)) .entry("is_partial", false) .entry("columns", List.of(Map.of("name", "MAX(a)", "type", "long"))) .entry("values", List.of(List.of(9))) From 1920982b9a4e28a7944c79c53271feb1c442f8f2 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Fri, 5 Sep 2025 15:28:17 -0400 Subject: [PATCH 5/7] Switch to row size estimates - worse, but easier --- .../lucene/LuceneTopNSourceOperator.java | 111 ++++++------------ .../LuceneTopNSourceOperatorScoringTests.java | 2 + .../lucene/LuceneTopNSourceOperatorTests.java | 2 + .../planner/EsPhysicalOperationProviders.java | 10 ++ 4 files changed, 51 insertions(+), 74 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java index f53a0df744624..8be3cf2b6e711 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java @@ -30,6 +30,7 @@ import org.elasticsearch.compute.data.DocVector; import org.elasticsearch.compute.data.DoubleBlock; import org.elasticsearch.compute.data.DoubleVector; +import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.IntVector; import org.elasticsearch.compute.data.Page; @@ -42,6 +43,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Optional; @@ -61,6 +63,7 @@ public static class Factory extends LuceneOperator.Factory { private final List contexts; private final int maxPageSize; private final List> sorts; + private final long estimatedPerRowSortSize; public Factory( List contexts, @@ -70,6 +73,7 @@ public Factory( int maxPageSize, int limit, List> sorts, + long estimatedPerRowSortSize, boolean needsScore ) { super( @@ -85,6 +89,7 @@ public Factory( this.contexts = contexts; this.maxPageSize = maxPageSize; this.sorts = sorts; + this.estimatedPerRowSortSize = estimatedPerRowSortSize; } @Override @@ -95,6 +100,7 @@ public SourceOperator get(DriverContext driverContext) { driverContext.blockFactory(), maxPageSize, sorts, + estimatedPerRowSortSize, limit, sliceQueue, needsScore @@ -124,6 +130,7 @@ public String describe() { private final CircuitBreaker breaker; private final List> sorts; + private final long estimatedPerRowSortSize; private final int limit; private final boolean needsScore; @@ -150,6 +157,7 @@ public LuceneTopNSourceOperator( BlockFactory blockFactory, int maxPageSize, List> sorts, + long estimatedPerRowSortSize, int limit, LuceneSliceQueue sliceQueue, boolean needsScore @@ -157,8 +165,10 @@ public LuceneTopNSourceOperator( super(contexts, blockFactory, maxPageSize, sliceQueue); this.breaker = breaker; this.sorts = sorts; + this.estimatedPerRowSortSize = estimatedPerRowSortSize; this.limit = limit; this.needsScore = needsScore; + breaker.addEstimateBytesAndMaybeBreak(reserveSize(), "esql topn"); } @Override @@ -203,12 +213,9 @@ private Page collect() throws IOException { if (scorer.tags().isEmpty() == false) { throw new UnsupportedOperationException("tags not supported by " + getClass()); } - if (perShardCollector == null) { - perShardCollector = newPerShardCollector(breaker, scorer.shardContext(), sorts, needsScore, limit); - } else if (perShardCollector.shardContext.index() != scorer.shardContext().index()) { + if (perShardCollector == null || perShardCollector.shardContext.index() != scorer.shardContext().index()) { // TODO: share the bottom between shardCollectors - Releasables.close(perShardCollector); - perShardCollector = newPerShardCollector(breaker, scorer.shardContext(), sorts, needsScore, limit); + perShardCollector = newPerShardCollector(scorer.shardContext(), sorts, needsScore, limit); } var leafCollector = perShardCollector.getLeafCollector(scorer.leafReaderContext()); scorer.scoreNextRange(leafCollector, scorer.leafReaderContext().reader().getLiveDocs(), maxPageSize); @@ -343,30 +350,23 @@ protected void describe(StringBuilder sb) { @Override protected void additionalClose() { - Releasables.close(perShardCollector); + Releasables.close(() -> breaker.addWithoutBreaking(-reserveSize())); } - abstract static class PerShardCollector implements Releasable { - private final CircuitBreaker breaker; + private long reserveSize() { + long perRowSize = FIELD_DOC_SIZE + estimatedPerRowSortSize; + return limit * perRowSize; + } + + abstract static class PerShardCollector { private final ShardContext shardContext; private final TopDocsCollector collector; - private final long ramBytesUsed; private int leafIndex; private LeafCollector leafCollector; private Thread currentThread; - PerShardCollector( - CircuitBreaker breaker, - ShardContext shardContext, - TopDocsCollector collector, - int perDocMemoryUsage, - int limit - ) { - this.ramBytesUsed = ((long) perDocMemoryUsage) * limit; - breaker.addEstimateBytesAndMaybeBreak(ramBytesUsed, "esql topn"); - - this.breaker = breaker; + PerShardCollector(ShardContext shardContext, TopDocsCollector collector) { this.shardContext = shardContext; this.collector = collector; } @@ -379,29 +379,18 @@ LeafCollector getLeafCollector(LeafReaderContext leafReaderContext) throws IOExc } return leafCollector; } - - @Override - public void close() { - breaker.addWithoutBreaking(-ramBytesUsed); - } } static final class NonScoringPerShardCollector extends PerShardCollector { - NonScoringPerShardCollector(CircuitBreaker breaker, ShardContext shardContext, Sort sort, int limit, int perDocMemoryUsage) { + NonScoringPerShardCollector(ShardContext shardContext, Sort sort, int limit) { // We don't use CollectorManager here as we don't retrieve the total hits and sort by score. - super(breaker, shardContext, new TopFieldCollectorManager(sort, limit, null, 0).newCollector(), limit, perDocMemoryUsage); + super(shardContext, new TopFieldCollectorManager(sort, limit, null, 0).newCollector()); } } static final class ScoringPerShardCollector extends PerShardCollector { - ScoringPerShardCollector( - CircuitBreaker breaker, - ShardContext shardContext, - TopDocsCollector topDocsCollector, - int limit, - int perDocMemoryUsage - ) { - super(breaker, shardContext, topDocsCollector, limit, perDocMemoryUsage); + ScoringPerShardCollector(ShardContext shardContext, TopDocsCollector topDocsCollector) { + super(shardContext, topDocsCollector); } } @@ -409,59 +398,34 @@ private static Function scoreModeFunction(List { try { // we create a collector with a limit of 1 to determine the appropriate score mode to use. - return newPerShardCollector(new NoopCircuitBreaker("sniff mode"), ctx, sorts, needsScore, 1).collector.scoreMode(); + return newPerShardCollector(ctx, sorts, needsScore, 1).collector.scoreMode(); } catch (IOException e) { throw new UncheckedIOException(e); } }; } - private static PerShardCollector newPerShardCollector( - CircuitBreaker breaker, - ShardContext context, - List> sorts, - boolean needsScore, - int limit - ) throws IOException { + private static PerShardCollector newPerShardCollector(ShardContext context, List> sorts, boolean needsScore, int limit) + throws IOException { Optional sortAndFormats = context.buildSort(sorts); if (sortAndFormats.isEmpty()) { throw new IllegalStateException("sorts must not be disabled in TopN"); } + if (needsScore == false) { + return new NonScoringPerShardCollector(context, sortAndFormats.get().sort, limit); + } Sort sort = sortAndFormats.get().sort; - if (needsScore && Sort.RELEVANCE.equals(sort)) { + if (Sort.RELEVANCE.equals(sort)) { // SORT _score DESC - return new ScoringPerShardCollector( - breaker, - context, - new TopScoreDocCollectorManager(limit, null, 0).newCollector(), - limit, - SCORE_DOC_SIZE - ); + return new ScoringPerShardCollector(context, new TopScoreDocCollectorManager(limit, null, 0).newCollector()); } - if (needsScore == false) { - SortField[] sortFields = new SortField[sort.getSort().length]; - for (int s = 0; s < sort.getSort().length; s++) { - sortFields[s] = sort.getSort()[s].rewrite(context.searcher()); - } - sort = new Sort(sortFields); - return new NonScoringPerShardCollector(breaker, context, sort, limit, perDocMemoryUsage(sortFields)); - } // SORT ..., _score, ... - SortField[] sortFields = new SortField[sort.getSort().length + 2]; - for (int s = 0; s < sort.getSort().length; s++) { - sortFields[s] = sort.getSort()[s].rewrite(context.searcher()); - } - sortFields[sort.getSort().length] = SortField.FIELD_DOC; - sortFields[sort.getSort().length + 1] = SortField.FIELD_SCORE; - sort = new Sort(sortFields); - return new ScoringPerShardCollector( - breaker, - context, - new TopFieldCollectorManager(sort, limit, null, 0).newCollector(), - limit, - perDocMemoryUsage(sort.getSort()) - ); + var l = new ArrayList<>(Arrays.asList(sort.getSort())); + l.add(SortField.FIELD_DOC); + l.add(SortField.FIELD_SCORE); + sort = new Sort(l.toArray(SortField[]::new)); + return new ScoringPerShardCollector(context, new TopFieldCollectorManager(sort, limit, null, 0).newCollector()); } private static int perDocMemoryUsage(SortField[] sorts) { @@ -526,6 +490,5 @@ private static int perDocMemoryUsageForCustom(SortField sort) { }; } - private static final int SCORE_DOC_SIZE = Math.toIntExact(RamUsageEstimator.shallowSizeOf(ScoreDoc.class)); private static final int FIELD_DOC_SIZE = Math.toIntExact(RamUsageEstimator.shallowSizeOf(FieldDoc.class)); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java index 7ba1f9790ecbe..306c91c0b742f 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java @@ -97,6 +97,7 @@ public Optional buildSort(List> sorts) { int taskConcurrency = 0; int maxPageSize = between(10, Math.max(10, size)); List> sorts = List.of(new FieldSortBuilder("s")); + long estimatedPerRowSortSize = 16; return new LuceneTopNSourceOperator.Factory( List.of(ctx), queryFunction, @@ -105,6 +106,7 @@ public Optional buildSort(List> sorts) { maxPageSize, limit, sorts, + estimatedPerRowSortSize, true // scoring ); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java index bb1aa0cf487dd..033c1af39680b 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java @@ -103,6 +103,7 @@ public Optional buildSort(List> sorts) { int taskConcurrency = 0; int maxPageSize = between(10, Math.max(10, size)); List> sorts = List.of(new FieldSortBuilder("s")); + long estimatedPerRowSortSize = 16; return new LuceneTopNSourceOperator.Factory( List.of(ctx), queryFunction, @@ -111,6 +112,7 @@ public Optional buildSort(List> sorts) { maxPageSize, limit, sorts, + estimatedPerRowSortSize, scoring ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index 093dbca0ae51f..5a04366206452 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -73,6 +73,7 @@ import org.elasticsearch.xpack.esql.expression.function.scalar.EsqlScalarFunction; import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec.Sort; +import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize; import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec; import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec; import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesFieldExtractExec; @@ -294,9 +295,17 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec, boolean scoring = esQueryExec.hasScoring(); if ((sorts != null && sorts.isEmpty() == false)) { List> sortBuilders = new ArrayList<>(sorts.size()); + long estimatedPerRowSortSize = 0; for (Sort sort : sorts) { sortBuilders.add(sort.sortBuilder()); + estimatedPerRowSortSize += EstimatesRowSize.estimateSize(sort.field().dataType()); } + /* + * In the worst case Lucene's TopN keeps each value in memory twice. Once + * for the actual sort and once for the top doc. In the best case they share + * references to the same underlying data, but we're being a bit paranoid here. + */ + estimatedPerRowSortSize *= 2; // LuceneTopNSourceOperator does not support QueryAndTags, if there are multiple queries or if the single query has tags, // UnsupportedOperationException will be thrown by esQueryExec.query() luceneFactory = new LuceneTopNSourceOperator.Factory( @@ -307,6 +316,7 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec, context.pageSize(rowEstimatedSize), limit, sortBuilders, + estimatedPerRowSortSize, scoring ); } else { From 2d39c1237c34efaeffbc13e505ceaba31b592492 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Fri, 5 Sep 2025 17:44:55 -0400 Subject: [PATCH 6/7] More --- .../xpack/esql/heap_attack/HeapAttackIT.java | 5 ++ .../lucene/LuceneTopNSourceOperator.java | 5 +- .../compute/operator/topn/TopNOperator.java | 59 +++++++++++++------ .../operator/topn/TopNOperatorTests.java | 4 +- .../xpack/esql/plan/physical/EsQueryExec.java | 21 +++++++ .../planner/EsPhysicalOperationProviders.java | 2 +- 6 files changed, 73 insertions(+), 23 deletions(-) diff --git a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java index 3dc947c6be4ba..d106f7bf4d740 100644 --- a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java +++ b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java @@ -212,6 +212,11 @@ public void testSortByManyLongsGiantTopNTooMuchMemory() throws IOException { assertCircuitBreaks(attempt -> sortBySomeLongsLimit(attempt * 500000)); } + public void testStupidTopN() throws IOException { + initManyLongs(1); // Doesn't actually matter how much data there is. + assertCircuitBreaks(attempt -> sortBySomeLongsLimit(2147483630)); + } + private static final int MAX_ATTEMPTS = 5; interface TryCircuitBreaking { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java index 8be3cf2b6e711..1b16610b26e9b 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java @@ -24,19 +24,16 @@ import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.Strings; import org.elasticsearch.common.breaker.CircuitBreaker; -import org.elasticsearch.common.breaker.NoopCircuitBreaker; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.DocBlock; import org.elasticsearch.compute.data.DocVector; import org.elasticsearch.compute.data.DoubleBlock; import org.elasticsearch.compute.data.DoubleVector; -import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.IntVector; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.SourceOperator; -import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.search.sort.SortAndFormats; import org.elasticsearch.search.sort.SortBuilder; @@ -168,7 +165,7 @@ public LuceneTopNSourceOperator( this.estimatedPerRowSortSize = estimatedPerRowSortSize; this.limit = limit; this.needsScore = needsScore; - breaker.addEstimateBytesAndMaybeBreak(reserveSize(), "esql topn"); + breaker.addEstimateBytesAndMaybeBreak(reserveSize(), "esql lucene topn"); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java index 3313f6efb132b..dc62b91ca8c1e 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java @@ -294,7 +294,6 @@ public String describe() { private final BlockFactory blockFactory; private final CircuitBreaker breaker; - private final Queue inputQueue; private final int maxPageSize; @@ -302,6 +301,7 @@ public String describe() { private final List encoders; private final List sortOrders; + private Queue inputQueue; private Row spare; private int spareValuesPreAllocSize = 0; private int spareKeysPreAllocSize = 0; @@ -346,7 +346,8 @@ public TopNOperator( this.elementTypes = elementTypes; this.encoders = encoders; this.sortOrders = sortOrders; - this.inputQueue = new Queue(topCount); + breaker.addEstimateBytesAndMaybeBreak(Queue.sizeOf(topCount), "esql engine topn"); + this.inputQueue = new Queue(breaker, topCount); } static int compareRows(Row r1, Row r2) { @@ -457,6 +458,8 @@ private Iterator toPages() { list.add(inputQueue.pop()); } Collections.reverse(list); + inputQueue.close(); + inputQueue = null; int p = 0; int size = 0; @@ -563,19 +566,27 @@ public Page getOutput() { @Override public void close() { - /* - * If we close before calling finish then spare and inputQueue will be live rows - * that need closing. If we close after calling finish then the output iterator - * will contain pages of results that have yet to be returned. - */ Releasables.closeExpectNoException( + /* + * The spare is used during most collections. It's cleared when this Operator + * is finish()ed. So it could be null here. + */ spare, - inputQueue == null ? null : Releasables.wrap(inputQueue), + /* + * The inputQueue is a min heap of all live rows. Closing it will close all + * the rows it contains and all decrement the breaker for the size of + * the heap itself. + */ + inputQueue, + /* + * If we're in the process of outputting pages then output will contain all + * allocated but un-emitted pages. + */ output == null ? null : Releasables.wrap(() -> Iterators.map(output, p -> p::releaseBlocks)) ); } - private static long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(TopNOperator.class) + RamUsageEstimator + private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(TopNOperator.class) + RamUsageEstimator .shallowSizeOfInstance(List.class) * 3; @Override @@ -589,7 +600,9 @@ public long ramBytesUsed() { size += RamUsageEstimator.alignObjectSize(arrHeader + ref * encoders.size()); size += RamUsageEstimator.alignObjectSize(arrHeader + ref * sortOrders.size()); size += sortOrders.size() * SortOrder.SHALLOW_SIZE; - size += inputQueue.ramBytesUsed(); + if (inputQueue != null) { + size += inputQueue.ramBytesUsed(); + } return size; } @@ -598,7 +611,7 @@ public Status status() { return new TopNOperatorStatus( receiveNanos, emitNanos, - inputQueue.size(), + inputQueue != null ? inputQueue.size() : 0, ramBytesUsed(), pagesReceived, pagesEmitted, @@ -620,16 +633,14 @@ public String toString() { + "]"; } - CircuitBreaker breaker() { - return breaker; - } - - private static class Queue extends PriorityQueue implements Accountable { + private static class Queue extends PriorityQueue implements Accountable, Releasable { private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(Queue.class); + private final CircuitBreaker breaker; private final int maxSize; - Queue(int maxSize) { + Queue(CircuitBreaker breaker, int maxSize) { super(maxSize); + this.breaker = breaker; this.maxSize = maxSize; } @@ -654,5 +665,19 @@ public long ramBytesUsed() { } return total; } + + @Override + public void close() { + Releasables.close(Releasables.wrap(this), () -> breaker.addWithoutBreaking(-Queue.sizeOf(maxSize))); + + } + + public static long sizeOf(int topCount) { + long total = SHALLOW_SIZE; + total += RamUsageEstimator.alignObjectSize( + RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + RamUsageEstimator.NUM_BYTES_OBJECT_REF * ((long) topCount + 1) + ); + return total; + } } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java index 2c322e5d9edaf..80f92fbcf91a5 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java @@ -1489,7 +1489,9 @@ public void testRowResizes() { block.decRef(); op.addInput(new Page(blocks)); - assertThat(breaker.getMemoryRequestCount(), is(94L)); + // 94 are from the collection process + // 1 is for the min-heap itself + assertThat(breaker.getMemoryRequestCount(), is(95L)); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExec.java index 36886b0e58d9b..cec91aadcc3f2 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExec.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExec.java @@ -69,6 +69,12 @@ public interface Sort { Order.OrderDirection direction(); FieldAttribute field(); + + /** + * Type of the result of the sort. For example, + * geo distance will be {@link DataType#DOUBLE}. + */ + DataType resulType(); } public record FieldSort(FieldAttribute field, Order.OrderDirection direction, Order.NullsPosition nulls) implements Sort { @@ -80,6 +86,11 @@ public SortBuilder sortBuilder() { builder.unmappedType(field.dataType().esType()); return builder; } + + @Override + public DataType resulType() { + return field.dataType(); + } } public record GeoDistanceSort(FieldAttribute field, Order.OrderDirection direction, double lat, double lon) implements Sort { @@ -89,6 +100,11 @@ public SortBuilder sortBuilder() { builder.order(Direction.from(direction).asOrder()); return builder; } + + @Override + public DataType resulType() { + return DataType.DOUBLE; + } } public record ScoreSort(Order.OrderDirection direction) implements Sort { @@ -102,6 +118,11 @@ public FieldAttribute field() { // TODO: refactor this: not all Sorts are backed by FieldAttributes return null; } + + @Override + public DataType resulType() { + return DataType.DOUBLE; + } } public record QueryBuilderAndTags(QueryBuilder query, List tags) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index 5a04366206452..9b9712289b9d3 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -298,7 +298,7 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec, long estimatedPerRowSortSize = 0; for (Sort sort : sorts) { sortBuilders.add(sort.sortBuilder()); - estimatedPerRowSortSize += EstimatesRowSize.estimateSize(sort.field().dataType()); + estimatedPerRowSortSize += EstimatesRowSize.estimateSize(sort.resulType()); } /* * In the worst case Lucene's TopN keeps each value in memory twice. Once From 0409da170cf23d5c32cd90d95971156f8c87772b Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Fri, 5 Sep 2025 18:47:48 -0400 Subject: [PATCH 7/7] Move, explain, rename --- .../compute/operator/topn/TopNOperator.java | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java index dc62b91ca8c1e..86032780bec20 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java @@ -346,8 +346,7 @@ public TopNOperator( this.elementTypes = elementTypes; this.encoders = encoders; this.sortOrders = sortOrders; - breaker.addEstimateBytesAndMaybeBreak(Queue.sizeOf(topCount), "esql engine topn"); - this.inputQueue = new Queue(breaker, topCount); + this.inputQueue = Queue.build(breaker, topCount); } static int compareRows(Row r1, Row r2) { @@ -636,12 +635,20 @@ public String toString() { private static class Queue extends PriorityQueue implements Accountable, Releasable { private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(Queue.class); private final CircuitBreaker breaker; - private final int maxSize; + private final int topCount; - Queue(CircuitBreaker breaker, int maxSize) { - super(maxSize); + /** + * Track memory usage in the breaker then build the {@link Queue}. + */ + static Queue build(CircuitBreaker breaker, int topCount) { + breaker.addEstimateBytesAndMaybeBreak(Queue.sizeOf(topCount), "esql engine topn"); + return new Queue(breaker, topCount); + } + + private Queue(CircuitBreaker breaker, int topCount) { + super(topCount); this.breaker = breaker; - this.maxSize = maxSize; + this.topCount = topCount; } @Override @@ -651,14 +658,14 @@ protected boolean lessThan(Row r1, Row r2) { @Override public String toString() { - return size() + "/" + maxSize; + return size() + "/" + topCount; } @Override public long ramBytesUsed() { long total = SHALLOW_SIZE; total += RamUsageEstimator.alignObjectSize( - RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + RamUsageEstimator.NUM_BYTES_OBJECT_REF * (maxSize + 1) + RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + RamUsageEstimator.NUM_BYTES_OBJECT_REF * ((long) topCount + 1) ); for (Row r : this) { total += r == null ? 0 : r.ramBytesUsed(); @@ -668,7 +675,7 @@ public long ramBytesUsed() { @Override public void close() { - Releasables.close(Releasables.wrap(this), () -> breaker.addWithoutBreaking(-Queue.sizeOf(maxSize))); + Releasables.close(Releasables.wrap(this), () -> breaker.addWithoutBreaking(-Queue.sizeOf(topCount))); }