Skip to content

Commit 7325c6c

Browse files
authored
[8.x] Resume Driver on cancelled or early finished (#120020) (#120208)
* Resume Driver on cancelled or early finished (#120020) Today, when a Driver is put to sleep or scheduled but the thread pool is busy with other tasks, canceling the Driver must wait until the Driver is awakened and scheduled for another loop, or until the thread pool completes the tasks ahead of it. A similar issue occurs with early finishes. This change enables preemptive early finishes or cancellations of the Driver without waiting for another run loop. * compile
1 parent 459df33 commit 7325c6c

File tree

7 files changed

+159
-41
lines changed

7 files changed

+159
-41
lines changed

docs/changelog/120020.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 120020
2+
summary: Resume Driver on cancelled or early finished
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java

Lines changed: 40 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.common.util.concurrent.ThreadContext;
1515
import org.elasticsearch.compute.Describable;
1616
import org.elasticsearch.compute.data.Page;
17+
import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperator;
1718
import org.elasticsearch.core.Nullable;
1819
import org.elasticsearch.core.Releasable;
1920
import org.elasticsearch.core.Releasables;
@@ -74,10 +75,9 @@ public class Driver implements Releasable, Describable {
7475
private final long statusNanos;
7576

7677
private final AtomicReference<String> cancelReason = new AtomicReference<>();
77-
private final AtomicReference<SubscribableListener<Void>> blocked = new AtomicReference<>();
78-
7978
private final AtomicBoolean started = new AtomicBoolean();
8079
private final SubscribableListener<Void> completionListener = new SubscribableListener<>();
80+
private final DriverScheduler scheduler = new DriverScheduler();
8181

8282
/**
8383
* Status reported to the tasks API. We write the status at most once every
@@ -245,31 +245,33 @@ private IsBlockedResult runSingleLoopIteration() {
245245
ensureNotCancelled();
246246
boolean movedPage = false;
247247

248-
for (int i = 0; i < activeOperators.size() - 1; i++) {
249-
Operator op = activeOperators.get(i);
250-
Operator nextOp = activeOperators.get(i + 1);
248+
if (activeOperators.isEmpty() == false && activeOperators.get(activeOperators.size() - 1).isFinished() == false) {
249+
for (int i = 0; i < activeOperators.size() - 1; i++) {
250+
Operator op = activeOperators.get(i);
251+
Operator nextOp = activeOperators.get(i + 1);
251252

252-
// skip blocked operator
253-
if (op.isBlocked().listener().isDone() == false) {
254-
continue;
255-
}
253+
// skip blocked operator
254+
if (op.isBlocked().listener().isDone() == false) {
255+
continue;
256+
}
256257

257-
if (op.isFinished() == false && nextOp.needsInput()) {
258-
Page page = op.getOutput();
259-
if (page == null) {
260-
// No result, just move to the next iteration
261-
} else if (page.getPositionCount() == 0) {
262-
// Empty result, release any memory it holds immediately and move to the next iteration
263-
page.releaseBlocks();
264-
} else {
265-
// Non-empty result from the previous operation, move it to the next operation
266-
nextOp.addInput(page);
267-
movedPage = true;
258+
if (op.isFinished() == false && nextOp.needsInput()) {
259+
Page page = op.getOutput();
260+
if (page == null) {
261+
// No result, just move to the next iteration
262+
} else if (page.getPositionCount() == 0) {
263+
// Empty result, release any memory it holds immediately and move to the next iteration
264+
page.releaseBlocks();
265+
} else {
266+
// Non-empty result from the previous operation, move it to the next operation
267+
nextOp.addInput(page);
268+
movedPage = true;
269+
}
268270
}
269-
}
270271

271-
if (op.isFinished()) {
272-
nextOp.finish();
272+
if (op.isFinished()) {
273+
nextOp.finish();
274+
}
273275
}
274276
}
275277

@@ -312,19 +314,10 @@ private IsBlockedResult runSingleLoopIteration() {
312314

313315
public void cancel(String reason) {
314316
if (cancelReason.compareAndSet(null, reason)) {
315-
synchronized (this) {
316-
SubscribableListener<Void> fut = this.blocked.get();
317-
if (fut != null) {
318-
fut.onFailure(new TaskCancelledException(reason));
319-
}
320-
}
317+
scheduler.runPendingTasks();
321318
}
322319
}
323320

324-
private boolean isCancelled() {
325-
return cancelReason.get() != null;
326-
}
327-
328321
private void ensureNotCancelled() {
329322
String reason = cancelReason.get();
330323
if (reason != null) {
@@ -342,6 +335,16 @@ public static void start(
342335
driver.completionListener.addListener(listener);
343336
if (driver.started.compareAndSet(false, true)) {
344337
driver.updateStatus(0, 0, DriverStatus.Status.STARTING, "driver starting");
338+
// Register a listener to an exchange sink to handle early completion scenarios:
339+
// 1. When the query accumulates sufficient data (e.g., reaching the LIMIT).
340+
// 2. When users abort the query but want to retain the current result.
341+
// This allows the Driver to finish early without waiting for the scheduled task.
342+
final List<Operator> operators = driver.activeOperators;
343+
if (operators.isEmpty() == false) {
344+
if (operators.get(operators.size() - 1) instanceof ExchangeSinkOperator sinkOperator) {
345+
sinkOperator.addCompletionListener(ActionListener.running(driver.scheduler::runPendingTasks));
346+
}
347+
}
345348
schedule(DEFAULT_TIME_BEFORE_YIELDING, maxIterations, threadContext, executor, driver, driver.completionListener);
346349
}
347350
}
@@ -371,7 +374,7 @@ private static void schedule(
371374
Driver driver,
372375
ActionListener<Void> listener
373376
) {
374-
executor.execute(new AbstractRunnable() {
377+
final var task = new AbstractRunnable() {
375378

376379
@Override
377380
protected void doRun() {
@@ -383,16 +386,12 @@ protected void doRun() {
383386
if (fut.isDone()) {
384387
schedule(maxTime, maxIterations, threadContext, executor, driver, listener);
385388
} else {
386-
synchronized (driver) {
387-
if (driver.isCancelled() == false) {
388-
driver.blocked.set(fut);
389-
}
390-
}
391389
ActionListener<Void> readyListener = ActionListener.wrap(
392390
ignored -> schedule(maxTime, maxIterations, threadContext, executor, driver, listener),
393391
this::onFailure
394392
);
395393
fut.addListener(ContextPreservingActionListener.wrapPreservingContext(readyListener, threadContext));
394+
driver.scheduler.addOrRunDelayedTask(() -> fut.onResponse(null));
396395
}
397396
}
398397

@@ -405,7 +404,8 @@ public void onFailure(Exception e) {
405404
void onComplete(ActionListener<Void> listener) {
406405
driver.driverContext.waitForAsyncActions(ContextPreservingActionListener.wrapPreservingContext(listener, threadContext));
407406
}
408-
});
407+
};
408+
driver.scheduler.scheduleOrRunTask(executor, task);
409409
}
410410

411411
private static IsBlockedResult oneOf(List<IsBlockedResult> results) {
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.operator;
9+
10+
import org.elasticsearch.common.util.concurrent.EsExecutors;
11+
12+
import java.util.List;
13+
import java.util.concurrent.Executor;
14+
import java.util.concurrent.atomic.AtomicBoolean;
15+
import java.util.concurrent.atomic.AtomicReference;
16+
17+
/**
18+
* A Driver be put to sleep while its sink is full or its source is empty or be rescheduled after running several iterations.
19+
* This scheduler tracks the delayed and scheduled tasks, allowing them to run without waking up the driver or waiting for
20+
* the thread pool to pick up the task. This enables fast cancellation or early finishing without discarding the current result.
21+
*/
22+
final class DriverScheduler {
23+
private final AtomicReference<Runnable> delayedTask = new AtomicReference<>();
24+
private final AtomicReference<Runnable> scheduledTask = new AtomicReference<>();
25+
private final AtomicBoolean completing = new AtomicBoolean();
26+
27+
void addOrRunDelayedTask(Runnable task) {
28+
delayedTask.set(task);
29+
if (completing.get()) {
30+
final Runnable toRun = delayedTask.getAndSet(null);
31+
if (toRun != null) {
32+
assert task == toRun;
33+
toRun.run();
34+
}
35+
}
36+
}
37+
38+
void scheduleOrRunTask(Executor executor, Runnable task) {
39+
final Runnable existing = scheduledTask.getAndSet(task);
40+
assert existing == null : existing;
41+
final Executor executorToUse = completing.get() ? EsExecutors.DIRECT_EXECUTOR_SERVICE : executor;
42+
executorToUse.execute(() -> {
43+
final Runnable next = scheduledTask.getAndSet(null);
44+
if (next != null) {
45+
assert next == task;
46+
next.run();
47+
}
48+
});
49+
}
50+
51+
void runPendingTasks() {
52+
completing.set(true);
53+
for (var taskHolder : List.of(delayedTask, scheduledTask)) {
54+
final Runnable task = taskHolder.getAndSet(null);
55+
if (task != null) {
56+
task.run();
57+
}
58+
}
59+
}
60+
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSink.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.compute.operator.exchange;
99

10+
import org.elasticsearch.action.ActionListener;
1011
import org.elasticsearch.compute.data.Page;
1112
import org.elasticsearch.compute.operator.IsBlockedResult;
1213

@@ -30,6 +31,11 @@ public interface ExchangeSink {
3031
*/
3132
boolean isFinished();
3233

34+
/**
35+
* Adds a listener that will be notified when this exchange sink is finished.
36+
*/
37+
void addCompletionListener(ActionListener<Void> listener);
38+
3339
/**
3440
* Whether the sink is blocked on adding more pages
3541
*/

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkHandler.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,11 @@ public ExchangeSinkHandler(BlockFactory blockFactory, int maxBufferSize, LongSup
5252

5353
private class ExchangeSinkImpl implements ExchangeSink {
5454
boolean finished;
55+
private final SubscribableListener<Void> onFinished = new SubscribableListener<>();
5556

5657
ExchangeSinkImpl() {
5758
onChanged();
59+
buffer.addCompletionListener(onFinished);
5860
outstandingSinks.incrementAndGet();
5961
}
6062

@@ -68,6 +70,7 @@ public void addPage(Page page) {
6870
public void finish() {
6971
if (finished == false) {
7072
finished = true;
73+
onFinished.onResponse(null);
7174
onChanged();
7275
if (outstandingSinks.decrementAndGet() == 0) {
7376
buffer.finish(false);
@@ -78,7 +81,12 @@ public void finish() {
7881

7982
@Override
8083
public boolean isFinished() {
81-
return finished || buffer.isFinished();
84+
return onFinished.isDone();
85+
}
86+
87+
@Override
88+
public void addCompletionListener(ActionListener<Void> listener) {
89+
onFinished.addListener(listener);
8290
}
8391

8492
@Override

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkOperator.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import org.elasticsearch.TransportVersion;
1111
import org.elasticsearch.TransportVersions;
12+
import org.elasticsearch.action.ActionListener;
1213
import org.elasticsearch.common.Strings;
1314
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1415
import org.elasticsearch.common.io.stream.StreamInput;
@@ -60,6 +61,10 @@ public boolean isFinished() {
6061
return sink.isFinished();
6162
}
6263

64+
public void addCompletionListener(ActionListener<Void> listener) {
65+
sink.addCompletionListener(listener);
66+
}
67+
6368
@Override
6469
public void finish() {
6570
sink.finish();

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import org.elasticsearch.action.ActionListener;
1111
import org.elasticsearch.action.ActionRunnable;
12+
import org.elasticsearch.action.support.PlainActionFuture;
1213
import org.elasticsearch.common.breaker.CircuitBreaker;
1314
import org.elasticsearch.common.settings.Settings;
1415
import org.elasticsearch.common.unit.ByteSizeValue;
@@ -21,6 +22,10 @@
2122
import org.elasticsearch.compute.data.BlockFactory;
2223
import org.elasticsearch.compute.data.ElementType;
2324
import org.elasticsearch.compute.data.Page;
25+
import org.elasticsearch.compute.operator.exchange.ExchangeSinkHandler;
26+
import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperator;
27+
import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler;
28+
import org.elasticsearch.compute.operator.exchange.ExchangeSourceOperator;
2429
import org.elasticsearch.core.TimeValue;
2530
import org.elasticsearch.test.ESTestCase;
2631
import org.elasticsearch.threadpool.FixedExecutorBuilder;
@@ -35,8 +40,10 @@
3540
import java.util.concurrent.CountDownLatch;
3641
import java.util.concurrent.CyclicBarrier;
3742
import java.util.concurrent.TimeUnit;
43+
import java.util.function.Function;
3844
import java.util.function.LongSupplier;
3945

46+
import static org.hamcrest.Matchers.either;
4047
import static org.hamcrest.Matchers.equalTo;
4148

4249
public class DriverTests extends ESTestCase {
@@ -273,6 +280,33 @@ public Page getOutput() {
273280
}
274281
}
275282

283+
public void testResumeOnEarlyFinish() throws Exception {
284+
DriverContext driverContext = driverContext();
285+
ThreadPool threadPool = threadPool();
286+
try {
287+
PlainActionFuture<Void> sourceFuture = new PlainActionFuture<>();
288+
var sourceHandler = new ExchangeSourceHandler(between(1, 5), threadPool.executor("esql"), sourceFuture);
289+
var sinkHandler = new ExchangeSinkHandler(driverContext.blockFactory(), between(1, 5), System::currentTimeMillis);
290+
var sourceOperator = new ExchangeSourceOperator(sourceHandler.createExchangeSource());
291+
var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(), Function.identity());
292+
Driver driver = new Driver(driverContext, sourceOperator, List.of(), sinkOperator, () -> {});
293+
PlainActionFuture<Void> future = new PlainActionFuture<>();
294+
Driver.start(threadPool.getThreadContext(), threadPool.executor("esql"), driver, between(1, 1000), future);
295+
assertBusy(
296+
() -> assertThat(
297+
driver.status().status(),
298+
either(equalTo(DriverStatus.Status.ASYNC)).or(equalTo(DriverStatus.Status.STARTING))
299+
)
300+
);
301+
sinkHandler.fetchPageAsync(true, ActionListener.noop());
302+
future.actionGet(5, TimeUnit.SECONDS);
303+
assertThat(driver.status().status(), equalTo(DriverStatus.Status.DONE));
304+
sourceFuture.actionGet(5, TimeUnit.SECONDS);
305+
} finally {
306+
terminate(threadPool);
307+
}
308+
}
309+
276310
private static void assertRunningWithRegularUser(ThreadPool threadPool) {
277311
String user = threadPool.getThreadContext().getHeader("user");
278312
assertThat(user, equalTo("user1"));

0 commit comments

Comments
 (0)