Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -377,7 +378,13 @@ protected void newResponseAsync(
public void testConcurrentlyCompletionAndCancellation() throws InterruptedException {
final var action = getTestTransportNodesAction();

final CancellableTask cancellableTask = new CancellableTask(randomLong(), "transport", "action", "", null, emptyMap());
final CountDownLatch onCancelledLatch = new CountDownLatch(1);
final CancellableTask cancellableTask = new CancellableTask(randomLong(), "transport", "action", "", null, emptyMap()) {
@Override
protected void onCancelled() {
onCancelledLatch.countDown();
}
};

final PlainActionFuture<TestNodesResponse> future = new PlainActionFuture<>();
action.execute(cancellableTask, new TestNodesRequest(), future);
Expand Down Expand Up @@ -414,6 +421,11 @@ public void testConcurrentlyCompletionAndCancellation() throws InterruptedExcept
assertNotNull("expect task cancellation exception, but got\n" + ExceptionsHelper.stackTrace(e), taskCancelledException);
assertThat(e.getMessage(), containsString("task cancelled [simulated]"));
assertTrue(cancellableTask.isCancelled());
// Wait for the latch, the listener for releasing node responses is called before it.
// We need to wait for the latch because the cancellation may be detected in CancellableFanOut#onCompletion with
// the responseHandled flag being true. The flag is set by the cancellation listener which is still in process of
// draining existing responses.
safeAwait(onCancelledLatch);
// All previously captured responses are released due to cancellation
assertTrue(nodeResponses.stream().allMatch(r -> r.hasReferences() == false));
// Wait for the last response to be gathered and assert it is also released by either the concurrent cancellation or
Expand Down