Skip to content

Commit abde0fe

Browse files
committed
Handle exceptions when register compute group
1 parent c92ce0a commit abde0fe

File tree

3 files changed

+19
-6
lines changed

3 files changed

+19
-6
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,13 @@ void startComputeOnRemoteCluster(
109109
groupTask = rootTask;
110110
onGroupFailure = cancelQueryOnFailure;
111111
} else {
112-
groupTask = computeService.createGroupTask(rootTask, () -> "compute group: cluster [" + clusterAlias + "]");
112+
try {
113+
groupTask = computeService.createGroupTask(rootTask, () -> "compute group: cluster [" + clusterAlias + "]");
114+
} catch (Exception e) {
115+
assert e instanceof TaskCancelledException : new AssertionError(e);
116+
l.onFailure(e);
117+
return;
118+
}
113119
onGroupFailure = computeService.cancelQueryOnFailure(groupTask);
114120
l = ActionListener.runAfter(l, () -> transportService.getTaskManager().unregister(groupTask));
115121
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.search.lookup.SourceProvider;
3434
import org.elasticsearch.tasks.CancellableTask;
3535
import org.elasticsearch.tasks.Task;
36+
import org.elasticsearch.tasks.TaskCancelledException;
3637
import org.elasticsearch.tasks.TaskId;
3738
import org.elasticsearch.tasks.TaskManager;
3839
import org.elasticsearch.threadpool.ThreadPool;
@@ -431,7 +432,7 @@ Runnable cancelQueryOnFailure(CancellableTask task) {
431432
});
432433
}
433434

434-
CancellableTask createGroupTask(Task parentTask, Supplier<String> description) {
435+
CancellableTask createGroupTask(Task parentTask, Supplier<String> description) throws TaskCancelledException {
435436
final TaskManager taskManager = transportService.getTaskManager();
436437
return (CancellableTask) taskManager.register(
437438
"transport",

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -130,10 +130,16 @@ protected void sendRequest(
130130
final Runnable onGroupFailure;
131131
final CancellableTask groupTask;
132132
if (allowPartialResults) {
133-
groupTask = computeService.createGroupTask(
134-
parentTask,
135-
() -> "compute group: data-node [" + node.getName() + "], " + shardIds + " [" + shardIds + "]"
136-
);
133+
try {
134+
groupTask = computeService.createGroupTask(
135+
parentTask,
136+
() -> "compute group: data-node [" + node.getName() + "], " + shardIds + " [" + shardIds + "]"
137+
);
138+
} catch (Exception e) {
139+
assert e instanceof TaskCancelledException : new AssertionError(e);
140+
l.onFailure(e);
141+
return;
142+
}
137143
onGroupFailure = computeService.cancelQueryOnFailure(groupTask);
138144
l = ActionListener.runAfter(l, () -> transportService.getTaskManager().unregister(groupTask));
139145
} else {

0 commit comments

Comments
 (0)