Skip to content

Commit 705606e

Browse files
ywangdgmjehovich
authored andcommitted
[Test] Wait for cancellation before assertions (elastic#134532)
Resolves: elastic#134277
1 parent 9324855 commit 705606e

File tree

2 files changed

+13
-4
lines changed

2 files changed

+13
-4
lines changed

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -543,9 +543,6 @@ tests:
543543
- class: org.elasticsearch.xpack.esql.expression.function.scalar.score.DecayTests
544544
method: "testEvaluateBlockWithNulls {TestCase=<integer>, <integer>, <integer>, <_source> #2}"
545545
issue: https://github.com/elastic/elasticsearch/issues/134679
546-
- class: org.elasticsearch.action.support.nodes.TransportNodesActionTests
547-
method: testConcurrentlyCompletionAndCancellation
548-
issue: https://github.com/elastic/elasticsearch/issues/134277
549546
- class: org.elasticsearch.search.vectors.CachingEnableFilterQueryTests
550547
method: testTermQuery
551548
issue: https://github.com/elastic/elasticsearch/issues/134723

server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import java.util.List;
5959
import java.util.Map;
6060
import java.util.Set;
61+
import java.util.concurrent.CountDownLatch;
6162
import java.util.concurrent.CyclicBarrier;
6263
import java.util.concurrent.Executor;
6364
import java.util.concurrent.TimeUnit;
@@ -377,7 +378,13 @@ protected void newResponseAsync(
377378
public void testConcurrentlyCompletionAndCancellation() throws InterruptedException {
378379
final var action = getTestTransportNodesAction();
379380

380-
final CancellableTask cancellableTask = new CancellableTask(randomLong(), "transport", "action", "", null, emptyMap());
381+
final CountDownLatch onCancelledLatch = new CountDownLatch(1);
382+
final CancellableTask cancellableTask = new CancellableTask(randomLong(), "transport", "action", "", null, emptyMap()) {
383+
@Override
384+
protected void onCancelled() {
385+
onCancelledLatch.countDown();
386+
}
387+
};
381388

382389
final PlainActionFuture<TestNodesResponse> future = new PlainActionFuture<>();
383390
action.execute(cancellableTask, new TestNodesRequest(), future);
@@ -414,6 +421,11 @@ public void testConcurrentlyCompletionAndCancellation() throws InterruptedExcept
414421
assertNotNull("expect task cancellation exception, but got\n" + ExceptionsHelper.stackTrace(e), taskCancelledException);
415422
assertThat(e.getMessage(), containsString("task cancelled [simulated]"));
416423
assertTrue(cancellableTask.isCancelled());
424+
// Wait for the latch, the listener for releasing node responses is called before it.
425+
// We need to wait for the latch because the cancellation may be detected in CancellableFanOut#onCompletion with
426+
// the responseHandled flag being true. The flag is set by the cancellation listener which is still in process of
427+
// draining existing responses.
428+
safeAwait(onCancelledLatch);
417429
// All previously captured responses are released due to cancellation
418430
assertTrue(nodeResponses.stream().allMatch(r -> r.hasReferences() == false));
419431
// Wait for the last response to be gathered and assert it is also released by either the concurrent cancellation or

0 commit comments

Comments
 (0)