@@ -186,7 +186,14 @@ SubscribableListener<Void> run(TimeValue maxTime, int maxIterations, LongSupplie
186186 long nextStatus = startTime + statusNanos ;
187187 int iter = 0 ;
188188 while (true ) {
189- IsBlockedResult isBlocked = runSingleLoopIteration ();
189+ IsBlockedResult isBlocked ;
190+ try {
191+ isBlocked = runSingleLoopIteration ();
192+ } catch (DriverEarlyTerminationException e ) {
193+ drainAndCloseOperators (null );
194+ updateStatus (finishNanos - startTime , iter , DriverStatus .Status .DONE , "early termination" );
195+ return Operator .NOT_BLOCKED .listener ();
196+ }
190197 iter ++;
191198 if (isBlocked .listener ().isDone () == false ) {
192199 updateStatus (nowSupplier .getAsLong () - startTime , iter , DriverStatus .Status .ASYNC , isBlocked .reason ());
@@ -227,6 +234,17 @@ public void close() {
227234 drainAndCloseOperators (null );
228235 }
229236
237+ private void checkForEarlyTermination () throws DriverEarlyTerminationException {
238+ if (activeOperators .size () >= 2 && activeOperators .getLast ().isFinished ()) {
239+ for (int i = activeOperators .size () - 2 ; i >= 0 ; i --) {
240+ Operator op = activeOperators .get (i );
241+ if (op .isFinished () == false ) {
242+ throw new DriverEarlyTerminationException ();
243+ }
244+ }
245+ }
246+ }
247+
230248 /**
231249 * Abort the driver and wait for it to finish
232250 */
@@ -253,6 +271,7 @@ private IsBlockedResult runSingleLoopIteration() {
253271 if (op .isBlocked ().listener ().isDone () == false ) {
254272 continue ;
255273 }
274+ checkForEarlyTermination ();
256275
257276 if (op .isFinished () == false && nextOp .needsInput ()) {
258277 Page page = op .getOutput ();
@@ -263,6 +282,15 @@ private IsBlockedResult runSingleLoopIteration() {
263282 page .releaseBlocks ();
264283 } else {
265284 // Non-empty result from the previous operation, move it to the next operation
285+ boolean terminated = true ;
286+ try {
287+ checkForEarlyTermination ();
288+ terminated = false ;
289+ } finally {
290+ if (terminated ) {
291+ page .releaseBlocks ();
292+ }
293+ }
266294 nextOp .addInput (page );
267295 movedPage = true ;
268296 }
@@ -290,6 +318,7 @@ private IsBlockedResult runSingleLoopIteration() {
290318 itr .remove ();
291319 }
292320
321+ checkForEarlyTermination ();
293322 // Finish the next operator, which is now the first operator.
294323 if (activeOperators .isEmpty () == false ) {
295324 Operator newRootOperator = activeOperators .get (0 );
0 commit comments