99
1010import java .util .ArrayList ;
1111import java .util .List ;
12+ import java .util .concurrent .CountDownLatch ;
1213import java .util .concurrent .Flow ;
14+ import java .util .concurrent .TimeUnit ;
1315import java .util .concurrent .atomic .AtomicReference ;
1416
1517import com .fasterxml .jackson .core .JsonProcessingException ;
@@ -297,13 +299,16 @@ public void run() {
297299 }
298300
299301 @ Test
300- public void testConsumeAllRaisesStoredException () {
302+ public void testConsumeAllRaisesStoredException () throws InterruptedException {
301303 // Set an error in the event consumer
302304 setEventConsumerError (new RuntimeException ("Stored error" ));
303305
304306 Flow .Publisher <Event > publisher = eventConsumer .consumeAll ();
305307 final AtomicReference <Throwable > receivedError = new AtomicReference <>();
306308
309+ final CountDownLatch errorLatch = new CountDownLatch (1 );
310+
311+
307312 publisher .subscribe (new Flow .Subscriber <>() {
308313 @ Override
309314 public void onSubscribe (Flow .Subscription subscription ) {
@@ -313,19 +318,25 @@ public void onSubscribe(Flow.Subscription subscription) {
313318 @ Override
314319 public void onNext (Event item ) {
315320 // Should not be called
321+ errorLatch .countDown ();
316322 }
317323
318324 @ Override
319325 public void onError (Throwable throwable ) {
320326 receivedError .set (throwable );
327+ errorLatch .countDown ();
321328 }
322329
323330 @ Override
324331 public void onComplete () {
325332 // Should not be called
333+ errorLatch .countDown ();
326334 }
327335 });
328336
337+ // Wait for error callback with timeout
338+ assertTrue (errorLatch .await (5 , TimeUnit .SECONDS ), "Test timed out waiting for onError callback." );
339+
329340 assertNotNull (receivedError .get ());
330341 assertEquals ("Stored error" , receivedError .get ().getMessage ());
331342 }
@@ -341,6 +352,7 @@ public void testConsumeAllStopsOnQueueClosed() throws Exception {
341352 Flow .Publisher <Event > publisher = consumer .consumeAll ();
342353 final List <Event > receivedEvents = new ArrayList <>();
343354 final AtomicReference <Boolean > completed = new AtomicReference <>(false );
355+ final CountDownLatch completionLatch = new CountDownLatch (1 );
344356
345357 publisher .subscribe (new Flow .Subscriber <>() {
346358 @ Override
@@ -356,14 +368,19 @@ public void onNext(Event item) {
356368 @ Override
357369 public void onError (Throwable throwable ) {
358370 // Should not be called
371+ completionLatch .countDown ();
359372 }
360373
361374 @ Override
362375 public void onComplete () {
363376 completed .set (true );
377+ completionLatch .countDown ();
364378 }
365379 });
366380
381+ // Wait for completion with timeout
382+ assertTrue (completionLatch .await (5 , TimeUnit .SECONDS ), "Test timed out waiting for onComplete callback." );
383+
367384 // Should complete immediately with no events
368385 assertTrue (completed .get ());
369386 assertEquals (0 , receivedEvents .size ());
@@ -385,6 +402,7 @@ public void testConsumeAllHandlesQueueClosedException() throws Exception {
385402 Flow .Publisher <Event > publisher = consumer .consumeAll ();
386403 final List <Event > receivedEvents = new ArrayList <>();
387404 final AtomicReference <Boolean > completed = new AtomicReference <>(false );
405+ final CountDownLatch completionLatch = new CountDownLatch (1 );
388406
389407 publisher .subscribe (new Flow .Subscriber <>() {
390408 @ Override
@@ -400,14 +418,19 @@ public void onNext(Event item) {
400418 @ Override
401419 public void onError (Throwable throwable ) {
402420 // Should not be called
421+ completionLatch .countDown ();
403422 }
404423
405424 @ Override
406425 public void onComplete () {
407426 completed .set (true );
427+ completionLatch .countDown ();
408428 }
409429 });
410430
431+ // Wait for completion with timeout
432+ assertTrue (completionLatch .await (5 , TimeUnit .SECONDS ), "Test timed out waiting for onComplete callback." );
433+
411434 // Should have received the message and completed
412435 assertTrue (completed .get ());
413436 assertEquals (1 , receivedEvents .size ());
@@ -426,7 +449,7 @@ private Throwable getEventConsumerError() {
426449 java .lang .reflect .Field errorField = EventConsumer .class .getDeclaredField ("error" );
427450 errorField .setAccessible (true );
428451 return (Throwable ) errorField .get (eventConsumer );
429- } catch (Exception e ) {
452+ } catch (NoSuchFieldException | IllegalAccessException e ) {
430453 throw new RuntimeException ("Failed to access error field" , e );
431454 }
432455 }
@@ -436,7 +459,7 @@ private void setEventConsumerError(Throwable error) {
436459 java .lang .reflect .Field errorField = EventConsumer .class .getDeclaredField ("error" );
437460 errorField .setAccessible (true );
438461 errorField .set (eventConsumer , error );
439- } catch (Exception e ) {
462+ } catch (NoSuchFieldException | IllegalAccessException e ) {
440463 throw new RuntimeException ("Failed to set error field" , e );
441464 }
442465 }
0 commit comments