diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java index bf357035905a1..5d12b5653c57e 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java @@ -21,8 +21,6 @@ import org.apache.lucene.search.TopScoreDocCollectorManager; import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.breaker.CircuitBreaker; -import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.DocBlock; import org.elasticsearch.compute.data.DocVector; import org.elasticsearch.compute.data.DoubleBlock; @@ -42,6 +40,7 @@ import java.util.Arrays; import java.util.List; import java.util.Optional; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; @@ -91,8 +90,7 @@ public Factory( public SourceOperator get(DriverContext driverContext) { return new LuceneTopNSourceOperator( contexts, - driverContext.breaker(), - driverContext.blockFactory(), + driverContext, maxPageSize, sorts, estimatedPerRowSortSize, @@ -126,7 +124,10 @@ public String describe() { // We use the same value as the INITIAL_INTERVAL from CancellableBulkScorer private static final int NUM_DOCS_INTERVAL = 1 << 12; - private final CircuitBreaker breaker; + // The max time we should be spending scoring docs in one getOutput call, before we return to update the driver status + private static final long MAX_EXEC_TIME = TimeUnit.SECONDS.toNanos(1); + + private final DriverContext driverContext; private final List> sorts; private final long estimatedPerRowSortSize; private final int limit; @@ -151,8 +152,7 @@ public String describe() { public LuceneTopNSourceOperator( List contexts, - CircuitBreaker breaker, - BlockFactory blockFactory, + DriverContext driverContext, int maxPageSize, List> sorts, long estimatedPerRowSortSize, @@ -160,13 +160,13 @@ public LuceneTopNSourceOperator( LuceneSliceQueue sliceQueue, boolean needsScore ) { - super(contexts, blockFactory, maxPageSize, sliceQueue); - this.breaker = breaker; + super(contexts, driverContext.blockFactory(), maxPageSize, sliceQueue); + this.driverContext = driverContext; this.sorts = sorts; this.estimatedPerRowSortSize = estimatedPerRowSortSize; this.limit = limit; this.needsScore = needsScore; - breaker.addEstimateBytesAndMaybeBreak(reserveSize(), "esql lucene topn"); + driverContext.breaker().addEstimateBytesAndMaybeBreak(reserveSize(), "esql lucene topn"); } @Override @@ -201,34 +201,49 @@ public Page getCheckedOutput() throws IOException { private Page collect() throws IOException { assert doneCollecting == false; + long start = System.nanoTime(); + var scorer = getCurrentOrLoadNextScorer(); - if (scorer == null) { - doneCollecting = true; - startEmitting(); - return emit(); - } - try { + + while (scorer != null) { if (scorer.tags().isEmpty() == false) { throw new UnsupportedOperationException("tags not supported by " + getClass()); } - if (perShardCollector == null || perShardCollector.shardContext.index() != scorer.shardContext().index()) { - // TODO: share the bottom between shardCollectors - perShardCollector = newPerShardCollector(scorer.shardContext(), sorts, needsScore, limit); + + try { + if (perShardCollector == null || perShardCollector.shardContext.index() != scorer.shardContext().index()) { + // TODO: share the bottom between shardCollectors + perShardCollector = newPerShardCollector(scorer.shardContext(), sorts, needsScore, limit); + } + var leafCollector = perShardCollector.getLeafCollector(scorer.leafReaderContext()); + scorer.scoreNextRange(leafCollector, scorer.leafReaderContext().reader().getLiveDocs(), NUM_DOCS_INTERVAL); + } catch (CollectionTerminatedException cte) { + // Lucene terminated early the collection (doing topN for an index that's sorted and the topN uses the same sorting) + scorer.markAsDone(); + } + + // check if the query has been cancelled. + driverContext.checkForEarlyTermination(); + + if (scorer.isDone()) { + var nextScorer = getCurrentOrLoadNextScorer(); + if (nextScorer != null && nextScorer.shardContext().index() != scorer.shardContext().index()) { + startEmitting(); + return emit(); + } + scorer = nextScorer; } - var leafCollector = perShardCollector.getLeafCollector(scorer.leafReaderContext()); - scorer.scoreNextRange(leafCollector, scorer.leafReaderContext().reader().getLiveDocs(), NUM_DOCS_INTERVAL); - } catch (CollectionTerminatedException cte) { - // Lucene terminated early the collection (doing topN for an index that's sorted and the topN uses the same sorting) - scorer.markAsDone(); - } - if (scorer.isDone()) { - var nextScorer = getCurrentOrLoadNextScorer(); - if (nextScorer == null || nextScorer.shardContext().index() != scorer.shardContext().index()) { - startEmitting(); - return emit(); + + // When it takes a long time to start emitting pages we need to return back to the driver so we can update its status. + // Even if this should almost never happen, we want to update the driver status even when a query runs "forever". + if (System.nanoTime() - start > MAX_EXEC_TIME) { + return null; } } - return null; + + doneCollecting = true; + startEmitting(); + return emit(); } private boolean isEmitting() { @@ -348,7 +363,7 @@ protected void describe(StringBuilder sb) { @Override protected void additionalClose() { - Releasables.close(() -> breaker.addWithoutBreaking(-reserveSize())); + Releasables.close(() -> driverContext.breaker().addWithoutBreaking(-reserveSize())); } private long reserveSize() {