diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneCountOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneCountOperator.java index de6f4c9a7dbdb..477bf110324e7 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneCountOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneCountOperator.java @@ -13,11 +13,11 @@ import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Weight; import org.elasticsearch.compute.data.Block; -import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.BlockUtils; import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.LongVector; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.SourceOperator; import org.elasticsearch.core.RefCounted; @@ -64,7 +64,7 @@ public Factory( @Override public SourceOperator get(DriverContext driverContext) { - return new LuceneCountOperator(shardRefCounters, driverContext.blockFactory(), sliceQueue, tagTypes, limit); + return new LuceneCountOperator(shardRefCounters, driverContext, sliceQueue, tagTypes, limit); } @Override @@ -76,17 +76,19 @@ public String describe() { private final List tagTypes; private final Map, PerTagsState> tagsToState = new HashMap<>(); private int remainingDocs; + private final DriverContext driverContext; public LuceneCountOperator( List shardRefCounters, - BlockFactory blockFactory, + DriverContext driverContext, LuceneSliceQueue sliceQueue, List tagTypes, int limit ) { - super(shardRefCounters, blockFactory, Integer.MAX_VALUE, sliceQueue); + super(shardRefCounters, driverContext.blockFactory(), Integer.MAX_VALUE, sliceQueue); this.tagTypes = tagTypes; this.remainingDocs = limit; + this.driverContext = driverContext; } @Override @@ -107,17 +109,27 @@ protected Page getCheckedOutput() throws IOException { } long start = System.nanoTime(); try { - final LuceneScorer scorer = getCurrentOrLoadNextScorer(); - if (scorer == null) { - remainingDocs = 0; - } else { - count(scorer); + while (remainingDocs > 0) { + final LuceneScorer scorer = getCurrentOrLoadNextScorer(); + if (scorer == null) { + remainingDocs = 0; + } else { + count(scorer); + } + + // Check if the query has been cancelled. + driverContext.checkForEarlyTermination(); + // Even if this should almost never happen, we want to update the driver status even when a query runs "forever". + if (System.nanoTime() - start > Driver.DEFAULT_STATUS_INTERVAL.getNanos()) { + break; + } } if (remainingDocs <= 0) { return buildResult(); + } else { + return null; } - return null; } finally { processingNanos += System.nanoTime() - start; } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java index 5d12b5653c57e..1dd0f35548ec3 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java @@ -28,6 +28,7 @@ import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.IntVector; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.SourceOperator; import org.elasticsearch.core.Releasables; @@ -40,7 +41,6 @@ import java.util.Arrays; import java.util.List; import java.util.Optional; -import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; @@ -124,9 +124,6 @@ public String describe() { // We use the same value as the INITIAL_INTERVAL from CancellableBulkScorer private static final int NUM_DOCS_INTERVAL = 1 << 12; - // The max time we should be spending scoring docs in one getOutput call, before we return to update the driver status - private static final long MAX_EXEC_TIME = TimeUnit.SECONDS.toNanos(1); - private final DriverContext driverContext; private final List> sorts; private final long estimatedPerRowSortSize; @@ -236,7 +233,7 @@ private Page collect() throws IOException { // When it takes a long time to start emitting pages we need to return back to the driver so we can update its status. // Even if this should almost never happen, we want to update the driver status even when a query runs "forever". - if (System.nanoTime() - start > MAX_EXEC_TIME) { + if (System.nanoTime() - start > Driver.DEFAULT_STATUS_INTERVAL.getNanos()) { return null; } }