Skip to content

Commit 22b5597

Browse files
authored
Handle exceptions when register compute group (#123438) (#123897)
If the parent task is canceled, registering the compute group task might throw a TaskCancelledException, so the listener won't be notified. Closes #123216 Closes #123101 Closes #123451
1 parent ba715c0 commit 22b5597

File tree

3 files changed

+17
-6
lines changed

3 files changed

+17
-6
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,12 @@ void startComputeOnRemoteCluster(
109109
groupTask = rootTask;
110110
onGroupFailure = cancelQueryOnFailure;
111111
} else {
112-
groupTask = computeService.createGroupTask(rootTask, () -> "compute group: cluster [" + clusterAlias + "]");
112+
try {
113+
groupTask = computeService.createGroupTask(rootTask, () -> "compute group: cluster [" + clusterAlias + "]");
114+
} catch (TaskCancelledException e) {
115+
l.onFailure(e);
116+
return;
117+
}
113118
onGroupFailure = computeService.cancelQueryOnFailure(groupTask);
114119
l = ActionListener.runAfter(l, () -> transportService.getTaskManager().unregister(groupTask));
115120
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.search.lookup.SourceProvider;
3434
import org.elasticsearch.tasks.CancellableTask;
3535
import org.elasticsearch.tasks.Task;
36+
import org.elasticsearch.tasks.TaskCancelledException;
3637
import org.elasticsearch.tasks.TaskId;
3738
import org.elasticsearch.tasks.TaskManager;
3839
import org.elasticsearch.threadpool.ThreadPool;
@@ -429,7 +430,7 @@ Runnable cancelQueryOnFailure(CancellableTask task) {
429430
});
430431
}
431432

432-
CancellableTask createGroupTask(Task parentTask, Supplier<String> description) {
433+
CancellableTask createGroupTask(Task parentTask, Supplier<String> description) throws TaskCancelledException {
433434
final TaskManager taskManager = transportService.getTaskManager();
434435
return (CancellableTask) taskManager.register(
435436
"transport",

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -130,10 +130,15 @@ protected void sendRequest(
130130
final Runnable onGroupFailure;
131131
final CancellableTask groupTask;
132132
if (allowPartialResults) {
133-
groupTask = computeService.createGroupTask(
134-
parentTask,
135-
() -> "compute group: data-node [" + node.getName() + "], " + shardIds + " [" + shardIds + "]"
136-
);
133+
try {
134+
groupTask = computeService.createGroupTask(
135+
parentTask,
136+
() -> "compute group: data-node [" + node.getName() + "], " + shardIds + " [" + shardIds + "]"
137+
);
138+
} catch (TaskCancelledException e) {
139+
l.onFailure(e);
140+
return;
141+
}
137142
onGroupFailure = computeService.cancelQueryOnFailure(groupTask);
138143
l = ActionListener.runAfter(l, () -> transportService.getTaskManager().unregister(groupTask));
139144
} else {

0 commit comments

Comments
 (0)