|
10 | 10 |
|
11 | 11 | import org.apache.logging.log4j.LogManager;
|
12 | 12 | import org.apache.logging.log4j.Logger;
|
| 13 | +import org.elasticsearch.core.Nullable; |
13 | 14 | import org.elasticsearch.tasks.CancellableTask;
|
14 | 15 | import org.elasticsearch.tasks.Task;
|
15 | 16 | import org.elasticsearch.tasks.TaskInfo;
|
16 |
| -import org.elasticsearch.tasks.TaskManager; |
17 | 17 | import org.elasticsearch.transport.TransportService;
|
18 | 18 |
|
| 19 | +import java.util.ArrayList; |
19 | 20 | import java.util.List;
|
| 21 | +import java.util.Objects; |
20 | 22 | import java.util.concurrent.TimeUnit;
|
21 | 23 | import java.util.stream.Collectors;
|
22 | 24 |
|
| 25 | +import static junit.framework.TestCase.assertFalse; |
23 | 26 | import static junit.framework.TestCase.assertTrue;
|
24 | 27 | import static junit.framework.TestCase.fail;
|
25 | 28 | import static org.elasticsearch.test.ESIntegTestCase.client;
|
@@ -59,30 +62,28 @@ private static void awaitTaskWithPrefix(String actionPrefix, Iterable<TransportS
|
59 | 62 | });
|
60 | 63 | }
|
61 | 64 |
|
62 |
| - public static void assertAllCancellableTasksAreCancelled(String actionPrefix) throws Exception { |
| 65 | + public static void assertAllCancellableTasksAreCancelled(String actionPrefix, @Nullable String opaqueId) throws Exception { |
63 | 66 | logger.info("--> checking that all tasks with prefix {} are marked as cancelled", actionPrefix);
|
64 | 67 |
|
65 | 68 | assertBusy(() -> {
|
66 |
| - boolean foundTask = false; |
| 69 | + var tasks = new ArrayList<CancellableTask>(); |
67 | 70 | for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
|
68 |
| - final TaskManager taskManager = transportService.getTaskManager(); |
| 71 | + var taskManager = transportService.getTaskManager(); |
69 | 72 | assertTrue(taskManager.assertCancellableTaskConsistency());
|
70 |
| - for (CancellableTask cancellableTask : taskManager.getCancellableTasks().values()) { |
71 |
| - if (cancellableTask.getAction().startsWith(actionPrefix)) { |
72 |
| - logger.trace("--> found task with prefix [{}]: [{}]", actionPrefix, cancellableTask); |
73 |
| - foundTask = true; |
74 |
| - assertTrue( |
75 |
| - "task " + cancellableTask.getId() + "/" + cancellableTask.getAction() + " not cancelled", |
76 |
| - cancellableTask.isCancelled() |
77 |
| - ); |
78 |
| - logger.trace("--> Task with prefix [{}] is marked as cancelled: [{}]", actionPrefix, cancellableTask); |
79 |
| - } |
80 |
| - } |
| 73 | + taskManager.getCancellableTasks().values().stream().filter(t -> t.getAction().startsWith(actionPrefix)).forEach(tasks::add); |
81 | 74 | }
|
82 |
| - assertTrue("found no cancellable tasks", foundTask); |
| 75 | + assertFalse("no tasks found for action: " + actionPrefix, tasks.isEmpty()); |
| 76 | + assertTrue( |
| 77 | + tasks.toString(), |
| 78 | + tasks.stream().allMatch(t -> t.isCancelled() && Objects.equals(t.getHeader(Task.X_OPAQUE_ID_HTTP_HEADER), opaqueId)) |
| 79 | + ); |
83 | 80 | }, 30, TimeUnit.SECONDS);
|
84 | 81 | }
|
85 | 82 |
|
| 83 | + public static void assertAllCancellableTasksAreCancelled(String actionPrefix) throws Exception { |
| 84 | + assertAllCancellableTasksAreCancelled(actionPrefix, null); |
| 85 | + } |
| 86 | + |
86 | 87 | public static void assertAllTasksHaveFinished(String actionPrefix) throws Exception {
|
87 | 88 | logger.info("--> checking that all tasks with prefix {} have finished", actionPrefix);
|
88 | 89 | assertBusy(() -> {
|
|
0 commit comments