Skip to content

Commit 2ac771e

Browse files
committed
Another test timing issue
1 parent 128a8df commit 2ac771e

File tree

1 file changed

+55
-49
lines changed

1 file changed

+55
-49
lines changed

extras/queue-manager-replicated/core/src/test/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManagerTest.java

Lines changed: 55 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -135,13 +135,10 @@ void testReplicatedEventDeliveredToCorrectQueue() throws InterruptedException {
135135
ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, testEvent);
136136
queueManager.onReplicatedEvent(replicatedEvent);
137137

138-
Event dequeuedEvent;
139-
try {
140-
dequeuedEvent = queue.dequeueEventItem(100).getEvent();
141-
} catch (EventQueueClosedException e) {
142-
fail("Queue should not be closed");
143-
return;
144-
}
138+
// Use retry logic to handle async MainEventBusProcessor distribution
139+
EventQueueItem item = dequeueEventWithRetry(queue, 5000);
140+
assertNotNull(item, "Event should be available in queue");
141+
Event dequeuedEvent = item.getEvent();
145142
assertEquals(testEvent, dequeuedEvent);
146143
}
147144

@@ -164,13 +161,11 @@ void testReplicatedEventCreatesQueueIfNeeded() throws InterruptedException {
164161
// Verify the event was enqueued by dequeuing it
165162
// Need to tap() the MainQueue to get a ChildQueue for consumption
166163
EventQueue childQueue = queue.tap();
167-
Event dequeuedEvent;
168-
try {
169-
dequeuedEvent = childQueue.dequeueEventItem(100).getEvent();
170-
} catch (EventQueueClosedException e) {
171-
fail("Queue should not be closed");
172-
return;
173-
}
164+
165+
// Use retry logic to handle async MainEventBusProcessor distribution
166+
EventQueueItem item = dequeueEventWithRetry(childQueue, 5000);
167+
assertNotNull(item, "Event should be available in queue");
168+
Event dequeuedEvent = item.getEvent();
174169
assertEquals(testEvent, dequeuedEvent, "The replicated event should be enqueued in the newly created queue");
175170
}
176171

@@ -355,27 +350,24 @@ void testReplicatedEventProcessedWhenTaskActive() throws InterruptedException {
355350
// Verify no queue exists initially
356351
assertNull(queueManager.get(taskId));
357352

353+
// Create a ChildQueue BEFORE processing the replicated event
354+
// This ensures the ChildQueue exists when MainEventBusProcessor distributes the event
355+
EventQueue childQueue = queueManager.createOrTap(taskId);
356+
assertNotNull(childQueue, "ChildQueue should be created");
357+
358+
// Verify MainQueue was created
359+
EventQueue mainQueue = queueManager.get(taskId);
360+
assertNotNull(mainQueue, "MainQueue should exist after createOrTap");
361+
358362
// Process a replicated event for an active task
359363
ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, testEvent);
360364
queueManager.onReplicatedEvent(replicatedEvent);
361365

362-
// Queue should be created and event should be enqueued
363-
EventQueue queue = queueManager.get(taskId);
364-
assertNotNull(queue, "Queue should be created for active task");
365-
366-
// Verify the event was enqueued
367-
// Need to tap() the MainQueue to get a ChildQueue for consumption
368-
EventQueue childQueue = queue.tap();
369-
Event dequeuedEvent;
370-
try {
371-
// Wait up to 5 seconds for event (async processing needs time)
372-
EventQueueItem item = childQueue.dequeueEventItem(5000);
373-
assertNotNull(item, "Event should be available in queue");
374-
dequeuedEvent = item.getEvent();
375-
} catch (EventQueueClosedException e) {
376-
fail("Queue should not be closed");
377-
return;
378-
}
366+
// Verify the event was enqueued and distributed to our ChildQueue
367+
// Use retry logic to handle async MainEventBusProcessor distribution
368+
EventQueueItem item = dequeueEventWithRetry(childQueue, 5000);
369+
assertNotNull(item, "Event should be available in queue");
370+
Event dequeuedEvent = item.getEvent();
379371
assertEquals(testEvent, dequeuedEvent, "Event should be enqueued for active task");
380372
}
381373

@@ -498,26 +490,14 @@ void testReplicatedQueueClosedEventTerminatesConsumer() throws InterruptedExcept
498490
ReplicatedEventQueueItem replicatedClosedEvent = new ReplicatedEventQueueItem(taskId, closedEvent);
499491
queueManager.onReplicatedEvent(replicatedClosedEvent);
500492

501-
// Dequeue the normal event first
502-
EventQueueItem item1;
503-
try {
504-
item1 = queue.dequeueEventItem(100);
505-
} catch (EventQueueClosedException e) {
506-
fail("Should not throw on first dequeue");
507-
return;
508-
}
509-
assertNotNull(item1);
493+
// Dequeue the normal event first (use retry for async processing)
494+
EventQueueItem item1 = dequeueEventWithRetry(queue, 5000);
495+
assertNotNull(item1, "First event should be available");
510496
assertEquals(testEvent, item1.getEvent());
511497

512-
// Next dequeue should get the QueueClosedEvent
513-
EventQueueItem item2;
514-
try {
515-
item2 = queue.dequeueEventItem(100);
516-
} catch (EventQueueClosedException e) {
517-
fail("Should not throw on second dequeue, should return the event");
518-
return;
519-
}
520-
assertNotNull(item2);
498+
// Next dequeue should get the QueueClosedEvent (use retry for async processing)
499+
EventQueueItem item2 = dequeueEventWithRetry(queue, 5000);
500+
assertNotNull(item2, "QueueClosedEvent should be available");
521501
assertTrue(item2.getEvent() instanceof QueueClosedEvent,
522502
"Second event should be QueueClosedEvent");
523503
}
@@ -576,4 +556,30 @@ public void setActive(boolean active) {
576556
this.active = active;
577557
}
578558
}
559+
560+
/**
561+
* Helper method to dequeue an event with retry logic to handle async MainEventBusProcessor distribution.
562+
* This handles the race condition where MainEventBusProcessor may not have distributed the event yet.
563+
*
564+
* @param queue the queue to dequeue from
565+
* @param maxWaitMs maximum time to wait in milliseconds (default 5000)
566+
* @return the dequeued EventQueueItem, or null if timeout occurs
567+
*/
568+
private EventQueueItem dequeueEventWithRetry(EventQueue queue, long maxWaitMs) throws InterruptedException {
569+
EventQueueItem item = null;
570+
long pollStart = System.currentTimeMillis();
571+
while (item == null && (System.currentTimeMillis() - pollStart) < maxWaitMs) {
572+
try {
573+
item = queue.dequeueEventItem(100); // Short timeout per attempt
574+
if (item == null) {
575+
// No event yet, wait a bit before retry
576+
Thread.sleep(50);
577+
}
578+
} catch (EventQueueClosedException e) {
579+
// Queue closed, return null
580+
return null;
581+
}
582+
}
583+
return item;
584+
}
579585
}

0 commit comments

Comments
 (0)