From c278f50899008ab212315c7aa5c365edf15928c3 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Mon, 8 Sep 2025 15:50:03 +1000 Subject: [PATCH 1/3] [Test] Wait all results to be gathered before assertion The assertion iterates through the list of responses. It needs to wait for the last element to be added to the list otherwise it may encounter ConcurrentModificationException Resolves: #134277 --- muted-tests.yml | 3 --- .../action/support/nodes/TransportNodesActionTests.java | 3 +++ 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/muted-tests.yml b/muted-tests.yml index 80cdb68f9ab95..8ec65b41e38f1 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -519,9 +519,6 @@ tests: - class: org.elasticsearch.xpack.esql.qa.single_node.GenerativeForkIT method: test {csv-spec:spatial_shapes.ConvertFromStringParseError} issue: https://github.com/elastic/elasticsearch/issues/134254 -- class: org.elasticsearch.action.support.nodes.TransportNodesActionTests - method: testConcurrentlyCompletionAndCancellation - issue: https://github.com/elastic/elasticsearch/issues/134277 # Examples: # diff --git a/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java b/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java index d62432c2ee4cd..73e1ce48f2afb 100644 --- a/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java @@ -397,9 +397,11 @@ protected void onCancelled() { } final var raceBarrier = new CyclicBarrier(3); + final var completedLatch = new CountDownLatch(1); final Thread completeThread = new Thread(() -> { safeAwait(raceBarrier); nodeResponses.add(completeOneRequest(capturedRequests[capturedRequests.length - 1])); + completedLatch.countDown(); }); final Thread cancelThread = new Thread(() -> { safeAwait(raceBarrier); @@ -420,6 +422,7 @@ protected void onCancelled() { assertThat(e.getMessage(), containsString("task cancelled [simulated]")); assertTrue(cancellableTask.isCancelled()); safeAwait(onCancelledLatch); // wait for the latch, the listener for releasing node responses is called before it + safeAwait(completedLatch); // Wait till all responses are gathered assertTrue(nodeResponses.stream().allMatch(r -> r.hasReferences() == false)); } From 87704aeb9ac84d77f45051279b3859a9d5ed569f Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Tue, 9 Sep 2025 12:57:34 +1000 Subject: [PATCH 2/3] Review feedback --- .../nodes/TransportNodesActionTests.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java b/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java index 73e1ce48f2afb..7fd5b64ef308b 100644 --- a/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java @@ -63,6 +63,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.ObjLongConsumer; @@ -378,19 +379,14 @@ protected void newResponseAsync( public void testConcurrentlyCompletionAndCancellation() throws InterruptedException { final var action = getTestTransportNodesAction(); - final CountDownLatch onCancelledLatch = new CountDownLatch(1); - final CancellableTask cancellableTask = new CancellableTask(randomLong(), "transport", "action", "", null, emptyMap()) { - @Override - protected void onCancelled() { - onCancelledLatch.countDown(); - } - }; + final CancellableTask cancellableTask = new CancellableTask(randomLong(), "transport", "action", "", null, emptyMap()); final PlainActionFuture future = new PlainActionFuture<>(); action.execute(cancellableTask, new TestNodesRequest(), future); final List nodeResponses = new ArrayList<>(); final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); + // Complete all but the last request for racing completion with cancellation for (int i = 0; i < capturedRequests.length - 1; i++) { final var capturedRequest = capturedRequests[i]; nodeResponses.add(completeOneRequest(capturedRequest)); @@ -398,9 +394,10 @@ protected void onCancelled() { final var raceBarrier = new CyclicBarrier(3); final var completedLatch = new CountDownLatch(1); + final var lastNodeResponseRef = new AtomicReference(); final Thread completeThread = new Thread(() -> { safeAwait(raceBarrier); - nodeResponses.add(completeOneRequest(capturedRequests[capturedRequests.length - 1])); + lastNodeResponseRef.set(completeOneRequest(capturedRequests[capturedRequests.length - 1])); completedLatch.countDown(); }); final Thread cancelThread = new Thread(() -> { @@ -421,9 +418,12 @@ protected void onCancelled() { assertNotNull("expect task cancellation exception, but got\n" + ExceptionsHelper.stackTrace(e), taskCancelledException); assertThat(e.getMessage(), containsString("task cancelled [simulated]")); assertTrue(cancellableTask.isCancelled()); - safeAwait(onCancelledLatch); // wait for the latch, the listener for releasing node responses is called before it - safeAwait(completedLatch); // Wait till all responses are gathered + // All previously captured responses are released due to cancellation assertTrue(nodeResponses.stream().allMatch(r -> r.hasReferences() == false)); + // Wait for the last response to be gathered and assert it is also released by either the concurrent cancellation or + // not tracked in onItemResponse at all due to already cancelled + safeAwait(completedLatch); + assertFalse(lastNodeResponseRef.get().hasReferences()); } completeThread.join(10_000); From fdf51b8e1afa6c6d3a26914d4b28af5430cd5db5 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Wed, 10 Sep 2025 12:10:54 +1000 Subject: [PATCH 3/3] use plainActionFuture --- .../support/nodes/TransportNodesActionTests.java | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java b/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java index 7fd5b64ef308b..662899821bc95 100644 --- a/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java @@ -58,12 +58,10 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.ObjLongConsumer; @@ -393,12 +391,10 @@ public void testConcurrentlyCompletionAndCancellation() throws InterruptedExcept } final var raceBarrier = new CyclicBarrier(3); - final var completedLatch = new CountDownLatch(1); - final var lastNodeResponseRef = new AtomicReference(); + final var lastResponseFuture = new PlainActionFuture(); final Thread completeThread = new Thread(() -> { safeAwait(raceBarrier); - lastNodeResponseRef.set(completeOneRequest(capturedRequests[capturedRequests.length - 1])); - completedLatch.countDown(); + lastResponseFuture.onResponse(completeOneRequest(capturedRequests[capturedRequests.length - 1])); }); final Thread cancelThread = new Thread(() -> { safeAwait(raceBarrier); @@ -422,8 +418,7 @@ public void testConcurrentlyCompletionAndCancellation() throws InterruptedExcept assertTrue(nodeResponses.stream().allMatch(r -> r.hasReferences() == false)); // Wait for the last response to be gathered and assert it is also released by either the concurrent cancellation or // not tracked in onItemResponse at all due to already cancelled - safeAwait(completedLatch); - assertFalse(lastNodeResponseRef.get().hasReferences()); + assertFalse(safeGet(lastResponseFuture).hasReferences()); } completeThread.join(10_000);