diff --git a/muted-tests.yml b/muted-tests.yml index 50e6b26239636..87e3fc66c0d77 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -202,9 +202,6 @@ tests: - class: org.elasticsearch.packaging.test.DockerTests method: test151MachineDependentHeapWithSizeOverride issue: https://github.com/elastic/elasticsearch/issues/123437 -- class: org.elasticsearch.action.admin.cluster.node.tasks.CancellableTasksIT - method: testChildrenTasksCancelledOnTimeout - issue: https://github.com/elastic/elasticsearch/issues/123568 - class: org.elasticsearch.xpack.searchablesnapshots.FrozenSearchableSnapshotsIntegTests method: testCreateAndRestorePartialSearchableSnapshot issue: https://github.com/elastic/elasticsearch/issues/123773 diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksIT.java index 3f3ee0c5598f9..e95267f4752b5 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksIT.java @@ -51,6 +51,7 @@ import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import org.junit.After; +import org.junit.Before; import java.io.IOException; import java.util.ArrayList; @@ -63,12 +64,14 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import java.util.stream.Collectors; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsStringIgnoringCase; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasSize; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST) @@ -79,6 +82,13 @@ public class CancellableTasksIT extends ESIntegTestCase { static final Map arrivedLatches = ConcurrentCollections.newConcurrentMap(); static final Map beforeExecuteLatches = ConcurrentCollections.newConcurrentMap(); static final Map completedLatches = ConcurrentCollections.newConcurrentMap(); + static final Set cancelledRequests = ConcurrentCollections.newConcurrentSet(); + + @Before + public void clearCollections() { + List.of(beforeSendLatches, arrivedLatches, beforeExecuteLatches, completedLatches).forEach(Map::clear); + cancelledRequests.clear(); + } @After public void ensureBansAndCancellationsConsistency() throws Exception { @@ -368,16 +378,65 @@ public void testRemoveBanParentsOnDisconnect() throws Exception { public void testChildrenTasksCancelledOnTimeout() throws Exception { Set nodes = clusterService().state().nodes().stream().collect(Collectors.toSet()); + final Supplier getNumChildLocalCancellationsWithNoChildrenFound = () -> nodes.stream() + .mapToLong( + node -> internalCluster().getInstance(TransportService.class, node.getName()) + .getTaskManager() + .getNumChildLocalCancellationsWithNoChildrenFound() + ) + .sum(); + final var startingNumChildLocalCancellationsWithNoChildrenFound = getNumChildLocalCancellationsWithNoChildrenFound.get(); final TestRequest rootRequest = generateTestRequest(nodes, 0, between(1, 4), true); ActionFuture rootTaskFuture = client().execute(TransportTestAction.ACTION, rootRequest); allowEntireRequest(rootRequest); waitForRootTask(rootTaskFuture, true); ensureBansAndCancellationsConsistency(); - // Make sure all descendent requests have completed - for (TestRequest subRequest : rootRequest.descendants()) { + // Make sure all descendant requests have completed + final var descendants = rootRequest.descendants(); + for (TestRequest subRequest : descendants) { safeAwait(completedLatches.get(subRequest)); } + + // Give any remaining scheduled subrequest child tasks time to run and detect cancellation. + for (int i = 0; i < 3 && cancelledRequests.size() != descendants.size(); ++i) { + safeSleep(TransportTestAction.SCHEDULE_INTERVAL_MILLIS); + } + + // Note that the design of TransportTestAction can produce a race condition where a subrequest times out and attempts to cancel + // the child task before the child task has been added to the task manager on the remote node. Here we assert that at least one + // child task has been cancelled due to a subrequest timeout, and then add a safeguard to manually cancel any remaining tasks. + // See https://github.com/elastic/elasticsearch/issues/123568 for details about transport-level timeouts and options considered + // when this test failed. + final var numCancelledBeforeIntervention = cancelledRequests.size(); + assertThat( + "expected child tasks to have been cancelled due to parent task timeouts", + numCancelledBeforeIntervention, + greaterThan(0) + ); + + assertBusy(() -> { + for (final var node : nodes) { + final var taskManager = internalCluster().getInstance(TransportService.class, node.getName()).getTaskManager(); + taskManager.getCancellableTasks() + .values() + .stream() + .filter(task -> task.getAction().equals(TransportTestAction.ACTION.name())) + .forEach(task -> { + logger.info("Manually cancelling orphaned task [{}]", task); + taskManager.cancel(task, "all completed latches have been pulled, cancelling orphaned tasks", () -> {}); + }); + } + assertThat("expected all root request descendants to be cancelled", cancelledRequests.size(), equalTo(descendants.size())); + }, 30, TimeUnit.SECONDS); + + final var numChildLocalCancellationsWithNoChildrenFound = getNumChildLocalCancellationsWithNoChildrenFound.get() + - startingNumChildLocalCancellationsWithNoChildrenFound; + assertThat( + "expected cancellation interventions to be caused by child local cancellations with no children found", + numChildLocalCancellationsWithNoChildrenFound + numCancelledBeforeIntervention, + equalTo((long) cancelledRequests.size()) + ); } static TaskId getRootTaskId(TestRequest request) throws Exception { @@ -507,6 +566,7 @@ public void writeTo(StreamOutput out) throws IOException { public static class TransportTestAction extends HandledTransportAction { public static ActionType ACTION = new ActionType<>("internal::test_action"); + static final long SCHEDULE_INTERVAL_MILLIS = 10L; private final TransportService transportService; private final NodeClient client; @@ -536,9 +596,10 @@ protected void doRun() throws Exception { if (request.timeout) { // Repeat work until cancelled if (((CancellableTask) task).isCancelled() == false) { - schedule(task, request, TimeValue.timeValueMillis(10), listener); + schedule(task, request, TimeValue.timeValueMillis(SCHEDULE_INTERVAL_MILLIS), listener); return; } + cancelledRequests.add(request); } else { ((CancellableTask) task).ensureNotCancelled(); } diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java index 249bb1d43119c..06b03f3fca006 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java @@ -90,6 +90,8 @@ public class TaskManager implements ClusterStateApplier { private final List removedTaskListeners = new CopyOnWriteArrayList<>(); + private final AtomicLong numChildLocalCancellationsWithNoChildrenFound = new AtomicLong(); + // For testing public TaskManager(Settings settings, ThreadPool threadPool, Set taskHeaders) { this(settings, threadPool, taskHeaders, Tracer.NOOP); @@ -306,6 +308,16 @@ public void cancelChildLocal(TaskId parentTaskId, long childRequestId, String re } child.cancel(reason); } + } else { + if (logger.isTraceEnabled()) { + logger.trace( + "No children list found for cancelChildLocal() with parent task [{}], request ID [{}], and reason [{}]", + parentTaskId, + childRequestId, + reason + ); + } + numChildLocalCancellationsWithNoChildrenFound.incrementAndGet(); } } } @@ -823,4 +835,8 @@ public void cancelTaskAndDescendants(CancellableTask task, String reason, boolea public Set getTaskHeaders() { return taskHeaders; } + + public long getNumChildLocalCancellationsWithNoChildrenFound() { + return numChildLocalCancellationsWithNoChildrenFound.get(); + } }