Skip to content

Commit 6dca4df

Browse files
committed
Use own MainEventBus for each test rather than a shared instance
1 parent 5fa114c commit 6dca4df

File tree

6 files changed

+68
-36
lines changed

6 files changed

+68
-36
lines changed

.github/workflows/build-and-test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ jobs:
2626
distribution: 'temurin'
2727
cache: maven
2828
- name: Build with Maven and run tests
29-
run: mvn -B package --file pom.xml -fae
29+
run: mvn -B install --file pom.xml -fae
3030
- name: Upload Test Reports
3131
if: failure()
3232
uses: actions/upload-artifact@v4

extras/queue-manager-replicated/core/src/test/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManagerTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,10 @@ void testReplicatedEventProcessedWhenTaskActive() throws InterruptedException {
357357
EventQueue childQueue = queue.tap();
358358
Event dequeuedEvent;
359359
try {
360-
dequeuedEvent = childQueue.dequeueEventItem(100).getEvent();
360+
// Wait up to 5 seconds for event (async processing needs time)
361+
EventQueueItem item = childQueue.dequeueEventItem(5000);
362+
assertNotNull(item, "Event should be available in queue");
363+
dequeuedEvent = item.getEvent();
361364
} catch (EventQueueClosedException e) {
362365
fail("Queue should not be closed");
363366
return;

server-common/src/test/java/io/a2a/server/events/EventConsumerTest.java

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import java.util.concurrent.atomic.AtomicReference;
1616

1717
import com.fasterxml.jackson.core.JsonProcessingException;
18+
import io.a2a.server.tasks.InMemoryTaskStore;
19+
import io.a2a.server.tasks.PushNotificationSender;
1820
import io.a2a.spec.A2AError;
1921
import io.a2a.spec.A2AServerException;
2022
import io.a2a.spec.Artifact;
@@ -28,14 +30,18 @@
2830
import io.a2a.spec.TaskStatusUpdateEvent;
2931
import io.a2a.spec.TextPart;
3032
import io.a2a.util.Utils;
33+
import org.junit.jupiter.api.AfterEach;
3134
import org.junit.jupiter.api.BeforeEach;
3235
import org.junit.jupiter.api.Test;
3336

3437
public class EventConsumerTest {
3538

39+
private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = task -> {};
40+
3641
private EventQueue eventQueue;
3742
private EventConsumer eventConsumer;
38-
43+
private MainEventBus mainEventBus;
44+
private MainEventBusProcessor mainEventBusProcessor;
3945

4046
private static final String MINIMAL_TASK = """
4147
{
@@ -57,10 +63,25 @@ public class EventConsumerTest {
5763

5864
@BeforeEach
5965
public void init() {
60-
eventQueue = EventQueueUtil.getEventQueueBuilder().build().tap();
66+
// Set up MainEventBus and processor for production-like test environment
67+
InMemoryTaskStore taskStore = new InMemoryTaskStore();
68+
mainEventBus = new MainEventBus();
69+
mainEventBusProcessor = new MainEventBusProcessor(mainEventBus, taskStore, NOOP_PUSHNOTIFICATION_SENDER);
70+
EventQueueUtil.start(mainEventBusProcessor);
71+
72+
eventQueue = EventQueueUtil.getEventQueueBuilder()
73+
.mainEventBus(mainEventBus)
74+
.build().tap();
6175
eventConsumer = new EventConsumer(eventQueue);
6276
}
6377

78+
@AfterEach
79+
public void cleanup() {
80+
if (mainEventBusProcessor != null) {
81+
EventQueueUtil.stop(mainEventBusProcessor);
82+
}
83+
}
84+
6485
@Test
6586
public void testConsumeOneTaskEvent() throws Exception {
6687
Task event = Utils.unmarshalFrom(MINIMAL_TASK, Task.TYPE_REFERENCE);
@@ -343,7 +364,9 @@ public void onComplete() {
343364

344365
@Test
345366
public void testConsumeAllStopsOnQueueClosed() throws Exception {
346-
EventQueue queue = EventQueueUtil.getEventQueueBuilder().build().tap();
367+
EventQueue queue = EventQueueUtil.getEventQueueBuilder()
368+
.mainEventBus(mainEventBus)
369+
.build().tap();
347370
EventConsumer consumer = new EventConsumer(queue);
348371

349372
// Close the queue immediately
@@ -389,7 +412,9 @@ public void onComplete() {
389412

390413
@Test
391414
public void testConsumeAllHandlesQueueClosedException() throws Exception {
392-
EventQueue queue = EventQueueUtil.getEventQueueBuilder().build().tap();
415+
EventQueue queue = EventQueueUtil.getEventQueueBuilder()
416+
.mainEventBus(mainEventBus)
417+
.build().tap();
393418
EventConsumer consumer = new EventConsumer(queue);
394419

395420
// Add a message event (which will complete the stream)
@@ -447,7 +472,9 @@ public void onComplete() {
447472

448473
@Test
449474
public void testConsumeAllTerminatesOnQueueClosedEvent() throws Exception {
450-
EventQueue queue = EventQueueUtil.getEventQueueBuilder().build().tap();
475+
EventQueue queue = EventQueueUtil.getEventQueueBuilder()
476+
.mainEventBus(mainEventBus)
477+
.build().tap();
451478
EventConsumer consumer = new EventConsumer(queue);
452479

453480
// Enqueue a QueueClosedEvent (poison pill)

server-common/src/test/java/io/a2a/server/events/EventQueueTest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,9 @@ public void testCloseImmediatePropagationToChildren() throws Exception {
225225

226226
@Test
227227
public void testEnqueueEventWhenClosed() throws Exception {
228-
EventQueue mainQueue = EventQueueUtil.getEventQueueBuilder().build();
228+
EventQueue mainQueue = EventQueueUtil.getEventQueueBuilder()
229+
.mainEventBus(mainEventBus)
230+
.build();
229231
EventQueue childQueue = mainQueue.tap();
230232
Event event = Utils.unmarshalFrom(MINIMAL_TASK, Task.TYPE_REFERENCE);
231233

@@ -260,7 +262,9 @@ public void testDequeueEventWhenClosedAndEmpty() throws Exception {
260262

261263
@Test
262264
public void testDequeueEventWhenClosedButHasEvents() throws Exception {
263-
EventQueue mainQueue = EventQueueUtil.getEventQueueBuilder().build();
265+
EventQueue mainQueue = EventQueueUtil.getEventQueueBuilder()
266+
.mainEventBus(mainEventBus)
267+
.build();
264268
EventQueue childQueue = mainQueue.tap();
265269
Event event = Utils.unmarshalFrom(MINIMAL_TASK, Task.TYPE_REFERENCE);
266270

server-common/src/test/java/io/a2a/server/events/EventQueueUtil.java

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,12 @@
11
package io.a2a.server.events;
22

3-
import io.a2a.server.tasks.InMemoryTaskStore;
4-
import io.a2a.server.tasks.PushNotificationSender;
5-
import io.a2a.server.tasks.TaskStateProvider;
63
import java.util.concurrent.atomic.AtomicInteger;
74

85
public class EventQueueUtil {
96
// Shared MainEventBus for all tests (to avoid creating one per test)
7+
// Each test creates its own MainEventBusProcessor in @BeforeEach/@AfterEach for proper isolation
108
private static final MainEventBus TEST_EVENT_BUS = new MainEventBus();
119

12-
// Shared MainEventBusProcessor for all tests (automatically processes events)
13-
private static final MainEventBusProcessor TEST_PROCESSOR;
14-
15-
static {
16-
// Initialize and start the processor once for all tests
17-
InMemoryTaskStore testTaskStore = new InMemoryTaskStore();
18-
PushNotificationSender testPushSender = taskId -> {}; // No-op for tests
19-
TEST_PROCESSOR = new MainEventBusProcessor(TEST_EVENT_BUS, testTaskStore, testPushSender);
20-
TEST_PROCESSOR.start(); // Start background thread
21-
22-
// Register shutdown hook to stop processor
23-
Runtime.getRuntime().addShutdownHook(new Thread(() -> TEST_PROCESSOR.stop()));
24-
}
25-
2610
// Counter for generating unique test taskIds
2711
private static final AtomicInteger TASK_ID_COUNTER = new AtomicInteger(0);
2812

@@ -39,15 +23,6 @@ public static EventQueue.EventQueueBuilder getEventQueueBuilder() {
3923
.taskId("test-task-" + TASK_ID_COUNTER.incrementAndGet());
4024
}
4125

42-
/**
43-
* Get the shared test MainEventBus instance.
44-
*
45-
* @return the test MainEventBus
46-
*/
47-
public static MainEventBus getTestEventBus() {
48-
return TEST_EVENT_BUS;
49-
}
50-
5126
/**
5227
* Start a MainEventBusProcessor instance.
5328
*

server-common/src/test/java/io/a2a/server/tasks/TaskUpdaterTest.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,16 @@
1616
import io.a2a.server.events.EventQueue;
1717
import io.a2a.server.events.EventQueueItem;
1818
import io.a2a.server.events.EventQueueUtil;
19+
import io.a2a.server.events.MainEventBus;
20+
import io.a2a.server.events.MainEventBusProcessor;
1921
import io.a2a.spec.Event;
2022
import io.a2a.spec.Message;
2123
import io.a2a.spec.Part;
2224
import io.a2a.spec.TaskArtifactUpdateEvent;
2325
import io.a2a.spec.TaskState;
2426
import io.a2a.spec.TaskStatusUpdateEvent;
2527
import io.a2a.spec.TextPart;
28+
import org.junit.jupiter.api.AfterEach;
2629
import org.junit.jupiter.api.BeforeEach;
2730
import org.junit.jupiter.api.Test;
2831

@@ -39,21 +42,41 @@ public class TaskUpdaterTest {
3942

4043
private static final List<Part<?>> SAMPLE_PARTS = List.of(new TextPart("Test message"));
4144

45+
private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = task -> {};
46+
4247
EventQueue eventQueue;
48+
private MainEventBus mainEventBus;
49+
private MainEventBusProcessor mainEventBusProcessor;
4350
private TaskUpdater taskUpdater;
4451

4552

4653

4754
@BeforeEach
4855
public void init() {
49-
eventQueue = EventQueueUtil.getEventQueueBuilder().build().tap();
56+
// Set up MainEventBus and processor for production-like test environment
57+
InMemoryTaskStore taskStore = new InMemoryTaskStore();
58+
mainEventBus = new MainEventBus();
59+
mainEventBusProcessor = new MainEventBusProcessor(mainEventBus, taskStore, NOOP_PUSHNOTIFICATION_SENDER);
60+
EventQueueUtil.start(mainEventBusProcessor);
61+
62+
eventQueue = EventQueueUtil.getEventQueueBuilder()
63+
.taskId(TEST_TASK_ID)
64+
.mainEventBus(mainEventBus)
65+
.build().tap();
5066
RequestContext context = new RequestContext.Builder()
5167
.setTaskId(TEST_TASK_ID)
5268
.setContextId(TEST_TASK_CONTEXT_ID)
5369
.build();
5470
taskUpdater = new TaskUpdater(context, eventQueue);
5571
}
5672

73+
@AfterEach
74+
public void cleanup() {
75+
if (mainEventBusProcessor != null) {
76+
EventQueueUtil.stop(mainEventBusProcessor);
77+
}
78+
}
79+
5780
@Test
5881
public void testAddArtifactWithCustomIdAndName() throws Exception {
5982
taskUpdater.addArtifact(SAMPLE_PARTS, "custom-artifact-id", "Custom Artifact", null);

0 commit comments

Comments
 (0)