Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/130303.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 130303
summary: Drain responses on completion for `TransportNodesAction`
area: Distributed
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.core.Releasables;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
Expand All @@ -42,6 +43,7 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.core.Strings.format;

Expand Down Expand Up @@ -99,6 +101,7 @@ protected void doExecute(Task task, NodesRequest request, ActionListener<NodesRe
final ActionContext actionContext = createActionContext(task, request);
final ArrayList<NodeResponse> responses = new ArrayList<>(concreteNodes.length);
final ArrayList<FailedNodeException> exceptions = new ArrayList<>(0);
final AtomicBoolean responsesHandled = new AtomicBoolean(false);

final TransportRequestOptions transportRequestOptions = TransportRequestOptions.timeout(request.timeout());

Expand All @@ -109,12 +112,14 @@ protected void doExecute(Task task, NodesRequest request, ActionListener<NodesRe
private void addReleaseOnCancellationListener() {
if (task instanceof CancellableTask cancellableTask) {
cancellableTask.addListener(() -> {
final List<NodeResponse> drainedResponses;
synchronized (responses) {
drainedResponses = List.copyOf(responses);
responses.clear();
if (responsesHandled.compareAndSet(false, true)) {
final List<NodeResponse> drainedResponses;
synchronized (responses) {
drainedResponses = List.copyOf(responses);
responses.clear();
}
Releasables.wrap(Iterators.map(drainedResponses.iterator(), r -> r::decRef)).close();
}
Releasables.wrap(Iterators.map(drainedResponses.iterator(), r -> r::decRef)).close();
});
}
}
Expand Down Expand Up @@ -161,10 +166,18 @@ protected void onItemFailure(DiscoveryNode discoveryNode, Exception e) {

@Override
protected CheckedConsumer<ActionListener<NodesResponse>, Exception> onCompletion() {
// ref releases all happen-before here so no need to be synchronized
return l -> {
try (var ignored = Releasables.wrap(Iterators.map(responses.iterator(), r -> r::decRef))) {
newResponseAsync(task, request, actionContext, responses, exceptions, l);
if (responsesHandled.compareAndSet(false, true)) {
// ref releases all happen-before here so no need to be synchronized
try (var ignored = Releasables.wrap(Iterators.map(responses.iterator(), r -> r::decRef))) {
newResponseAsync(task, request, actionContext, responses, exceptions, l);
}
} else {
logger.debug("task cancelled after all responses were collected");
assert task instanceof CancellableTask : "expect CancellableTask, but got: " + task;
final var cancellableTask = (CancellableTask) task;
assert cancellableTask.isCancelled();
throw new TaskCancelledException("task cancelled [" + cancellableTask.getReasonCancelled() + "]");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getReasonCancelled is racy according to its Javadocs: "May also be null if the task was just cancelled since we don't set the reason and the cancellation flag atomically." You need to use notifyIfCancelled to get the right behaviour here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Pushed 3d07261. Please let me know if it has used the right listener.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is to address the edge case commented here. But I struggle to write a test for it. Essentially we need the cancel to comes in after all node responses are collected but before the AtomicBoolean responsesHandled is checked. One option is to extract the creation of CancellableFanOut into its own protected method plus wrapping the returned value with a delgating CancellableFanOut. But this requires making the 4 protected methods in CancellableFanOut package private. I am a bit suspicous on whether this is the right path to go down. I am open to suggestions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be content with a test which concurrently completes the action and cancels it, and asserts that we always either get an exception or we get a successful response. I expect such a test would find the bug here pretty reliably.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool I added such a test, see fb71e89

}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -316,6 +317,67 @@ protected Object createActionContext(Task task, TestNodesRequest request) {
assertTrue(cancellableTask.isCancelled()); // keep task alive
}

public void testCompletionAndCancellationShouldMutualExclusivelyHandleResponses() throws Exception {
final var barrier = new CyclicBarrier(2);
final var action = new TestTransportNodesAction(
clusterService,
transportService,
new ActionFilters(Set.of()),
TestNodeRequest::new,
THREAD_POOL.executor(ThreadPool.Names.GENERIC)
) {
@Override
protected void newResponseAsync(
Task task,
TestNodesRequest request,
Void unused,
List<TestNodeResponse> testNodeResponses,
List<FailedNodeException> failures,
ActionListener<TestNodesResponse> listener
) {
boolean waited = false;
for (var response : testNodeResponses) {
if (waited == false) {
waited = true;
safeAwait(barrier);
safeAwait(barrier);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we not just do this?

Suggested change
boolean waited = false;
for (var response : testNodeResponses) {
if (waited == false) {
waited = true;
safeAwait(barrier);
safeAwait(barrier);
}
}
if (testNodeResponses.isEmpty() == false) {
safeAwait(barrier);
safeAwait(barrier);
}

Indeed can we not assert that testNodeResponses is nonempty in this test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The for-loop is to reproduce the ConcurrentModificationException reported in #128852. The test always passes without it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, could you add a comment to that effect or else this'll get "tidied up"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment added in fdf0b22

super.newResponseAsync(task, request, unused, testNodeResponses, failures, listener);
}
};

final CancellableTask cancellableTask = new CancellableTask(randomLong(), "transport", "action", "", null, emptyMap());
final var cancelledFuture = new PlainActionFuture<Void>();
cancellableTask.addListener(() -> cancelledFuture.onResponse(null));

final PlainActionFuture<TestNodesResponse> future = new PlainActionFuture<>();
action.execute(cancellableTask, new TestNodesRequest(), future);

final List<TestNodeResponse> nodeResponses = new ArrayList<>();
for (var capturedRequest : transport.getCapturedRequestsAndClear()) {
final var response = new TestNodeResponse(capturedRequest.node());
nodeResponses.add(response);
try {
transport.getTransportResponseHandler(capturedRequest.requestId()).handleResponse(response);
} finally {
response.decRef();
}
}

// Wait for the overall response starts to processing the node responses in a loop
safeAwait(barrier);

// Cancel the task while the overall response is being processed
TaskCancelHelper.cancel(cancellableTask, "simulated");
safeGet(cancelledFuture);

// Let the process continue it should be successful since the cancellation came after processing started
safeAwait(barrier);
safeGet(future);
assertTrue(nodeResponses.stream().allMatch(r -> r.hasReferences() == false));
}

@BeforeClass
public static void startThreadPool() {
THREAD_POOL = new TestThreadPool(TransportNodesActionTests.class.getSimpleName());
Expand Down
Loading