From 4e20250ef9994b1f94d8f63d374cd1421db59eb8 Mon Sep 17 00:00:00 2001 From: Ioana Tagirta Date: Wed, 8 Oct 2025 10:42:01 +0200 Subject: [PATCH 1/3] Score all docs in a single getOutput call in LuceneTopNSourceOperator --- .../lucene/LuceneTopNSourceOperator.java | 67 ++++++++++--------- 1 file changed, 35 insertions(+), 32 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java index bf357035905a1..d3af34b9dd27c 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java @@ -21,8 +21,6 @@ import org.apache.lucene.search.TopScoreDocCollectorManager; import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.breaker.CircuitBreaker; -import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.DocBlock; import org.elasticsearch.compute.data.DocVector; import org.elasticsearch.compute.data.DoubleBlock; @@ -91,8 +89,7 @@ public Factory( public SourceOperator get(DriverContext driverContext) { return new LuceneTopNSourceOperator( contexts, - driverContext.breaker(), - driverContext.blockFactory(), + driverContext, maxPageSize, sorts, estimatedPerRowSortSize, @@ -126,7 +123,7 @@ public String describe() { // We use the same value as the INITIAL_INTERVAL from CancellableBulkScorer private static final int NUM_DOCS_INTERVAL = 1 << 12; - private final CircuitBreaker breaker; + private final DriverContext driverContext; private final List> sorts; private final long estimatedPerRowSortSize; private final int limit; @@ -151,8 +148,7 @@ public String describe() { public LuceneTopNSourceOperator( List contexts, - CircuitBreaker breaker, - BlockFactory blockFactory, + DriverContext driverContext, int maxPageSize, List> sorts, long estimatedPerRowSortSize, @@ -160,13 +156,13 @@ public LuceneTopNSourceOperator( LuceneSliceQueue sliceQueue, boolean needsScore ) { - super(contexts, blockFactory, maxPageSize, sliceQueue); - this.breaker = breaker; + super(contexts, driverContext.blockFactory(), maxPageSize, sliceQueue); + this.driverContext = driverContext; this.sorts = sorts; this.estimatedPerRowSortSize = estimatedPerRowSortSize; this.limit = limit; this.needsScore = needsScore; - breaker.addEstimateBytesAndMaybeBreak(reserveSize(), "esql lucene topn"); + driverContext.breaker().addEstimateBytesAndMaybeBreak(reserveSize(), "esql lucene topn"); } @Override @@ -202,33 +198,40 @@ public Page getCheckedOutput() throws IOException { private Page collect() throws IOException { assert doneCollecting == false; var scorer = getCurrentOrLoadNextScorer(); - if (scorer == null) { - doneCollecting = true; - startEmitting(); - return emit(); - } - try { + + while (scorer != null) { if (scorer.tags().isEmpty() == false) { throw new UnsupportedOperationException("tags not supported by " + getClass()); } - if (perShardCollector == null || perShardCollector.shardContext.index() != scorer.shardContext().index()) { - // TODO: share the bottom between shardCollectors - perShardCollector = newPerShardCollector(scorer.shardContext(), sorts, needsScore, limit); + + try { + if (perShardCollector == null || perShardCollector.shardContext.index() != scorer.shardContext().index()) { + // TODO: share the bottom between shardCollectors + perShardCollector = newPerShardCollector(scorer.shardContext(), sorts, needsScore, limit); + } + var leafCollector = perShardCollector.getLeafCollector(scorer.leafReaderContext()); + scorer.scoreNextRange(leafCollector, scorer.leafReaderContext().reader().getLiveDocs(), NUM_DOCS_INTERVAL); + } catch (CollectionTerminatedException cte) { + // Lucene terminated early the collection (doing topN for an index that's sorted and the topN uses the same sorting) + scorer.markAsDone(); } - var leafCollector = perShardCollector.getLeafCollector(scorer.leafReaderContext()); - scorer.scoreNextRange(leafCollector, scorer.leafReaderContext().reader().getLiveDocs(), NUM_DOCS_INTERVAL); - } catch (CollectionTerminatedException cte) { - // Lucene terminated early the collection (doing topN for an index that's sorted and the topN uses the same sorting) - scorer.markAsDone(); - } - if (scorer.isDone()) { - var nextScorer = getCurrentOrLoadNextScorer(); - if (nextScorer == null || nextScorer.shardContext().index() != scorer.shardContext().index()) { - startEmitting(); - return emit(); + + // check if the query has been cancelled. + driverContext.checkForEarlyTermination(); + + if (scorer.isDone()) { + var nextScorer = getCurrentOrLoadNextScorer(); + if (nextScorer != null && nextScorer.shardContext().index() != scorer.shardContext().index()) { + startEmitting(); + return emit(); + } + scorer = nextScorer; } } - return null; + + doneCollecting = true; + startEmitting(); + return emit(); } private boolean isEmitting() { @@ -348,7 +351,7 @@ protected void describe(StringBuilder sb) { @Override protected void additionalClose() { - Releasables.close(() -> breaker.addWithoutBreaking(-reserveSize())); + Releasables.close(() -> driverContext.breaker().addWithoutBreaking(-reserveSize())); } private long reserveSize() { From d9db0f35204357544a6e0d2bf5cc697fe3dd72a9 Mon Sep 17 00:00:00 2001 From: Ioana Tagirta Date: Wed, 8 Oct 2025 16:13:45 +0200 Subject: [PATCH 2/3] Make sure we return from getOutput to update driver status --- .../compute/lucene/LuceneTopNSourceOperator.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java index d3af34b9dd27c..f822123d2ed3c 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java @@ -40,6 +40,7 @@ import java.util.Arrays; import java.util.List; import java.util.Optional; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; @@ -197,6 +198,8 @@ public Page getCheckedOutput() throws IOException { private Page collect() throws IOException { assert doneCollecting == false; + long start = System.nanoTime(); + var scorer = getCurrentOrLoadNextScorer(); while (scorer != null) { @@ -227,6 +230,12 @@ private Page collect() throws IOException { } scorer = nextScorer; } + + // If we stayed longer than 1 second to execute getOutput, we should return back to the driver, so we can update its status. + // Even if this should almost never happen, we want to update the driver status even when a query runs "forever". + if (TimeUnit.SECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS) >= 1) { + return null; + } } doneCollecting = true; From ca898464ab7ea032e32e1b7a625a483bf78ee50f Mon Sep 17 00:00:00 2001 From: Ioana Tagirta Date: Wed, 8 Oct 2025 16:30:34 +0200 Subject: [PATCH 3/3] Make it a constant --- .../compute/lucene/LuceneTopNSourceOperator.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java index f822123d2ed3c..5d12b5653c57e 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java @@ -124,6 +124,9 @@ public String describe() { // We use the same value as the INITIAL_INTERVAL from CancellableBulkScorer private static final int NUM_DOCS_INTERVAL = 1 << 12; + // The max time we should be spending scoring docs in one getOutput call, before we return to update the driver status + private static final long MAX_EXEC_TIME = TimeUnit.SECONDS.toNanos(1); + private final DriverContext driverContext; private final List> sorts; private final long estimatedPerRowSortSize; @@ -231,9 +234,9 @@ private Page collect() throws IOException { scorer = nextScorer; } - // If we stayed longer than 1 second to execute getOutput, we should return back to the driver, so we can update its status. + // When it takes a long time to start emitting pages we need to return back to the driver so we can update its status. // Even if this should almost never happen, we want to update the driver status even when a query runs "forever". - if (TimeUnit.SECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS) >= 1) { + if (System.nanoTime() - start > MAX_EXEC_TIME) { return null; } }