@@ -340,7 +340,8 @@ protected void newResponseAsync(
340340 ActionListener <TestNodesResponse > listener
341341 ) {
342342 boolean waited = false ;
343- // Process node responses in a loop and ensure no ConcurrentModificationException, see also #128852
343+ // Process node responses in a loop and ensure no ConcurrentModificationException will be thrown due to
344+ // concurrent cancellation comes after the loops has started, see also #128852
344345 for (var response : testNodeResponses ) {
345346 if (waited == false ) {
346347 waited = true ;
@@ -363,14 +364,13 @@ protected void newResponseAsync(
363364 completeOneRequest (capturedRequest );
364365 }
365366
366- // Wait for the overall response starts to processing the node responses in a loop
367+ // Wait for the overall response starts to processing the node responses in a loop and then cancel the task.
368+ // It should not interfere with the node response processing.
367369 safeAwait (barrier );
368-
369- // Cancel the task while the overall response is being processed
370370 TaskCancelHelper .cancel (cancellableTask , "simulated" );
371371 safeGet (cancelledFuture );
372372
373- // Let the process continue it should be successful since the cancellation came after processing started
373+ // Let the process continue, and it should be successful
374374 safeAwait (barrier );
375375 assertResponseReleased (safeGet (future ));
376376 }
@@ -399,7 +399,7 @@ protected void onCancelled() {
399399 final var raceBarrier = new CyclicBarrier (3 );
400400 final Thread completeThread = new Thread (() -> {
401401 safeAwait (raceBarrier );
402- completeOneRequest (capturedRequests [capturedRequests .length - 1 ]);
402+ nodeResponses . add ( completeOneRequest (capturedRequests [capturedRequests .length - 1 ]) );
403403 });
404404 final Thread cancelThread = new Thread (() -> {
405405 safeAwait (raceBarrier );
@@ -409,17 +409,17 @@ protected void onCancelled() {
409409 cancelThread .start ();
410410 safeAwait (raceBarrier );
411411
412+ // We expect either a successful response or a cancellation exception. All node responses should be released in both cases.
412413 try {
413414 final var testNodesResponse = future .actionGet (SAFE_AWAIT_TIMEOUT );
414- assertFalse (cancellableTask .isCancelled ());
415415 assertThat (testNodesResponse .getNodes (), hasSize (capturedRequests .length ));
416416 assertResponseReleased (testNodesResponse );
417417 } catch (Exception e ) {
418418 final var taskCancelledException = (TaskCancelledException ) ExceptionsHelper .unwrap (e , TaskCancelledException .class );
419419 assertNotNull ("expect task cancellation exception, but got\n " + ExceptionsHelper .stackTrace (e ), taskCancelledException );
420420 assertThat (e .getMessage (), containsString ("task cancelled [simulated]" ));
421421 assertTrue (cancellableTask .isCancelled ());
422- safeAwait (onCancelledLatch );
422+ safeAwait (onCancelledLatch ); // wait for the latch, the listener for releasing node responses is called before it
423423 assertTrue (nodeResponses .stream ().allMatch (r -> r .hasReferences () == false ));
424424 }
425425
0 commit comments