Skip to content

Commit 0210097

Browse files
committed
Fix bug caused by mishandling of errors during driver iteration
1 parent 25fd514 commit 0210097

File tree

1 file changed

+34
-30
lines changed
  • x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator

1 file changed

+34
-30
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java

Lines changed: 34 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.ArrayList;
2525
import java.util.Iterator;
2626
import java.util.List;
27+
import java.util.ListIterator;
2728
import java.util.concurrent.Executor;
2829
import java.util.concurrent.atomic.AtomicBoolean;
2930
import java.util.concurrent.atomic.AtomicReference;
@@ -184,8 +185,7 @@ SubscribableListener<Void> run(TimeValue maxTime, int maxIterations, LongSupplie
184185
assert driverContext.assertBeginRunLoop();
185186
isBlocked = runSingleLoopIteration();
186187
} catch (DriverEarlyTerminationException unused) {
187-
int lastFinished = closeEarlyFinishedOperators(0, activeOperators.size() - 1);
188-
activeOperators = new ArrayList<>(activeOperators.subList(lastFinished + 1, activeOperators.size()));
188+
closeEarlyFinishedOperators(activeOperators.listIterator(activeOperators.size()));
189189
assert isFinished() : "not finished after early termination";
190190
} finally {
191191
assert driverContext.assertEndRunLoop();
@@ -252,10 +252,13 @@ private IsBlockedResult runSingleLoopIteration() {
252252
driverContext.checkForEarlyTermination();
253253
boolean movedPage = false;
254254

255-
int lastClosed = -1;
256-
for (int i = 0; i < activeOperators.size() - 1; i++) {
257-
Operator op = activeOperators.get(i);
258-
Operator nextOp = activeOperators.get(i + 1);
255+
ListIterator<Operator> iterator = activeOperators.listIterator();
256+
while (iterator.hasNext()) {
257+
Operator op = iterator.next();
258+
if (iterator.hasNext() == false) {
259+
break;
260+
}
261+
Operator nextOp = activeOperators.get(iterator.nextIndex());
259262

260263
// skip blocked operator
261264
if (op.isBlocked().listener().isDone() == false) {
@@ -264,6 +267,7 @@ private IsBlockedResult runSingleLoopIteration() {
264267

265268
if (op.isFinished() == false && nextOp.needsInput()) {
266269
driverContext.checkForEarlyTermination();
270+
assert nextOp.isFinished() == false : "next operator should not be finished yet: " + nextOp;
267271
Page page = op.getOutput();
268272
if (page == null) {
269273
// No result, just move to the next iteration
@@ -285,15 +289,15 @@ private IsBlockedResult runSingleLoopIteration() {
285289

286290
if (op.isFinished()) {
287291
driverContext.checkForEarlyTermination();
288-
closeEarlyFinishedOperators(lastClosed + 1, i);
289-
lastClosed = i;
292+
var originalIndex = iterator.previousIndex();
293+
var index = closeEarlyFinishedOperators(iterator);
294+
if (index >= 0) {
295+
iterator = new ArrayList<>(activeOperators).listIterator(originalIndex - index);
296+
}
290297
}
291298
}
292299

293-
lastClosed = closeEarlyFinishedOperators(lastClosed + 1, activeOperators.size() - 1);
294-
if (lastClosed >= 0) {
295-
activeOperators = new ArrayList<>(activeOperators.subList(lastClosed + 1, activeOperators.size()));
296-
}
300+
closeEarlyFinishedOperators(activeOperators.listIterator(activeOperators.size()));
297301

298302
if (movedPage == false) {
299303
return oneOf(
@@ -306,33 +310,33 @@ private IsBlockedResult runSingleLoopIteration() {
306310
return Operator.NOT_BLOCKED;
307311
}
308312

309-
// Returns the index of the last operator that was closed, or minIndex - 1 if no operators were closed.
310-
private int closeEarlyFinishedOperators(int minIndex, int maxIndex) {
311-
for (int index = maxIndex; index >= minIndex; index--) {
312-
if (activeOperators.get(index).isFinished()) {
313+
// Returns the index of the last operator that was closed, -1 if no operator was closed.
314+
private int closeEarlyFinishedOperators(ListIterator<Operator> operators) {
315+
var iterator = activeOperators.listIterator(operators.nextIndex());
316+
while (iterator.hasPrevious()) {
317+
if (iterator.previous().isFinished()) {
318+
var index = iterator.nextIndex();
319+
iterator.next();
313320
/*
314-
* Remove this operator and all unclosed source operators in the
315-
* most paranoid possible way. Closing operators shouldn't throw,
316-
* but if it does, this will make sure we don't try to close any
317-
* that succeed twice.
318-
*/
319-
List<Operator> operatorsToClose = this.activeOperators.subList(minIndex, index + 1);
320-
Iterator<Operator> itr = operatorsToClose.iterator();
321-
while (itr.hasNext()) {
322-
Operator op = itr.next();
321+
* Remove this operator and all unclosed source operators in the
322+
* most paranoid possible way. Closing operators shouldn't throw,
323+
* but if it does, this will make sure we don't try to close any
324+
* that succeed twice.
325+
*/
326+
while (iterator.hasPrevious()) {
327+
Operator op = iterator.previous();
323328
statusOfCompletedOperators.add(new OperatorStatus(op.toString(), op.status()));
324329
op.close();
330+
iterator.remove();
325331
}
326-
327332
// Finish the next operator.
328-
if (index + 1 < activeOperators.size()) {
329-
Operator newRootOperator = activeOperators.get(index + 1);
330-
newRootOperator.finish();
333+
if (iterator.hasNext()) {
334+
iterator.next().finish();
331335
}
332336
return index;
333337
}
334338
}
335-
return minIndex - 1;
339+
return -1;
336340
}
337341

338342
public void cancel(String reason) {

0 commit comments

Comments
 (0)