|
58 | 58 | import java.util.List; |
59 | 59 | import java.util.Map; |
60 | 60 | import java.util.Set; |
| 61 | +import java.util.concurrent.CountDownLatch; |
61 | 62 | import java.util.concurrent.CyclicBarrier; |
62 | 63 | import java.util.concurrent.Executor; |
63 | 64 | import java.util.concurrent.TimeUnit; |
@@ -377,7 +378,13 @@ protected void newResponseAsync( |
377 | 378 | public void testConcurrentlyCompletionAndCancellation() throws InterruptedException { |
378 | 379 | final var action = getTestTransportNodesAction(); |
379 | 380 |
|
380 | | - final CancellableTask cancellableTask = new CancellableTask(randomLong(), "transport", "action", "", null, emptyMap()); |
| 381 | + final CountDownLatch onCancelledLatch = new CountDownLatch(1); |
| 382 | + final CancellableTask cancellableTask = new CancellableTask(randomLong(), "transport", "action", "", null, emptyMap()) { |
| 383 | + @Override |
| 384 | + protected void onCancelled() { |
| 385 | + onCancelledLatch.countDown(); |
| 386 | + } |
| 387 | + }; |
381 | 388 |
|
382 | 389 | final PlainActionFuture<TestNodesResponse> future = new PlainActionFuture<>(); |
383 | 390 | action.execute(cancellableTask, new TestNodesRequest(), future); |
@@ -414,6 +421,11 @@ public void testConcurrentlyCompletionAndCancellation() throws InterruptedExcept |
414 | 421 | assertNotNull("expect task cancellation exception, but got\n" + ExceptionsHelper.stackTrace(e), taskCancelledException); |
415 | 422 | assertThat(e.getMessage(), containsString("task cancelled [simulated]")); |
416 | 423 | assertTrue(cancellableTask.isCancelled()); |
| 424 | + // Wait for the latch, the listener for releasing node responses is called before it. |
| 425 | + // We need to wait for the latch because the cancellation may be detected in CancellableFanOut#onCompletion with |
| 426 | + // the responseHandled flag being true. The flag is set by the cancellation listener which is still in process of |
| 427 | + // draining existing responses. |
| 428 | + safeAwait(onCancelledLatch); |
417 | 429 | // All previously captured responses are released due to cancellation |
418 | 430 | assertTrue(nodeResponses.stream().allMatch(r -> r.hasReferences() == false)); |
419 | 431 | // Wait for the last response to be gathered and assert it is also released by either the concurrent cancellation or |
|
0 commit comments