Skip to content

Commit 8cb540f

Browse files
authored
Abort drivers on node shutting down (#101701) (#101716)
The TransportResponseHandler can be notified while the Driver is still running during node shutdown or the Driver hasn't started when the parent task is canceled. In such cases, we should abort the Driver and wait for it to finish; otherwise, multiple threads can access a Driver at the same time Closes #101595
1 parent 7fb28b0 commit 8cb540f

File tree

4 files changed

+40
-15
lines changed

4 files changed

+40
-15
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.Iterator;
2525
import java.util.List;
2626
import java.util.concurrent.Executor;
27+
import java.util.concurrent.atomic.AtomicBoolean;
2728
import java.util.concurrent.atomic.AtomicReference;
2829
import java.util.function.Supplier;
2930
import java.util.stream.Collectors;
@@ -56,6 +57,10 @@ public class Driver implements Releasable, Describable {
5657

5758
private final AtomicReference<String> cancelReason = new AtomicReference<>();
5859
private final AtomicReference<SubscribableListener<Void>> blocked = new AtomicReference<>();
60+
61+
private final AtomicBoolean started = new AtomicBoolean();
62+
private final SubscribableListener<Void> completionListener = new SubscribableListener<>();
63+
5964
/**
6065
* Status reported to the tasks API. We write the status at most once every
6166
* {@link #statusNanos}, as soon as loop has finished and after {@link #statusNanos}
@@ -149,7 +154,7 @@ private SubscribableListener<Void> run(TimeValue maxTime, int maxIterations) {
149154
if (isFinished()) {
150155
status.set(updateStatus(DriverStatus.Status.DONE));
151156
driverContext.finish();
152-
releasable.close();
157+
Releasables.close(releasable, driverContext.getSnapshot());
153158
} else {
154159
status.set(updateStatus(DriverStatus.Status.WAITING));
155160
}
@@ -159,7 +164,7 @@ private SubscribableListener<Void> run(TimeValue maxTime, int maxIterations) {
159164
/**
160165
* Whether the driver has run the chain of operators to completion.
161166
*/
162-
public boolean isFinished() {
167+
private boolean isFinished() {
163168
return activeOperators.isEmpty();
164169
}
165170

@@ -168,6 +173,19 @@ public void close() {
168173
drainAndCloseOperators(null);
169174
}
170175

176+
/**
177+
* Abort the driver and wait for it to finish
178+
*/
179+
public void abort(Exception reason, ActionListener<Void> listener) {
180+
completionListener.addListener(listener);
181+
if (started.compareAndSet(false, true)) {
182+
drainAndCloseOperators(reason);
183+
completionListener.onFailure(reason);
184+
} else {
185+
cancel(reason.getMessage());
186+
}
187+
}
188+
171189
private SubscribableListener<Void> runSingleLoopIteration() {
172190
ensureNotCancelled();
173191
boolean movedPage = false;
@@ -261,8 +279,11 @@ public static void start(
261279
int maxIterations,
262280
ActionListener<Void> listener
263281
) {
264-
driver.status.set(driver.updateStatus(DriverStatus.Status.STARTING));
265-
schedule(DEFAULT_TIME_BEFORE_YIELDING, maxIterations, threadContext, executor, driver, listener);
282+
driver.completionListener.addListener(listener);
283+
if (driver.started.compareAndSet(false, true)) {
284+
driver.status.set(driver.updateStatus(DriverStatus.Status.STARTING));
285+
schedule(DEFAULT_TIME_BEFORE_YIELDING, maxIterations, threadContext, executor, driver, driver.completionListener);
286+
}
266287
}
267288

268289
// Drains all active operators and closes them.
@@ -279,7 +300,7 @@ private void drainAndCloseOperators(@Nullable Exception e) {
279300
itr.remove();
280301
}
281302
driverContext.finish();
282-
Releasables.closeWhileHandlingException(releasable);
303+
Releasables.closeWhileHandlingException(releasable, driverContext.getSnapshot());
283304
}
284305

285306
private static void schedule(

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverContext.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.common.util.BigArrays;
1212
import org.elasticsearch.compute.data.BlockFactory;
1313
import org.elasticsearch.core.Releasable;
14+
import org.elasticsearch.core.Releasables;
1415

1516
import java.util.Collections;
1617
import java.util.IdentityHashMap;
@@ -69,7 +70,12 @@ public BlockFactory blockFactory() {
6970
}
7071

7172
/** A snapshot of the driver context. */
72-
public record Snapshot(Set<Releasable> releasables) {}
73+
public record Snapshot(Set<Releasable> releasables) implements Releasable {
74+
@Override
75+
public void close() {
76+
Releasables.close(releasables);
77+
}
78+
}
7379

7480
/**
7581
* Adds a releasable to this context. Releasables are identified by Object identity.

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverRunner.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.elasticsearch.common.util.concurrent.AtomicArray;
1313
import org.elasticsearch.common.util.concurrent.CountDown;
1414
import org.elasticsearch.common.util.concurrent.ThreadContext;
15-
import org.elasticsearch.core.Releasables;
1615
import org.elasticsearch.tasks.TaskCancelledException;
1716

1817
import java.util.HashMap;
@@ -84,13 +83,6 @@ private void done() {
8483
responseHeaders.setOnce(driverIndex, threadContext.getResponseHeaders());
8584
if (counter.countDown()) {
8685
mergeResponseHeaders(responseHeaders);
87-
for (Driver d : drivers) {
88-
if (d.status().status() == DriverStatus.Status.QUEUED) {
89-
d.close();
90-
} else {
91-
Releasables.close(d.driverContext().getSnapshot().releasables());
92-
}
93-
}
9486
Exception error = failure.get();
9587
if (error != null) {
9688
listener.onFailure(error);

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverTaskRunner.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,13 @@ protected void start(Driver driver, ActionListener<Void> driverListener) {
5252
new DriverRequest(driver, executor),
5353
parentTask,
5454
TransportRequestOptions.EMPTY,
55-
TransportResponseHandler.empty(executor, driverListener)
55+
TransportResponseHandler.empty(
56+
executor,
57+
// The TransportResponseHandler can be notified while the Driver is still running during node shutdown
58+
// or the Driver hasn't started when the parent task is canceled. In such cases, we should abort
59+
// the Driver and wait for it to finish.
60+
ActionListener.wrap(driverListener::onResponse, e -> driver.abort(e, driverListener))
61+
)
5662
);
5763
}
5864
};

0 commit comments

Comments
 (0)