Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,6 @@ tests:
- class: org.elasticsearch.packaging.test.DockerTests
method: test151MachineDependentHeapWithSizeOverride
issue: https://github.com/elastic/elasticsearch/issues/123437
- class: org.elasticsearch.action.admin.cluster.node.tasks.CancellableTasksIT
method: testChildrenTasksCancelledOnTimeout
issue: https://github.com/elastic/elasticsearch/issues/123568
- class: org.elasticsearch.xpack.searchablesnapshots.FrozenSearchableSnapshotsIntegTests
method: testCreateAndRestorePartialSearchableSnapshot
issue: https://github.com/elastic/elasticsearch/issues/123773
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -63,12 +64,14 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsStringIgnoringCase;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
Expand All @@ -79,6 +82,13 @@ public class CancellableTasksIT extends ESIntegTestCase {
static final Map<TestRequest, CountDownLatch> arrivedLatches = ConcurrentCollections.newConcurrentMap();
static final Map<TestRequest, CountDownLatch> beforeExecuteLatches = ConcurrentCollections.newConcurrentMap();
static final Map<TestRequest, CountDownLatch> completedLatches = ConcurrentCollections.newConcurrentMap();
static final Set<TestRequest> cancelledRequests = ConcurrentCollections.newConcurrentSet();

@Before
public void clearCollections() {
List.of(beforeSendLatches, arrivedLatches, beforeExecuteLatches, completedLatches).forEach(Map::clear);
cancelledRequests.clear();
}

@After
public void ensureBansAndCancellationsConsistency() throws Exception {
Expand Down Expand Up @@ -368,16 +378,65 @@ public void testRemoveBanParentsOnDisconnect() throws Exception {

public void testChildrenTasksCancelledOnTimeout() throws Exception {
Set<DiscoveryNode> nodes = clusterService().state().nodes().stream().collect(Collectors.toSet());
final Supplier<Long> getNumChildLocalCancellationsWithNoChildrenFound = () -> nodes.stream()
.mapToLong(
node -> internalCluster().getInstance(TransportService.class, node.getName())
.getTaskManager()
.getNumChildLocalCancellationsWithNoChildrenFound()
)
.sum();
final var startingNumChildLocalCancellationsWithNoChildrenFound = getNumChildLocalCancellationsWithNoChildrenFound.get();
final TestRequest rootRequest = generateTestRequest(nodes, 0, between(1, 4), true);
ActionFuture<TestResponse> rootTaskFuture = client().execute(TransportTestAction.ACTION, rootRequest);
allowEntireRequest(rootRequest);
waitForRootTask(rootTaskFuture, true);
ensureBansAndCancellationsConsistency();

// Make sure all descendent requests have completed
for (TestRequest subRequest : rootRequest.descendants()) {
// Make sure all descendant requests have completed
final var descendants = rootRequest.descendants();
for (TestRequest subRequest : descendants) {
safeAwait(completedLatches.get(subRequest));
}

// Give any remaining scheduled subrequest child tasks time to run and detect cancellation.
for (int i = 0; i < 3 && cancelledRequests.size() != descendants.size(); ++i) {
safeSleep(TransportTestAction.SCHEDULE_INTERVAL_MILLIS);
}

// Note that the design of TransportTestAction can produce a race condition where a subrequest times out and attempts to cancel
// the child task before the child task has been added to the task manager on the remote node. Here we assert that at least one
// child task has been cancelled due to a subrequest timeout, and then add a safeguard to manually cancel any remaining tasks.
// See https://github.com/elastic/elasticsearch/issues/123568 for details about transport-level timeouts and options considered
// when this test failed.
final var numCancelledBeforeIntervention = cancelledRequests.size();
assertThat(
"expected child tasks to have been cancelled due to parent task timeouts",
numCancelledBeforeIntervention,
greaterThan(0)
);

assertBusy(() -> {
for (final var node : nodes) {
final var taskManager = internalCluster().getInstance(TransportService.class, node.getName()).getTaskManager();
taskManager.getCancellableTasks()
.values()
.stream()
.filter(task -> task.getAction().equals(TransportTestAction.ACTION.name()))
.forEach(task -> {
logger.info("Manually cancelling orphaned task [{}]", task);
taskManager.cancel(task, "all completed latches have been pulled, cancelling orphaned tasks", () -> {});
});
}
assertThat("expected all root request descendants to be cancelled", cancelledRequests.size(), equalTo(descendants.size()));
}, 30, TimeUnit.SECONDS);

final var numChildLocalCancellationsWithNoChildrenFound = getNumChildLocalCancellationsWithNoChildrenFound.get()
- startingNumChildLocalCancellationsWithNoChildrenFound;
assertThat(
"expected cancellation interventions to be caused by child local cancellations with no children found",
numChildLocalCancellationsWithNoChildrenFound + numCancelledBeforeIntervention,
equalTo((long) cancelledRequests.size())
);
}

static TaskId getRootTaskId(TestRequest request) throws Exception {
Expand Down Expand Up @@ -507,6 +566,7 @@ public void writeTo(StreamOutput out) throws IOException {
public static class TransportTestAction extends HandledTransportAction<TestRequest, TestResponse> {

public static ActionType<TestResponse> ACTION = new ActionType<>("internal::test_action");
static final long SCHEDULE_INTERVAL_MILLIS = 10L;
private final TransportService transportService;
private final NodeClient client;

Expand Down Expand Up @@ -536,9 +596,10 @@ protected void doRun() throws Exception {
if (request.timeout) {
// Repeat work until cancelled
if (((CancellableTask) task).isCancelled() == false) {
schedule(task, request, TimeValue.timeValueMillis(10), listener);
schedule(task, request, TimeValue.timeValueMillis(SCHEDULE_INTERVAL_MILLIS), listener);
return;
}
cancelledRequests.add(request);
} else {
((CancellableTask) task).ensureNotCancelled();
}
Expand Down
16 changes: 16 additions & 0 deletions server/src/main/java/org/elasticsearch/tasks/TaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ public class TaskManager implements ClusterStateApplier {

private final List<RemovedTaskListener> removedTaskListeners = new CopyOnWriteArrayList<>();

private final AtomicLong numChildLocalCancellationsWithNoChildrenFound = new AtomicLong();

// For testing
public TaskManager(Settings settings, ThreadPool threadPool, Set<String> taskHeaders) {
this(settings, threadPool, taskHeaders, Tracer.NOOP);
Expand Down Expand Up @@ -306,6 +308,16 @@ public void cancelChildLocal(TaskId parentTaskId, long childRequestId, String re
}
child.cancel(reason);
}
} else {
if (logger.isTraceEnabled()) {
logger.trace(
"No children list found for cancelChildLocal() with parent task [{}], request ID [{}], and reason [{}]",
parentTaskId,
childRequestId,
reason
);
}
numChildLocalCancellationsWithNoChildrenFound.incrementAndGet();
}
}
}
Expand Down Expand Up @@ -823,4 +835,8 @@ public void cancelTaskAndDescendants(CancellableTask task, String reason, boolea
public Set<String> getTaskHeaders() {
return taskHeaders;
}

public long getNumChildLocalCancellationsWithNoChildrenFound() {
return numChildLocalCancellationsWithNoChildrenFound.get();
}
}