Skip to content

Commit 2532703

Browse files
committed
Gemini feedback and fixing tests
1 parent 8b78c8a commit 2532703

File tree

9 files changed

+130
-120
lines changed

9 files changed

+130
-120
lines changed

extras/queue-manager-replicated/core/src/test/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManagerTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,10 @@ void testReplicatedEventProcessedWhenTaskActive() throws InterruptedException {
357357
EventQueue childQueue = queue.tap();
358358
Event dequeuedEvent;
359359
try {
360-
dequeuedEvent = childQueue.dequeueEventItem(100).getEvent();
360+
// Wait up to 5 seconds for event (async processing needs time)
361+
EventQueueItem item = childQueue.dequeueEventItem(5000);
362+
assertNotNull(item, "Event should be available in queue");
363+
dequeuedEvent = item.getEvent();
361364
} catch (EventQueueClosedException e) {
362365
fail("Queue should not be closed");
363366
return;

server-common/src/main/java/io/a2a/server/events/EventQueue.java

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -306,21 +306,19 @@ void childClosing(ChildQueue child, boolean immediate) {
306306
* Called by MainEventBusProcessor after TaskStore persistence.
307307
*/
308308
void distributeToChildren(EventQueueItem item) {
309-
synchronized (children) {
310-
int childCount = children.size();
311-
if (LOGGER.isDebugEnabled()) {
312-
LOGGER.debug("MainQueue[{}]: Distributing event {} to {} children",
313-
taskId, item.getEvent().getClass().getSimpleName(), childCount);
314-
}
315-
children.forEach(child -> {
316-
LOGGER.debug("MainQueue[{}]: Enqueueing event {} to child queue",
317-
taskId, item.getEvent().getClass().getSimpleName());
318-
child.internalEnqueueItem(item);
319-
});
320-
if (LOGGER.isDebugEnabled()) {
321-
LOGGER.debug("MainQueue[{}]: Completed distribution of {} to {} children",
322-
taskId, item.getEvent().getClass().getSimpleName(), childCount);
323-
}
309+
int childCount = children.size();
310+
if (LOGGER.isDebugEnabled()) {
311+
LOGGER.debug("MainQueue[{}]: Distributing event {} to {} children",
312+
taskId, item.getEvent().getClass().getSimpleName(), childCount);
313+
}
314+
children.forEach(child -> {
315+
LOGGER.debug("MainQueue[{}]: Enqueueing event {} to child queue",
316+
taskId, item.getEvent().getClass().getSimpleName());
317+
child.internalEnqueueItem(item);
318+
});
319+
if (LOGGER.isDebugEnabled()) {
320+
LOGGER.debug("MainQueue[{}]: Completed distribution of {} to {} children",
321+
taskId, item.getEvent().getClass().getSimpleName(), childCount);
324322
}
325323
}
326324

@@ -471,8 +469,11 @@ protected void doClose(boolean immediate) {
471469
super.doClose(immediate); // Sets closed flag
472470
if (immediate) {
473471
// Immediate close: clear pending events from local queue
472+
int clearedCount = queue.size();
474473
queue.clear();
475-
LOGGER.debug("Cleared ChildQueue for immediate close: {}", this);
474+
// Release semaphore permits for cleared events to prevent deadlock
475+
semaphore.release(clearedCount);
476+
LOGGER.debug("Cleared {} events from ChildQueue for immediate close: {}", clearedCount, this);
476477
}
477478
// For graceful close, let the queue drain naturally through normal consumption
478479
}

server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public class MainEventBusProcessor implements Runnable {
4444
* Default is NOOP to avoid null checks in production code.
4545
* Tests can inject their own callback via setCallback().
4646
*/
47-
private static volatile MainEventBusProcessorCallback callback = MainEventBusProcessorCallback.NOOP;
47+
private volatile MainEventBusProcessorCallback callback = MainEventBusProcessorCallback.NOOP;
4848

4949
private final MainEventBus eventBus;
5050

@@ -71,14 +71,14 @@ public MainEventBusProcessor(MainEventBus eventBus, TaskStore taskStore, PushNot
7171
*
7272
* @param callback the callback to invoke during event processing, or null for NOOP
7373
*/
74-
public static void setCallback(MainEventBusProcessorCallback callback) {
75-
MainEventBusProcessor.callback = callback != null ? callback : MainEventBusProcessorCallback.NOOP;
74+
public void setCallback(MainEventBusProcessorCallback callback) {
75+
this.callback = callback != null ? callback : MainEventBusProcessorCallback.NOOP;
7676
}
7777

7878
@PostConstruct
7979
void start() {
8080
processorThread = new Thread(this, "MainEventBusProcessor");
81-
processorThread.setDaemon(false); // Keep JVM alive
81+
processorThread.setDaemon(true); // Allow JVM to exit even if this thread is running
8282
processorThread.start();
8383
LOGGER.info("MainEventBusProcessor started");
8484
}
@@ -98,9 +98,13 @@ void stop() {
9898
if (processorThread != null) {
9999
processorThread.interrupt();
100100
try {
101+
long start = System.currentTimeMillis();
101102
processorThread.join(5000); // Wait up to 5 seconds
103+
long elapsed = System.currentTimeMillis() - start;
104+
LOGGER.info("MainEventBusProcessor thread stopped in {}ms", elapsed);
102105
} catch (InterruptedException e) {
103106
Thread.currentThread().interrupt();
107+
LOGGER.warn("Interrupted while waiting for MainEventBusProcessor thread to stop");
104108
}
105109
}
106110
LOGGER.info("MainEventBusProcessor stopped");

server-common/src/main/java/io/a2a/server/events/MainEventBusProcessorCallback.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,13 @@
1212
* Usage in tests:
1313
* <pre>
1414
* {@code
15+
* @Inject
16+
* MainEventBusProcessor processor;
17+
*
1518
* @BeforeEach
1619
* void setUp() {
1720
* CountDownLatch latch = new CountDownLatch(3);
18-
* MainEventBusProcessor.setCallback(new MainEventBusProcessorCallback() {
21+
* processor.setCallback(new MainEventBusProcessorCallback() {
1922
* public void onEventProcessed(String taskId, Event event) {
2023
* latch.countDown();
2124
* }
@@ -24,7 +27,7 @@
2427
*
2528
* @AfterEach
2629
* void tearDown() {
27-
* MainEventBusProcessor.setCallback(null); // Reset to NOOP
30+
* processor.setCallback(null); // Reset to NOOP
2831
* }
2932
* }
3033
* </pre>

server-common/src/test/java/io/a2a/server/events/EventConsumerTest.java

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import java.util.concurrent.atomic.AtomicReference;
1616

1717
import com.fasterxml.jackson.core.JsonProcessingException;
18+
import io.a2a.server.tasks.InMemoryTaskStore;
19+
import io.a2a.server.tasks.PushNotificationSender;
1820
import io.a2a.spec.A2AError;
1921
import io.a2a.spec.A2AServerException;
2022
import io.a2a.spec.Artifact;
@@ -28,14 +30,18 @@
2830
import io.a2a.spec.TaskStatusUpdateEvent;
2931
import io.a2a.spec.TextPart;
3032
import io.a2a.util.Utils;
33+
import org.junit.jupiter.api.AfterEach;
3134
import org.junit.jupiter.api.BeforeEach;
3235
import org.junit.jupiter.api.Test;
3336

3437
public class EventConsumerTest {
3538

39+
private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = task -> {};
40+
3641
private EventQueue eventQueue;
3742
private EventConsumer eventConsumer;
38-
43+
private MainEventBus mainEventBus;
44+
private MainEventBusProcessor mainEventBusProcessor;
3945

4046
private static final String MINIMAL_TASK = """
4147
{
@@ -57,10 +63,25 @@ public class EventConsumerTest {
5763

5864
@BeforeEach
5965
public void init() {
60-
eventQueue = EventQueueUtil.getEventQueueBuilder().build().tap();
66+
// Set up MainEventBus and processor for production-like test environment
67+
InMemoryTaskStore taskStore = new InMemoryTaskStore();
68+
mainEventBus = new MainEventBus();
69+
mainEventBusProcessor = new MainEventBusProcessor(mainEventBus, taskStore, NOOP_PUSHNOTIFICATION_SENDER);
70+
EventQueueUtil.start(mainEventBusProcessor);
71+
72+
eventQueue = EventQueueUtil.getEventQueueBuilder()
73+
.mainEventBus(mainEventBus)
74+
.build().tap();
6175
eventConsumer = new EventConsumer(eventQueue);
6276
}
6377

78+
@AfterEach
79+
public void cleanup() {
80+
if (mainEventBusProcessor != null) {
81+
EventQueueUtil.stop(mainEventBusProcessor);
82+
}
83+
}
84+
6485
@Test
6586
public void testConsumeOneTaskEvent() throws Exception {
6687
Task event = Utils.unmarshalFrom(MINIMAL_TASK, Task.TYPE_REFERENCE);
@@ -343,7 +364,9 @@ public void onComplete() {
343364

344365
@Test
345366
public void testConsumeAllStopsOnQueueClosed() throws Exception {
346-
EventQueue queue = EventQueueUtil.getEventQueueBuilder().build().tap();
367+
EventQueue queue = EventQueueUtil.getEventQueueBuilder()
368+
.mainEventBus(mainEventBus)
369+
.build().tap();
347370
EventConsumer consumer = new EventConsumer(queue);
348371

349372
// Close the queue immediately
@@ -389,7 +412,9 @@ public void onComplete() {
389412

390413
@Test
391414
public void testConsumeAllHandlesQueueClosedException() throws Exception {
392-
EventQueue queue = EventQueueUtil.getEventQueueBuilder().build().tap();
415+
EventQueue queue = EventQueueUtil.getEventQueueBuilder()
416+
.mainEventBus(mainEventBus)
417+
.build().tap();
393418
EventConsumer consumer = new EventConsumer(queue);
394419

395420
// Add a message event (which will complete the stream)
@@ -447,7 +472,9 @@ public void onComplete() {
447472

448473
@Test
449474
public void testConsumeAllTerminatesOnQueueClosedEvent() throws Exception {
450-
EventQueue queue = EventQueueUtil.getEventQueueBuilder().build().tap();
475+
EventQueue queue = EventQueueUtil.getEventQueueBuilder()
476+
.mainEventBus(mainEventBus)
477+
.build().tap();
451478
EventConsumer consumer = new EventConsumer(queue);
452479

453480
// Enqueue a QueueClosedEvent (poison pill)

server-common/src/test/java/io/a2a/server/events/EventQueueTest.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,9 @@ public void testCloseImmediatePropagationToChildren() throws Exception {
225225

226226
@Test
227227
public void testEnqueueEventWhenClosed() throws Exception {
228-
EventQueue mainQueue = EventQueueUtil.getEventQueueBuilder().build();
228+
EventQueue mainQueue = EventQueueUtil.getEventQueueBuilder()
229+
.mainEventBus(mainEventBus)
230+
.build();
229231
EventQueue childQueue = mainQueue.tap();
230232
Event event = Utils.unmarshalFrom(MINIMAL_TASK, Task.TYPE_REFERENCE);
231233

@@ -260,16 +262,22 @@ public void testDequeueEventWhenClosedAndEmpty() throws Exception {
260262

261263
@Test
262264
public void testDequeueEventWhenClosedButHasEvents() throws Exception {
263-
EventQueue mainQueue = EventQueueUtil.getEventQueueBuilder().build();
265+
EventQueue mainQueue = EventQueueUtil.getEventQueueBuilder()
266+
.mainEventBus(mainEventBus)
267+
.build();
264268
EventQueue childQueue = mainQueue.tap();
265269
Event event = Utils.unmarshalFrom(MINIMAL_TASK, Task.TYPE_REFERENCE);
266270

267271
// Enqueue to mainQueue
268272
mainQueue.enqueueEvent(event);
269273

270-
// Wait for event to arrive in childQueue (use peek-like behavior by dequeueing then re-checking)
271-
// Actually, just wait a bit for async processing
272-
Thread.sleep(100); // Give MainEventBusProcessor time to distribute event
274+
// Poll for event to arrive in childQueue (async MainEventBusProcessor distribution)
275+
long startTime = System.currentTimeMillis();
276+
long timeout = 2000;
277+
while (childQueue.size() == 0 && (System.currentTimeMillis() - startTime) < timeout) {
278+
Thread.sleep(50);
279+
}
280+
assertTrue(childQueue.size() > 0, "Event should arrive in ChildQueue within timeout");
273281

274282
childQueue.close(); // Graceful close - events should remain
275283
assertTrue(childQueue.isClosed());
Lines changed: 21 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -1,102 +1,43 @@
11
package io.a2a.server.events;
22

3-
import io.a2a.server.tasks.InMemoryTaskStore;
4-
import io.a2a.server.tasks.PushNotificationSender;
5-
import io.a2a.server.tasks.TaskStateProvider;
63
import java.util.concurrent.atomic.AtomicInteger;
74

85
public class EventQueueUtil {
96
// Shared MainEventBus for all tests (to avoid creating one per test)
7+
// Each test creates its own MainEventBusProcessor in @BeforeEach/@AfterEach for proper isolation
108
private static final MainEventBus TEST_EVENT_BUS = new MainEventBus();
119

12-
// Shared MainEventBusProcessor for all tests (automatically processes events)
13-
private static final MainEventBusProcessor TEST_PROCESSOR;
14-
15-
static {
16-
// Initialize and start the processor once for all tests
17-
InMemoryTaskStore testTaskStore = new InMemoryTaskStore();
18-
PushNotificationSender testPushSender = taskId -> {}; // No-op for tests
19-
TEST_PROCESSOR = new MainEventBusProcessor(TEST_EVENT_BUS, testTaskStore, testPushSender);
20-
TEST_PROCESSOR.start(); // Start background thread
21-
22-
// Register shutdown hook to stop processor
23-
Runtime.getRuntime().addShutdownHook(new Thread(() -> TEST_PROCESSOR.stop()));
24-
}
25-
2610
// Counter for generating unique test taskIds
2711
private static final AtomicInteger TASK_ID_COUNTER = new AtomicInteger(0);
2812

29-
// Since EventQueue.builder() is package protected, add a method to expose it
30-
// Note: Now includes MainEventBus requirement and default taskId
31-
// Returns MainQueue - tests should call .tap() if they need to consume events
13+
/**
14+
* Get an EventQueue builder pre-configured with the shared test MainEventBus and a unique taskId.
15+
* <p>
16+
* Note: Returns MainQueue - tests should call .tap() if they need to consume events.
17+
* </p>
18+
*
19+
* @return builder with TEST_EVENT_BUS and unique taskId already set
20+
*/
3221
public static EventQueue.EventQueueBuilder getEventQueueBuilder() {
33-
return new EventQueueBuilderWrapper(
34-
EventQueue.builder(TEST_EVENT_BUS)
35-
.taskId("test-task-" + TASK_ID_COUNTER.incrementAndGet())
36-
);
37-
}
38-
39-
// Get the shared test MainEventBus instance
40-
public static MainEventBus getTestEventBus() {
41-
return TEST_EVENT_BUS;
22+
return EventQueue.builder(TEST_EVENT_BUS)
23+
.taskId("test-task-" + TASK_ID_COUNTER.incrementAndGet());
4224
}
4325

26+
/**
27+
* Start a MainEventBusProcessor instance.
28+
*
29+
* @param processor the processor to start
30+
*/
4431
public static void start(MainEventBusProcessor processor) {
4532
processor.start();
4633
}
4734

35+
/**
36+
* Stop a MainEventBusProcessor instance.
37+
*
38+
* @param processor the processor to stop
39+
*/
4840
public static void stop(MainEventBusProcessor processor) {
4941
processor.stop();
5042
}
51-
52-
// Wrapper that delegates to actual builder
53-
private static class EventQueueBuilderWrapper extends EventQueue.EventQueueBuilder {
54-
private final EventQueue.EventQueueBuilder delegate;
55-
56-
EventQueueBuilderWrapper(EventQueue.EventQueueBuilder delegate) {
57-
this.delegate = delegate;
58-
}
59-
60-
@Override
61-
public EventQueue.EventQueueBuilder queueSize(int queueSize) {
62-
delegate.queueSize(queueSize);
63-
return this;
64-
}
65-
66-
@Override
67-
public EventQueue.EventQueueBuilder hook(EventEnqueueHook hook) {
68-
delegate.hook(hook);
69-
return this;
70-
}
71-
72-
@Override
73-
public EventQueue.EventQueueBuilder taskId(String taskId) {
74-
delegate.taskId(taskId);
75-
return this;
76-
}
77-
78-
@Override
79-
public EventQueue.EventQueueBuilder addOnCloseCallback(Runnable onCloseCallback) {
80-
delegate.addOnCloseCallback(onCloseCallback);
81-
return this;
82-
}
83-
84-
@Override
85-
public EventQueue.EventQueueBuilder taskStateProvider(TaskStateProvider taskStateProvider) {
86-
delegate.taskStateProvider(taskStateProvider);
87-
return this;
88-
}
89-
90-
@Override
91-
public EventQueue.EventQueueBuilder mainEventBus(MainEventBus mainEventBus) {
92-
delegate.mainEventBus(mainEventBus);
93-
return this;
94-
}
95-
96-
@Override
97-
public EventQueue build() {
98-
// Return MainQueue directly - tests should call .tap() if they need ChildQueue
99-
return delegate.build();
100-
}
101-
}
10243
}

0 commit comments

Comments
 (0)