@@ -83,13 +83,45 @@ private TaskStatusUpdateEvent createEventForTask(String taskId) {
8383 @ AfterEach
8484 void tearDown () {
8585 if (mainEventBusProcessor != null ) {
86+ mainEventBusProcessor .setCallback (null ); // Clear any test callbacks
8687 EventQueueUtil .stop (mainEventBusProcessor );
8788 }
8889 mainEventBusProcessor = null ;
8990 mainEventBus = null ;
9091 queueManager = null ;
9192 }
9293
94+ /**
95+ * Helper to wait for MainEventBusProcessor to process an event.
96+ * Replaces polling patterns with deterministic callback-based waiting.
97+ *
98+ * @param action the action that triggers event processing
99+ * @throws InterruptedException if waiting is interrupted
100+ * @throws AssertionError if processing doesn't complete within timeout
101+ */
102+ private void waitForEventProcessing (Runnable action ) throws InterruptedException {
103+ CountDownLatch processingLatch = new CountDownLatch (1 );
104+ mainEventBusProcessor .setCallback (new io .a2a .server .events .MainEventBusProcessorCallback () {
105+ @ Override
106+ public void onEventProcessed (String taskId , io .a2a .spec .Event event ) {
107+ processingLatch .countDown ();
108+ }
109+
110+ @ Override
111+ public void onTaskFinalized (String taskId ) {
112+ // Not needed for basic event processing wait
113+ }
114+ });
115+
116+ try {
117+ action .run ();
118+ assertTrue (processingLatch .await (5 , TimeUnit .SECONDS ),
119+ "MainEventBusProcessor should have processed the event within timeout" );
120+ } finally {
121+ mainEventBusProcessor .setCallback (null );
122+ }
123+ }
124+
93125 @ Test
94126 void testReplicationStrategyTriggeredOnNormalEnqueue () throws InterruptedException {
95127 CountingReplicationStrategy strategy = new CountingReplicationStrategy ();
@@ -147,10 +179,9 @@ void testReplicatedEventDeliveredToCorrectQueue() throws InterruptedException {
147179 EventQueue queue = queueManager .createOrTap (taskId );
148180
149181 ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem (taskId , eventForTask );
150- queueManager .onReplicatedEvent (replicatedEvent );
151182
152- // Use retry logic to handle async MainEventBusProcessor distribution
153- EventQueueItem item = dequeueEventWithRetry (queue , 5000 );
183+ // Use callback to wait for event processing
184+ EventQueueItem item = dequeueEventWithRetry (queue , () -> queueManager . onReplicatedEvent ( replicatedEvent ) );
154185 assertNotNull (item , "Event should be available in queue" );
155186 Event dequeuedEvent = item .getEvent ();
156187 assertEquals (eventForTask , dequeuedEvent );
@@ -175,12 +206,11 @@ void testReplicatedEventCreatesQueueIfNeeded() throws InterruptedException {
175206
176207 ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem (taskId , eventForTask );
177208
178- // Process the replicated event
179- assertDoesNotThrow (() -> queueManager .onReplicatedEvent (replicatedEvent ));
180-
181- // Verify the event was enqueued and distributed to our ChildQueue
182- // Use retry logic to handle async MainEventBusProcessor distribution
183- EventQueueItem item = dequeueEventWithRetry (childQueue , 5000 );
209+ // Process the replicated event and wait for distribution
210+ // Use callback to wait for event processing
211+ EventQueueItem item = dequeueEventWithRetry (childQueue , () -> {
212+ assertDoesNotThrow (() -> queueManager .onReplicatedEvent (replicatedEvent ));
213+ });
184214 assertNotNull (item , "Event should be available in queue" );
185215 Event dequeuedEvent = item .getEvent ();
186216 assertEquals (eventForTask , dequeuedEvent , "The replicated event should be enqueued in the newly created queue" );
@@ -379,11 +409,10 @@ void testReplicatedEventProcessedWhenTaskActive() throws InterruptedException {
379409
380410 // Process a replicated event for an active task
381411 ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem (taskId , eventForTask );
382- queueManager .onReplicatedEvent (replicatedEvent );
383412
384413 // Verify the event was enqueued and distributed to our ChildQueue
385- // Use retry logic to handle async MainEventBusProcessor distribution
386- EventQueueItem item = dequeueEventWithRetry (childQueue , 5000 );
414+ // Use callback to wait for event processing
415+ EventQueueItem item = dequeueEventWithRetry (childQueue , () -> queueManager . onReplicatedEvent ( replicatedEvent ) );
387416 assertNotNull (item , "Event should be available in queue" );
388417 Event dequeuedEvent = item .getEvent ();
389418 assertEquals (eventForTask , dequeuedEvent , "Event should be enqueued for active task" );
@@ -501,21 +530,17 @@ void testReplicatedQueueClosedEventTerminatesConsumer() throws InterruptedExcept
501530 TaskStatusUpdateEvent eventForTask = createEventForTask (taskId ); // Use matching taskId
502531 EventQueue queue = queueManager .createOrTap (taskId );
503532
504- // Enqueue a normal event
505- queue .enqueueEvent (eventForTask );
506-
507533 // Simulate receiving QueueClosedEvent from remote node
508534 QueueClosedEvent closedEvent = new QueueClosedEvent (taskId );
509535 ReplicatedEventQueueItem replicatedClosedEvent = new ReplicatedEventQueueItem (taskId , closedEvent );
510- queueManager .onReplicatedEvent (replicatedClosedEvent );
511536
512- // Dequeue the normal event first (use retry for async processing)
513- EventQueueItem item1 = dequeueEventWithRetry (queue , 5000 );
537+ // Dequeue the normal event first (use callback to wait for async processing)
538+ EventQueueItem item1 = dequeueEventWithRetry (queue , () -> queue . enqueueEvent ( eventForTask ) );
514539 assertNotNull (item1 , "First event should be available" );
515540 assertEquals (eventForTask , item1 .getEvent ());
516541
517- // Next dequeue should get the QueueClosedEvent (use retry for async processing)
518- EventQueueItem item2 = dequeueEventWithRetry (queue , 5000 );
542+ // Next dequeue should get the QueueClosedEvent (use callback to wait for async processing)
543+ EventQueueItem item2 = dequeueEventWithRetry (queue , () -> queueManager . onReplicatedEvent ( replicatedClosedEvent ) );
519544 assertNotNull (item2 , "QueueClosedEvent should be available" );
520545 assertTrue (item2 .getEvent () instanceof QueueClosedEvent ,
521546 "Second event should be QueueClosedEvent" );
@@ -577,28 +602,23 @@ public void setActive(boolean active) {
577602 }
578603
579604 /**
580- * Helper method to dequeue an event with retry logic to handle async MainEventBusProcessor distribution.
581- * This handles the race condition where MainEventBusProcessor may not have distributed the event yet .
605+ * Helper method to dequeue an event after waiting for MainEventBusProcessor distribution.
606+ * Uses callback-based waiting instead of polling for deterministic synchronization .
582607 *
583608 * @param queue the queue to dequeue from
584- * @param maxWaitMs maximum time to wait in milliseconds (default 5000 )
585- * @return the dequeued EventQueueItem, or null if timeout occurs
609+ * @param enqueueAction the action that enqueues the event (triggers event processing )
610+ * @return the dequeued EventQueueItem, or null if queue is closed
586611 */
587- private EventQueueItem dequeueEventWithRetry (EventQueue queue , long maxWaitMs ) throws InterruptedException {
588- EventQueueItem item = null ;
589- long pollStart = System .currentTimeMillis ();
590- while (item == null && (System .currentTimeMillis () - pollStart ) < maxWaitMs ) {
591- try {
592- item = queue .dequeueEventItem (100 ); // Short timeout per attempt
593- if (item == null ) {
594- // No event yet, wait a bit before retry
595- Thread .sleep (50 );
596- }
597- } catch (EventQueueClosedException e ) {
598- // Queue closed, return null
599- return null ;
600- }
612+ private EventQueueItem dequeueEventWithRetry (EventQueue queue , Runnable enqueueAction ) throws InterruptedException {
613+ // Wait for event to be processed and distributed
614+ waitForEventProcessing (enqueueAction );
615+
616+ // Event is now available, dequeue directly
617+ try {
618+ return queue .dequeueEventItem (100 );
619+ } catch (EventQueueClosedException e ) {
620+ // Queue closed, return null
621+ return null ;
601622 }
602- return item ;
603623 }
604624}
0 commit comments