Skip to content

Commit fdc1760

Browse files
committed
Gemini feedback and fixing tests
1 parent e25166d commit fdc1760

File tree

16 files changed

+578
-367
lines changed

16 files changed

+578
-367
lines changed

extras/queue-manager-replicated/core/src/test/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManagerTest.java

Lines changed: 129 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import io.a2a.spec.TaskState;
3434
import io.a2a.spec.TaskStatus;
3535
import io.a2a.spec.TaskStatusUpdateEvent;
36+
import org.junit.jupiter.api.AfterEach;
3637
import org.junit.jupiter.api.BeforeEach;
3738
import org.junit.jupiter.api.Test;
3839

@@ -66,6 +67,61 @@ void setUp() {
6667
.build();
6768
}
6869

70+
/**
71+
* Helper to create a test event with the specified taskId.
72+
* This ensures taskId consistency between queue creation and event creation.
73+
*/
74+
private TaskStatusUpdateEvent createEventForTask(String taskId) {
75+
return TaskStatusUpdateEvent.builder()
76+
.taskId(taskId)
77+
.contextId("test-context")
78+
.status(new TaskStatus(TaskState.SUBMITTED))
79+
.isFinal(false)
80+
.build();
81+
}
82+
83+
@AfterEach
84+
void tearDown() {
85+
if (mainEventBusProcessor != null) {
86+
mainEventBusProcessor.setCallback(null); // Clear any test callbacks
87+
EventQueueUtil.stop(mainEventBusProcessor);
88+
}
89+
mainEventBusProcessor = null;
90+
mainEventBus = null;
91+
queueManager = null;
92+
}
93+
94+
/**
95+
* Helper to wait for MainEventBusProcessor to process an event.
96+
* Replaces polling patterns with deterministic callback-based waiting.
97+
*
98+
* @param action the action that triggers event processing
99+
* @throws InterruptedException if waiting is interrupted
100+
* @throws AssertionError if processing doesn't complete within timeout
101+
*/
102+
private void waitForEventProcessing(Runnable action) throws InterruptedException {
103+
CountDownLatch processingLatch = new CountDownLatch(1);
104+
mainEventBusProcessor.setCallback(new io.a2a.server.events.MainEventBusProcessorCallback() {
105+
@Override
106+
public void onEventProcessed(String taskId, io.a2a.spec.Event event) {
107+
processingLatch.countDown();
108+
}
109+
110+
@Override
111+
public void onTaskFinalized(String taskId) {
112+
// Not needed for basic event processing wait
113+
}
114+
});
115+
116+
try {
117+
action.run();
118+
assertTrue(processingLatch.await(5, TimeUnit.SECONDS),
119+
"MainEventBusProcessor should have processed the event within timeout");
120+
} finally {
121+
mainEventBusProcessor.setCallback(null);
122+
}
123+
}
124+
69125
@Test
70126
void testReplicationStrategyTriggeredOnNormalEnqueue() throws InterruptedException {
71127
CountingReplicationStrategy strategy = new CountingReplicationStrategy();
@@ -119,48 +175,45 @@ void testReplicationStrategyWithCountingImplementation() throws InterruptedExcep
119175
@Test
120176
void testReplicatedEventDeliveredToCorrectQueue() throws InterruptedException {
121177
String taskId = "test-task-4";
178+
TaskStatusUpdateEvent eventForTask = createEventForTask(taskId); // Use matching taskId
122179
EventQueue queue = queueManager.createOrTap(taskId);
123180

124-
ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, testEvent);
125-
queueManager.onReplicatedEvent(replicatedEvent);
181+
ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, eventForTask);
126182

127-
Event dequeuedEvent;
128-
try {
129-
dequeuedEvent = queue.dequeueEventItem(100).getEvent();
130-
} catch (EventQueueClosedException e) {
131-
fail("Queue should not be closed");
132-
return;
133-
}
134-
assertEquals(testEvent, dequeuedEvent);
183+
// Use callback to wait for event processing
184+
EventQueueItem item = dequeueEventWithRetry(queue, () -> queueManager.onReplicatedEvent(replicatedEvent));
185+
assertNotNull(item, "Event should be available in queue");
186+
Event dequeuedEvent = item.getEvent();
187+
assertEquals(eventForTask, dequeuedEvent);
135188
}
136189

137190
@Test
138191
void testReplicatedEventCreatesQueueIfNeeded() throws InterruptedException {
139192
String taskId = "non-existent-task";
193+
TaskStatusUpdateEvent eventForTask = createEventForTask(taskId); // Use matching taskId
140194

141195
// Verify no queue exists initially
142196
assertNull(queueManager.get(taskId));
143197

144-
ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, testEvent);
145-
146-
// Process the replicated event
147-
assertDoesNotThrow(() -> queueManager.onReplicatedEvent(replicatedEvent));
148-
149-
// Verify that a queue was created and the event was enqueued
150-
EventQueue queue = queueManager.get(taskId);
151-
assertNotNull(queue, "Queue should be created when processing replicated event for non-existent task");
152-
153-
// Verify the event was enqueued by dequeuing it
154-
// Need to tap() the MainQueue to get a ChildQueue for consumption
155-
EventQueue childQueue = queue.tap();
156-
Event dequeuedEvent;
157-
try {
158-
dequeuedEvent = childQueue.dequeueEventItem(100).getEvent();
159-
} catch (EventQueueClosedException e) {
160-
fail("Queue should not be closed");
161-
return;
162-
}
163-
assertEquals(testEvent, dequeuedEvent, "The replicated event should be enqueued in the newly created queue");
198+
// Create a ChildQueue BEFORE processing the replicated event
199+
// This ensures the ChildQueue exists when MainEventBusProcessor distributes the event
200+
EventQueue childQueue = queueManager.createOrTap(taskId);
201+
assertNotNull(childQueue, "ChildQueue should be created");
202+
203+
// Verify MainQueue was created
204+
EventQueue mainQueue = queueManager.get(taskId);
205+
assertNotNull(mainQueue, "MainQueue should exist after createOrTap");
206+
207+
ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, eventForTask);
208+
209+
// Process the replicated event and wait for distribution
210+
// Use callback to wait for event processing
211+
EventQueueItem item = dequeueEventWithRetry(childQueue, () -> {
212+
assertDoesNotThrow(() -> queueManager.onReplicatedEvent(replicatedEvent));
213+
});
214+
assertNotNull(item, "Event should be available in queue");
215+
Event dequeuedEvent = item.getEvent();
216+
assertEquals(eventForTask, dequeuedEvent, "The replicated event should be enqueued in the newly created queue");
164217
}
165218

166219
@Test
@@ -340,29 +393,29 @@ void testReplicatedEventProcessedWhenTaskActive() throws InterruptedException {
340393
queueManager = new ReplicatedQueueManager(new CountingReplicationStrategy(), stateProvider, mainEventBus);
341394

342395
String taskId = "active-task";
396+
TaskStatusUpdateEvent eventForTask = createEventForTask(taskId); // Use matching taskId
343397

344398
// Verify no queue exists initially
345399
assertNull(queueManager.get(taskId));
346400

347-
// Process a replicated event for an active task
348-
ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, testEvent);
349-
queueManager.onReplicatedEvent(replicatedEvent);
401+
// Create a ChildQueue BEFORE processing the replicated event
402+
// This ensures the ChildQueue exists when MainEventBusProcessor distributes the event
403+
EventQueue childQueue = queueManager.createOrTap(taskId);
404+
assertNotNull(childQueue, "ChildQueue should be created");
350405

351-
// Queue should be created and event should be enqueued
352-
EventQueue queue = queueManager.get(taskId);
353-
assertNotNull(queue, "Queue should be created for active task");
406+
// Verify MainQueue was created
407+
EventQueue mainQueue = queueManager.get(taskId);
408+
assertNotNull(mainQueue, "MainQueue should exist after createOrTap");
354409

355-
// Verify the event was enqueued
356-
// Need to tap() the MainQueue to get a ChildQueue for consumption
357-
EventQueue childQueue = queue.tap();
358-
Event dequeuedEvent;
359-
try {
360-
dequeuedEvent = childQueue.dequeueEventItem(100).getEvent();
361-
} catch (EventQueueClosedException e) {
362-
fail("Queue should not be closed");
363-
return;
364-
}
365-
assertEquals(testEvent, dequeuedEvent, "Event should be enqueued for active task");
410+
// Process a replicated event for an active task
411+
ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, eventForTask);
412+
413+
// Verify the event was enqueued and distributed to our ChildQueue
414+
// Use callback to wait for event processing
415+
EventQueueItem item = dequeueEventWithRetry(childQueue, () -> queueManager.onReplicatedEvent(replicatedEvent));
416+
assertNotNull(item, "Event should be available in queue");
417+
Event dequeuedEvent = item.getEvent();
418+
assertEquals(eventForTask, dequeuedEvent, "Event should be enqueued for active task");
366419
}
367420

368421

@@ -474,36 +527,21 @@ void testQueueClosedEventJsonSerialization() throws Exception {
474527
@Test
475528
void testReplicatedQueueClosedEventTerminatesConsumer() throws InterruptedException {
476529
String taskId = "remote-close-test";
530+
TaskStatusUpdateEvent eventForTask = createEventForTask(taskId); // Use matching taskId
477531
EventQueue queue = queueManager.createOrTap(taskId);
478532

479-
// Enqueue a normal event
480-
queue.enqueueEvent(testEvent);
481-
482533
// Simulate receiving QueueClosedEvent from remote node
483534
QueueClosedEvent closedEvent = new QueueClosedEvent(taskId);
484535
ReplicatedEventQueueItem replicatedClosedEvent = new ReplicatedEventQueueItem(taskId, closedEvent);
485-
queueManager.onReplicatedEvent(replicatedClosedEvent);
486536

487-
// Dequeue the normal event first
488-
EventQueueItem item1;
489-
try {
490-
item1 = queue.dequeueEventItem(100);
491-
} catch (EventQueueClosedException e) {
492-
fail("Should not throw on first dequeue");
493-
return;
494-
}
495-
assertNotNull(item1);
496-
assertEquals(testEvent, item1.getEvent());
537+
// Dequeue the normal event first (use callback to wait for async processing)
538+
EventQueueItem item1 = dequeueEventWithRetry(queue, () -> queue.enqueueEvent(eventForTask));
539+
assertNotNull(item1, "First event should be available");
540+
assertEquals(eventForTask, item1.getEvent());
497541

498-
// Next dequeue should get the QueueClosedEvent
499-
EventQueueItem item2;
500-
try {
501-
item2 = queue.dequeueEventItem(100);
502-
} catch (EventQueueClosedException e) {
503-
fail("Should not throw on second dequeue, should return the event");
504-
return;
505-
}
506-
assertNotNull(item2);
542+
// Next dequeue should get the QueueClosedEvent (use callback to wait for async processing)
543+
EventQueueItem item2 = dequeueEventWithRetry(queue, () -> queueManager.onReplicatedEvent(replicatedClosedEvent));
544+
assertNotNull(item2, "QueueClosedEvent should be available");
507545
assertTrue(item2.getEvent() instanceof QueueClosedEvent,
508546
"Second event should be QueueClosedEvent");
509547
}
@@ -562,4 +600,25 @@ public void setActive(boolean active) {
562600
this.active = active;
563601
}
564602
}
603+
604+
/**
605+
* Helper method to dequeue an event after waiting for MainEventBusProcessor distribution.
606+
* Uses callback-based waiting instead of polling for deterministic synchronization.
607+
*
608+
* @param queue the queue to dequeue from
609+
* @param enqueueAction the action that enqueues the event (triggers event processing)
610+
* @return the dequeued EventQueueItem, or null if queue is closed
611+
*/
612+
private EventQueueItem dequeueEventWithRetry(EventQueue queue, Runnable enqueueAction) throws InterruptedException {
613+
// Wait for event to be processed and distributed
614+
waitForEventProcessing(enqueueAction);
615+
616+
// Event is now available, dequeue directly
617+
try {
618+
return queue.dequeueEventItem(100);
619+
} catch (EventQueueClosedException e) {
620+
// Queue closed, return null
621+
return null;
622+
}
623+
}
565624
}

extras/queue-manager-replicated/core/src/test/java/io/a2a/server/events/EventQueueUtil.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,6 @@
11
package io.a2a.server.events;
22

33
public class EventQueueUtil {
4-
// Shared MainEventBus for all tests - ensures events are properly distributed
5-
private static final MainEventBus TEST_EVENT_BUS = new MainEventBus();
6-
7-
// Since EventQueue.builder() is package protected, add a method to expose it
8-
public static EventQueue.EventQueueBuilder getEventQueueBuilder() {
9-
return EventQueue.builder(TEST_EVENT_BUS);
10-
}
11-
124
public static void start(MainEventBusProcessor processor) {
135
processor.start();
146
}

server-common/src/main/java/io/a2a/server/events/EventQueue.java

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -470,22 +470,29 @@ void childClosing(ChildQueue child, boolean immediate) {
470470
* Called by MainEventBusProcessor after TaskStore persistence.
471471
*/
472472
void distributeToChildren(EventQueueItem item) {
473-
synchronized (children) {
474-
int childCount = children.size();
475-
if (LOGGER.isDebugEnabled()) {
476-
LOGGER.debug("MainQueue[{}]: Distributing event {} to {} children",
477-
taskId, item.getEvent().getClass().getSimpleName(), childCount);
478-
}
479-
children.forEach(child -> {
480-
LOGGER.debug("MainQueue[{}]: Enqueueing event {} to child queue",
481-
taskId, item.getEvent().getClass().getSimpleName());
482-
child.internalEnqueueItem(item);
483-
});
484-
if (LOGGER.isDebugEnabled()) {
485-
LOGGER.debug("MainQueue[{}]: Completed distribution of {} to {} children",
486-
taskId, item.getEvent().getClass().getSimpleName(), childCount);
487-
}
473+
int childCount = children.size();
474+
if (LOGGER.isDebugEnabled()) {
475+
LOGGER.debug("MainQueue[{}]: Distributing event {} to {} children",
476+
taskId, item.getEvent().getClass().getSimpleName(), childCount);
488477
}
478+
children.forEach(child -> {
479+
LOGGER.debug("MainQueue[{}]: Enqueueing event {} to child queue",
480+
taskId, item.getEvent().getClass().getSimpleName());
481+
child.internalEnqueueItem(item);
482+
});
483+
if (LOGGER.isDebugEnabled()) {
484+
LOGGER.debug("MainQueue[{}]: Completed distribution of {} to {} children",
485+
taskId, item.getEvent().getClass().getSimpleName(), childCount);
486+
}
487+
}
488+
489+
/**
490+
* Release the semaphore after event processing is complete.
491+
* Called by MainEventBusProcessor in finally block to ensure release even on exceptions.
492+
* Balances the acquire() in enqueueEvent() - protects MainEventBus throughput.
493+
*/
494+
void releaseSemaphore() {
495+
semaphore.release();
489496
}
490497

491498
/**
@@ -557,17 +564,12 @@ public void enqueueItem(EventQueueItem item) {
557564

558565
private void internalEnqueueItem(EventQueueItem item) {
559566
// Internal method called by MainEventBusProcessor to add to local queue
567+
// Note: Semaphore is managed by parent MainQueue (acquire/release), not ChildQueue
560568
Event event = item.getEvent();
561569
if (isClosed()) {
562570
LOGGER.warn("ChildQueue is closed. Event will not be enqueued. {} {}", this, event);
563571
return;
564572
}
565-
try {
566-
semaphore.acquire();
567-
} catch (InterruptedException e) {
568-
Thread.currentThread().interrupt();
569-
throw new RuntimeException("Unable to acquire the semaphore to enqueue the event", e);
570-
}
571573
queue.add(item);
572574
LOGGER.debug("Enqueued event {} {}", event instanceof Throwable ? event.toString() : event, this);
573575
}
@@ -585,7 +587,7 @@ public EventQueueItem dequeueEventItem(int waitMilliSeconds) throws EventQueueCl
585587
if (item != null) {
586588
Event event = item.getEvent();
587589
LOGGER.debug("Dequeued event item (no wait) {} {}", this, event instanceof Throwable ? event.toString() : event);
588-
semaphore.release();
590+
// Note: Semaphore is managed by parent MainQueue, not released here
589591
}
590592
return item;
591593
}
@@ -595,7 +597,7 @@ public EventQueueItem dequeueEventItem(int waitMilliSeconds) throws EventQueueCl
595597
if (item != null) {
596598
Event event = item.getEvent();
597599
LOGGER.debug("Dequeued event item (waiting) {} {}", this, event instanceof Throwable ? event.toString() : event);
598-
semaphore.release();
600+
// Note: Semaphore is managed by parent MainQueue, not released here
599601
} else {
600602
LOGGER.trace("Dequeue timeout (null) from ChildQueue {}", System.identityHashCode(this));
601603
}
@@ -636,8 +638,11 @@ protected void doClose(boolean immediate) {
636638
super.doClose(immediate); // Sets closed flag
637639
if (immediate) {
638640
// Immediate close: clear pending events from local queue
641+
int clearedCount = queue.size();
639642
queue.clear();
640-
LOGGER.debug("Cleared ChildQueue for immediate close: {}", this);
643+
// Release semaphore permits for cleared events to prevent deadlock
644+
semaphore.release(clearedCount);
645+
LOGGER.debug("Cleared {} events from ChildQueue for immediate close: {}", clearedCount, this);
641646
}
642647
// For graceful close, let the queue drain naturally through normal consumption
643648
}

0 commit comments

Comments
 (0)