Skip to content

Commit cd96706

Browse files
authored
[7.17] Wait for task on master in testGetMappingsCancellation (#91709) (#91916) (#91926)
* Wait for task on master in testGetMappingsCancellation (#91709) (#91916) * replace List.of usage
1 parent 4088dd8 commit cd96706

File tree

3 files changed

+34
-7
lines changed

3 files changed

+34
-7
lines changed

qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/RestClusterInfoActionCancellationIT.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import org.elasticsearch.core.TimeValue;
2727
import org.elasticsearch.rest.RestStatus;
2828
import org.elasticsearch.test.ESIntegTestCase;
29-
import org.elasticsearch.test.junit.annotations.TestLogging;
29+
import org.hamcrest.Matchers;
3030

3131
import java.util.EnumSet;
3232
import java.util.concurrent.CancellationException;
@@ -35,11 +35,10 @@
3535
import static org.elasticsearch.action.support.ActionTestUtils.wrapAsRestResponseListener;
3636
import static org.elasticsearch.test.TaskAssertions.assertAllCancellableTasksAreCancelled;
3737
import static org.elasticsearch.test.TaskAssertions.assertAllTasksHaveFinished;
38-
import static org.elasticsearch.test.TaskAssertions.awaitTaskWithPrefix;
38+
import static org.elasticsearch.test.TaskAssertions.awaitTaskWithPrefixOnMaster;
3939
import static org.hamcrest.core.IsEqual.equalTo;
4040

4141
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
42-
@TestLogging(value = "org.elasticsearch.tasks.TaskManager:TRACE,org.elasticsearch.test.TaskAssertions:TRACE", reason = "debugging")
4342
public class RestClusterInfoActionCancellationIT extends HttpSmokeTestCase {
4443

4544
public void testGetMappingsCancellation() throws Exception {
@@ -77,8 +76,18 @@ private void runTest(String actionName, String endpoint) throws Exception {
7776
final Cancellable cancellable = getRestClient().performRequestAsync(request, wrapAsRestResponseListener(future));
7877

7978
assertThat(future.isDone(), equalTo(false));
80-
awaitTaskWithPrefix(actionName);
81-
79+
awaitTaskWithPrefixOnMaster(actionName);
80+
// To ensure that the task is executing on master, we wait until the first blocked execution of the task registers its cluster state
81+
// observer for further retries. This ensures that a task is not cancelled before we have started its execution, which could result
82+
// in the task being unregistered and the test not being able to find any cancelled tasks.
83+
assertBusy(
84+
() -> assertThat(
85+
internalCluster().getCurrentMasterNodeInstance(ClusterService.class)
86+
.getClusterApplierService()
87+
.getTimeoutClusterStateListenersSize(),
88+
Matchers.greaterThan(0)
89+
)
90+
);
8291
cancellable.cancel();
8392
assertAllCancellableTasksAreCancelled(actionName);
8493

server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -672,4 +672,9 @@ protected boolean applicationMayFail() {
672672
public ClusterApplierRecordingService.Stats getStats() {
673673
return recordingService.getStats();
674674
}
675+
676+
// Exposed only for testing
677+
public int getTimeoutClusterStateListenersSize() {
678+
return timeoutClusterStateListeners.size();
679+
}
675680
}

test/framework/src/main/java/org/elasticsearch/test/TaskAssertions.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.tasks.TaskManager;
1717
import org.elasticsearch.transport.TransportService;
1818

19+
import java.util.Collections;
1920
import java.util.List;
2021
import java.util.concurrent.TimeUnit;
2122
import java.util.stream.Collectors;
@@ -32,10 +33,21 @@ public class TaskAssertions {
3233
private TaskAssertions() {}
3334

3435
public static void awaitTaskWithPrefix(String actionPrefix) throws Exception {
36+
awaitTaskWithPrefix(actionPrefix, internalCluster().getInstances(TransportService.class));
37+
}
38+
39+
public static void awaitTaskWithPrefixOnMaster(String actionPrefix) throws Exception {
40+
awaitTaskWithPrefix(
41+
actionPrefix,
42+
Collections.singletonList(internalCluster().getCurrentMasterNodeInstance(TransportService.class))
43+
);
44+
}
45+
46+
private static void awaitTaskWithPrefix(String actionPrefix, Iterable<TransportService> transportServiceInstances) throws Exception {
3547
logger.info("--> waiting for task with prefix [{}] to start", actionPrefix);
3648

3749
assertBusy(() -> {
38-
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
50+
for (TransportService transportService : transportServiceInstances) {
3951
List<Task> matchingTasks = transportService.getTaskManager()
4052
.getTasks()
4153
.values()
@@ -61,12 +73,13 @@ public static void assertAllCancellableTasksAreCancelled(String actionPrefix) th
6173
assertTrue(taskManager.assertCancellableTaskConsistency());
6274
for (CancellableTask cancellableTask : taskManager.getCancellableTasks().values()) {
6375
if (cancellableTask.getAction().startsWith(actionPrefix)) {
64-
logger.trace("--> found task with prefix [{}] marked as cancelled: [{}]", actionPrefix, cancellableTask);
76+
logger.trace("--> found task with prefix [{}]: [{}]", actionPrefix, cancellableTask);
6577
foundTask = true;
6678
assertTrue(
6779
"task " + cancellableTask.getId() + "/" + cancellableTask.getAction() + " not cancelled",
6880
cancellableTask.isCancelled()
6981
);
82+
logger.trace("--> Task with prefix [{}] is marked as cancelled: [{}]", actionPrefix, cancellableTask);
7083
}
7184
}
7285
}

0 commit comments

Comments
 (0)