Skip to content

Commit e63d87f

Browse files
committed
Make list-tasks API cancellable
In a busy cluster the list-tasks API may retain information about a very large number of tasks while waiting for all nodes to respond. This commit makes the API cancellable so that unnecessary partial results can be released earlier. Relates elastic#96279, which implements the early-release functionality.
1 parent d1ec14f commit e63d87f

File tree

5 files changed

+87
-6
lines changed

5 files changed

+87
-6
lines changed
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.action.support.tasks;
10+
11+
import org.apache.http.client.methods.HttpGet;
12+
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
13+
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
14+
import org.elasticsearch.action.support.PlainActionFuture;
15+
import org.elasticsearch.client.Cancellable;
16+
import org.elasticsearch.client.Request;
17+
import org.elasticsearch.client.Response;
18+
import org.elasticsearch.http.HttpSmokeTestCase;
19+
20+
import java.util.concurrent.CancellationException;
21+
import java.util.concurrent.TimeUnit;
22+
23+
import static org.elasticsearch.action.support.ActionTestUtils.wrapAsRestResponseListener;
24+
import static org.elasticsearch.test.TaskAssertions.assertAllCancellableTasksAreCancelled;
25+
import static org.elasticsearch.test.TaskAssertions.awaitTaskWithPrefix;
26+
27+
public class RestListTasksCancellationIT extends HttpSmokeTestCase {
28+
29+
public void testListTasksCancellation() throws Exception {
30+
final Request clusterStateRequest = new Request(HttpGet.METHOD_NAME, "/_cluster/state");
31+
clusterStateRequest.addParameter("wait_for_metadata_version", Long.toString(Long.MAX_VALUE));
32+
clusterStateRequest.addParameter("wait_for_timeout", "1h");
33+
34+
final PlainActionFuture<Response> clusterStateFuture = new PlainActionFuture<>();
35+
final Cancellable clusterStateCancellable = getRestClient().performRequestAsync(
36+
clusterStateRequest,
37+
wrapAsRestResponseListener(clusterStateFuture)
38+
);
39+
40+
awaitTaskWithPrefix(ClusterStateAction.NAME);
41+
42+
final Request tasksRequest = new Request(HttpGet.METHOD_NAME, "/_tasks");
43+
tasksRequest.addParameter("actions", ClusterStateAction.NAME);
44+
tasksRequest.addParameter("wait_for_completion", Boolean.toString(true));
45+
46+
final PlainActionFuture<Response> tasksFuture = new PlainActionFuture<>();
47+
final Cancellable tasksCancellable = getRestClient().performRequestAsync(tasksRequest, wrapAsRestResponseListener(tasksFuture));
48+
49+
awaitTaskWithPrefix(ListTasksAction.NAME);
50+
51+
tasksCancellable.cancel();
52+
53+
assertAllCancellableTasksAreCancelled(ListTasksAction.NAME);
54+
expectThrows(CancellationException.class, () -> tasksFuture.actionGet(10, TimeUnit.SECONDS));
55+
56+
clusterStateCancellable.cancel();
57+
}
58+
59+
}

server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksRequest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,12 @@
1414
import org.elasticsearch.common.Strings;
1515
import org.elasticsearch.common.io.stream.StreamInput;
1616
import org.elasticsearch.common.io.stream.StreamOutput;
17+
import org.elasticsearch.tasks.CancellableTask;
1718
import org.elasticsearch.tasks.Task;
19+
import org.elasticsearch.tasks.TaskId;
1820

1921
import java.io.IOException;
22+
import java.util.Map;
2023

2124
import static org.elasticsearch.action.ValidateActions.addValidationError;
2225
import static org.elasticsearch.common.regex.Regex.simpleMatch;
@@ -119,4 +122,8 @@ public ListTasksRequest setDescriptions(String... descriptions) {
119122
return this;
120123
}
121124

125+
@Override
126+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
127+
return new CancellableTask(id, type, action, "", parentTaskId, headers);
128+
}
122129
}

server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/TransportListTasksAction.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.tasks.CancellableTask;
2525
import org.elasticsearch.tasks.RemovedTaskListener;
2626
import org.elasticsearch.tasks.Task;
27+
import org.elasticsearch.tasks.TaskCancelledException;
2728
import org.elasticsearch.tasks.TaskInfo;
2829
import org.elasticsearch.threadpool.ThreadPool;
2930
import org.elasticsearch.transport.TransportService;
@@ -76,7 +77,13 @@ protected void taskOperation(CancellableTask actionTask, ListTasksRequest reques
7677
}
7778

7879
@Override
79-
protected void processTasks(ListTasksRequest request, ActionListener<List<Task>> nodeOperation) {
80+
protected void doExecute(Task task, ListTasksRequest request, ActionListener<ListTasksResponse> listener) {
81+
assert task instanceof CancellableTask;
82+
super.doExecute(task, request, listener);
83+
}
84+
85+
@Override
86+
protected void processTasks(CancellableTask nodeTask, ListTasksRequest request, ActionListener<List<Task>> nodeOperation) {
8087
if (request.getWaitForCompletion()) {
8188
final ListenableActionFuture<List<Task>> future = new ListenableActionFuture<>();
8289
final List<Task> processedTasks = new ArrayList<>();
@@ -137,8 +144,9 @@ protected void processTasks(ListTasksRequest request, ActionListener<List<Task>>
137144
threadPool,
138145
ThreadPool.Names.SAME
139146
);
147+
nodeTask.addListener(() -> future.onFailure(new TaskCancelledException("task cancelled")));
140148
} else {
141-
super.processTasks(request, nodeOperation);
149+
super.processTasks(nodeTask, request, nodeOperation);
142150
}
143151
}
144152
}

server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,13 @@ protected void doExecute(Task task, TasksRequest request, ActionListener<TasksRe
8888
new AsyncAction(task, request, listener).start();
8989
}
9090

91-
private void nodeOperation(CancellableTask task, NodeTaskRequest nodeTaskRequest, ActionListener<NodeTasksResponse> listener) {
91+
private void nodeOperation(CancellableTask nodeTask, NodeTaskRequest nodeTaskRequest, ActionListener<NodeTasksResponse> listener) {
9292
TasksRequest request = nodeTaskRequest.tasksRequest;
93-
processTasks(request, ActionListener.wrap(tasks -> nodeOperation(task, listener, request, tasks), listener::onFailure));
93+
processTasks(
94+
nodeTask,
95+
request,
96+
ActionListener.wrap(tasks -> nodeOperation(nodeTask, listener, request, tasks), listener::onFailure)
97+
);
9498
}
9599

96100
private void nodeOperation(
@@ -156,7 +160,7 @@ protected String[] resolveNodes(TasksRequest request, DiscoveryNodes discoveryNo
156160
}
157161
}
158162

159-
protected void processTasks(TasksRequest request, ActionListener<List<OperationTask>> nodeOperation) {
163+
protected void processTasks(CancellableTask nodeTask, TasksRequest request, ActionListener<List<OperationTask>> nodeOperation) {
160164
nodeOperation.onResponse(processTasks(request));
161165
}
162166

server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestListTasksAction.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.rest.BaseRestHandler;
1919
import org.elasticsearch.rest.RestChannel;
2020
import org.elasticsearch.rest.RestRequest;
21+
import org.elasticsearch.rest.action.RestCancellableNodeClient;
2122
import org.elasticsearch.rest.action.RestChunkedToXContentListener;
2223
import org.elasticsearch.tasks.TaskId;
2324

@@ -49,7 +50,9 @@ public String getName() {
4950
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
5051
final ListTasksRequest listTasksRequest = generateListTasksRequest(request);
5152
final String groupBy = request.param("group_by", "nodes");
52-
return channel -> client.admin().cluster().listTasks(listTasksRequest, listTasksResponseListener(nodesInCluster, groupBy, channel));
53+
return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).admin()
54+
.cluster()
55+
.listTasks(listTasksRequest, listTasksResponseListener(nodesInCluster, groupBy, channel));
5356
}
5457

5558
public static ListTasksRequest generateListTasksRequest(RestRequest request) {

0 commit comments

Comments
 (0)