@@ -135,13 +135,10 @@ void testReplicatedEventDeliveredToCorrectQueue() throws InterruptedException {
135135 ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem (taskId , testEvent );
136136 queueManager .onReplicatedEvent (replicatedEvent );
137137
138- Event dequeuedEvent ;
139- try {
140- dequeuedEvent = queue .dequeueEventItem (100 ).getEvent ();
141- } catch (EventQueueClosedException e ) {
142- fail ("Queue should not be closed" );
143- return ;
144- }
138+ // Use retry logic to handle async MainEventBusProcessor distribution
139+ EventQueueItem item = dequeueEventWithRetry (queue , 5000 );
140+ assertNotNull (item , "Event should be available in queue" );
141+ Event dequeuedEvent = item .getEvent ();
145142 assertEquals (testEvent , dequeuedEvent );
146143 }
147144
@@ -164,13 +161,11 @@ void testReplicatedEventCreatesQueueIfNeeded() throws InterruptedException {
164161 // Verify the event was enqueued by dequeuing it
165162 // Need to tap() the MainQueue to get a ChildQueue for consumption
166163 EventQueue childQueue = queue .tap ();
167- Event dequeuedEvent ;
168- try {
169- dequeuedEvent = childQueue .dequeueEventItem (100 ).getEvent ();
170- } catch (EventQueueClosedException e ) {
171- fail ("Queue should not be closed" );
172- return ;
173- }
164+
165+ // Use retry logic to handle async MainEventBusProcessor distribution
166+ EventQueueItem item = dequeueEventWithRetry (childQueue , 5000 );
167+ assertNotNull (item , "Event should be available in queue" );
168+ Event dequeuedEvent = item .getEvent ();
174169 assertEquals (testEvent , dequeuedEvent , "The replicated event should be enqueued in the newly created queue" );
175170 }
176171
@@ -355,27 +350,24 @@ void testReplicatedEventProcessedWhenTaskActive() throws InterruptedException {
355350 // Verify no queue exists initially
356351 assertNull (queueManager .get (taskId ));
357352
353+ // Create a ChildQueue BEFORE processing the replicated event
354+ // This ensures the ChildQueue exists when MainEventBusProcessor distributes the event
355+ EventQueue childQueue = queueManager .createOrTap (taskId );
356+ assertNotNull (childQueue , "ChildQueue should be created" );
357+
358+ // Verify MainQueue was created
359+ EventQueue mainQueue = queueManager .get (taskId );
360+ assertNotNull (mainQueue , "MainQueue should exist after createOrTap" );
361+
358362 // Process a replicated event for an active task
359363 ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem (taskId , testEvent );
360364 queueManager .onReplicatedEvent (replicatedEvent );
361365
362- // Queue should be created and event should be enqueued
363- EventQueue queue = queueManager .get (taskId );
364- assertNotNull (queue , "Queue should be created for active task" );
365-
366- // Verify the event was enqueued
367- // Need to tap() the MainQueue to get a ChildQueue for consumption
368- EventQueue childQueue = queue .tap ();
369- Event dequeuedEvent ;
370- try {
371- // Wait up to 5 seconds for event (async processing needs time)
372- EventQueueItem item = childQueue .dequeueEventItem (5000 );
373- assertNotNull (item , "Event should be available in queue" );
374- dequeuedEvent = item .getEvent ();
375- } catch (EventQueueClosedException e ) {
376- fail ("Queue should not be closed" );
377- return ;
378- }
366+ // Verify the event was enqueued and distributed to our ChildQueue
367+ // Use retry logic to handle async MainEventBusProcessor distribution
368+ EventQueueItem item = dequeueEventWithRetry (childQueue , 5000 );
369+ assertNotNull (item , "Event should be available in queue" );
370+ Event dequeuedEvent = item .getEvent ();
379371 assertEquals (testEvent , dequeuedEvent , "Event should be enqueued for active task" );
380372 }
381373
@@ -498,26 +490,14 @@ void testReplicatedQueueClosedEventTerminatesConsumer() throws InterruptedExcept
498490 ReplicatedEventQueueItem replicatedClosedEvent = new ReplicatedEventQueueItem (taskId , closedEvent );
499491 queueManager .onReplicatedEvent (replicatedClosedEvent );
500492
501- // Dequeue the normal event first
502- EventQueueItem item1 ;
503- try {
504- item1 = queue .dequeueEventItem (100 );
505- } catch (EventQueueClosedException e ) {
506- fail ("Should not throw on first dequeue" );
507- return ;
508- }
509- assertNotNull (item1 );
493+ // Dequeue the normal event first (use retry for async processing)
494+ EventQueueItem item1 = dequeueEventWithRetry (queue , 5000 );
495+ assertNotNull (item1 , "First event should be available" );
510496 assertEquals (testEvent , item1 .getEvent ());
511497
512- // Next dequeue should get the QueueClosedEvent
513- EventQueueItem item2 ;
514- try {
515- item2 = queue .dequeueEventItem (100 );
516- } catch (EventQueueClosedException e ) {
517- fail ("Should not throw on second dequeue, should return the event" );
518- return ;
519- }
520- assertNotNull (item2 );
498+ // Next dequeue should get the QueueClosedEvent (use retry for async processing)
499+ EventQueueItem item2 = dequeueEventWithRetry (queue , 5000 );
500+ assertNotNull (item2 , "QueueClosedEvent should be available" );
521501 assertTrue (item2 .getEvent () instanceof QueueClosedEvent ,
522502 "Second event should be QueueClosedEvent" );
523503 }
@@ -576,4 +556,30 @@ public void setActive(boolean active) {
576556 this .active = active ;
577557 }
578558 }
559+
560+ /**
561+ * Helper method to dequeue an event with retry logic to handle async MainEventBusProcessor distribution.
562+ * This handles the race condition where MainEventBusProcessor may not have distributed the event yet.
563+ *
564+ * @param queue the queue to dequeue from
565+ * @param maxWaitMs maximum time to wait in milliseconds (default 5000)
566+ * @return the dequeued EventQueueItem, or null if timeout occurs
567+ */
568+ private EventQueueItem dequeueEventWithRetry (EventQueue queue , long maxWaitMs ) throws InterruptedException {
569+ EventQueueItem item = null ;
570+ long pollStart = System .currentTimeMillis ();
571+ while (item == null && (System .currentTimeMillis () - pollStart ) < maxWaitMs ) {
572+ try {
573+ item = queue .dequeueEventItem (100 ); // Short timeout per attempt
574+ if (item == null ) {
575+ // No event yet, wait a bit before retry
576+ Thread .sleep (50 );
577+ }
578+ } catch (EventQueueClosedException e ) {
579+ // Queue closed, return null
580+ return null ;
581+ }
582+ }
583+ return item ;
584+ }
579585}
0 commit comments