Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -377,7 +378,13 @@ protected void newResponseAsync(
public void testConcurrentlyCompletionAndCancellation() throws InterruptedException {
final var action = getTestTransportNodesAction();

final CancellableTask cancellableTask = new CancellableTask(randomLong(), "transport", "action", "", null, emptyMap());
final CountDownLatch onCancelledLatch = new CountDownLatch(1);
final CancellableTask cancellableTask = new CancellableTask(randomLong(), "transport", "action", "", null, emptyMap()) {
@Override
protected void onCancelled() {
onCancelledLatch.countDown();
}
};

final PlainActionFuture<TestNodesResponse> future = new PlainActionFuture<>();
action.execute(cancellableTask, new TestNodesRequest(), future);
Expand Down Expand Up @@ -414,6 +421,11 @@ public void testConcurrentlyCompletionAndCancellation() throws InterruptedExcept
assertNotNull("expect task cancellation exception, but got\n" + ExceptionsHelper.stackTrace(e), taskCancelledException);
assertThat(e.getMessage(), containsString("task cancelled [simulated]"));
assertTrue(cancellableTask.isCancelled());
// Wait for the latch, the listener for releasing node responses is called before it.
// We need to wait for the latch because the cancellation may be detected in CancellableFanOut#onCompletion with
// the responseHandled flag being true. The flag is set by the cancellation listener which is still in process of
// draining existing responses.
safeAwait(onCancelledLatch);
Comment on lines +424 to +428
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It turns out we still need the latch. It was actually part of the original issue, i.e. the cancellation can comes in after all node responses are collected and right before the final response is sent. In this case, the final response is short circuited to be an exception while the cancellation listener is still doing work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm ok I think I see, so you mean the final response can be sent in between these two lines:

semaphore.release();
// finally, release refs to all the per-item listeners (without calling onItemFailure, so this is also fast)
cancellableTask.notifyIfCancelled(itemCancellationListener);

On reflection this seems kinda surprising if not an outright bug: we generally prefer to delay sending the response until everything is released.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not the above lines. The final response (cancellation) can be sent at this line

before the node responses are released by this line

Releasables.wrap(Iterators.map(drainedResponses.iterator(), r -> r::decRef)).close();

A potential alternative fix is to add a similar synchronized block to drain node responses right before the first line linked above. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah sorry this is all quite a tangle. I don't think it'll work to drain responses in onCompletion - we could already have drained them in the listener within addReleaseOnCancellationListener but still not quite released them yet.

On reflection it looks like there's other ways we can complete the final listener before releasing all the node-level responses, e.g. here:

try (var ignored = Releasables.wrap(Iterators.map(responses.iterator(), r -> r::decRef))) {
newResponseAsync(task, request, actionContext, responses, exceptions, l);
}

Let's leave this alone then; I appreciate the comment in the test calling out that this is slightly odd.

// All previously captured responses are released due to cancellation
assertTrue(nodeResponses.stream().allMatch(r -> r.hasReferences() == false));
// Wait for the last response to be gathered and assert it is also released by either the concurrent cancellation or
Expand Down