@@ -186,7 +186,13 @@ SubscribableListener<Void> run(TimeValue maxTime, int maxIterations, LongSupplie
186186 long nextStatus = startTime + statusNanos ;
187187 int iter = 0 ;
188188 while (true ) {
189- IsBlockedResult isBlocked = runSingleLoopIteration ();
189+ IsBlockedResult isBlocked = Operator .NOT_BLOCKED ;
190+ try {
191+ isBlocked = runSingleLoopIteration ();
192+ } catch (DriverEarlyTerminationException unused ) {
193+ closeEarlyFinishedOperators ();
194+ assert isFinished () : "not finished after early termination" ;
195+ }
190196 iter ++;
191197 if (isBlocked .listener ().isDone () == false ) {
192198 updateStatus (nowSupplier .getAsLong () - startTime , iter , DriverStatus .Status .ASYNC , isBlocked .reason ());
@@ -242,39 +248,59 @@ public void abort(Exception reason, ActionListener<Void> listener) {
242248 }
243249
244250 private IsBlockedResult runSingleLoopIteration () {
245- ensureNotCancelled ();
251+ driverContext . checkForEarlyTermination ();
246252 boolean movedPage = false ;
247253
248- if (activeOperators .isEmpty () == false && activeOperators .getLast ().isFinished () == false ) {
249- for (int i = 0 ; i < activeOperators .size () - 1 ; i ++) {
250- Operator op = activeOperators .get (i );
251- Operator nextOp = activeOperators .get (i + 1 );
254+ for (int i = 0 ; i < activeOperators .size () - 1 ; i ++) {
255+ Operator op = activeOperators .get (i );
256+ Operator nextOp = activeOperators .get (i + 1 );
252257
253- // skip blocked operator
254- if (op .isBlocked ().listener ().isDone () == false ) {
255- continue ;
256- }
258+ // skip blocked operator
259+ if (op .isBlocked ().listener ().isDone () == false ) {
260+ continue ;
261+ }
257262
258- if (op .isFinished () == false && nextOp .needsInput ()) {
259- Page page = op .getOutput ();
260- if (page == null ) {
261- // No result, just move to the next iteration
262- } else if (page .getPositionCount () == 0 ) {
263- // Empty result, release any memory it holds immediately and move to the next iteration
263+ if (op .isFinished () == false && nextOp .needsInput ()) {
264+ driverContext .checkForEarlyTermination ();
265+ Page page = op .getOutput ();
266+ if (page == null ) {
267+ // No result, just move to the next iteration
268+ } else if (page .getPositionCount () == 0 ) {
269+ // Empty result, release any memory it holds immediately and move to the next iteration
270+ page .releaseBlocks ();
271+ } else {
272+ // Non-empty result from the previous operation, move it to the next operation
273+ try {
274+ driverContext .checkForEarlyTermination ();
275+ } catch (DriverEarlyTerminationException | TaskCancelledException e ) {
264276 page .releaseBlocks ();
265- } else {
266- // Non-empty result from the previous operation, move it to the next operation
267- nextOp .addInput (page );
268- movedPage = true ;
277+ throw e ;
269278 }
279+ nextOp .addInput (page );
280+ movedPage = true ;
270281 }
282+ }
271283
272- if (op .isFinished ()) {
273- nextOp . finish ();
274- }
284+ if (op .isFinished ()) {
285+ driverContext . checkForEarlyTermination ();
286+ nextOp . finish ();
275287 }
276288 }
277289
290+ closeEarlyFinishedOperators ();
291+
292+ if (movedPage == false ) {
293+ return oneOf (
294+ activeOperators .stream ()
295+ .map (Operator ::isBlocked )
296+ .filter (laf -> laf .listener ().isDone () == false )
297+ .collect (Collectors .toList ())
298+ );
299+ }
300+ return Operator .NOT_BLOCKED ;
301+ }
302+
303+ private void closeEarlyFinishedOperators () {
278304 for (int index = activeOperators .size () - 1 ; index >= 0 ; index --) {
279305 if (activeOperators .get (index ).isFinished ()) {
280306 /*
@@ -300,16 +326,6 @@ private IsBlockedResult runSingleLoopIteration() {
300326 break ;
301327 }
302328 }
303-
304- if (movedPage == false ) {
305- return oneOf (
306- activeOperators .stream ()
307- .map (Operator ::isBlocked )
308- .filter (laf -> laf .listener ().isDone () == false )
309- .collect (Collectors .toList ())
310- );
311- }
312- return Operator .NOT_BLOCKED ;
313329 }
314330
315331 public void cancel (String reason ) {
@@ -318,13 +334,6 @@ public void cancel(String reason) {
318334 }
319335 }
320336
321- private void ensureNotCancelled () {
322- String reason = cancelReason .get ();
323- if (reason != null ) {
324- throw new TaskCancelledException (reason );
325- }
326- }
327-
328337 public static void start (
329338 ThreadContext threadContext ,
330339 Executor executor ,
@@ -335,19 +344,36 @@ public static void start(
335344 driver .completionListener .addListener (listener );
336345 if (driver .started .compareAndSet (false , true )) {
337346 driver .updateStatus (0 , 0 , DriverStatus .Status .STARTING , "driver starting" );
338- // Register a listener to an exchange sink to handle early completion scenarios:
339- // 1. When the query accumulates sufficient data (e.g., reaching the LIMIT).
340- // 2. When users abort the query but want to retain the current result.
341- // This allows the Driver to finish early without waiting for the scheduled task.
342- if (driver .activeOperators .isEmpty () == false ) {
343- if (driver .activeOperators .getLast () instanceof ExchangeSinkOperator sinkOperator ) {
344- sinkOperator .addCompletionListener (ActionListener .running (driver .scheduler ::runPendingTasks ));
345- }
346- }
347+ initializeEarlyTerminationChecker (driver );
347348 schedule (DEFAULT_TIME_BEFORE_YIELDING , maxIterations , threadContext , executor , driver , driver .completionListener );
348349 }
349350 }
350351
352+ private static void initializeEarlyTerminationChecker (Driver driver ) {
353+ // Register a listener to an exchange sink to handle early completion scenarios:
354+ // 1. When the query accumulates sufficient data (e.g., reaching the LIMIT).
355+ // 2. When users abort the query but want to retain the current result.
356+ // This allows the Driver to finish early without waiting for the scheduled task.
357+ final AtomicBoolean earlyFinished = new AtomicBoolean ();
358+ driver .driverContext .initializeEarlyTerminationChecker (() -> {
359+ final String reason = driver .cancelReason .get ();
360+ if (reason != null ) {
361+ throw new TaskCancelledException (reason );
362+ }
363+ if (earlyFinished .get ()) {
364+ throw new DriverEarlyTerminationException ("Exchange sink is closed" );
365+ }
366+ });
367+ if (driver .activeOperators .isEmpty () == false ) {
368+ if (driver .activeOperators .getLast () instanceof ExchangeSinkOperator sinkOperator ) {
369+ sinkOperator .addCompletionListener (ActionListener .running (() -> {
370+ earlyFinished .set (true );
371+ driver .scheduler .runPendingTasks ();
372+ }));
373+ }
374+ }
375+ }
376+
351377 // Drains all active operators and closes them.
352378 private void drainAndCloseOperators (@ Nullable Exception e ) {
353379 Iterator <Operator > itr = activeOperators .iterator ();
0 commit comments