2121import org .apache .lucene .search .TopScoreDocCollectorManager ;
2222import org .apache .lucene .util .RamUsageEstimator ;
2323import org .elasticsearch .common .Strings ;
24- import org .elasticsearch .common .breaker .CircuitBreaker ;
25- import org .elasticsearch .compute .data .BlockFactory ;
2624import org .elasticsearch .compute .data .DocBlock ;
2725import org .elasticsearch .compute .data .DocVector ;
2826import org .elasticsearch .compute .data .DoubleBlock ;
4240import java .util .Arrays ;
4341import java .util .List ;
4442import java .util .Optional ;
43+ import java .util .concurrent .TimeUnit ;
4544import java .util .function .Function ;
4645import java .util .stream .Collectors ;
4746
@@ -91,8 +90,7 @@ public Factory(
9190 public SourceOperator get (DriverContext driverContext ) {
9291 return new LuceneTopNSourceOperator (
9392 contexts ,
94- driverContext .breaker (),
95- driverContext .blockFactory (),
93+ driverContext ,
9694 maxPageSize ,
9795 sorts ,
9896 estimatedPerRowSortSize ,
@@ -126,7 +124,10 @@ public String describe() {
126124 // We use the same value as the INITIAL_INTERVAL from CancellableBulkScorer
127125 private static final int NUM_DOCS_INTERVAL = 1 << 12 ;
128126
129- private final CircuitBreaker breaker ;
127+ // The max time we should be spending scoring docs in one getOutput call, before we return to update the driver status
128+ private static final long MAX_EXEC_TIME = TimeUnit .SECONDS .toNanos (1 );
129+
130+ private final DriverContext driverContext ;
130131 private final List <SortBuilder <?>> sorts ;
131132 private final long estimatedPerRowSortSize ;
132133 private final int limit ;
@@ -151,22 +152,21 @@ public String describe() {
151152
152153 public LuceneTopNSourceOperator (
153154 List <? extends ShardContext > contexts ,
154- CircuitBreaker breaker ,
155- BlockFactory blockFactory ,
155+ DriverContext driverContext ,
156156 int maxPageSize ,
157157 List <SortBuilder <?>> sorts ,
158158 long estimatedPerRowSortSize ,
159159 int limit ,
160160 LuceneSliceQueue sliceQueue ,
161161 boolean needsScore
162162 ) {
163- super (contexts , blockFactory , maxPageSize , sliceQueue );
164- this .breaker = breaker ;
163+ super (contexts , driverContext . blockFactory () , maxPageSize , sliceQueue );
164+ this .driverContext = driverContext ;
165165 this .sorts = sorts ;
166166 this .estimatedPerRowSortSize = estimatedPerRowSortSize ;
167167 this .limit = limit ;
168168 this .needsScore = needsScore ;
169- breaker .addEstimateBytesAndMaybeBreak (reserveSize (), "esql lucene topn" );
169+ driverContext . breaker () .addEstimateBytesAndMaybeBreak (reserveSize (), "esql lucene topn" );
170170 }
171171
172172 @ Override
@@ -201,34 +201,49 @@ public Page getCheckedOutput() throws IOException {
201201
202202 private Page collect () throws IOException {
203203 assert doneCollecting == false ;
204+ long start = System .nanoTime ();
205+
204206 var scorer = getCurrentOrLoadNextScorer ();
205- if (scorer == null ) {
206- doneCollecting = true ;
207- startEmitting ();
208- return emit ();
209- }
210- try {
207+
208+ while (scorer != null ) {
211209 if (scorer .tags ().isEmpty () == false ) {
212210 throw new UnsupportedOperationException ("tags not supported by " + getClass ());
213211 }
214- if (perShardCollector == null || perShardCollector .shardContext .index () != scorer .shardContext ().index ()) {
215- // TODO: share the bottom between shardCollectors
216- perShardCollector = newPerShardCollector (scorer .shardContext (), sorts , needsScore , limit );
212+
213+ try {
214+ if (perShardCollector == null || perShardCollector .shardContext .index () != scorer .shardContext ().index ()) {
215+ // TODO: share the bottom between shardCollectors
216+ perShardCollector = newPerShardCollector (scorer .shardContext (), sorts , needsScore , limit );
217+ }
218+ var leafCollector = perShardCollector .getLeafCollector (scorer .leafReaderContext ());
219+ scorer .scoreNextRange (leafCollector , scorer .leafReaderContext ().reader ().getLiveDocs (), NUM_DOCS_INTERVAL );
220+ } catch (CollectionTerminatedException cte ) {
221+ // Lucene terminated early the collection (doing topN for an index that's sorted and the topN uses the same sorting)
222+ scorer .markAsDone ();
223+ }
224+
225+ // check if the query has been cancelled.
226+ driverContext .checkForEarlyTermination ();
227+
228+ if (scorer .isDone ()) {
229+ var nextScorer = getCurrentOrLoadNextScorer ();
230+ if (nextScorer != null && nextScorer .shardContext ().index () != scorer .shardContext ().index ()) {
231+ startEmitting ();
232+ return emit ();
233+ }
234+ scorer = nextScorer ;
217235 }
218- var leafCollector = perShardCollector .getLeafCollector (scorer .leafReaderContext ());
219- scorer .scoreNextRange (leafCollector , scorer .leafReaderContext ().reader ().getLiveDocs (), NUM_DOCS_INTERVAL );
220- } catch (CollectionTerminatedException cte ) {
221- // Lucene terminated early the collection (doing topN for an index that's sorted and the topN uses the same sorting)
222- scorer .markAsDone ();
223- }
224- if (scorer .isDone ()) {
225- var nextScorer = getCurrentOrLoadNextScorer ();
226- if (nextScorer == null || nextScorer .shardContext ().index () != scorer .shardContext ().index ()) {
227- startEmitting ();
228- return emit ();
236+
237+ // When it takes a long time to start emitting pages we need to return back to the driver so we can update its status.
238+ // Even if this should almost never happen, we want to update the driver status even when a query runs "forever".
239+ if (System .nanoTime () - start > MAX_EXEC_TIME ) {
240+ return null ;
229241 }
230242 }
231- return null ;
243+
244+ doneCollecting = true ;
245+ startEmitting ();
246+ return emit ();
232247 }
233248
234249 private boolean isEmitting () {
@@ -348,7 +363,7 @@ protected void describe(StringBuilder sb) {
348363
349364 @ Override
350365 protected void additionalClose () {
351- Releasables .close (() -> breaker .addWithoutBreaking (-reserveSize ()));
366+ Releasables .close (() -> driverContext . breaker () .addWithoutBreaking (-reserveSize ()));
352367 }
353368
354369 private long reserveSize () {
0 commit comments