From a7daa5069447a97dc0737a07ae61c83a9c3caa2c Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Mon, 30 Jun 2025 16:21:25 +1000 Subject: [PATCH 01/15] Drain responses on completion for TransportNodesAction This PR ensures the node responses are copied and drained exclusively in onCompletion so that they do not get concurrently modified by cancellation. Resolves: #128852 --- .../support/nodes/TransportNodesAction.java | 9 ++- .../nodes/TransportNodesActionTests.java | 60 ++++++++++++++++++- 2 files changed, 66 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java b/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java index 96b1e1e1efca4..21898a9a9ee1b 100644 --- a/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java @@ -163,8 +163,13 @@ protected void onItemFailure(DiscoveryNode discoveryNode, Exception e) { protected CheckedConsumer, Exception> onCompletion() { // ref releases all happen-before here so no need to be synchronized return l -> { - try (var ignored = Releasables.wrap(Iterators.map(responses.iterator(), r -> r::decRef))) { - newResponseAsync(task, request, actionContext, responses, exceptions, l); + final List completedResponses; + synchronized (responses) { + completedResponses = List.copyOf(responses); + responses.clear(); + } + try (var ignored = Releasables.wrap(Iterators.map(completedResponses.iterator(), r -> r::decRef))) { + newResponseAsync(task, request, actionContext, completedResponses, exceptions, l); } }; } diff --git a/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java b/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java index f954019d5556d..36608360ec082 100644 --- a/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java @@ -57,8 +57,10 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.function.ObjLongConsumer; @@ -316,6 +318,62 @@ protected Object createActionContext(Task task, TestNodesRequest request) { assertTrue(cancellableTask.isCancelled()); // keep task alive } + public void testCompletionAndCancellationShouldMutualExclusivelyHandleResponses() { + final var barrier = new CyclicBarrier(2); + final var action = new TestTransportNodesAction( + clusterService, + transportService, + new ActionFilters(Set.of()), + TestNodeRequest::new, + THREAD_POOL.executor(ThreadPool.Names.GENERIC) + ) { + @Override + protected void newResponseAsync( + Task task, + TestNodesRequest request, + Void unused, + List testNodeResponses, + List failures, + ActionListener listener + ) { + final var waited = new AtomicBoolean(); + for (var response : testNodeResponses) { + if (waited.compareAndSet(false, true)) { + safeAwait(barrier); + safeAwait(barrier); + } + } + super.newResponseAsync(task, request, unused, testNodeResponses, failures, listener); + } + }; + + final CancellableTask cancellableTask = new CancellableTask(randomLong(), "transport", "action", "", null, emptyMap()); + final PlainActionFuture future = new PlainActionFuture<>(); + action.execute(cancellableTask, new TestNodesRequest(), future); + + final List nodeResponses = new ArrayList<>(); + for (var capturedRequest : transport.getCapturedRequestsAndClear()) { + final var response = new TestNodeResponse(capturedRequest.node()); + nodeResponses.add(response); + try { + transport.getTransportResponseHandler(capturedRequest.requestId()).handleResponse(response); + } finally { + response.decRef(); + } + } + + // Wait for the overall response starts to processing the node responses in a loop + safeAwait(barrier); + + // Cancel the task while the overall response is being processed + TaskCancelHelper.cancel(cancellableTask, "simulated"); + + // Let the process continue to process the node responses and it should be successful + safeAwait(barrier); + safeGet(future); + assertTrue(nodeResponses.stream().allMatch(r -> r.hasReferences() == false)); + } + @BeforeClass public static void startThreadPool() { THREAD_POOL = new TestThreadPool(TransportNodesActionTests.class.getSimpleName()); @@ -343,7 +401,7 @@ public void setUp() throws Exception { ); transportService.start(); transportService.acceptIncomingRequests(); - int numNodes = randomIntBetween(3, 10); + int numNodes = 5; // randomIntBetween(3, 10); DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder(); List discoveryNodes = new ArrayList<>(); for (int i = 0; i < numNodes; i++) { From 6e8bfbe9f8476eed9918599e4bf14c3e64a1e221 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Mon, 30 Jun 2025 16:27:16 +1000 Subject: [PATCH 02/15] Update docs/changelog/130303.yaml --- docs/changelog/130303.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/130303.yaml diff --git a/docs/changelog/130303.yaml b/docs/changelog/130303.yaml new file mode 100644 index 0000000000000..aff277f67eba1 --- /dev/null +++ b/docs/changelog/130303.yaml @@ -0,0 +1,5 @@ +pr: 130303 +summary: Drain responses on completion for `TransportNodesAction` +area: Distributed +type: bug +issues: [] From 670a1757e38c798332751c1e3c005177294f3821 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Mon, 30 Jun 2025 16:28:56 +1000 Subject: [PATCH 03/15] unwanted change --- .../action/support/nodes/TransportNodesActionTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java b/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java index 36608360ec082..7f657072e9e4c 100644 --- a/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java @@ -401,7 +401,7 @@ public void setUp() throws Exception { ); transportService.start(); transportService.acceptIncomingRequests(); - int numNodes = 5; // randomIntBetween(3, 10); + int numNodes = randomIntBetween(3, 10); DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder(); List discoveryNodes = new ArrayList<>(); for (int i = 0; i < numNodes; i++) { From 4adb4a60731e26057ad9f1bf26b4d9c7e43b7c98 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Mon, 30 Jun 2025 16:50:16 +1000 Subject: [PATCH 04/15] Use atomicBoolean --- .../support/nodes/TransportNodesAction.java | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java b/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java index 21898a9a9ee1b..02c41a5f9b415 100644 --- a/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java @@ -42,6 +42,7 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.core.Strings.format; @@ -99,6 +100,7 @@ protected void doExecute(Task task, NodesRequest request, ActionListener responses = new ArrayList<>(concreteNodes.length); final ArrayList exceptions = new ArrayList<>(0); + final AtomicBoolean responsesHandled = new AtomicBoolean(false); final TransportRequestOptions transportRequestOptions = TransportRequestOptions.timeout(request.timeout()); @@ -109,12 +111,14 @@ protected void doExecute(Task task, NodesRequest request, ActionListener { - final List drainedResponses; - synchronized (responses) { - drainedResponses = List.copyOf(responses); - responses.clear(); + if (responsesHandled.compareAndSet(false, true)) { + final List drainedResponses; + synchronized (responses) { + drainedResponses = List.copyOf(responses); + responses.clear(); + } + Releasables.wrap(Iterators.map(drainedResponses.iterator(), r -> r::decRef)).close(); } - Releasables.wrap(Iterators.map(drainedResponses.iterator(), r -> r::decRef)).close(); }); } } @@ -163,13 +167,12 @@ protected void onItemFailure(DiscoveryNode discoveryNode, Exception e) { protected CheckedConsumer, Exception> onCompletion() { // ref releases all happen-before here so no need to be synchronized return l -> { - final List completedResponses; - synchronized (responses) { - completedResponses = List.copyOf(responses); - responses.clear(); - } - try (var ignored = Releasables.wrap(Iterators.map(completedResponses.iterator(), r -> r::decRef))) { - newResponseAsync(task, request, actionContext, completedResponses, exceptions, l); + if (responsesHandled.compareAndSet(false, true)) { + try (var ignored = Releasables.wrap(Iterators.map(responses.iterator(), r -> r::decRef))) { + newResponseAsync(task, request, actionContext, responses, exceptions, l); + } + } else { + newResponseAsync(task, request, actionContext, List.of(), exceptions, l); } }; } From f990e43c4f03ed01f052b28736e7518df7b9253d Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Mon, 30 Jun 2025 17:01:45 +1000 Subject: [PATCH 05/15] move comment --- .../action/support/nodes/TransportNodesAction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java b/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java index 02c41a5f9b415..83f7a283d650f 100644 --- a/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java @@ -165,9 +165,9 @@ protected void onItemFailure(DiscoveryNode discoveryNode, Exception e) { @Override protected CheckedConsumer, Exception> onCompletion() { - // ref releases all happen-before here so no need to be synchronized return l -> { if (responsesHandled.compareAndSet(false, true)) { + // ref releases all happen-before here so no need to be synchronized try (var ignored = Releasables.wrap(Iterators.map(responses.iterator(), r -> r::decRef))) { newResponseAsync(task, request, actionContext, responses, exceptions, l); } From 3d6140ec3d19f4a80316742fb44c31ad71fc0355 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Mon, 30 Jun 2025 17:40:34 +1000 Subject: [PATCH 06/15] more edge case --- .../action/support/nodes/TransportNodesAction.java | 7 ++++++- .../support/nodes/TransportNodesActionTests.java | 14 +++++++++----- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java b/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java index 83f7a283d650f..d8a69ebc562d9 100644 --- a/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java @@ -31,6 +31,7 @@ import org.elasticsearch.core.Releasables; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestHandler; @@ -172,7 +173,11 @@ protected CheckedConsumer, Exception> onCompletion newResponseAsync(task, request, actionContext, responses, exceptions, l); } } else { - newResponseAsync(task, request, actionContext, List.of(), exceptions, l); + logger.debug("task cancelled after all responses were collected"); + assert task instanceof CancellableTask : "expect CancellableTask, but got: " + task; + final var cancellableTask = (CancellableTask) task; + assert cancellableTask.isCancelled(); + throw new TaskCancelledException("task cancelled [" + cancellableTask.getReasonCancelled() + "]"); } }; } diff --git a/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java b/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java index 7f657072e9e4c..8f9342b89145c 100644 --- a/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java @@ -60,7 +60,6 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.function.ObjLongConsumer; @@ -318,7 +317,7 @@ protected Object createActionContext(Task task, TestNodesRequest request) { assertTrue(cancellableTask.isCancelled()); // keep task alive } - public void testCompletionAndCancellationShouldMutualExclusivelyHandleResponses() { + public void testCompletionAndCancellationShouldMutualExclusivelyHandleResponses() throws Exception { final var barrier = new CyclicBarrier(2); final var action = new TestTransportNodesAction( clusterService, @@ -336,9 +335,10 @@ protected void newResponseAsync( List failures, ActionListener listener ) { - final var waited = new AtomicBoolean(); + boolean waited = false; for (var response : testNodeResponses) { - if (waited.compareAndSet(false, true)) { + if (waited == false) { + waited = true; safeAwait(barrier); safeAwait(barrier); } @@ -348,6 +348,9 @@ protected void newResponseAsync( }; final CancellableTask cancellableTask = new CancellableTask(randomLong(), "transport", "action", "", null, emptyMap()); + final var cancelledFuture = new PlainActionFuture(); + cancellableTask.addListener(() -> cancelledFuture.onResponse(null)); + final PlainActionFuture future = new PlainActionFuture<>(); action.execute(cancellableTask, new TestNodesRequest(), future); @@ -367,8 +370,9 @@ protected void newResponseAsync( // Cancel the task while the overall response is being processed TaskCancelHelper.cancel(cancellableTask, "simulated"); + safeGet(cancelledFuture); - // Let the process continue to process the node responses and it should be successful + // Let the process continue it should be successful since the cancellation came after processing started safeAwait(barrier); safeGet(future); assertTrue(nodeResponses.stream().allMatch(r -> r.hasReferences() == false)); From 3d072614922a25bf738610c7116134f0d7051dfd Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Mon, 30 Jun 2025 18:03:48 +1000 Subject: [PATCH 07/15] notify cancel --- .../action/support/nodes/TransportNodesAction.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java b/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java index d8a69ebc562d9..116a349a239dc 100644 --- a/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java @@ -96,6 +96,8 @@ protected void doExecute(Task task, NodesRequest request, ActionListener, Exception>> finalListener = + new ThreadedActionListener<>(finalExecutor, listener.delegateFailureAndWrap((l, c) -> c.accept(l))); new CancellableFanOut, Exception>>() { final ActionContext actionContext = createActionContext(task, request); @@ -177,7 +179,7 @@ protected CheckedConsumer, Exception> onCompletion assert task instanceof CancellableTask : "expect CancellableTask, but got: " + task; final var cancellableTask = (CancellableTask) task; assert cancellableTask.isCancelled(); - throw new TaskCancelledException("task cancelled [" + cancellableTask.getReasonCancelled() + "]"); + cancellableTask.notifyIfCancelled(finalListener); } }; } @@ -186,11 +188,7 @@ protected CheckedConsumer, Exception> onCompletion public String toString() { return actionName; } - }.run( - task, - Iterators.forArray(concreteNodes), - new ThreadedActionListener<>(finalExecutor, listener.delegateFailureAndWrap((l, c) -> c.accept(l))) - ); + }.run(task, Iterators.forArray(concreteNodes), finalListener); } private Writeable.Reader nodeResponseReader(DiscoveryNode discoveryNode) { From e06c981c1a992892dca082aeebfe2f35ae80b270 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Mon, 30 Jun 2025 08:12:29 +0000 Subject: [PATCH 08/15] [CI] Auto commit changes from spotless --- .../elasticsearch/action/support/nodes/TransportNodesAction.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java b/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java index 116a349a239dc..267bf479f0f24 100644 --- a/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java @@ -31,7 +31,6 @@ import org.elasticsearch.core.Releasables; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; -import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestHandler; From fb71e89b6a100010df9e7bc9f6699fbb9b9fe76e Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Mon, 30 Jun 2025 18:38:13 +1000 Subject: [PATCH 09/15] test concurrently completing and cancelling --- .../nodes/TransportNodesActionTests.java | 67 +++++++++++++++++-- 1 file changed, 60 insertions(+), 7 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java b/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java index 8f9342b89145c..2a61d8873bac8 100644 --- a/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java @@ -10,6 +10,7 @@ package org.elasticsearch.action.support.nodes; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.ActionFilters; @@ -67,7 +68,9 @@ import static java.util.Collections.emptyMap; import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; import static org.elasticsearch.test.ClusterServiceUtils.setState; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; import static org.mockito.Mockito.mock; public class TransportNodesActionTests extends ESTestCase { @@ -356,13 +359,7 @@ protected void newResponseAsync( final List nodeResponses = new ArrayList<>(); for (var capturedRequest : transport.getCapturedRequestsAndClear()) { - final var response = new TestNodeResponse(capturedRequest.node()); - nodeResponses.add(response); - try { - transport.getTransportResponseHandler(capturedRequest.requestId()).handleResponse(response); - } finally { - response.decRef(); - } + nodeResponses.add(completeOneRequest(capturedRequest)); } // Wait for the overall response starts to processing the node responses in a loop @@ -378,6 +375,62 @@ protected void newResponseAsync( assertTrue(nodeResponses.stream().allMatch(r -> r.hasReferences() == false)); } + public void testConcurrentlyCompletionAndCancellation() throws InterruptedException { + final var action = getTestTransportNodesAction(); + + final CancellableTask cancellableTask = new CancellableTask(randomLong(), "transport", "action", "", null, emptyMap()); + + final PlainActionFuture future = new PlainActionFuture<>(); + action.execute(cancellableTask, new TestNodesRequest(), future); + + final List nodeResponses = new ArrayList<>(); + final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); + for (int i = 0; i < capturedRequests.length - 1; i++) { + final var capturedRequest = capturedRequests[i]; + nodeResponses.add(completeOneRequest(capturedRequest)); + } + + final var raceBarrier = new CyclicBarrier(3); + final Thread completeThread = new Thread(() -> { + safeAwait(raceBarrier); + completeOneRequest(capturedRequests[capturedRequests.length - 1]); + }); + final Thread cancelThread = new Thread(() -> { + safeAwait(raceBarrier); + TaskCancelHelper.cancel(cancellableTask, "simulated"); + }); + completeThread.start(); + cancelThread.start(); + safeAwait(raceBarrier); + + try { + final var testNodesResponse = future.actionGet(SAFE_AWAIT_TIMEOUT); + assertThat(testNodesResponse.getNodes(), hasSize(capturedRequests.length)); + assertFalse(cancellableTask.isCancelled()); + } catch (Exception e) { + final var taskCancelledException = (TaskCancelledException) ExceptionsHelper.unwrap(e, TaskCancelledException.class); + assertNotNull("expect task cancellation exception, but got\n" + ExceptionsHelper.stackTrace(e), taskCancelledException); + assertThat(e.getMessage(), containsString("task cancelled [simulated]")); + assertTrue(cancellableTask.isCancelled()); + } + assertTrue(nodeResponses.stream().allMatch(r -> r.hasReferences() == false)); + + completeThread.join(10_000); + cancelThread.join(10_000); + assertFalse(completeThread.isAlive()); + assertFalse(cancelThread.isAlive()); + } + + private TestNodeResponse completeOneRequest(CapturingTransport.CapturedRequest capturedRequest) { + final var response = new TestNodeResponse(capturedRequest.node()); + try { + transport.getTransportResponseHandler(capturedRequest.requestId()).handleResponse(response); + } finally { + response.decRef(); + } + return response; + } + @BeforeClass public static void startThreadPool() { THREAD_POOL = new TestThreadPool(TransportNodesActionTests.class.getSimpleName()); From 9dcbbd0f7a98e682f59c8e38c0a33000587439f3 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Mon, 30 Jun 2025 18:39:28 +1000 Subject: [PATCH 10/15] tweak name --- .../action/support/nodes/TransportNodesActionTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java b/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java index 2a61d8873bac8..86af3ba7759bc 100644 --- a/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java @@ -320,7 +320,7 @@ protected Object createActionContext(Task task, TestNodesRequest request) { assertTrue(cancellableTask.isCancelled()); // keep task alive } - public void testCompletionAndCancellationShouldMutualExclusivelyHandleResponses() throws Exception { + public void testCompletionShouldNotBeInterferedByCancellationAfterProcessingBegins() { final var barrier = new CyclicBarrier(2); final var action = new TestTransportNodesAction( clusterService, From 8612b1a42bf8bef82abf2d06e17ca84b59484b1c Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Mon, 30 Jun 2025 18:56:16 +1000 Subject: [PATCH 11/15] better assertions --- .../nodes/TransportNodesActionTests.java | 36 ++++++++++++++----- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java b/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java index 86af3ba7759bc..6d0165fcdb053 100644 --- a/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java @@ -58,6 +58,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -320,7 +321,7 @@ protected Object createActionContext(Task task, TestNodesRequest request) { assertTrue(cancellableTask.isCancelled()); // keep task alive } - public void testCompletionShouldNotBeInterferedByCancellationAfterProcessingBegins() { + public void testCompletionShouldNotBeInterferedByCancellationAfterProcessingBegins() throws Exception { final var barrier = new CyclicBarrier(2); final var action = new TestTransportNodesAction( clusterService, @@ -357,9 +358,8 @@ protected void newResponseAsync( final PlainActionFuture future = new PlainActionFuture<>(); action.execute(cancellableTask, new TestNodesRequest(), future); - final List nodeResponses = new ArrayList<>(); for (var capturedRequest : transport.getCapturedRequestsAndClear()) { - nodeResponses.add(completeOneRequest(capturedRequest)); + completeOneRequest(capturedRequest); } // Wait for the overall response starts to processing the node responses in a loop @@ -371,14 +371,19 @@ protected void newResponseAsync( // Let the process continue it should be successful since the cancellation came after processing started safeAwait(barrier); - safeGet(future); - assertTrue(nodeResponses.stream().allMatch(r -> r.hasReferences() == false)); + assertResponseReleased(safeGet(future)); } public void testConcurrentlyCompletionAndCancellation() throws InterruptedException { final var action = getTestTransportNodesAction(); - final CancellableTask cancellableTask = new CancellableTask(randomLong(), "transport", "action", "", null, emptyMap()); + final CountDownLatch onCancelledLatch = new CountDownLatch(1); + final CancellableTask cancellableTask = new CancellableTask(randomLong(), "transport", "action", "", null, emptyMap()) { + @Override + protected void onCancelled() { + onCancelledLatch.countDown(); + } + }; final PlainActionFuture future = new PlainActionFuture<>(); action.execute(cancellableTask, new TestNodesRequest(), future); @@ -405,15 +410,17 @@ public void testConcurrentlyCompletionAndCancellation() throws InterruptedExcept try { final var testNodesResponse = future.actionGet(SAFE_AWAIT_TIMEOUT); - assertThat(testNodesResponse.getNodes(), hasSize(capturedRequests.length)); assertFalse(cancellableTask.isCancelled()); + assertThat(testNodesResponse.getNodes(), hasSize(capturedRequests.length)); + assertResponseReleased(testNodesResponse); } catch (Exception e) { final var taskCancelledException = (TaskCancelledException) ExceptionsHelper.unwrap(e, TaskCancelledException.class); assertNotNull("expect task cancellation exception, but got\n" + ExceptionsHelper.stackTrace(e), taskCancelledException); assertThat(e.getMessage(), containsString("task cancelled [simulated]")); assertTrue(cancellableTask.isCancelled()); + safeAwait(onCancelledLatch); + assertTrue(nodeResponses.stream().allMatch(r -> r.hasReferences() == false)); } - assertTrue(nodeResponses.stream().allMatch(r -> r.hasReferences() == false)); completeThread.join(10_000); cancelThread.join(10_000); @@ -421,6 +428,19 @@ public void testConcurrentlyCompletionAndCancellation() throws InterruptedExcept assertFalse(cancelThread.isAlive()); } + private void assertResponseReleased(TestNodesResponse response) { + final var allResponsesReleasedListener = new SubscribableListener(); + try (var listeners = new RefCountingListener(allResponsesReleasedListener)) { + response.addCloseListener(listeners.acquire()); + for (final var nodeResponse : response.getNodes()) { + nodeResponse.addCloseListener(listeners.acquire()); + } + } + safeAwait(allResponsesReleasedListener); + assertTrue(response.getNodes().stream().noneMatch(TestNodeResponse::hasReferences)); + assertFalse(response.hasReferences()); + } + private TestNodeResponse completeOneRequest(CapturingTransport.CapturedRequest capturedRequest) { final var response = new TestNodeResponse(capturedRequest.node()); try { From db76ef879f1c4cab4badf7f48972c002f273d9ad Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Mon, 30 Jun 2025 18:57:42 +1000 Subject: [PATCH 12/15] complete l --- .../action/support/nodes/TransportNodesAction.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java b/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java index 267bf479f0f24..181d935d97f37 100644 --- a/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java @@ -95,8 +95,6 @@ protected void doExecute(Task task, NodesRequest request, ActionListener, Exception>> finalListener = - new ThreadedActionListener<>(finalExecutor, listener.delegateFailureAndWrap((l, c) -> c.accept(l))); new CancellableFanOut, Exception>>() { final ActionContext actionContext = createActionContext(task, request); @@ -178,7 +176,7 @@ protected CheckedConsumer, Exception> onCompletion assert task instanceof CancellableTask : "expect CancellableTask, but got: " + task; final var cancellableTask = (CancellableTask) task; assert cancellableTask.isCancelled(); - cancellableTask.notifyIfCancelled(finalListener); + cancellableTask.notifyIfCancelled(l); } }; } @@ -187,7 +185,11 @@ protected CheckedConsumer, Exception> onCompletion public String toString() { return actionName; } - }.run(task, Iterators.forArray(concreteNodes), finalListener); + }.run( + task, + Iterators.forArray(concreteNodes), + new ThreadedActionListener<>(finalExecutor, listener.delegateFailureAndWrap((l, c) -> c.accept(l))) + ); } private Writeable.Reader nodeResponseReader(DiscoveryNode discoveryNode) { From fdf0b22c461e99aab0ef548a17cf8bc6eba2af66 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Mon, 30 Jun 2025 19:02:06 +1000 Subject: [PATCH 13/15] comment for loop --- .../action/support/nodes/TransportNodesActionTests.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java b/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java index 6d0165fcdb053..91bd33db55fb2 100644 --- a/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java @@ -340,6 +340,7 @@ protected void newResponseAsync( ActionListener listener ) { boolean waited = false; + // Process node responses in a loop and ensure no ConcurrentModificationException, see also #128852 for (var response : testNodeResponses) { if (waited == false) { waited = true; From b38783d8bfa1299dc719b42c4511077ed76d4fed Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Tue, 1 Jul 2025 12:48:56 +1000 Subject: [PATCH 14/15] remove assertion for task cancellation --- .../support/nodes/TransportNodesActionTests.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java b/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java index 91bd33db55fb2..82e2147cbc204 100644 --- a/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java @@ -340,7 +340,8 @@ protected void newResponseAsync( ActionListener listener ) { boolean waited = false; - // Process node responses in a loop and ensure no ConcurrentModificationException, see also #128852 + // Process node responses in a loop and ensure no ConcurrentModificationException will be thrown due to + // concurrent cancellation comes after the loops has started, see also #128852 for (var response : testNodeResponses) { if (waited == false) { waited = true; @@ -363,14 +364,13 @@ protected void newResponseAsync( completeOneRequest(capturedRequest); } - // Wait for the overall response starts to processing the node responses in a loop + // Wait for the overall response starts to processing the node responses in a loop and then cancel the task. + // It should not interfere with the node response processing. safeAwait(barrier); - - // Cancel the task while the overall response is being processed TaskCancelHelper.cancel(cancellableTask, "simulated"); safeGet(cancelledFuture); - // Let the process continue it should be successful since the cancellation came after processing started + // Let the process continue, and it should be successful safeAwait(barrier); assertResponseReleased(safeGet(future)); } @@ -399,7 +399,7 @@ protected void onCancelled() { final var raceBarrier = new CyclicBarrier(3); final Thread completeThread = new Thread(() -> { safeAwait(raceBarrier); - completeOneRequest(capturedRequests[capturedRequests.length - 1]); + nodeResponses.add(completeOneRequest(capturedRequests[capturedRequests.length - 1])); }); final Thread cancelThread = new Thread(() -> { safeAwait(raceBarrier); @@ -409,9 +409,9 @@ protected void onCancelled() { cancelThread.start(); safeAwait(raceBarrier); + // We expect either a successful response or a cancellation exception. All node responses should be released in both cases. try { final var testNodesResponse = future.actionGet(SAFE_AWAIT_TIMEOUT); - assertFalse(cancellableTask.isCancelled()); assertThat(testNodesResponse.getNodes(), hasSize(capturedRequests.length)); assertResponseReleased(testNodesResponse); } catch (Exception e) { @@ -419,7 +419,7 @@ protected void onCancelled() { assertNotNull("expect task cancellation exception, but got\n" + ExceptionsHelper.stackTrace(e), taskCancelledException); assertThat(e.getMessage(), containsString("task cancelled [simulated]")); assertTrue(cancellableTask.isCancelled()); - safeAwait(onCancelledLatch); + safeAwait(onCancelledLatch); // wait for the latch, the listener for releasing node responses is called before it assertTrue(nodeResponses.stream().allMatch(r -> r.hasReferences() == false)); } From 5a0f186c6abd7c1210030f3420920a1d25f9ae13 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Tue, 1 Jul 2025 13:06:46 +1000 Subject: [PATCH 15/15] wording --- .../action/support/nodes/TransportNodesActionTests.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java b/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java index 82e2147cbc204..d62432c2ee4cd 100644 --- a/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java @@ -341,7 +341,7 @@ protected void newResponseAsync( ) { boolean waited = false; // Process node responses in a loop and ensure no ConcurrentModificationException will be thrown due to - // concurrent cancellation comes after the loops has started, see also #128852 + // concurrent cancellation coming after the loop has started, see also #128852 for (var response : testNodeResponses) { if (waited == false) { waited = true; @@ -364,8 +364,8 @@ protected void newResponseAsync( completeOneRequest(capturedRequest); } - // Wait for the overall response starts to processing the node responses in a loop and then cancel the task. - // It should not interfere with the node response processing. + // Wait for the overall response to start processing the node responses in a loop and then cancel the task. + // The cancellation should not interfere with the node response processing. safeAwait(barrier); TaskCancelHelper.cancel(cancellableTask, "simulated"); safeGet(cancelledFuture);