@@ -316,22 +316,24 @@ private int closeEarlyFinishedOperators(ListIterator<Operator> operators) {
316316 while (iterator .hasPrevious ()) {
317317 if (iterator .previous ().isFinished ()) {
318318 var index = iterator .nextIndex ();
319- iterator .next ();
320319 /*
321- * Remove this operator and all unclosed source operators in the
322- * most paranoid possible way. Closing operators shouldn't throw,
323- * but if it does, this will make sure we don't try to close any
324- * that succeed twice.
325- */
326- while (iterator .hasPrevious ()) {
327- Operator op = iterator .previous ();
320+ * Remove this operator and all source operators in the
321+ * most paranoid possible way. Closing operators shouldn't throw,
322+ * but if it does, this will make sure we don't try to close any
323+ * that succeed twice.
324+ */
325+ Iterator <Operator > finishedOperators = this .activeOperators .subList (0 , index + 1 ).iterator ();
326+ while (finishedOperators .hasNext ()) {
327+ Operator op = finishedOperators .next ();
328328 statusOfCompletedOperators .add (new OperatorStatus (op .toString (), op .status ()));
329329 op .close ();
330- iterator .remove ();
330+ finishedOperators .remove ();
331331 }
332- // Finish the next operator.
333- if (iterator .hasNext ()) {
334- iterator .next ().finish ();
332+
333+ // Finish the next operator, which is now the first operator.
334+ if (activeOperators .isEmpty () == false ) {
335+ Operator newRootOperator = activeOperators .get (0 );
336+ newRootOperator .finish ();
335337 }
336338 return index ;
337339 }
0 commit comments