Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -513,9 +513,6 @@ tests:
- class: org.elasticsearch.xpack.esql.qa.single_node.GenerativeForkIT
method: test {csv-spec:spatial_shapes.ConvertFromStringParseError}
issue: https://github.com/elastic/elasticsearch/issues/134254
- class: org.elasticsearch.action.support.nodes.TransportNodesActionTests
method: testConcurrentlyCompletionAndCancellation
issue: https://github.com/elastic/elasticsearch/issues/134277
- class: org.elasticsearch.xpack.core.datastreams.TimeSeriesFeatureSetUsageTests
method: testEqualsAndHashcode
issue: https://github.com/elastic/elasticsearch/issues/134332
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.ObjLongConsumer;

Expand Down Expand Up @@ -378,28 +379,26 @@ protected void newResponseAsync(
public void testConcurrentlyCompletionAndCancellation() throws InterruptedException {
final var action = getTestTransportNodesAction();

final CountDownLatch onCancelledLatch = new CountDownLatch(1);
final CancellableTask cancellableTask = new CancellableTask(randomLong(), "transport", "action", "", null, emptyMap()) {
@Override
protected void onCancelled() {
onCancelledLatch.countDown();
}
};
final CancellableTask cancellableTask = new CancellableTask(randomLong(), "transport", "action", "", null, emptyMap());

final PlainActionFuture<TestNodesResponse> future = new PlainActionFuture<>();
action.execute(cancellableTask, new TestNodesRequest(), future);

final List<TestNodeResponse> nodeResponses = new ArrayList<>();
final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear();
// Complete all but the last request for racing completion with cancellation
for (int i = 0; i < capturedRequests.length - 1; i++) {
final var capturedRequest = capturedRequests[i];
nodeResponses.add(completeOneRequest(capturedRequest));
}

final var raceBarrier = new CyclicBarrier(3);
final var completedLatch = new CountDownLatch(1);
final var lastNodeResponseRef = new AtomicReference<TestNodeResponse>();
final Thread completeThread = new Thread(() -> {
safeAwait(raceBarrier);
nodeResponses.add(completeOneRequest(capturedRequests[capturedRequests.length - 1]));
lastNodeResponseRef.set(completeOneRequest(capturedRequests[capturedRequests.length - 1]));
completedLatch.countDown();
});
final Thread cancelThread = new Thread(() -> {
safeAwait(raceBarrier);
Expand All @@ -419,8 +418,12 @@ protected void onCancelled() {
assertNotNull("expect task cancellation exception, but got\n" + ExceptionsHelper.stackTrace(e), taskCancelledException);
assertThat(e.getMessage(), containsString("task cancelled [simulated]"));
assertTrue(cancellableTask.isCancelled());
safeAwait(onCancelledLatch); // wait for the latch, the listener for releasing node responses is called before it
// All previously captured responses are released due to cancellation
assertTrue(nodeResponses.stream().allMatch(r -> r.hasReferences() == false));
// Wait for the last response to be gathered and assert it is also released by either the concurrent cancellation or
// not tracked in onItemResponse at all due to already cancelled
safeAwait(completedLatch);
assertFalse(lastNodeResponseRef.get().hasReferences());
}

completeThread.join(10_000);
Expand Down