Skip to content

Conversation

@ywangd
Copy link
Member

@ywangd ywangd commented Jun 30, 2025

This PR ensures the node responses are copied and drained exclusively in onCompletion so that they do not get concurrently modified by cancellation.

Resolves: #128852

This PR ensures the node responses are copied and drained exclusively
in onCompletion so that they do not get concurrently modified by
cancellation.

Resolves: elastic#128852
@ywangd ywangd added >bug v9.0.0 v8.19.0 v9.1.0 :Distributed Coordination/Distributed A catch all label for anything in the Distributed Coordination area. Please avoid if you can. v9.2.0 labels Jun 30, 2025
@elasticsearchmachine elasticsearchmachine added the Team:Distributed Coordination Meta label for Distributed Coordination team label Jun 30, 2025
@elasticsearchmachine
Copy link
Collaborator

Hi @ywangd, I've created a changelog YAML for you.

@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-distributed-coordination (Team:Distributed Coordination)

@ywangd ywangd added v9.0.4 and removed v9.0.0 labels Jun 30, 2025
) {
final var waited = new AtomicBoolean();
for (var response : testNodeResponses) {
if (waited.compareAndSet(false, true)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of a convoluted way to wait on a nonempty list. There's no concurrency here so the compareAndSet is a bit of a sledgehammer. Can we just check testNodeResponses.isEmpty()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to wait for only the first response. You are right there is no need for AtomicBoolean. I changed it to a primitive boolean variable.

Comment on lines 338 to 345
boolean waited = false;
for (var response : testNodeResponses) {
if (waited == false) {
waited = true;
safeAwait(barrier);
safeAwait(barrier);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we not just do this?

Suggested change
boolean waited = false;
for (var response : testNodeResponses) {
if (waited == false) {
waited = true;
safeAwait(barrier);
safeAwait(barrier);
}
}
if (testNodeResponses.isEmpty() == false) {
safeAwait(barrier);
safeAwait(barrier);
}

Indeed can we not assert that testNodeResponses is nonempty in this test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The for-loop is to reproduce the ConcurrentModificationException reported in #128852. The test always passes without it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, could you add a comment to that effect or else this'll get "tidied up"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment added in fdf0b22

assert task instanceof CancellableTask : "expect CancellableTask, but got: " + task;
final var cancellableTask = (CancellableTask) task;
assert cancellableTask.isCancelled();
throw new TaskCancelledException("task cancelled [" + cancellableTask.getReasonCancelled() + "]");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getReasonCancelled is racy according to its Javadocs: "May also be null if the task was just cancelled since we don't set the reason and the cancellation flag atomically." You need to use notifyIfCancelled to get the right behaviour here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Pushed 3d07261. Please let me know if it has used the right listener.

Comment on lines 176 to 180
logger.debug("task cancelled after all responses were collected");
assert task instanceof CancellableTask : "expect CancellableTask, but got: " + task;
final var cancellableTask = (CancellableTask) task;
assert cancellableTask.isCancelled();
throw new TaskCancelledException("task cancelled [" + cancellableTask.getReasonCancelled() + "]");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is to address the edge case commented here. But I struggle to write a test for it. Essentially we need the cancel to comes in after all node responses are collected but before the AtomicBoolean responsesHandled is checked. One option is to extract the creation of CancellableFanOut into its own protected method plus wrapping the returned value with a delgating CancellableFanOut. But this requires making the 4 protected methods in CancellableFanOut package private. I am a bit suspicous on whether this is the right path to go down. I am open to suggestions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be content with a test which concurrently completes the action and cancels it, and asserts that we always either get an exception or we get a successful response. I expect such a test would find the bug here pretty reliably.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool I added such a test, see fb71e89

@ywangd ywangd requested a review from DaveCTurner June 30, 2025 08:40

try {
final var testNodesResponse = future.actionGet(SAFE_AWAIT_TIMEOUT);
assertFalse(cancellableTask.isCancelled());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this'll hold in general, we could cancel the task after the completion has already passed the point of no return and then the task's cancellation flag will be set even though it completed successfully.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good point, Thanks. I removed that in b38783d which also contains a few other tweaks.

@ywangd ywangd requested a review from DaveCTurner July 1, 2025 03:00
Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@ywangd
Copy link
Member Author

ywangd commented Jul 1, 2025

@elasticmachine update branch

@ywangd ywangd added auto-merge-without-approval Automatically merge pull request when CI checks pass (NB doesn't wait for reviews!) auto-backport Automatically create backport pull requests when merged labels Jul 1, 2025
@ywangd
Copy link
Member Author

ywangd commented Jul 2, 2025

@elasticmachine update branch

@ywangd
Copy link
Member Author

ywangd commented Jul 2, 2025

@elasticmachine update branch

@elasticsearchmachine elasticsearchmachine merged commit 74fd66c into elastic:main Jul 3, 2025
33 checks passed
@ywangd ywangd deleted the es-128852-fix branch July 3, 2025 00:26
ywangd added a commit to ywangd/elasticsearch that referenced this pull request Jul 3, 2025
This PR ensures the node responses are copied and drained exclusively in
onCompletion so that they do not get concurrently modified by
cancellation.

Resolves: elastic#128852
@elasticsearchmachine
Copy link
Collaborator

💚 Backport successful

Status Branch Result
8.19
9.1
9.0

ywangd added a commit to ywangd/elasticsearch that referenced this pull request Jul 3, 2025
This PR ensures the node responses are copied and drained exclusively in
onCompletion so that they do not get concurrently modified by
cancellation.

Resolves: elastic#128852
elasticsearchmachine pushed a commit that referenced this pull request Jul 3, 2025
…0514)

This PR ensures the node responses are copied and drained exclusively in
onCompletion so that they do not get concurrently modified by
cancellation.

Resolves: #128852
elasticsearchmachine pushed a commit that referenced this pull request Jul 3, 2025
…0513)

This PR ensures the node responses are copied and drained exclusively in
onCompletion so that they do not get concurrently modified by
cancellation.

Resolves: #128852
elasticsearchmachine pushed a commit that referenced this pull request Jul 3, 2025
…0515)

This PR ensures the node responses are copied and drained exclusively in
onCompletion so that they do not get concurrently modified by
cancellation.

Resolves: #128852
mridula-s109 pushed a commit to mridula-s109/elasticsearch that referenced this pull request Jul 3, 2025
This PR ensures the node responses are copied and drained exclusively in
onCompletion so that they do not get concurrently modified by
cancellation.

Resolves: elastic#128852
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

auto-backport Automatically create backport pull requests when merged auto-merge-without-approval Automatically merge pull request when CI checks pass (NB doesn't wait for reviews!) >bug :Distributed Coordination/Distributed A catch all label for anything in the Distributed Coordination area. Please avoid if you can. Team:Distributed Coordination Meta label for Distributed Coordination team v8.19.0 v9.0.4 v9.1.0 v9.2.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

ConcurrentModificationException in TransportClusterStatsAction

4 participants