Skip to content

Commit e65e865

Browse files
authored
Handle rejection in DriverScheduler (#122105) (#122118)
When a node is shutting down, scheduling tasks for the Driver can result in a rejection exception. In this case, we drain and close all operators. However, we don't clear the pending tasks in the scheduler, which can lead to a pending task being triggered unexpectedly, causing a ConcurrentModificationException.
1 parent 5df9d4c commit e65e865

File tree

2 files changed

+82
-9
lines changed

2 files changed

+82
-9
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverScheduler.java

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77

88
package org.elasticsearch.compute.operator;
99

10+
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
1011
import org.elasticsearch.common.util.concurrent.EsExecutors;
12+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
1113

1214
import java.util.List;
1315
import java.util.concurrent.Executor;
@@ -21,7 +23,7 @@
2123
*/
2224
final class DriverScheduler {
2325
private final AtomicReference<Runnable> delayedTask = new AtomicReference<>();
24-
private final AtomicReference<Runnable> scheduledTask = new AtomicReference<>();
26+
private final AtomicReference<AbstractRunnable> scheduledTask = new AtomicReference<>();
2527
private final AtomicBoolean completing = new AtomicBoolean();
2628

2729
void addOrRunDelayedTask(Runnable task) {
@@ -35,22 +37,32 @@ void addOrRunDelayedTask(Runnable task) {
3537
}
3638
}
3739

38-
void scheduleOrRunTask(Executor executor, Runnable task) {
39-
final Runnable existing = scheduledTask.getAndSet(task);
40+
void scheduleOrRunTask(Executor executor, AbstractRunnable task) {
41+
final AbstractRunnable existing = scheduledTask.getAndSet(task);
4042
assert existing == null : existing;
4143
final Executor executorToUse = completing.get() ? EsExecutors.DIRECT_EXECUTOR_SERVICE : executor;
42-
executorToUse.execute(() -> {
43-
final Runnable next = scheduledTask.getAndSet(null);
44-
if (next != null) {
45-
assert next == task;
46-
next.run();
44+
executorToUse.execute(new AbstractRunnable() {
45+
@Override
46+
public void onFailure(Exception e) {
47+
assert e instanceof EsRejectedExecutionException : new AssertionError(e);
48+
if (scheduledTask.getAndUpdate(t -> t == task ? null : t) == task) {
49+
task.onFailure(e);
50+
}
51+
}
52+
53+
@Override
54+
protected void doRun() {
55+
AbstractRunnable toRun = scheduledTask.getAndSet(null);
56+
if (toRun == task) {
57+
task.run();
58+
}
4759
}
4860
});
4961
}
5062

5163
void runPendingTasks() {
5264
completing.set(true);
53-
for (var taskHolder : List.of(delayedTask, scheduledTask)) {
65+
for (var taskHolder : List.of(scheduledTask, delayedTask)) {
5466
final Runnable task = taskHolder.getAndSet(null);
5567
if (task != null) {
5668
task.run();
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.operator;
9+
10+
import org.elasticsearch.common.settings.Settings;
11+
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
12+
import org.elasticsearch.common.util.concurrent.EsExecutors;
13+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
14+
import org.elasticsearch.test.ESTestCase;
15+
import org.elasticsearch.threadpool.FixedExecutorBuilder;
16+
import org.elasticsearch.threadpool.TestThreadPool;
17+
18+
import java.util.concurrent.CountDownLatch;
19+
import java.util.concurrent.Executor;
20+
import java.util.concurrent.atomic.AtomicInteger;
21+
22+
import static org.hamcrest.Matchers.equalTo;
23+
24+
public class DriverSchedulerTests extends ESTestCase {
25+
26+
public void testClearPendingTaskOnRejection() {
27+
DriverScheduler scheduler = new DriverScheduler();
28+
AtomicInteger counter = new AtomicInteger();
29+
var threadPool = new TestThreadPool(
30+
"test",
31+
new FixedExecutorBuilder(Settings.EMPTY, "test", 1, 2, "test", EsExecutors.TaskTrackingConfig.DEFAULT)
32+
);
33+
CountDownLatch latch = new CountDownLatch(1);
34+
Executor executor = threadPool.executor("test");
35+
try {
36+
for (int i = 0; i < 10; i++) {
37+
try {
38+
executor.execute(() -> safeAwait(latch));
39+
} catch (EsRejectedExecutionException e) {
40+
break;
41+
}
42+
}
43+
scheduler.scheduleOrRunTask(executor, new AbstractRunnable() {
44+
@Override
45+
public void onFailure(Exception e) {
46+
counter.incrementAndGet();
47+
}
48+
49+
@Override
50+
protected void doRun() {
51+
counter.incrementAndGet();
52+
}
53+
});
54+
scheduler.runPendingTasks();
55+
assertThat(counter.get(), equalTo(1));
56+
} finally {
57+
latch.countDown();
58+
terminate(threadPool);
59+
}
60+
}
61+
}

0 commit comments

Comments
 (0)