-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Drain responses on completion for TransportNodesAction #130303
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
a7daa50
6e8bfbe
670a175
4adb4a6
f990e43
3d6140e
3d07261
e06c981
fb71e89
9dcbbd0
8612b1a
db76ef8
fdf0b22
b521449
b38783d
d40d44b
5a0f186
976203b
6b938fa
7c2c42d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| pr: 130303 | ||
| summary: Drain responses on completion for `TransportNodesAction` | ||
| area: Distributed | ||
| type: bug | ||
| issues: [] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -57,8 +57,10 @@ | |
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Set; | ||
| import java.util.concurrent.CyclicBarrier; | ||
| import java.util.concurrent.Executor; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.function.Function; | ||
| import java.util.function.ObjLongConsumer; | ||
|
|
@@ -316,6 +318,62 @@ protected Object createActionContext(Task task, TestNodesRequest request) { | |
| assertTrue(cancellableTask.isCancelled()); // keep task alive | ||
| } | ||
|
|
||
| public void testCompletionAndCancellationShouldMutualExclusivelyHandleResponses() { | ||
| final var barrier = new CyclicBarrier(2); | ||
| final var action = new TestTransportNodesAction( | ||
| clusterService, | ||
| transportService, | ||
| new ActionFilters(Set.of()), | ||
| TestNodeRequest::new, | ||
| THREAD_POOL.executor(ThreadPool.Names.GENERIC) | ||
| ) { | ||
| @Override | ||
| protected void newResponseAsync( | ||
| Task task, | ||
| TestNodesRequest request, | ||
| Void unused, | ||
| List<TestNodeResponse> testNodeResponses, | ||
| List<FailedNodeException> failures, | ||
| ActionListener<TestNodesResponse> listener | ||
| ) { | ||
| final var waited = new AtomicBoolean(); | ||
| for (var response : testNodeResponses) { | ||
| if (waited.compareAndSet(false, true)) { | ||
|
||
| safeAwait(barrier); | ||
| safeAwait(barrier); | ||
| } | ||
| } | ||
| super.newResponseAsync(task, request, unused, testNodeResponses, failures, listener); | ||
| } | ||
| }; | ||
|
|
||
| final CancellableTask cancellableTask = new CancellableTask(randomLong(), "transport", "action", "", null, emptyMap()); | ||
| final PlainActionFuture<TestNodesResponse> future = new PlainActionFuture<>(); | ||
| action.execute(cancellableTask, new TestNodesRequest(), future); | ||
|
|
||
| final List<TestNodeResponse> nodeResponses = new ArrayList<>(); | ||
| for (var capturedRequest : transport.getCapturedRequestsAndClear()) { | ||
| final var response = new TestNodeResponse(capturedRequest.node()); | ||
| nodeResponses.add(response); | ||
| try { | ||
| transport.getTransportResponseHandler(capturedRequest.requestId()).handleResponse(response); | ||
| } finally { | ||
| response.decRef(); | ||
| } | ||
| } | ||
|
|
||
| // Wait for the overall response starts to processing the node responses in a loop | ||
| safeAwait(barrier); | ||
|
|
||
| // Cancel the task while the overall response is being processed | ||
| TaskCancelHelper.cancel(cancellableTask, "simulated"); | ||
|
|
||
| // Let the process continue to process the node responses and it should be successful | ||
| safeAwait(barrier); | ||
| safeGet(future); | ||
| assertTrue(nodeResponses.stream().allMatch(r -> r.hasReferences() == false)); | ||
| } | ||
|
|
||
| @BeforeClass | ||
| public static void startThreadPool() { | ||
| THREAD_POOL = new TestThreadPool(TransportNodesActionTests.class.getSimpleName()); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.