Skip to content

Commit cfd539f

Browse files
committed
Gemini feedback and fixing tests
1 parent 5c34a71 commit cfd539f

File tree

16 files changed

+578
-366
lines changed

16 files changed

+578
-366
lines changed

extras/queue-manager-replicated/core/src/test/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManagerTest.java

Lines changed: 130 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import java.util.concurrent.atomic.AtomicInteger;
1818

1919
import io.a2a.extras.common.events.TaskFinalizedEvent;
20+
import io.a2a.json.JsonUtil;
2021
import io.a2a.server.events.EventQueue;
2122
import io.a2a.server.events.EventQueueClosedException;
2223
import io.a2a.server.events.EventQueueItem;
@@ -32,7 +33,7 @@
3233
import io.a2a.spec.TaskState;
3334
import io.a2a.spec.TaskStatus;
3435
import io.a2a.spec.TaskStatusUpdateEvent;
35-
import io.a2a.json.JsonUtil;
36+
import org.junit.jupiter.api.AfterEach;
3637
import org.junit.jupiter.api.BeforeEach;
3738
import org.junit.jupiter.api.Test;
3839

@@ -66,6 +67,61 @@ void setUp() {
6667
.build();
6768
}
6869

70+
/**
71+
* Helper to create a test event with the specified taskId.
72+
* This ensures taskId consistency between queue creation and event creation.
73+
*/
74+
private TaskStatusUpdateEvent createEventForTask(String taskId) {
75+
return new TaskStatusUpdateEvent.Builder()
76+
.taskId(taskId)
77+
.contextId("test-context")
78+
.status(new TaskStatus(TaskState.SUBMITTED))
79+
.isFinal(false)
80+
.build();
81+
}
82+
83+
@AfterEach
84+
void tearDown() {
85+
if (mainEventBusProcessor != null) {
86+
mainEventBusProcessor.setCallback(null); // Clear any test callbacks
87+
EventQueueUtil.stop(mainEventBusProcessor);
88+
}
89+
mainEventBusProcessor = null;
90+
mainEventBus = null;
91+
queueManager = null;
92+
}
93+
94+
/**
95+
* Helper to wait for MainEventBusProcessor to process an event.
96+
* Replaces polling patterns with deterministic callback-based waiting.
97+
*
98+
* @param action the action that triggers event processing
99+
* @throws InterruptedException if waiting is interrupted
100+
* @throws AssertionError if processing doesn't complete within timeout
101+
*/
102+
private void waitForEventProcessing(Runnable action) throws InterruptedException {
103+
CountDownLatch processingLatch = new CountDownLatch(1);
104+
mainEventBusProcessor.setCallback(new io.a2a.server.events.MainEventBusProcessorCallback() {
105+
@Override
106+
public void onEventProcessed(String taskId, io.a2a.spec.Event event) {
107+
processingLatch.countDown();
108+
}
109+
110+
@Override
111+
public void onTaskFinalized(String taskId) {
112+
// Not needed for basic event processing wait
113+
}
114+
});
115+
116+
try {
117+
action.run();
118+
assertTrue(processingLatch.await(5, TimeUnit.SECONDS),
119+
"MainEventBusProcessor should have processed the event within timeout");
120+
} finally {
121+
mainEventBusProcessor.setCallback(null);
122+
}
123+
}
124+
69125
@Test
70126
void testReplicationStrategyTriggeredOnNormalEnqueue() throws InterruptedException {
71127
CountingReplicationStrategy strategy = new CountingReplicationStrategy();
@@ -119,48 +175,45 @@ void testReplicationStrategyWithCountingImplementation() throws InterruptedExcep
119175
@Test
120176
void testReplicatedEventDeliveredToCorrectQueue() throws InterruptedException {
121177
String taskId = "test-task-4";
178+
TaskStatusUpdateEvent eventForTask = createEventForTask(taskId); // Use matching taskId
122179
EventQueue queue = queueManager.createOrTap(taskId);
123180

124-
ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, testEvent);
125-
queueManager.onReplicatedEvent(replicatedEvent);
181+
ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, eventForTask);
126182

127-
Event dequeuedEvent;
128-
try {
129-
dequeuedEvent = queue.dequeueEventItem(100).getEvent();
130-
} catch (EventQueueClosedException e) {
131-
fail("Queue should not be closed");
132-
return;
133-
}
134-
assertEquals(testEvent, dequeuedEvent);
183+
// Use callback to wait for event processing
184+
EventQueueItem item = dequeueEventWithRetry(queue, () -> queueManager.onReplicatedEvent(replicatedEvent));
185+
assertNotNull(item, "Event should be available in queue");
186+
Event dequeuedEvent = item.getEvent();
187+
assertEquals(eventForTask, dequeuedEvent);
135188
}
136189

137190
@Test
138191
void testReplicatedEventCreatesQueueIfNeeded() throws InterruptedException {
139192
String taskId = "non-existent-task";
193+
TaskStatusUpdateEvent eventForTask = createEventForTask(taskId); // Use matching taskId
140194

141195
// Verify no queue exists initially
142196
assertNull(queueManager.get(taskId));
143197

144-
ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, testEvent);
145-
146-
// Process the replicated event
147-
assertDoesNotThrow(() -> queueManager.onReplicatedEvent(replicatedEvent));
148-
149-
// Verify that a queue was created and the event was enqueued
150-
EventQueue queue = queueManager.get(taskId);
151-
assertNotNull(queue, "Queue should be created when processing replicated event for non-existent task");
152-
153-
// Verify the event was enqueued by dequeuing it
154-
// Need to tap() the MainQueue to get a ChildQueue for consumption
155-
EventQueue childQueue = queue.tap();
156-
Event dequeuedEvent;
157-
try {
158-
dequeuedEvent = childQueue.dequeueEventItem(100).getEvent();
159-
} catch (EventQueueClosedException e) {
160-
fail("Queue should not be closed");
161-
return;
162-
}
163-
assertEquals(testEvent, dequeuedEvent, "The replicated event should be enqueued in the newly created queue");
198+
// Create a ChildQueue BEFORE processing the replicated event
199+
// This ensures the ChildQueue exists when MainEventBusProcessor distributes the event
200+
EventQueue childQueue = queueManager.createOrTap(taskId);
201+
assertNotNull(childQueue, "ChildQueue should be created");
202+
203+
// Verify MainQueue was created
204+
EventQueue mainQueue = queueManager.get(taskId);
205+
assertNotNull(mainQueue, "MainQueue should exist after createOrTap");
206+
207+
ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, eventForTask);
208+
209+
// Process the replicated event and wait for distribution
210+
// Use callback to wait for event processing
211+
EventQueueItem item = dequeueEventWithRetry(childQueue, () -> {
212+
assertDoesNotThrow(() -> queueManager.onReplicatedEvent(replicatedEvent));
213+
});
214+
assertNotNull(item, "Event should be available in queue");
215+
Event dequeuedEvent = item.getEvent();
216+
assertEquals(eventForTask, dequeuedEvent, "The replicated event should be enqueued in the newly created queue");
164217
}
165218

166219
@Test
@@ -340,29 +393,29 @@ void testReplicatedEventProcessedWhenTaskActive() throws InterruptedException {
340393
queueManager = new ReplicatedQueueManager(new CountingReplicationStrategy(), stateProvider, mainEventBus);
341394

342395
String taskId = "active-task";
396+
TaskStatusUpdateEvent eventForTask = createEventForTask(taskId); // Use matching taskId
343397

344398
// Verify no queue exists initially
345399
assertNull(queueManager.get(taskId));
346400

347-
// Process a replicated event for an active task
348-
ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, testEvent);
349-
queueManager.onReplicatedEvent(replicatedEvent);
401+
// Create a ChildQueue BEFORE processing the replicated event
402+
// This ensures the ChildQueue exists when MainEventBusProcessor distributes the event
403+
EventQueue childQueue = queueManager.createOrTap(taskId);
404+
assertNotNull(childQueue, "ChildQueue should be created");
350405

351-
// Queue should be created and event should be enqueued
352-
EventQueue queue = queueManager.get(taskId);
353-
assertNotNull(queue, "Queue should be created for active task");
406+
// Verify MainQueue was created
407+
EventQueue mainQueue = queueManager.get(taskId);
408+
assertNotNull(mainQueue, "MainQueue should exist after createOrTap");
354409

355-
// Verify the event was enqueued
356-
// Need to tap() the MainQueue to get a ChildQueue for consumption
357-
EventQueue childQueue = queue.tap();
358-
Event dequeuedEvent;
359-
try {
360-
dequeuedEvent = childQueue.dequeueEventItem(100).getEvent();
361-
} catch (EventQueueClosedException e) {
362-
fail("Queue should not be closed");
363-
return;
364-
}
365-
assertEquals(testEvent, dequeuedEvent, "Event should be enqueued for active task");
410+
// Process a replicated event for an active task
411+
ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, eventForTask);
412+
413+
// Verify the event was enqueued and distributed to our ChildQueue
414+
// Use callback to wait for event processing
415+
EventQueueItem item = dequeueEventWithRetry(childQueue, () -> queueManager.onReplicatedEvent(replicatedEvent));
416+
assertNotNull(item, "Event should be available in queue");
417+
Event dequeuedEvent = item.getEvent();
418+
assertEquals(eventForTask, dequeuedEvent, "Event should be enqueued for active task");
366419
}
367420

368421

@@ -474,36 +527,21 @@ void testQueueClosedEventJsonSerialization() throws Exception {
474527
@Test
475528
void testReplicatedQueueClosedEventTerminatesConsumer() throws InterruptedException {
476529
String taskId = "remote-close-test";
530+
TaskStatusUpdateEvent eventForTask = createEventForTask(taskId); // Use matching taskId
477531
EventQueue queue = queueManager.createOrTap(taskId);
478532

479-
// Enqueue a normal event
480-
queue.enqueueEvent(testEvent);
481-
482533
// Simulate receiving QueueClosedEvent from remote node
483534
QueueClosedEvent closedEvent = new QueueClosedEvent(taskId);
484535
ReplicatedEventQueueItem replicatedClosedEvent = new ReplicatedEventQueueItem(taskId, closedEvent);
485-
queueManager.onReplicatedEvent(replicatedClosedEvent);
486536

487-
// Dequeue the normal event first
488-
EventQueueItem item1;
489-
try {
490-
item1 = queue.dequeueEventItem(100);
491-
} catch (EventQueueClosedException e) {
492-
fail("Should not throw on first dequeue");
493-
return;
494-
}
495-
assertNotNull(item1);
496-
assertEquals(testEvent, item1.getEvent());
537+
// Dequeue the normal event first (use callback to wait for async processing)
538+
EventQueueItem item1 = dequeueEventWithRetry(queue, () -> queue.enqueueEvent(eventForTask));
539+
assertNotNull(item1, "First event should be available");
540+
assertEquals(eventForTask, item1.getEvent());
497541

498-
// Next dequeue should get the QueueClosedEvent
499-
EventQueueItem item2;
500-
try {
501-
item2 = queue.dequeueEventItem(100);
502-
} catch (EventQueueClosedException e) {
503-
fail("Should not throw on second dequeue, should return the event");
504-
return;
505-
}
506-
assertNotNull(item2);
542+
// Next dequeue should get the QueueClosedEvent (use callback to wait for async processing)
543+
EventQueueItem item2 = dequeueEventWithRetry(queue, () -> queueManager.onReplicatedEvent(replicatedClosedEvent));
544+
assertNotNull(item2, "QueueClosedEvent should be available");
507545
assertTrue(item2.getEvent() instanceof QueueClosedEvent,
508546
"Second event should be QueueClosedEvent");
509547
}
@@ -562,4 +600,25 @@ public void setActive(boolean active) {
562600
this.active = active;
563601
}
564602
}
603+
604+
/**
605+
* Helper method to dequeue an event after waiting for MainEventBusProcessor distribution.
606+
* Uses callback-based waiting instead of polling for deterministic synchronization.
607+
*
608+
* @param queue the queue to dequeue from
609+
* @param enqueueAction the action that enqueues the event (triggers event processing)
610+
* @return the dequeued EventQueueItem, or null if queue is closed
611+
*/
612+
private EventQueueItem dequeueEventWithRetry(EventQueue queue, Runnable enqueueAction) throws InterruptedException {
613+
// Wait for event to be processed and distributed
614+
waitForEventProcessing(enqueueAction);
615+
616+
// Event is now available, dequeue directly
617+
try {
618+
return queue.dequeueEventItem(100);
619+
} catch (EventQueueClosedException e) {
620+
// Queue closed, return null
621+
return null;
622+
}
623+
}
565624
}

extras/queue-manager-replicated/core/src/test/java/io/a2a/server/events/EventQueueUtil.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,6 @@
11
package io.a2a.server.events;
22

33
public class EventQueueUtil {
4-
// Shared MainEventBus for all tests - ensures events are properly distributed
5-
private static final MainEventBus TEST_EVENT_BUS = new MainEventBus();
6-
7-
// Since EventQueue.builder() is package protected, add a method to expose it
8-
public static EventQueue.EventQueueBuilder getEventQueueBuilder() {
9-
return EventQueue.builder(TEST_EVENT_BUS);
10-
}
11-
124
public static void start(MainEventBusProcessor processor) {
135
processor.start();
146
}

server-common/src/main/java/io/a2a/server/events/EventQueue.java

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -306,22 +306,29 @@ void childClosing(ChildQueue child, boolean immediate) {
306306
* Called by MainEventBusProcessor after TaskStore persistence.
307307
*/
308308
void distributeToChildren(EventQueueItem item) {
309-
synchronized (children) {
310-
int childCount = children.size();
311-
if (LOGGER.isDebugEnabled()) {
312-
LOGGER.debug("MainQueue[{}]: Distributing event {} to {} children",
313-
taskId, item.getEvent().getClass().getSimpleName(), childCount);
314-
}
315-
children.forEach(child -> {
316-
LOGGER.debug("MainQueue[{}]: Enqueueing event {} to child queue",
317-
taskId, item.getEvent().getClass().getSimpleName());
318-
child.internalEnqueueItem(item);
319-
});
320-
if (LOGGER.isDebugEnabled()) {
321-
LOGGER.debug("MainQueue[{}]: Completed distribution of {} to {} children",
322-
taskId, item.getEvent().getClass().getSimpleName(), childCount);
323-
}
309+
int childCount = children.size();
310+
if (LOGGER.isDebugEnabled()) {
311+
LOGGER.debug("MainQueue[{}]: Distributing event {} to {} children",
312+
taskId, item.getEvent().getClass().getSimpleName(), childCount);
324313
}
314+
children.forEach(child -> {
315+
LOGGER.debug("MainQueue[{}]: Enqueueing event {} to child queue",
316+
taskId, item.getEvent().getClass().getSimpleName());
317+
child.internalEnqueueItem(item);
318+
});
319+
if (LOGGER.isDebugEnabled()) {
320+
LOGGER.debug("MainQueue[{}]: Completed distribution of {} to {} children",
321+
taskId, item.getEvent().getClass().getSimpleName(), childCount);
322+
}
323+
}
324+
325+
/**
326+
* Release the semaphore after event processing is complete.
327+
* Called by MainEventBusProcessor in finally block to ensure release even on exceptions.
328+
* Balances the acquire() in enqueueEvent() - protects MainEventBus throughput.
329+
*/
330+
void releaseSemaphore() {
331+
semaphore.release();
325332
}
326333

327334
/**
@@ -393,17 +400,12 @@ public void enqueueItem(EventQueueItem item) {
393400

394401
private void internalEnqueueItem(EventQueueItem item) {
395402
// Internal method called by MainEventBusProcessor to add to local queue
403+
// Note: Semaphore is managed by parent MainQueue (acquire/release), not ChildQueue
396404
Event event = item.getEvent();
397405
if (isClosed()) {
398406
LOGGER.warn("ChildQueue is closed. Event will not be enqueued. {} {}", this, event);
399407
return;
400408
}
401-
try {
402-
semaphore.acquire();
403-
} catch (InterruptedException e) {
404-
Thread.currentThread().interrupt();
405-
throw new RuntimeException("Unable to acquire the semaphore to enqueue the event", e);
406-
}
407409
queue.add(item);
408410
LOGGER.debug("Enqueued event {} {}", event instanceof Throwable ? event.toString() : event, this);
409411
}
@@ -420,7 +422,7 @@ public EventQueueItem dequeueEventItem(int waitMilliSeconds) throws EventQueueCl
420422
if (item != null) {
421423
Event event = item.getEvent();
422424
LOGGER.debug("Dequeued event item (no wait) {} {}", this, event instanceof Throwable ? event.toString() : event);
423-
semaphore.release();
425+
// Note: Semaphore is managed by parent MainQueue, not released here
424426
}
425427
return item;
426428
}
@@ -430,7 +432,7 @@ public EventQueueItem dequeueEventItem(int waitMilliSeconds) throws EventQueueCl
430432
if (item != null) {
431433
Event event = item.getEvent();
432434
LOGGER.debug("Dequeued event item (waiting) {} {}", this, event instanceof Throwable ? event.toString() : event);
433-
semaphore.release();
435+
// Note: Semaphore is managed by parent MainQueue, not released here
434436
} else {
435437
LOGGER.trace("Dequeue timeout (null) from ChildQueue {}", System.identityHashCode(this));
436438
}
@@ -471,8 +473,11 @@ protected void doClose(boolean immediate) {
471473
super.doClose(immediate); // Sets closed flag
472474
if (immediate) {
473475
// Immediate close: clear pending events from local queue
476+
int clearedCount = queue.size();
474477
queue.clear();
475-
LOGGER.debug("Cleared ChildQueue for immediate close: {}", this);
478+
// Release semaphore permits for cleared events to prevent deadlock
479+
semaphore.release(clearedCount);
480+
LOGGER.debug("Cleared {} events from ChildQueue for immediate close: {}", clearedCount, this);
476481
}
477482
// For graceful close, let the queue drain naturally through normal consumption
478483
}

0 commit comments

Comments
 (0)