Skip to content

Commit e5111e3

Browse files
authored
Make list-tasks API cancellable (#96283)
In a busy cluster the list-tasks API may retain information about a very large number of tasks while waiting for all nodes to respond. This commit makes the API cancellable so that unnecessary partial results can be released earlier. Relates #96279, which implements the early-release functionality.
1 parent 651ec81 commit e5111e3

File tree

6 files changed

+108
-13
lines changed

6 files changed

+108
-13
lines changed
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.action.support.tasks;
10+
11+
import org.apache.http.client.methods.HttpGet;
12+
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
13+
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
14+
import org.elasticsearch.action.support.PlainActionFuture;
15+
import org.elasticsearch.client.Cancellable;
16+
import org.elasticsearch.client.Request;
17+
import org.elasticsearch.client.Response;
18+
import org.elasticsearch.http.HttpSmokeTestCase;
19+
import org.elasticsearch.tasks.TaskManager;
20+
import org.elasticsearch.transport.TransportService;
21+
22+
import java.util.ArrayList;
23+
import java.util.concurrent.CancellationException;
24+
import java.util.concurrent.TimeUnit;
25+
26+
import static org.elasticsearch.action.support.ActionTestUtils.wrapAsRestResponseListener;
27+
import static org.elasticsearch.test.TaskAssertions.awaitTaskWithPrefix;
28+
29+
public class RestListTasksCancellationIT extends HttpSmokeTestCase {
30+
31+
public void testListTasksCancellation() throws Exception {
32+
final Request clusterStateRequest = new Request(HttpGet.METHOD_NAME, "/_cluster/state");
33+
clusterStateRequest.addParameter("wait_for_metadata_version", Long.toString(Long.MAX_VALUE));
34+
clusterStateRequest.addParameter("wait_for_timeout", "1h");
35+
36+
final PlainActionFuture<Response> clusterStateFuture = new PlainActionFuture<>();
37+
final Cancellable clusterStateCancellable = getRestClient().performRequestAsync(
38+
clusterStateRequest,
39+
wrapAsRestResponseListener(clusterStateFuture)
40+
);
41+
42+
awaitTaskWithPrefix(ClusterStateAction.NAME);
43+
44+
final Request tasksRequest = new Request(HttpGet.METHOD_NAME, "/_tasks");
45+
tasksRequest.addParameter("actions", ClusterStateAction.NAME);
46+
tasksRequest.addParameter("wait_for_completion", Boolean.toString(true));
47+
tasksRequest.addParameter("timeout", "1h");
48+
49+
final PlainActionFuture<Response> tasksFuture = new PlainActionFuture<>();
50+
final Cancellable tasksCancellable = getRestClient().performRequestAsync(tasksRequest, wrapAsRestResponseListener(tasksFuture));
51+
52+
awaitTaskWithPrefix(ListTasksAction.NAME + "[n]");
53+
54+
tasksCancellable.cancel();
55+
56+
final var taskManagers = new ArrayList<TaskManager>(internalCluster().getNodeNames().length);
57+
for (final var transportService : internalCluster().getInstances(TransportService.class)) {
58+
taskManagers.add(transportService.getTaskManager());
59+
}
60+
assertBusy(
61+
() -> assertFalse(
62+
taskManagers.stream()
63+
.flatMap(taskManager -> taskManager.getCancellableTasks().values().stream())
64+
.anyMatch(t -> t.getAction().startsWith(ListTasksAction.NAME))
65+
)
66+
);
67+
68+
expectThrows(CancellationException.class, () -> tasksFuture.actionGet(10, TimeUnit.SECONDS));
69+
clusterStateCancellable.cancel();
70+
}
71+
72+
}

server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksRequest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,12 @@
1414
import org.elasticsearch.common.Strings;
1515
import org.elasticsearch.common.io.stream.StreamInput;
1616
import org.elasticsearch.common.io.stream.StreamOutput;
17+
import org.elasticsearch.tasks.CancellableTask;
1718
import org.elasticsearch.tasks.Task;
19+
import org.elasticsearch.tasks.TaskId;
1820

1921
import java.io.IOException;
22+
import java.util.Map;
2023

2124
import static org.elasticsearch.action.ValidateActions.addValidationError;
2225
import static org.elasticsearch.common.regex.Regex.simpleMatch;
@@ -119,4 +122,8 @@ public ListTasksRequest setDescriptions(String... descriptions) {
119122
return this;
120123
}
121124

125+
@Override
126+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
127+
return new CancellableTask(id, type, action, "", parentTaskId, headers);
128+
}
122129
}

server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/TransportListTasksAction.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.tasks.CancellableTask;
2525
import org.elasticsearch.tasks.RemovedTaskListener;
2626
import org.elasticsearch.tasks.Task;
27+
import org.elasticsearch.tasks.TaskCancelledException;
2728
import org.elasticsearch.tasks.TaskInfo;
2829
import org.elasticsearch.threadpool.ThreadPool;
2930
import org.elasticsearch.transport.TransportService;
@@ -76,7 +77,13 @@ protected void taskOperation(CancellableTask actionTask, ListTasksRequest reques
7677
}
7778

7879
@Override
79-
protected void processTasks(ListTasksRequest request, ActionListener<List<Task>> nodeOperation) {
80+
protected void doExecute(Task task, ListTasksRequest request, ActionListener<ListTasksResponse> listener) {
81+
assert task instanceof CancellableTask;
82+
super.doExecute(task, request, listener);
83+
}
84+
85+
@Override
86+
protected void processTasks(CancellableTask nodeTask, ListTasksRequest request, ActionListener<List<Task>> nodeOperation) {
8087
if (request.getWaitForCompletion()) {
8188
final ListenableActionFuture<List<Task>> future = new ListenableActionFuture<>();
8289
final List<Task> processedTasks = new ArrayList<>();
@@ -137,8 +144,9 @@ protected void processTasks(ListTasksRequest request, ActionListener<List<Task>>
137144
threadPool,
138145
ThreadPool.Names.SAME
139146
);
147+
nodeTask.addListener(() -> future.onFailure(new TaskCancelledException("task cancelled")));
140148
} else {
141-
super.processTasks(request, nodeOperation);
149+
super.processTasks(nodeTask, request, nodeOperation);
142150
}
143151
}
144152
}

server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ protected String[] resolveNodes(TasksRequest request, DiscoveryNodes discoveryNo
198198
}
199199
}
200200

201-
protected void processTasks(TasksRequest request, ActionListener<List<OperationTask>> nodeOperation) {
201+
protected void processTasks(CancellableTask nodeTask, TasksRequest request, ActionListener<List<OperationTask>> nodeOperation) {
202202
nodeOperation.onResponse(processTasks(request));
203203
}
204204

@@ -255,6 +255,7 @@ public void messageReceived(final NodeTaskRequest request, final TransportChanne
255255
assert task instanceof CancellableTask;
256256
TasksRequest tasksRequest = request.tasksRequest;
257257
processTasks(
258+
(CancellableTask) task,
258259
tasksRequest,
259260
new ChannelActionListener<NodeTasksResponse>(channel).delegateFailure(
260261
(l, tasks) -> nodeOperation((CancellableTask) task, l, tasksRequest, tasks)

server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestListTasksAction.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.rest.BaseRestHandler;
1919
import org.elasticsearch.rest.RestChannel;
2020
import org.elasticsearch.rest.RestRequest;
21+
import org.elasticsearch.rest.action.RestCancellableNodeClient;
2122
import org.elasticsearch.rest.action.RestChunkedToXContentListener;
2223
import org.elasticsearch.tasks.TaskId;
2324

@@ -49,7 +50,9 @@ public String getName() {
4950
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
5051
final ListTasksRequest listTasksRequest = generateListTasksRequest(request);
5152
final String groupBy = request.param("group_by", "nodes");
52-
return channel -> client.admin().cluster().listTasks(listTasksRequest, listTasksResponseListener(nodesInCluster, groupBy, channel));
53+
return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).admin()
54+
.cluster()
55+
.listTasks(listTasksRequest, listTasksResponseListener(nodesInCluster, groupBy, channel));
5356
}
5457

5558
public static ListTasksRequest generateListTasksRequest(RestRequest request) {

test/framework/src/main/java/org/elasticsearch/action/support/ActionTestUtils.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@
1515
import org.elasticsearch.client.ResponseListener;
1616
import org.elasticsearch.core.CheckedConsumer;
1717
import org.elasticsearch.tasks.Task;
18+
import org.elasticsearch.tasks.TaskId;
1819
import org.elasticsearch.tasks.TaskManager;
1920
import org.elasticsearch.transport.Transport;
2021

21-
import static org.elasticsearch.action.support.PlainActionFuture.newFuture;
22-
import static org.mockito.Mockito.mock;
22+
import java.util.Map;
23+
import java.util.concurrent.TimeUnit;
2324

2425
public class ActionTestUtils {
2526

@@ -29,10 +30,11 @@ public static <Request extends ActionRequest, Response extends ActionResponse> R
2930
TransportAction<Request, Response> action,
3031
Request request
3132
) {
32-
PlainActionFuture<Response> future = newFuture();
33-
Task task = mock(Task.class);
34-
action.execute(task, request, future);
35-
return future.actionGet();
33+
return PlainActionFuture.get(
34+
future -> action.execute(request.createTask(1L, "direct", action.actionName, TaskId.EMPTY_TASK_ID, Map.of()), request, future),
35+
10,
36+
TimeUnit.SECONDS
37+
);
3638
}
3739

3840
public static <Request extends ActionRequest, Response extends ActionResponse> Response executeBlockingWithTask(
@@ -41,9 +43,11 @@ public static <Request extends ActionRequest, Response extends ActionResponse> R
4143
TransportAction<Request, Response> action,
4244
Request request
4345
) {
44-
PlainActionFuture<Response> future = newFuture();
45-
taskManager.registerAndExecute("transport", action, request, localConnection, future);
46-
return future.actionGet();
46+
return PlainActionFuture.get(
47+
future -> taskManager.registerAndExecute("transport", action, request, localConnection, future),
48+
10,
49+
TimeUnit.SECONDS
50+
);
4751
}
4852

4953
/**

0 commit comments

Comments
 (0)