From e63d87ff51079200b822eb9401fa09d93d177864 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 23 May 2023 11:58:58 +0100 Subject: [PATCH 1/6] Make list-tasks API cancellable In a busy cluster the list-tasks API may retain information about a very large number of tasks while waiting for all nodes to respond. This commit makes the API cancellable so that unnecessary partial results can be released earlier. Relates #96279, which implements the early-release functionality. --- .../tasks/RestListTasksCancellationIT.java | 59 +++++++++++++++++++ .../node/tasks/list/ListTasksRequest.java | 7 +++ .../tasks/list/TransportListTasksAction.java | 12 +++- .../support/tasks/TransportTasksAction.java | 10 +++- .../admin/cluster/RestListTasksAction.java | 5 +- 5 files changed, 87 insertions(+), 6 deletions(-) create mode 100644 qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/action/support/tasks/RestListTasksCancellationIT.java diff --git a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/action/support/tasks/RestListTasksCancellationIT.java b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/action/support/tasks/RestListTasksCancellationIT.java new file mode 100644 index 0000000000000..5fffdf6a6710c --- /dev/null +++ b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/action/support/tasks/RestListTasksCancellationIT.java @@ -0,0 +1,59 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.support.tasks; + +import org.apache.http.client.methods.HttpGet; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction; +import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.client.Cancellable; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.http.HttpSmokeTestCase; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.action.support.ActionTestUtils.wrapAsRestResponseListener; +import static org.elasticsearch.test.TaskAssertions.assertAllCancellableTasksAreCancelled; +import static org.elasticsearch.test.TaskAssertions.awaitTaskWithPrefix; + +public class RestListTasksCancellationIT extends HttpSmokeTestCase { + + public void testListTasksCancellation() throws Exception { + final Request clusterStateRequest = new Request(HttpGet.METHOD_NAME, "/_cluster/state"); + clusterStateRequest.addParameter("wait_for_metadata_version", Long.toString(Long.MAX_VALUE)); + clusterStateRequest.addParameter("wait_for_timeout", "1h"); + + final PlainActionFuture clusterStateFuture = new PlainActionFuture<>(); + final Cancellable clusterStateCancellable = getRestClient().performRequestAsync( + clusterStateRequest, + wrapAsRestResponseListener(clusterStateFuture) + ); + + awaitTaskWithPrefix(ClusterStateAction.NAME); + + final Request tasksRequest = new Request(HttpGet.METHOD_NAME, "/_tasks"); + tasksRequest.addParameter("actions", ClusterStateAction.NAME); + tasksRequest.addParameter("wait_for_completion", Boolean.toString(true)); + + final PlainActionFuture tasksFuture = new PlainActionFuture<>(); + final Cancellable tasksCancellable = getRestClient().performRequestAsync(tasksRequest, wrapAsRestResponseListener(tasksFuture)); + + awaitTaskWithPrefix(ListTasksAction.NAME); + + tasksCancellable.cancel(); + + assertAllCancellableTasksAreCancelled(ListTasksAction.NAME); + expectThrows(CancellationException.class, () -> tasksFuture.actionGet(10, TimeUnit.SECONDS)); + + clusterStateCancellable.cancel(); + } + +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksRequest.java index 5b0194c81283e..597c9821e48ec 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksRequest.java @@ -14,9 +14,12 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import java.io.IOException; +import java.util.Map; import static org.elasticsearch.action.ValidateActions.addValidationError; import static org.elasticsearch.common.regex.Regex.simpleMatch; @@ -119,4 +122,8 @@ public ListTasksRequest setDescriptions(String... descriptions) { return this; } + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, "", parentTaskId, headers); + } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/TransportListTasksAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/TransportListTasksAction.java index d3a9ab80db5ca..eaaebb5d2bb9c 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/TransportListTasksAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/TransportListTasksAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.RemovedTaskListener; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -76,7 +77,13 @@ protected void taskOperation(CancellableTask actionTask, ListTasksRequest reques } @Override - protected void processTasks(ListTasksRequest request, ActionListener> nodeOperation) { + protected void doExecute(Task task, ListTasksRequest request, ActionListener listener) { + assert task instanceof CancellableTask; + super.doExecute(task, request, listener); + } + + @Override + protected void processTasks(CancellableTask nodeTask, ListTasksRequest request, ActionListener> nodeOperation) { if (request.getWaitForCompletion()) { final ListenableActionFuture> future = new ListenableActionFuture<>(); final List processedTasks = new ArrayList<>(); @@ -137,8 +144,9 @@ protected void processTasks(ListTasksRequest request, ActionListener> threadPool, ThreadPool.Names.SAME ); + nodeTask.addListener(() -> future.onFailure(new TaskCancelledException("task cancelled"))); } else { - super.processTasks(request, nodeOperation); + super.processTasks(nodeTask, request, nodeOperation); } } } diff --git a/server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java b/server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java index daedacf6fb4ad..624f32bf75100 100644 --- a/server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java @@ -88,9 +88,13 @@ protected void doExecute(Task task, TasksRequest request, ActionListener listener) { + private void nodeOperation(CancellableTask nodeTask, NodeTaskRequest nodeTaskRequest, ActionListener listener) { TasksRequest request = nodeTaskRequest.tasksRequest; - processTasks(request, ActionListener.wrap(tasks -> nodeOperation(task, listener, request, tasks), listener::onFailure)); + processTasks( + nodeTask, + request, + ActionListener.wrap(tasks -> nodeOperation(nodeTask, listener, request, tasks), listener::onFailure) + ); } private void nodeOperation( @@ -156,7 +160,7 @@ protected String[] resolveNodes(TasksRequest request, DiscoveryNodes discoveryNo } } - protected void processTasks(TasksRequest request, ActionListener> nodeOperation) { + protected void processTasks(CancellableTask nodeTask, TasksRequest request, ActionListener> nodeOperation) { nodeOperation.onResponse(processTasks(request)); } diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestListTasksAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestListTasksAction.java index 99417fbc962b7..cbf8baa9a2ea9 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestListTasksAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestListTasksAction.java @@ -18,6 +18,7 @@ import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestCancellableNodeClient; import org.elasticsearch.rest.action.RestChunkedToXContentListener; import org.elasticsearch.tasks.TaskId; @@ -49,7 +50,9 @@ public String getName() { public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { final ListTasksRequest listTasksRequest = generateListTasksRequest(request); final String groupBy = request.param("group_by", "nodes"); - return channel -> client.admin().cluster().listTasks(listTasksRequest, listTasksResponseListener(nodesInCluster, groupBy, channel)); + return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).admin() + .cluster() + .listTasks(listTasksRequest, listTasksResponseListener(nodesInCluster, groupBy, channel)); } public static ListTasksRequest generateListTasksRequest(RestRequest request) { From 38b79428fb711860fe187d39cca8711f6914bf8e Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 23 May 2023 12:53:11 +0100 Subject: [PATCH 2/6] Fix affected tests --- .../action/support/ActionTestUtils.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/action/support/ActionTestUtils.java b/test/framework/src/main/java/org/elasticsearch/action/support/ActionTestUtils.java index b9ae3d0a62e91..7fe278e4892f7 100644 --- a/test/framework/src/main/java/org/elasticsearch/action/support/ActionTestUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/action/support/ActionTestUtils.java @@ -14,11 +14,13 @@ import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseListener; import org.elasticsearch.core.CheckedConsumer; +import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.transport.Transport; -import static org.elasticsearch.action.support.PlainActionFuture.newFuture; +import java.util.concurrent.TimeUnit; + import static org.mockito.Mockito.mock; public class ActionTestUtils { @@ -29,10 +31,7 @@ public static R TransportAction action, Request request ) { - PlainActionFuture future = newFuture(); - Task task = mock(Task.class); - action.execute(task, request, future); - return future.actionGet(); + return PlainActionFuture.get(future -> action.execute(mock(CancellableTask.class), request, future), 10, TimeUnit.SECONDS); } public static Response executeBlockingWithTask( @@ -41,9 +40,11 @@ public static R TransportAction action, Request request ) { - PlainActionFuture future = newFuture(); - taskManager.registerAndExecute("transport", action, request, localConnection, future); - return future.actionGet(); + return PlainActionFuture.get( + future -> taskManager.registerAndExecute("transport", action, request, localConnection, future), + 10, + TimeUnit.SECONDS + ); } /** From 82cf365d7f74d07e95a0c9c8f3ae1b7cc76181aa Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 23 May 2023 13:39:33 +0100 Subject: [PATCH 3/6] =?UTF-8?q?Mocks=20=F0=9F=91=BF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../action/support/ActionTestUtils.java | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/action/support/ActionTestUtils.java b/test/framework/src/main/java/org/elasticsearch/action/support/ActionTestUtils.java index 7fe278e4892f7..49d465b18e209 100644 --- a/test/framework/src/main/java/org/elasticsearch/action/support/ActionTestUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/action/support/ActionTestUtils.java @@ -16,13 +16,13 @@ import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.transport.Transport; +import java.util.Map; import java.util.concurrent.TimeUnit; -import static org.mockito.Mockito.mock; - public class ActionTestUtils { private ActionTestUtils() { /* no construction */ } @@ -31,7 +31,15 @@ public static R TransportAction action, Request request ) { - return PlainActionFuture.get(future -> action.execute(mock(CancellableTask.class), request, future), 10, TimeUnit.SECONDS); + return PlainActionFuture.get( + future -> action.execute( + new CancellableTask(1L, "direct", action.actionName, "", TaskId.EMPTY_TASK_ID, Map.of()), + request, + future + ), + 10, + TimeUnit.SECONDS + ); } public static Response executeBlockingWithTask( From b4701c876c641051ba70a7a4e0d6686d98d38b67 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 23 May 2023 13:44:40 +0100 Subject: [PATCH 4/6] Let request create task --- .../java/org/elasticsearch/action/support/ActionTestUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/framework/src/main/java/org/elasticsearch/action/support/ActionTestUtils.java b/test/framework/src/main/java/org/elasticsearch/action/support/ActionTestUtils.java index 49d465b18e209..a7203d40ac5a7 100644 --- a/test/framework/src/main/java/org/elasticsearch/action/support/ActionTestUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/action/support/ActionTestUtils.java @@ -33,7 +33,7 @@ public static R ) { return PlainActionFuture.get( future -> action.execute( - new CancellableTask(1L, "direct", action.actionName, "", TaskId.EMPTY_TASK_ID, Map.of()), + request.createTask(1L, "direct", action.actionName, TaskId.EMPTY_TASK_ID, Map.of()), request, future ), From 8ea320561f43f1f7784a36f6869746e5297029e2 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 23 May 2023 14:02:32 +0100 Subject: [PATCH 5/6] Spotless --- .../org/elasticsearch/action/support/ActionTestUtils.java | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/action/support/ActionTestUtils.java b/test/framework/src/main/java/org/elasticsearch/action/support/ActionTestUtils.java index a7203d40ac5a7..49c3df17d60dd 100644 --- a/test/framework/src/main/java/org/elasticsearch/action/support/ActionTestUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/action/support/ActionTestUtils.java @@ -14,7 +14,6 @@ import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseListener; import org.elasticsearch.core.CheckedConsumer; -import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskManager; @@ -32,11 +31,7 @@ public static R Request request ) { return PlainActionFuture.get( - future -> action.execute( - request.createTask(1L, "direct", action.actionName, TaskId.EMPTY_TASK_ID, Map.of()), - request, - future - ), + future -> action.execute(request.createTask(1L, "direct", action.actionName, TaskId.EMPTY_TASK_ID, Map.of()), request, future), 10, TimeUnit.SECONDS ); From aeac3958009c25869c8ca2ed7e581969275b67bb Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 23 May 2023 15:09:06 +0100 Subject: [PATCH 6/6] Fix REST test --- .../tasks/RestListTasksCancellationIT.java | 21 +++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/action/support/tasks/RestListTasksCancellationIT.java b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/action/support/tasks/RestListTasksCancellationIT.java index 5fffdf6a6710c..29e59af7b9f70 100644 --- a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/action/support/tasks/RestListTasksCancellationIT.java +++ b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/action/support/tasks/RestListTasksCancellationIT.java @@ -16,12 +16,14 @@ import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.http.HttpSmokeTestCase; +import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.transport.TransportService; +import java.util.ArrayList; import java.util.concurrent.CancellationException; import java.util.concurrent.TimeUnit; import static org.elasticsearch.action.support.ActionTestUtils.wrapAsRestResponseListener; -import static org.elasticsearch.test.TaskAssertions.assertAllCancellableTasksAreCancelled; import static org.elasticsearch.test.TaskAssertions.awaitTaskWithPrefix; public class RestListTasksCancellationIT extends HttpSmokeTestCase { @@ -42,17 +44,28 @@ public void testListTasksCancellation() throws Exception { final Request tasksRequest = new Request(HttpGet.METHOD_NAME, "/_tasks"); tasksRequest.addParameter("actions", ClusterStateAction.NAME); tasksRequest.addParameter("wait_for_completion", Boolean.toString(true)); + tasksRequest.addParameter("timeout", "1h"); final PlainActionFuture tasksFuture = new PlainActionFuture<>(); final Cancellable tasksCancellable = getRestClient().performRequestAsync(tasksRequest, wrapAsRestResponseListener(tasksFuture)); - awaitTaskWithPrefix(ListTasksAction.NAME); + awaitTaskWithPrefix(ListTasksAction.NAME + "[n]"); tasksCancellable.cancel(); - assertAllCancellableTasksAreCancelled(ListTasksAction.NAME); - expectThrows(CancellationException.class, () -> tasksFuture.actionGet(10, TimeUnit.SECONDS)); + final var taskManagers = new ArrayList(internalCluster().getNodeNames().length); + for (final var transportService : internalCluster().getInstances(TransportService.class)) { + taskManagers.add(transportService.getTaskManager()); + } + assertBusy( + () -> assertFalse( + taskManagers.stream() + .flatMap(taskManager -> taskManager.getCancellableTasks().values().stream()) + .anyMatch(t -> t.getAction().startsWith(ListTasksAction.NAME)) + ) + ); + expectThrows(CancellationException.class, () -> tasksFuture.actionGet(10, TimeUnit.SECONDS)); clusterStateCancellable.cancel(); }