Skip to content

Commit 32886d3

Browse files
committed
Fix server-common tests
1 parent 12bdd3a commit 32886d3

File tree

9 files changed

+134
-35
lines changed

9 files changed

+134
-35
lines changed

server-common/src/main/java/io/a2a/server/events/EventQueue.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -316,13 +316,12 @@ public void enqueueItem(EventQueueItem item) {
316316
queue.add(item);
317317
LOGGER.debug("Enqueued event {} {}", event instanceof Throwable ? event.toString() : event, this);
318318

319-
// Submit to MainEventBus instead of immediate distribution to children
319+
// Submit to MainEventBus for centralized persistence + distribution
320320
if (mainEventBus != null && taskId != null) {
321321
mainEventBus.submit(taskId, this, item);
322322
} else {
323-
// Fallback: immediate distribution (for tests/non-bus mode)
324-
LOGGER.warn("MainEventBus not configured for task {}, using immediate distribution", taskId);
325-
children.forEach(eq -> eq.internalEnqueueItem(item));
323+
// This should not happen in properly configured systems
324+
LOGGER.error("MainEventBus not configured for task {} - events will NOT be distributed to children!", taskId);
326325
}
327326

328327
// Trigger replication hook if configured (KEEP for inter-process replication)

server-common/src/main/java/io/a2a/server/events/InMemoryQueueManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@ public class InMemoryQueueManager implements QueueManager {
1818
private final EventQueueFactory factory;
1919
private final TaskStateProvider taskStateProvider;
2020

21-
@Inject
2221
MainEventBus mainEventBus;
2322

2423
@Inject
25-
public InMemoryQueueManager(TaskStateProvider taskStateProvider) {
24+
public InMemoryQueueManager(TaskStateProvider taskStateProvider, MainEventBus mainEventBus) {
25+
this.mainEventBus = mainEventBus;
2626
this.factory = new DefaultEventQueueFactory();
2727
this.taskStateProvider = taskStateProvider;
2828
}

server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,20 @@
3333
public class MainEventBusProcessor implements Runnable {
3434
private static final Logger LOGGER = LoggerFactory.getLogger(MainEventBusProcessor.class);
3535

36-
@Inject
37-
MainEventBus eventBus;
3836

39-
@Inject
40-
TaskStore taskStore;
37+
private final MainEventBus eventBus;
38+
39+
private final TaskStore taskStore;
4140

4241
private volatile boolean running = true;
4342
private Thread processorThread;
4443

44+
@Inject
45+
public MainEventBusProcessor(MainEventBus eventBus, TaskStore taskStore) {
46+
this.eventBus = eventBus;
47+
this.taskStore = taskStore;
48+
}
49+
4550
@PostConstruct
4651
void start() {
4752
processorThread = new Thread(this, "MainEventBusProcessor");
@@ -91,10 +96,10 @@ private void processEvent(MainEventBusContext context) {
9196

9297
LOGGER.debug("Processing event for task {}: {}", taskId, event.getClass().getSimpleName());
9398

94-
// Step 1: Update TaskStore FIRST
99+
// Step 1: Update TaskStore FIRST (persistence before clients see it)
95100
updateTaskStore(taskId, event);
96101

97-
// Step 2: Then distribute to ChildQueues
102+
// Step 2: Then distribute to ChildQueues (clients see it AFTER persistence)
98103
if (eventQueue instanceof EventQueue.MainQueue mainQueue) {
99104
mainQueue.distributeToChildren(context.eventQueueItem());
100105
LOGGER.debug("Distributed event to children for task {}", taskId);

server-common/src/test/java/io/a2a/server/events/EventQueueTest.java

Lines changed: 49 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,19 @@
2020
import io.a2a.spec.TaskNotFoundError;
2121
import io.a2a.spec.TaskState;
2222
import io.a2a.spec.TaskStatus;
23+
import io.a2a.server.tasks.InMemoryTaskStore;
2324
import io.a2a.spec.TaskStatusUpdateEvent;
2425
import io.a2a.spec.TextPart;
2526
import io.a2a.util.Utils;
27+
import org.junit.jupiter.api.AfterEach;
2628
import org.junit.jupiter.api.BeforeEach;
2729
import org.junit.jupiter.api.Test;
2830

2931
public class EventQueueTest {
3032

3133
private EventQueue eventQueue;
34+
private MainEventBus mainEventBus;
35+
private MainEventBusProcessor mainEventBusProcessor;
3236

3337
private static final String MINIMAL_TASK = """
3438
{
@@ -51,8 +55,33 @@ public class EventQueueTest {
5155

5256
@BeforeEach
5357
public void init() {
54-
eventQueue = EventQueue.builder().build();
58+
// Set up MainEventBus and processor for production-like test environment
59+
InMemoryTaskStore taskStore = new InMemoryTaskStore();
60+
mainEventBus = new MainEventBus();
61+
mainEventBusProcessor = new MainEventBusProcessor(mainEventBus, taskStore);
62+
EventQueueUtil.start(mainEventBusProcessor);
63+
64+
eventQueue = EventQueue.builder()
65+
.taskId("test-task")
66+
.mainEventBus(mainEventBus)
67+
.build();
68+
}
69+
70+
@AfterEach
71+
public void cleanup() {
72+
if (mainEventBusProcessor != null) {
73+
EventQueueUtil.stop(mainEventBusProcessor);
74+
}
75+
}
5576

77+
/**
78+
* Helper to create a queue with MainEventBus configured (for tests that need event distribution).
79+
*/
80+
private EventQueue createQueueWithEventBus(String taskId) {
81+
return EventQueue.builder()
82+
.taskId(taskId)
83+
.mainEventBus(mainEventBus)
84+
.build();
5685
}
5786

5887
@Test
@@ -97,23 +126,24 @@ public void testTapOnChildQueueThrowsException() {
97126

98127
@Test
99128
public void testEnqueueEventPropagagesToChildren() throws Exception {
100-
EventQueue parentQueue = EventQueue.builder().build();
129+
EventQueue parentQueue = createQueueWithEventBus("test-propagate");
101130
EventQueue childQueue = parentQueue.tap();
102131

103132
Event event = Utils.unmarshalFrom(MINIMAL_TASK, Task.TYPE_REFERENCE);
104133
parentQueue.enqueueEvent(event);
105134

106135
// Event should be available in both parent and child queues
107-
Event parentEvent = parentQueue.dequeueEventItem(-1).getEvent();
108-
Event childEvent = childQueue.dequeueEventItem(-1).getEvent();
136+
// Note: MainEventBusProcessor runs async, so we use dequeueEventItem with timeout
137+
Event parentEvent = parentQueue.dequeueEventItem(5000).getEvent();
138+
Event childEvent = childQueue.dequeueEventItem(5000).getEvent();
109139

110140
assertSame(event, parentEvent);
111141
assertSame(event, childEvent);
112142
}
113143

114144
@Test
115145
public void testMultipleChildQueuesReceiveEvents() throws Exception {
116-
EventQueue parentQueue = EventQueue.builder().build();
146+
EventQueue parentQueue = createQueueWithEventBus("test-multiple");
117147
EventQueue childQueue1 = parentQueue.tap();
118148
EventQueue childQueue2 = parentQueue.tap();
119149

@@ -124,42 +154,43 @@ public void testMultipleChildQueuesReceiveEvents() throws Exception {
124154
parentQueue.enqueueEvent(event2);
125155

126156
// All queues should receive both events
127-
assertSame(event1, parentQueue.dequeueEventItem(-1).getEvent());
128-
assertSame(event2, parentQueue.dequeueEventItem(-1).getEvent());
157+
// Note: Use timeout for async processing
158+
assertSame(event1, parentQueue.dequeueEventItem(5000).getEvent());
159+
assertSame(event2, parentQueue.dequeueEventItem(5000).getEvent());
129160

130-
assertSame(event1, childQueue1.dequeueEventItem(-1).getEvent());
131-
assertSame(event2, childQueue1.dequeueEventItem(-1).getEvent());
161+
assertSame(event1, childQueue1.dequeueEventItem(5000).getEvent());
162+
assertSame(event2, childQueue1.dequeueEventItem(5000).getEvent());
132163

133-
assertSame(event1, childQueue2.dequeueEventItem(-1).getEvent());
134-
assertSame(event2, childQueue2.dequeueEventItem(-1).getEvent());
164+
assertSame(event1, childQueue2.dequeueEventItem(5000).getEvent());
165+
assertSame(event2, childQueue2.dequeueEventItem(5000).getEvent());
135166
}
136167

137168
@Test
138169
public void testChildQueueDequeueIndependently() throws Exception {
139-
EventQueue parentQueue = EventQueue.builder().build();
170+
EventQueue parentQueue = createQueueWithEventBus("test-independent");
140171
EventQueue childQueue1 = parentQueue.tap();
141172
EventQueue childQueue2 = parentQueue.tap();
142173

143174
Event event = Utils.unmarshalFrom(MINIMAL_TASK, Task.TYPE_REFERENCE);
144175
parentQueue.enqueueEvent(event);
145176

146-
// Dequeue from child1 first
147-
Event child1Event = childQueue1.dequeueEventItem(-1).getEvent();
177+
// Dequeue from child1 first (use timeout for async processing)
178+
Event child1Event = childQueue1.dequeueEventItem(5000).getEvent();
148179
assertSame(event, child1Event);
149180

150181
// child2 should still have the event available
151-
Event child2Event = childQueue2.dequeueEventItem(-1).getEvent();
182+
Event child2Event = childQueue2.dequeueEventItem(5000).getEvent();
152183
assertSame(event, child2Event);
153184

154185
// Parent should still have the event available
155-
Event parentEvent = parentQueue.dequeueEventItem(-1).getEvent();
186+
Event parentEvent = parentQueue.dequeueEventItem(5000).getEvent();
156187
assertSame(event, parentEvent);
157188
}
158189

159190

160191
@Test
161192
public void testCloseImmediatePropagationToChildren() throws Exception {
162-
EventQueue parentQueue = EventQueue.builder().build();
193+
EventQueue parentQueue = createQueueWithEventBus("test-close");
163194
EventQueue childQueue = parentQueue.tap();
164195

165196
// Add events to both parent and child
@@ -168,7 +199,7 @@ public void testCloseImmediatePropagationToChildren() throws Exception {
168199

169200
assertFalse(childQueue.isClosed());
170201
try {
171-
assertNotNull(childQueue.dequeueEventItem(-1)); // Child has the event
202+
assertNotNull(childQueue.dequeueEventItem(5000)); // Child has the event (use timeout)
172203
} catch (EventQueueClosedException e) {
173204
// This is fine if queue closed before dequeue
174205
}

server-common/src/test/java/io/a2a/server/events/EventQueueUtil.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,12 @@ public class EventQueueUtil {
55
public static EventQueue.EventQueueBuilder getEventQueueBuilder() {
66
return EventQueue.builder();
77
}
8+
9+
public static void start(MainEventBusProcessor processor) {
10+
processor.start();
11+
}
12+
13+
public static void stop(MainEventBusProcessor processor) {
14+
processor.stop();
15+
}
816
}

server-common/src/test/java/io/a2a/server/events/InMemoryQueueManagerTest.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,34 @@
1414
import java.util.concurrent.ExecutionException;
1515
import java.util.stream.IntStream;
1616

17+
import io.a2a.server.tasks.InMemoryTaskStore;
1718
import io.a2a.server.tasks.MockTaskStateProvider;
19+
import org.junit.jupiter.api.AfterEach;
1820
import org.junit.jupiter.api.BeforeEach;
1921
import org.junit.jupiter.api.Test;
2022

2123
public class InMemoryQueueManagerTest {
2224

2325
private InMemoryQueueManager queueManager;
2426
private MockTaskStateProvider taskStateProvider;
27+
private InMemoryTaskStore taskStore;
28+
private MainEventBus mainEventBus;
29+
private MainEventBusProcessor mainEventBusProcessor;
2530

2631
@BeforeEach
2732
public void setUp() {
2833
taskStateProvider = new MockTaskStateProvider();
29-
queueManager = new InMemoryQueueManager(taskStateProvider);
34+
taskStore = new InMemoryTaskStore();
35+
mainEventBus = new MainEventBus();
36+
mainEventBusProcessor = new MainEventBusProcessor(mainEventBus, taskStore);
37+
EventQueueUtil.start(mainEventBusProcessor);
38+
39+
queueManager = new InMemoryQueueManager(taskStateProvider, mainEventBus);
40+
}
41+
42+
@AfterEach
43+
public void tearDown() {
44+
EventQueueUtil.stop(mainEventBusProcessor);
3045
}
3146

3247
@Test

server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@
2121
import io.a2a.server.agentexecution.RequestContext;
2222
import io.a2a.server.events.EventQueue;
2323
import io.a2a.server.events.EventQueueItem;
24+
import io.a2a.server.events.EventQueueUtil;
2425
import io.a2a.server.events.InMemoryQueueManager;
26+
import io.a2a.server.events.MainEventBus;
27+
import io.a2a.server.events.MainEventBusProcessor;
2528
import io.a2a.server.tasks.BasePushNotificationSender;
2629
import io.a2a.server.tasks.InMemoryPushNotificationConfigStore;
2730
import io.a2a.server.tasks.InMemoryTaskStore;
@@ -71,6 +74,8 @@ public class AbstractA2ARequestHandlerTest {
7174
protected AgentExecutorMethod agentExecutorCancel;
7275
protected InMemoryQueueManager queueManager;
7376
protected TestHttpClient httpClient;
77+
protected MainEventBus mainEventBus;
78+
protected MainEventBusProcessor mainEventBusProcessor;
7479

7580
protected final Executor internalExecutor = Executors.newCachedThreadPool();
7681

@@ -94,7 +99,14 @@ public void cancel(RequestContext context, EventQueue eventQueue) throws JSONRPC
9499

95100
InMemoryTaskStore inMemoryTaskStore = new InMemoryTaskStore();
96101
taskStore = inMemoryTaskStore;
97-
queueManager = new InMemoryQueueManager(inMemoryTaskStore);
102+
103+
// Create MainEventBus and MainEventBusProcessor (production code path)
104+
mainEventBus = new MainEventBus();
105+
mainEventBusProcessor = new MainEventBusProcessor(mainEventBus, taskStore);
106+
EventQueueUtil.start(mainEventBusProcessor);
107+
108+
queueManager = new InMemoryQueueManager(inMemoryTaskStore, mainEventBus);
109+
98110
httpClient = new TestHttpClient();
99111
PushNotificationConfigStore pushConfigStore = new InMemoryPushNotificationConfigStore();
100112
PushNotificationSender pushSender = new BasePushNotificationSender(pushConfigStore, httpClient);
@@ -107,6 +119,11 @@ public void cancel(RequestContext context, EventQueue eventQueue) throws JSONRPC
107119
public void cleanup() {
108120
agentExecutorExecute = null;
109121
agentExecutorCancel = null;
122+
123+
// Stop MainEventBusProcessor background thread
124+
if (mainEventBusProcessor != null) {
125+
EventQueueUtil.stop(mainEventBusProcessor);
126+
}
110127
}
111128

112129
protected static AgentCard createAgentCard(boolean streaming, boolean pushNotifications, boolean stateTransitionHistory) {

server-common/src/test/java/io/a2a/server/requesthandlers/DefaultRequestHandlerTest.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
import io.a2a.server.agentexecution.RequestContext;
2020
import io.a2a.server.auth.UnauthenticatedUser;
2121
import io.a2a.server.events.EventQueue;
22+
import io.a2a.server.events.EventQueueUtil;
2223
import io.a2a.server.events.InMemoryQueueManager;
24+
import io.a2a.server.events.MainEventBus;
25+
import io.a2a.server.events.MainEventBusProcessor;
2326
import io.a2a.server.tasks.InMemoryPushNotificationConfigStore;
2427
import io.a2a.server.tasks.InMemoryTaskStore;
2528
import io.a2a.server.tasks.TaskUpdater;
@@ -32,6 +35,7 @@
3235
import io.a2a.spec.TaskState;
3336
import io.a2a.spec.TaskStatus;
3437
import io.a2a.spec.TextPart;
38+
import org.junit.jupiter.api.AfterEach;
3539
import org.junit.jupiter.api.BeforeEach;
3640
import org.junit.jupiter.api.Test;
3741
import org.junit.jupiter.api.Timeout;
@@ -50,12 +54,21 @@ public class DefaultRequestHandlerTest {
5054
private InMemoryQueueManager queueManager;
5155
private TestAgentExecutor agentExecutor;
5256
private ServerCallContext serverCallContext;
57+
private MainEventBus mainEventBus;
58+
private MainEventBusProcessor mainEventBusProcessor;
5359

5460
@BeforeEach
5561
void setUp() {
5662
taskStore = new InMemoryTaskStore();
63+
64+
// Create MainEventBus and MainEventBusProcessor (production code path)
65+
mainEventBus = new MainEventBus();
66+
mainEventBusProcessor = new MainEventBusProcessor(mainEventBus, taskStore);
67+
EventQueueUtil.start(mainEventBusProcessor);
68+
5769
// Pass taskStore as TaskStateProvider to queueManager for task-aware queue management
58-
queueManager = new InMemoryQueueManager(taskStore);
70+
queueManager = new InMemoryQueueManager(taskStore, mainEventBus);
71+
5972
agentExecutor = new TestAgentExecutor();
6073

6174
requestHandler = DefaultRequestHandler.create(
@@ -70,6 +83,14 @@ void setUp() {
7083
serverCallContext = new ServerCallContext(UnauthenticatedUser.INSTANCE, Map.of(), Set.of());
7184
}
7285

86+
@AfterEach
87+
void tearDown() {
88+
// Stop MainEventBusProcessor background thread
89+
if (mainEventBusProcessor != null) {
90+
EventQueueUtil.stop(mainEventBusProcessor);
91+
}
92+
}
93+
7394
/**
7495
* Test that multiple blocking messages to the same task work correctly
7596
* when agent doesn't emit final events (fire-and-forget pattern).

0 commit comments

Comments
 (0)