@@ -31,7 +31,13 @@ public class ThreadedActionListenerTests extends ESTestCase {
3131
3232 public void testRejectionHandling () throws InterruptedException {
3333 final var listenerCount = between (1 , 1000 );
34- final var countdownLatch = new CountDownLatch (listenerCount );
34+
35+ // await the completion of some number of listeners before starting to shut the threadpool down ...
36+ final var startLatch = new CountDownLatch (between (1 , listenerCount ));
37+
38+ // ... but ensure that all the submitted listeners are completed somehow, even the ones submitted after the threadpool closes
39+ final var finishLatch = new CountDownLatch (listenerCount );
40+
3541 final var threadPool = new TestThreadPool (
3642 "test" ,
3743 Settings .EMPTY ,
@@ -64,9 +70,10 @@ public void testRejectionHandling() throws InterruptedException {
6470 threadPool .generic ().execute (() -> {
6571 for (int i = 0 ; i < listenerCount ; i ++) {
6672 final var pool = randomFrom (pools );
73+ final var forceExecution = (pool .equals ("fixed-bounded-queue" ) || pool .startsWith ("scaling" )) && randomBoolean ();
6774 final var listener = new ThreadedActionListener <Void >(
6875 threadPool .executor (pool ),
69- ( pool . equals ( "fixed-bounded-queue" ) || pool . startsWith ( "scaling" )) && rarely () ,
76+ forceExecution ,
7077 ActionListener .runAfter (new ActionListener <>() {
7178 @ Override
7279 public void onResponse (Void ignored ) {}
@@ -75,7 +82,9 @@ public void onResponse(Void ignored) {}
7582 public void onFailure (Exception e ) {
7683 assertNull (e .getCause ());
7784 if (e instanceof EsRejectedExecutionException esRejectedExecutionException ) {
78- assertTrue (esRejectedExecutionException .isExecutorShutdown ());
85+ if (pool .equals ("fixed-bounded-queue" ) == false || forceExecution ) {
86+ assertTrue (esRejectedExecutionException .isExecutorShutdown ());
87+ } // else we might have been rejected because of the queue bound and that's ok too
7988 if (e .getSuppressed ().length == 0 ) {
8089 return ;
8190 }
@@ -96,12 +105,14 @@ public void onFailure(Exception e) {
96105 }
97106
98107 }
99- }, countdownLatch ::countDown )
108+ }, finishLatch ::countDown )
100109 );
110+ startLatch .countDown ();
111+ Thread .yield ();
101112 synchronized (closeFlag ) {
102113 if (closeFlag .get () && shutdownUnsafePools .contains (pool )) {
103114 // closing, so tasks submitted to this pool may just be dropped
104- countdownLatch .countDown ();
115+ finishLatch .countDown ();
105116 } else if (randomBoolean ()) {
106117 listener .onResponse (null );
107118 } else {
@@ -111,14 +122,16 @@ public void onFailure(Exception e) {
111122 Thread .yield ();
112123 }
113124 });
125+ startLatch .countDown (); // sometimes shut down before the first listener
126+ safeAwait (startLatch );
114127 } finally {
115128 synchronized (closeFlag ) {
116129 assertTrue (closeFlag .compareAndSet (false , true ));
117130 threadPool .shutdown ();
118131 }
119132 assertTrue (threadPool .awaitTermination (10 , TimeUnit .SECONDS ));
120133 }
121- assertTrue ( countdownLatch . await ( 10 , TimeUnit . SECONDS ) );
134+ safeAwait ( finishLatch );
122135 }
123136
124137 public void testToString () {
0 commit comments