Skip to content

Commit c9defab

Browse files
committed
Gemini feedback
1 parent f0b3a37 commit c9defab

File tree

9 files changed

+59
-70
lines changed

9 files changed

+59
-70
lines changed

extras/queue-manager-replicated/core/src/test/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManagerTest.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import java.util.concurrent.atomic.AtomicInteger;
1818

1919
import io.a2a.extras.common.events.TaskFinalizedEvent;
20+
import io.a2a.json.JsonUtil;
2021
import io.a2a.server.events.EventQueue;
2122
import io.a2a.server.events.EventQueueClosedException;
2223
import io.a2a.server.events.EventQueueItem;
@@ -32,7 +33,7 @@
3233
import io.a2a.spec.TaskState;
3334
import io.a2a.spec.TaskStatus;
3435
import io.a2a.spec.TaskStatusUpdateEvent;
35-
import io.a2a.json.JsonUtil;
36+
import org.junit.jupiter.api.AfterEach;
3637
import org.junit.jupiter.api.BeforeEach;
3738
import org.junit.jupiter.api.Test;
3839

@@ -66,6 +67,16 @@ void setUp() {
6667
.build();
6768
}
6869

70+
@AfterEach
71+
void tearDown() {
72+
if (mainEventBusProcessor != null) {
73+
EventQueueUtil.stop(mainEventBusProcessor);
74+
}
75+
mainEventBusProcessor = null;
76+
mainEventBus = null;
77+
queueManager = null;
78+
}
79+
6980
@Test
7081
void testReplicationStrategyTriggeredOnNormalEnqueue() throws InterruptedException {
7182
CountingReplicationStrategy strategy = new CountingReplicationStrategy();

extras/queue-manager-replicated/core/src/test/java/io/a2a/server/events/EventQueueUtil.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,6 @@
11
package io.a2a.server.events;
22

33
public class EventQueueUtil {
4-
// Shared MainEventBus for all tests - ensures events are properly distributed
5-
private static final MainEventBus TEST_EVENT_BUS = new MainEventBus();
6-
7-
// Since EventQueue.builder() is package protected, add a method to expose it
8-
public static EventQueue.EventQueueBuilder getEventQueueBuilder() {
9-
return EventQueue.builder(TEST_EVENT_BUS);
10-
}
11-
124
public static void start(MainEventBusProcessor processor) {
135
processor.start();
146
}

server-common/src/main/java/io/a2a/server/events/MainEventBusContext.java

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,27 +2,10 @@
22

33
import java.util.Objects;
44

5-
class MainEventBusContext {
6-
private final String taskId;
7-
private final EventQueue eventQueue;
8-
private final EventQueueItem eventQueueItem;
9-
10-
public MainEventBusContext(String taskId, EventQueue eventQueue, EventQueueItem eventQueueItem) {
11-
this.taskId = Objects.requireNonNull(taskId, "taskId cannot be null");
12-
this.eventQueue = Objects.requireNonNull(eventQueue, "eventQueue cannot be null");
13-
this.eventQueueItem = Objects.requireNonNull(eventQueueItem, "eventQueueItem cannot be null");
14-
}
15-
16-
public String taskId() {
17-
return taskId;
18-
}
19-
20-
public EventQueue eventQueue() {
21-
return eventQueue;
22-
}
23-
24-
public EventQueueItem eventQueueItem() {
25-
return eventQueueItem;
5+
record MainEventBusContext(String taskId, EventQueue eventQueue, EventQueueItem eventQueueItem) {
6+
MainEventBusContext {
7+
Objects.requireNonNull(taskId, "taskId cannot be null");
8+
Objects.requireNonNull(eventQueue, "eventQueue cannot be null");
9+
Objects.requireNonNull(eventQueueItem, "eventQueueItem cannot be null");
2610
}
27-
2811
}

server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -375,7 +375,12 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
375375

376376
// Cleanup as background task to avoid blocking Vert.x threads
377377
// Pass the consumption future to ensure cleanup waits for background consumption to complete
378-
cleanupProducer(agentFuture, etai != null ? etai.consumptionFuture() : null, taskId, queue, false);
378+
cleanupProducer(agentFuture, etai != null ? etai.consumptionFuture() : null, taskId, queue, false)
379+
.whenComplete((res, err) -> {
380+
if (err != null) {
381+
LOGGER.error("Error during async cleanup for task {}", taskId, err);
382+
}
383+
});
379384
}
380385

381386
LOGGER.debug("Returning: {}", kind);
@@ -508,7 +513,12 @@ public void onComplete() {
508513
CompletableFuture<Void> agentFuture = runningAgents.remove(taskId.get());
509514
LOGGER.debug("Removed agent for task {} from runningAgents in finally block, size after: {}", taskId.get(), runningAgents.size());
510515

511-
cleanupProducer(agentFuture, null, taskId.get(), queue, true);
516+
cleanupProducer(agentFuture, null, taskId.get(), queue, true)
517+
.whenComplete((res, err) -> {
518+
if (err != null) {
519+
LOGGER.error("Error during async cleanup for streaming task {}", taskId.get(), err);
520+
}
521+
});
512522
}
513523
}
514524

server-common/src/test/java/io/a2a/server/events/EventConsumerTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public void init() {
6969
mainEventBusProcessor = new MainEventBusProcessor(mainEventBus, taskStore, NOOP_PUSHNOTIFICATION_SENDER);
7070
EventQueueUtil.start(mainEventBusProcessor);
7171

72-
eventQueue = EventQueueUtil.getEventQueueBuilder()
72+
eventQueue = EventQueueUtil.getEventQueueBuilder(mainEventBus)
7373
.mainEventBus(mainEventBus)
7474
.build().tap();
7575
eventConsumer = new EventConsumer(eventQueue);
@@ -364,7 +364,7 @@ public void onComplete() {
364364

365365
@Test
366366
public void testConsumeAllStopsOnQueueClosed() throws Exception {
367-
EventQueue queue = EventQueueUtil.getEventQueueBuilder()
367+
EventQueue queue = EventQueueUtil.getEventQueueBuilder(mainEventBus)
368368
.mainEventBus(mainEventBus)
369369
.build().tap();
370370
EventConsumer consumer = new EventConsumer(queue);
@@ -412,7 +412,7 @@ public void onComplete() {
412412

413413
@Test
414414
public void testConsumeAllHandlesQueueClosedException() throws Exception {
415-
EventQueue queue = EventQueueUtil.getEventQueueBuilder()
415+
EventQueue queue = EventQueueUtil.getEventQueueBuilder(mainEventBus)
416416
.mainEventBus(mainEventBus)
417417
.build().tap();
418418
EventConsumer consumer = new EventConsumer(queue);
@@ -472,7 +472,7 @@ public void onComplete() {
472472

473473
@Test
474474
public void testConsumeAllTerminatesOnQueueClosedEvent() throws Exception {
475-
EventQueue queue = EventQueueUtil.getEventQueueBuilder()
475+
EventQueue queue = EventQueueUtil.getEventQueueBuilder(mainEventBus)
476476
.mainEventBus(mainEventBus)
477477
.build().tap();
478478
EventConsumer consumer = new EventConsumer(queue);

server-common/src/test/java/io/a2a/server/events/EventQueueTest.java

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public void init() {
6363
mainEventBusProcessor = new MainEventBusProcessor(mainEventBus, taskStore, NOOP_PUSHNOTIFICATION_SENDER);
6464
EventQueueUtil.start(mainEventBusProcessor);
6565

66-
eventQueue = EventQueueUtil.getEventQueueBuilder()
66+
eventQueue = EventQueueUtil.getEventQueueBuilder(mainEventBus)
6767
.taskId("test-task")
6868
.mainEventBus(mainEventBus)
6969
.build().tap();
@@ -80,37 +80,36 @@ public void cleanup() {
8080
* Helper to create a queue with MainEventBus configured (for tests that need event distribution).
8181
*/
8282
private EventQueue createQueueWithEventBus(String taskId) {
83-
return EventQueueUtil.getEventQueueBuilder()
83+
return EventQueueUtil.getEventQueueBuilder(mainEventBus)
8484
.taskId(taskId)
85-
.mainEventBus(mainEventBus)
8685
.build();
8786
}
8887

8988
@Test
9089
public void testConstructorDefaultQueueSize() {
91-
EventQueue queue = EventQueueUtil.getEventQueueBuilder().build();
90+
EventQueue queue = EventQueueUtil.getEventQueueBuilder(mainEventBus).build();
9291
assertEquals(EventQueue.DEFAULT_QUEUE_SIZE, queue.getQueueSize());
9392
}
9493

9594
@Test
9695
public void testConstructorCustomQueueSize() {
9796
int customSize = 500;
98-
EventQueue queue = EventQueueUtil.getEventQueueBuilder().queueSize(customSize).build();
97+
EventQueue queue = EventQueueUtil.getEventQueueBuilder(mainEventBus).queueSize(customSize).build();
9998
assertEquals(customSize, queue.getQueueSize());
10099
}
101100

102101
@Test
103102
public void testConstructorInvalidQueueSize() {
104103
// Test zero queue size
105-
assertThrows(IllegalArgumentException.class, () -> EventQueueUtil.getEventQueueBuilder().queueSize(0).build());
104+
assertThrows(IllegalArgumentException.class, () -> EventQueueUtil.getEventQueueBuilder(mainEventBus).queueSize(0).build());
106105

107106
// Test negative queue size
108-
assertThrows(IllegalArgumentException.class, () -> EventQueueUtil.getEventQueueBuilder().queueSize(-10).build());
107+
assertThrows(IllegalArgumentException.class, () -> EventQueueUtil.getEventQueueBuilder(mainEventBus).queueSize(-10).build());
109108
}
110109

111110
@Test
112111
public void testTapCreatesChildQueue() {
113-
EventQueue parentQueue = EventQueueUtil.getEventQueueBuilder().build();
112+
EventQueue parentQueue = EventQueueUtil.getEventQueueBuilder(mainEventBus).build();
114113
EventQueue childQueue = parentQueue.tap();
115114

116115
assertNotNull(childQueue);
@@ -120,7 +119,7 @@ public void testTapCreatesChildQueue() {
120119

121120
@Test
122121
public void testTapOnChildQueueThrowsException() {
123-
EventQueue parentQueue = EventQueueUtil.getEventQueueBuilder().build();
122+
EventQueue parentQueue = EventQueueUtil.getEventQueueBuilder(mainEventBus).build();
124123
EventQueue childQueue = parentQueue.tap();
125124

126125
assertThrows(IllegalStateException.class, () -> childQueue.tap());
@@ -225,8 +224,7 @@ public void testCloseImmediatePropagationToChildren() throws Exception {
225224

226225
@Test
227226
public void testEnqueueEventWhenClosed() throws Exception {
228-
EventQueue mainQueue = EventQueueUtil.getEventQueueBuilder()
229-
.mainEventBus(mainEventBus)
227+
EventQueue mainQueue = EventQueueUtil.getEventQueueBuilder(mainEventBus)
230228
.build();
231229
EventQueue childQueue = mainQueue.tap();
232230
Event event = Utils.unmarshalFrom(MINIMAL_TASK, Task.class);
@@ -252,7 +250,7 @@ public void testEnqueueEventWhenClosed() throws Exception {
252250

253251
@Test
254252
public void testDequeueEventWhenClosedAndEmpty() throws Exception {
255-
EventQueue queue = EventQueueUtil.getEventQueueBuilder().build().tap();
253+
EventQueue queue = EventQueueUtil.getEventQueueBuilder(mainEventBus).build().tap();
256254
queue.close();
257255
assertTrue(queue.isClosed());
258256

@@ -262,8 +260,7 @@ public void testDequeueEventWhenClosedAndEmpty() throws Exception {
262260

263261
@Test
264262
public void testDequeueEventWhenClosedButHasEvents() throws Exception {
265-
EventQueue mainQueue = EventQueueUtil.getEventQueueBuilder()
266-
.mainEventBus(mainEventBus)
263+
EventQueue mainQueue = EventQueueUtil.getEventQueueBuilder(mainEventBus)
267264
.build();
268265
EventQueue childQueue = mainQueue.tap();
269266
Event event = Utils.unmarshalFrom(MINIMAL_TASK, Task.class);
@@ -409,7 +406,7 @@ public void testCloseIdempotent() throws Exception {
409406
assertTrue(eventQueue.isClosed());
410407

411408
// Test with immediate close as well
412-
EventQueue eventQueue2 = EventQueueUtil.getEventQueueBuilder().build();
409+
EventQueue eventQueue2 = EventQueueUtil.getEventQueueBuilder(mainEventBus).build();
413410
eventQueue2.close(true);
414411
assertTrue(eventQueue2.isClosed());
415412

@@ -423,7 +420,7 @@ public void testCloseIdempotent() throws Exception {
423420
*/
424421
@Test
425422
public void testCloseChildQueues() throws Exception {
426-
EventQueue mainQueue = EventQueueUtil.getEventQueueBuilder().build();
423+
EventQueue mainQueue = EventQueueUtil.getEventQueueBuilder(mainEventBus).build();
427424
EventQueue childQueue = mainQueue.tap();
428425
assertTrue(childQueue != null);
429426

@@ -433,7 +430,7 @@ public void testCloseChildQueues() throws Exception {
433430
assertFalse(childQueue.isClosed()); // Child NOT closed on graceful parent close
434431

435432
// Immediate close - parent force-closes all children
436-
EventQueue mainQueue2 = EventQueueUtil.getEventQueueBuilder().build();
433+
EventQueue mainQueue2 = EventQueueUtil.getEventQueueBuilder(mainEventBus).build();
437434
EventQueue childQueue2 = mainQueue2.tap();
438435
mainQueue2.close(true); // immediate=true
439436
assertTrue(mainQueue2.isClosed());
@@ -446,7 +443,7 @@ public void testCloseChildQueues() throws Exception {
446443
*/
447444
@Test
448445
public void testMainQueueReferenceCountingStaysOpenWithActiveChildren() throws Exception {
449-
EventQueue mainQueue = EventQueueUtil.getEventQueueBuilder().build();
446+
EventQueue mainQueue = EventQueueUtil.getEventQueueBuilder(mainEventBus).build();
450447
EventQueue child1 = mainQueue.tap();
451448
EventQueue child2 = mainQueue.tap();
452449

server-common/src/test/java/io/a2a/server/events/EventQueueUtil.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,6 @@
33
import java.util.concurrent.atomic.AtomicInteger;
44

55
public class EventQueueUtil {
6-
// Shared MainEventBus for all tests (to avoid creating one per test)
7-
// Each test creates its own MainEventBusProcessor in @BeforeEach/@AfterEach for proper isolation
8-
private static final MainEventBus TEST_EVENT_BUS = new MainEventBus();
9-
106
// Counter for generating unique test taskIds
117
private static final AtomicInteger TASK_ID_COUNTER = new AtomicInteger(0);
128

@@ -18,8 +14,8 @@ public class EventQueueUtil {
1814
*
1915
* @return builder with TEST_EVENT_BUS and unique taskId already set
2016
*/
21-
public static EventQueue.EventQueueBuilder getEventQueueBuilder() {
22-
return EventQueue.builder(TEST_EVENT_BUS)
17+
public static EventQueue.EventQueueBuilder getEventQueueBuilder(MainEventBus eventBus) {
18+
return EventQueue.builder(eventBus)
2319
.taskId("test-task-" + TASK_ID_COUNTER.incrementAndGet());
2420
}
2521

server-common/src/test/java/io/a2a/server/events/InMemoryQueueManagerTest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public void tearDown() {
4949
@Test
5050
public void testAddNewQueue() {
5151
String taskId = "test_task_id";
52-
EventQueue queue = EventQueueUtil.getEventQueueBuilder().build();
52+
EventQueue queue = EventQueueUtil.getEventQueueBuilder(mainEventBus).build();
5353

5454
queueManager.add(taskId, queue);
5555

@@ -60,8 +60,8 @@ public void testAddNewQueue() {
6060
@Test
6161
public void testAddExistingQueueThrowsException() {
6262
String taskId = "test_task_id";
63-
EventQueue queue1 = EventQueueUtil.getEventQueueBuilder().build();
64-
EventQueue queue2 = EventQueueUtil.getEventQueueBuilder().build();
63+
EventQueue queue1 = EventQueueUtil.getEventQueueBuilder(mainEventBus).build();
64+
EventQueue queue2 = EventQueueUtil.getEventQueueBuilder(mainEventBus).build();
6565

6666
queueManager.add(taskId, queue1);
6767

@@ -73,7 +73,7 @@ public void testAddExistingQueueThrowsException() {
7373
@Test
7474
public void testGetExistingQueue() {
7575
String taskId = "test_task_id";
76-
EventQueue queue = EventQueueUtil.getEventQueueBuilder().build();
76+
EventQueue queue = EventQueueUtil.getEventQueueBuilder(mainEventBus).build();
7777

7878
queueManager.add(taskId, queue);
7979
EventQueue result = queueManager.get(taskId);
@@ -90,7 +90,7 @@ public void testGetNonexistentQueue() {
9090
@Test
9191
public void testTapExistingQueue() {
9292
String taskId = "test_task_id";
93-
EventQueue queue = EventQueueUtil.getEventQueueBuilder().build();
93+
EventQueue queue = EventQueueUtil.getEventQueueBuilder(mainEventBus).build();
9494

9595
queueManager.add(taskId, queue);
9696
EventQueue tappedQueue = queueManager.tap(taskId);
@@ -111,7 +111,7 @@ public void testTapNonexistentQueue() {
111111
@Test
112112
public void testCloseExistingQueue() {
113113
String taskId = "test_task_id";
114-
EventQueue queue = EventQueueUtil.getEventQueueBuilder().build();
114+
EventQueue queue = EventQueueUtil.getEventQueueBuilder(mainEventBus).build();
115115

116116
queueManager.add(taskId, queue);
117117
queueManager.close(taskId);
@@ -146,7 +146,7 @@ public void testCreateOrTapNewQueue() {
146146
@Test
147147
public void testCreateOrTapExistingQueue() {
148148
String taskId = "test_task_id";
149-
EventQueue originalQueue = EventQueueUtil.getEventQueueBuilder().build();
149+
EventQueue originalQueue = EventQueueUtil.getEventQueueBuilder(mainEventBus).build();
150150

151151
queueManager.add(taskId, originalQueue);
152152
EventQueue result = queueManager.createOrTap(taskId);
@@ -168,7 +168,7 @@ public void testConcurrentOperations() throws InterruptedException, ExecutionExc
168168
// Add tasks concurrently
169169
List<CompletableFuture<String>> addFutures = taskIds.stream()
170170
.map(taskId -> CompletableFuture.supplyAsync(() -> {
171-
EventQueue queue = EventQueueUtil.getEventQueueBuilder().build();
171+
EventQueue queue = EventQueueUtil.getEventQueueBuilder(mainEventBus).build();
172172
queueManager.add(taskId, queue);
173173
return taskId;
174174
}))

server-common/src/test/java/io/a2a/server/tasks/TaskUpdaterTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public void init() {
5959
mainEventBusProcessor = new MainEventBusProcessor(mainEventBus, taskStore, NOOP_PUSHNOTIFICATION_SENDER);
6060
EventQueueUtil.start(mainEventBusProcessor);
6161

62-
eventQueue = EventQueueUtil.getEventQueueBuilder()
62+
eventQueue = EventQueueUtil.getEventQueueBuilder(mainEventBus)
6363
.taskId(TEST_TASK_ID)
6464
.mainEventBus(mainEventBus)
6565
.build().tap();

0 commit comments

Comments
 (0)