2121import java .util .ArrayList ;
2222import java .util .HashSet ;
2323import java .util .List ;
24+ import java .util .Queue ;
2425import java .util .Set ;
25- import java .util .concurrent .CancellationException ;
26+ import java .util .concurrent .CompletableFuture ;
27+ import java .util .concurrent .ConcurrentLinkedQueue ;
2628import java .util .concurrent .CountDownLatch ;
2729import java .util .concurrent .ExecutionException ;
2830import java .util .concurrent .ExecutorService ;
3335import java .util .concurrent .atomic .AtomicBoolean ;
3436import java .util .concurrent .atomic .AtomicReference ;
3537import java .util .concurrent .atomic .LongAdder ;
38+ import java .util .stream .IntStream ;
3639
3740@ SuiteScopeTestCase
3841public class AsyncSearchConcurrentStatusIT extends AsyncSearchIntegTestCase {
@@ -91,7 +94,12 @@ public void testConcurrentStatusFetchWhileTaskCloses() throws Exception {
9194 // Wait for pollers to be active
9295 CountDownLatch warmed = new CountDownLatch (1 );
9396
94- PollerGroup pollers = createPollers (id , pollerThreads , aggName , stats , warmed );
97+ // Executor and coordination for pollers
98+ ExecutorService pollerExec = Executors .newFixedThreadPool (pollerThreads );
99+ AtomicBoolean running = new AtomicBoolean (true );
100+ Queue <Throwable > failures = new ConcurrentLinkedQueue <>();
101+
102+ CompletableFuture <Void > pollers = createPollers (id , pollerThreads , stats , warmed , pollerExec , running , failures );
95103
96104 // Wait until pollers are issuing requests (warming period)
97105 assertTrue ("pollers did not warm up in time" , warmed .await (timeout .millis (), TimeUnit .MILLISECONDS ));
@@ -117,15 +125,30 @@ public void testConcurrentStatusFetchWhileTaskCloses() throws Exception {
117125 } catch (TimeoutException e ) {
118126 consumer .cancel (true );
119127 fail (e , "Consumer thread did not finish within timeout" );
120- } catch (Exception ignored ) {} finally {
121-
128+ } catch (Exception ignored ) {
129+ // ignored
130+ } finally {
131+ // Stop pollers
132+ running .set (false );
122133 try {
123- pollers .stopAndAwait (timeout );
124- } catch (InterruptedException e ) {
134+ pollers .get (timeout .millis (), TimeUnit .MILLISECONDS );
135+ } catch (TimeoutException te ) {
136+ // The finally block will shut down the pollers forcibly
137+ } catch (ExecutionException ee ) {
138+ failures .add (ExceptionsHelper .unwrapCause (ee .getCause ()));
139+ } catch (InterruptedException ie ) {
125140 Thread .currentThread ().interrupt ();
126- fail ("Interrupted while stopping pollers: " + e .getMessage ());
141+ } finally {
142+ pollerExec .shutdownNow ();
143+ try {
144+ pollerExec .awaitTermination (timeout .millis (), TimeUnit .MILLISECONDS );
145+ } catch (InterruptedException ie ) {
146+ Thread .currentThread ().interrupt ();
147+ fail ("Interrupted while stopping pollers: " + ie .getMessage ());
148+ }
127149 }
128150
151+ // Shut down the consumer executor
129152 consumerExec .shutdown ();
130153 try {
131154 consumerExec .awaitTermination (timeout .millis (), TimeUnit .MILLISECONDS );
@@ -134,13 +157,12 @@ public void testConcurrentStatusFetchWhileTaskCloses() throws Exception {
134157 }
135158 }
136159
137- assertNoWorkerFailures (pollers );
160+ assertNoWorkerFailures (failures );
138161 assertStats (stats );
139162 }
140163 }
141164
142- private void assertNoWorkerFailures (PollerGroup pollers ) {
143- List <Throwable > failures = pollers .getFailures ();
165+ private void assertNoWorkerFailures (Queue <Throwable > failures ) {
144166 assertTrue (
145167 "Unexpected worker failures:\n " + failures .stream ().map (ExceptionsHelper ::stackTrace ).reduce ("" , (a , b ) -> a + "\n ---\n " + b ),
146168 failures .isEmpty ()
@@ -175,57 +197,68 @@ private void consumeAllResponses(SearchResponseIterator it, String aggName) thro
175197 }
176198 }
177199
178- private PollerGroup createPollers (String id , int threads , String aggName , PollStats stats , CountDownLatch warmed ) {
179- final ExecutorService exec = Executors .newFixedThreadPool (threads );
180- final List <Future <?>> futures = new ArrayList <>(threads );
181- final AtomicBoolean running = new AtomicBoolean (true );
182-
183- for (int i = 0 ; i < threads ; i ++) {
184- futures .add (exec .submit (() -> {
185- while (running .get ()) {
186- AsyncSearchResponse resp = null ;
187- try {
188- resp = getAsyncSearch (id );
189- stats .totalCalls .increment ();
200+ private CompletableFuture <Void > createPollers (
201+ String id ,
202+ int threads ,
203+ PollStats stats ,
204+ CountDownLatch warmed ,
205+ ExecutorService pollerExec ,
206+ AtomicBoolean running ,
207+ Queue <Throwable > failures
208+ ) {
209+ @ SuppressWarnings ("unchecked" )
210+ final CompletableFuture <Void >[] tasks = IntStream .range (0 , threads ).mapToObj (i -> CompletableFuture .runAsync (() -> {
211+ while (running .get ()) {
212+ AsyncSearchResponse resp = null ;
213+ try {
214+ resp = getAsyncSearch (id );
215+ stats .totalCalls .increment ();
190216
191- // Once enough requests have been sent, consider pollers "warmed".
192- if (stats .totalCalls .sum () >= threads ) {
193- warmed .countDown ();
194- }
217+ // Once enough requests have been sent, consider pollers "warmed".
218+ if (stats .totalCalls .sum () >= threads ) {
219+ warmed .countDown ();
220+ }
195221
196- if (resp .isRunning ()) {
197- stats .runningResponses .increment ();
198- } else {
199- // Success-only assertions: if reported completed, we must have a proper search response
200- assertNull ("Async search reported completed with failure" , resp .getFailure ());
201- assertNotNull ("Completed async search must carry a SearchResponse" , resp .getSearchResponse ());
202- assertNotNull ("Completed async search must have aggregations" , resp .getSearchResponse ().getAggregations ());
203- assertNotNull (
204- "Completed async search must contain the expected aggregation" ,
205- resp .getSearchResponse ().getAggregations ().get (aggName )
206- );
207- stats .completedResponses .increment ();
208- }
209- } catch (Exception e ) {
210- Throwable cause = ExceptionsHelper .unwrapCause (e );
211- if (cause instanceof ElasticsearchStatusException ) {
212- RestStatus status = ExceptionsHelper .status (cause );
213- if (status == RestStatus .GONE ) {
214- stats .gone410 .increment ();
215- }
222+ if (resp .isRunning ()) {
223+ stats .runningResponses .increment ();
224+ } else {
225+ // Success-only assertions: if reported completed, we must have a proper search response
226+ assertNull ("Async search reported completed with failure" , resp .getFailure ());
227+ assertNotNull ("Completed async search must carry a SearchResponse" , resp .getSearchResponse ());
228+ assertNotNull ("Completed async search must have aggregations" , resp .getSearchResponse ().getAggregations ());
229+ assertNotNull (
230+ "Completed async search must contain the expected aggregation" ,
231+ resp .getSearchResponse ().getAggregations ().get ("terms" )
232+ );
233+ stats .completedResponses .increment ();
234+ }
235+ } catch (Exception e ) {
236+ Throwable cause = ExceptionsHelper .unwrapCause (e );
237+ if (cause instanceof ElasticsearchStatusException ) {
238+ RestStatus status = ExceptionsHelper .status (cause );
239+ if (status == RestStatus .GONE ) {
240+ stats .gone410 .increment ();
216241 } else {
217242 stats .exceptions .increment ();
243+ failures .add (cause );
218244 }
219- } finally {
220- if (resp != null ) {
221- resp .decRef ();
222- }
245+ } else {
246+ stats .exceptions .increment ();
247+ failures .add (cause );
248+ }
249+ } finally {
250+ if (resp != null ) {
251+ resp .decRef ();
223252 }
224253 }
225- return null ;
226- }));
227- }
228- return new PollerGroup (exec , futures , running );
254+ }
255+ }, pollerExec ).whenComplete ((v , ex ) -> {
256+ if (ex != null ) {
257+ failures .add (ExceptionsHelper .unwrapCause (ex ));
258+ }
259+ })).toArray (CompletableFuture []::new );
260+
261+ return CompletableFuture .allOf (tasks );
229262 }
230263
231264 static final class PollStats {
@@ -235,41 +268,4 @@ static final class PollStats {
235268 final LongAdder exceptions = new LongAdder ();
236269 final LongAdder gone410 = new LongAdder ();
237270 }
238-
239- static class PollerGroup {
240- private final ExecutorService exec ;
241- private final List <Future <?>> futures ;
242- // The threads are created and running right away
243- private final AtomicBoolean running ;
244-
245- private PollerGroup (ExecutorService exec , List <Future <?>> futures , AtomicBoolean running ) {
246- this .exec = exec ;
247- this .futures = futures ;
248- this .running = running ;
249- }
250-
251- void stopAndAwait (TimeValue timeout ) throws InterruptedException {
252- running .set (false );
253- exec .shutdown ();
254- if (exec .awaitTermination (timeout .millis (), TimeUnit .MILLISECONDS ) == false ) {
255- exec .shutdownNow ();
256- exec .awaitTermination (timeout .millis (), TimeUnit .MILLISECONDS );
257- }
258- }
259-
260- List <Throwable > getFailures () {
261- List <Throwable > failures = new ArrayList <>();
262- for (Future <?> f : futures ) {
263- try {
264- f .get ();
265- } catch (CancellationException ignored ) {} catch (ExecutionException ee ) {
266- failures .add (ExceptionsHelper .unwrapCause (ee .getCause ()));
267- } catch (InterruptedException ie ) {
268- Thread .currentThread ().interrupt ();
269- if (failures .isEmpty ()) failures .add (ie );
270- }
271- }
272- return failures ;
273- }
274- }
275271}
0 commit comments