diff --git a/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java b/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java index 4ecabbaa06ee1..e09f0b0fddd20 100644 --- a/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java @@ -58,6 +58,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -377,7 +378,13 @@ protected void newResponseAsync( public void testConcurrentlyCompletionAndCancellation() throws InterruptedException { final var action = getTestTransportNodesAction(); - final CancellableTask cancellableTask = new CancellableTask(randomLong(), "transport", "action", "", null, emptyMap()); + final CountDownLatch onCancelledLatch = new CountDownLatch(1); + final CancellableTask cancellableTask = new CancellableTask(randomLong(), "transport", "action", "", null, emptyMap()) { + @Override + protected void onCancelled() { + onCancelledLatch.countDown(); + } + }; final PlainActionFuture future = new PlainActionFuture<>(); action.execute(cancellableTask, new TestNodesRequest(), future); @@ -414,6 +421,11 @@ public void testConcurrentlyCompletionAndCancellation() throws InterruptedExcept assertNotNull("expect task cancellation exception, but got\n" + ExceptionsHelper.stackTrace(e), taskCancelledException); assertThat(e.getMessage(), containsString("task cancelled [simulated]")); assertTrue(cancellableTask.isCancelled()); + // Wait for the latch, the listener for releasing node responses is called before it. + // We need to wait for the latch because the cancellation may be detected in CancellableFanOut#onCompletion with + // the responseHandled flag being true. The flag is set by the cancellation listener which is still in process of + // draining existing responses. + safeAwait(onCancelledLatch); // All previously captured responses are released due to cancellation assertTrue(nodeResponses.stream().allMatch(r -> r.hasReferences() == false)); // Wait for the last response to be gathered and assert it is also released by either the concurrent cancellation or