Skip to content

Commit 54dd78f

Browse files
committed
Fix Gemini feedback
1 parent 8b78c8a commit 54dd78f

File tree

6 files changed

+50
-74
lines changed

6 files changed

+50
-74
lines changed

server-common/src/main/java/io/a2a/server/events/EventQueue.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -471,8 +471,11 @@ protected void doClose(boolean immediate) {
471471
super.doClose(immediate); // Sets closed flag
472472
if (immediate) {
473473
// Immediate close: clear pending events from local queue
474+
int clearedCount = queue.size();
474475
queue.clear();
475-
LOGGER.debug("Cleared ChildQueue for immediate close: {}", this);
476+
// Release semaphore permits for cleared events to prevent deadlock
477+
semaphore.release(clearedCount);
478+
LOGGER.debug("Cleared {} events from ChildQueue for immediate close: {}", clearedCount, this);
476479
}
477480
// For graceful close, let the queue drain naturally through normal consumption
478481
}

server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public class MainEventBusProcessor implements Runnable {
4444
* Default is NOOP to avoid null checks in production code.
4545
* Tests can inject their own callback via setCallback().
4646
*/
47-
private static volatile MainEventBusProcessorCallback callback = MainEventBusProcessorCallback.NOOP;
47+
private volatile MainEventBusProcessorCallback callback = MainEventBusProcessorCallback.NOOP;
4848

4949
private final MainEventBus eventBus;
5050

@@ -71,8 +71,8 @@ public MainEventBusProcessor(MainEventBus eventBus, TaskStore taskStore, PushNot
7171
*
7272
* @param callback the callback to invoke during event processing, or null for NOOP
7373
*/
74-
public static void setCallback(MainEventBusProcessorCallback callback) {
75-
MainEventBusProcessor.callback = callback != null ? callback : MainEventBusProcessorCallback.NOOP;
74+
public void setCallback(MainEventBusProcessorCallback callback) {
75+
this.callback = callback != null ? callback : MainEventBusProcessorCallback.NOOP;
7676
}
7777

7878
@PostConstruct

server-common/src/main/java/io/a2a/server/events/MainEventBusProcessorCallback.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,13 @@
1212
* Usage in tests:
1313
* <pre>
1414
* {@code
15+
* @Inject
16+
* MainEventBusProcessor processor;
17+
*
1518
* @BeforeEach
1619
* void setUp() {
1720
* CountDownLatch latch = new CountDownLatch(3);
18-
* MainEventBusProcessor.setCallback(new MainEventBusProcessorCallback() {
21+
* processor.setCallback(new MainEventBusProcessorCallback() {
1922
* public void onEventProcessed(String taskId, Event event) {
2023
* latch.countDown();
2124
* }
@@ -24,7 +27,7 @@
2427
*
2528
* @AfterEach
2629
* void tearDown() {
27-
* MainEventBusProcessor.setCallback(null); // Reset to NOOP
30+
* processor.setCallback(null); // Reset to NOOP
2831
* }
2932
* }
3033
* </pre>

server-common/src/test/java/io/a2a/server/events/EventQueueTest.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -267,9 +267,13 @@ public void testDequeueEventWhenClosedButHasEvents() throws Exception {
267267
// Enqueue to mainQueue
268268
mainQueue.enqueueEvent(event);
269269

270-
// Wait for event to arrive in childQueue (use peek-like behavior by dequeueing then re-checking)
271-
// Actually, just wait a bit for async processing
272-
Thread.sleep(100); // Give MainEventBusProcessor time to distribute event
270+
// Poll for event to arrive in childQueue (async MainEventBusProcessor distribution)
271+
long startTime = System.currentTimeMillis();
272+
long timeout = 2000;
273+
while (childQueue.size() == 0 && (System.currentTimeMillis() - startTime) < timeout) {
274+
Thread.sleep(50);
275+
}
276+
assertTrue(childQueue.size() > 0, "Event should arrive in ChildQueue within timeout");
273277

274278
childQueue.close(); // Graceful close - events should remain
275279
assertTrue(childQueue.isClosed());

server-common/src/test/java/io/a2a/server/events/EventQueueUtil.java

Lines changed: 25 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -26,77 +26,43 @@ public class EventQueueUtil {
2626
// Counter for generating unique test taskIds
2727
private static final AtomicInteger TASK_ID_COUNTER = new AtomicInteger(0);
2828

29-
// Since EventQueue.builder() is package protected, add a method to expose it
30-
// Note: Now includes MainEventBus requirement and default taskId
31-
// Returns MainQueue - tests should call .tap() if they need to consume events
29+
/**
30+
* Get an EventQueue builder pre-configured with the shared test MainEventBus and a unique taskId.
31+
* <p>
32+
* Note: Returns MainQueue - tests should call .tap() if they need to consume events.
33+
* </p>
34+
*
35+
* @return builder with TEST_EVENT_BUS and unique taskId already set
36+
*/
3237
public static EventQueue.EventQueueBuilder getEventQueueBuilder() {
33-
return new EventQueueBuilderWrapper(
34-
EventQueue.builder(TEST_EVENT_BUS)
35-
.taskId("test-task-" + TASK_ID_COUNTER.incrementAndGet())
36-
);
38+
return EventQueue.builder(TEST_EVENT_BUS)
39+
.taskId("test-task-" + TASK_ID_COUNTER.incrementAndGet());
3740
}
3841

39-
// Get the shared test MainEventBus instance
42+
/**
43+
* Get the shared test MainEventBus instance.
44+
*
45+
* @return the test MainEventBus
46+
*/
4047
public static MainEventBus getTestEventBus() {
4148
return TEST_EVENT_BUS;
4249
}
4350

51+
/**
52+
* Start a MainEventBusProcessor instance.
53+
*
54+
* @param processor the processor to start
55+
*/
4456
public static void start(MainEventBusProcessor processor) {
4557
processor.start();
4658
}
4759

60+
/**
61+
* Stop a MainEventBusProcessor instance.
62+
*
63+
* @param processor the processor to stop
64+
*/
4865
public static void stop(MainEventBusProcessor processor) {
4966
processor.stop();
5067
}
51-
52-
// Wrapper that delegates to actual builder
53-
private static class EventQueueBuilderWrapper extends EventQueue.EventQueueBuilder {
54-
private final EventQueue.EventQueueBuilder delegate;
55-
56-
EventQueueBuilderWrapper(EventQueue.EventQueueBuilder delegate) {
57-
this.delegate = delegate;
58-
}
59-
60-
@Override
61-
public EventQueue.EventQueueBuilder queueSize(int queueSize) {
62-
delegate.queueSize(queueSize);
63-
return this;
64-
}
65-
66-
@Override
67-
public EventQueue.EventQueueBuilder hook(EventEnqueueHook hook) {
68-
delegate.hook(hook);
69-
return this;
70-
}
71-
72-
@Override
73-
public EventQueue.EventQueueBuilder taskId(String taskId) {
74-
delegate.taskId(taskId);
75-
return this;
76-
}
77-
78-
@Override
79-
public EventQueue.EventQueueBuilder addOnCloseCallback(Runnable onCloseCallback) {
80-
delegate.addOnCloseCallback(onCloseCallback);
81-
return this;
82-
}
83-
84-
@Override
85-
public EventQueue.EventQueueBuilder taskStateProvider(TaskStateProvider taskStateProvider) {
86-
delegate.taskStateProvider(taskStateProvider);
87-
return this;
88-
}
89-
90-
@Override
91-
public EventQueue.EventQueueBuilder mainEventBus(MainEventBus mainEventBus) {
92-
delegate.mainEventBus(mainEventBus);
93-
return this;
94-
}
95-
96-
@Override
97-
public EventQueue build() {
98-
// Return MainQueue directly - tests should call .tap() if they need ChildQueue
99-
return delegate.build();
100-
}
101-
}
10268
}

transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ public void onComplete() {
285285
public void testOnMessageStreamNewMessageMultipleEventsSuccess() throws InterruptedException {
286286
// Setup callback to wait for all 3 events to be processed by MainEventBusProcessor
287287
CountDownLatch processingLatch = new CountDownLatch(3);
288-
io.a2a.server.events.MainEventBusProcessor.setCallback(new io.a2a.server.events.MainEventBusProcessorCallback() {
288+
mainEventBusProcessor.setCallback(new io.a2a.server.events.MainEventBusProcessorCallback() {
289289
@Override
290290
public void onEventProcessed(String taskId, io.a2a.spec.Event event) {
291291
processingLatch.countDown();
@@ -407,7 +407,7 @@ public void onComplete() {
407407
assertEquals(MINIMAL_TASK.getId(), receivedStatus.getTaskId());
408408
assertEquals(TaskState.COMPLETED, receivedStatus.getStatus().state());
409409
} finally {
410-
io.a2a.server.events.MainEventBusProcessor.setCallback(null);
410+
mainEventBusProcessor.setCallback(null);
411411
}
412412
}
413413

@@ -679,7 +679,7 @@ public void testGetPushNotificationConfigSuccess() {
679679
public void testOnMessageStreamNewMessageSendPushNotificationSuccess() throws Exception {
680680
// Setup callback to wait for all 3 events to be processed by MainEventBusProcessor
681681
CountDownLatch processingLatch = new CountDownLatch(3);
682-
io.a2a.server.events.MainEventBusProcessor.setCallback(new io.a2a.server.events.MainEventBusProcessorCallback() {
682+
mainEventBusProcessor.setCallback(new io.a2a.server.events.MainEventBusProcessorCallback() {
683683
@Override
684684
public void onEventProcessed(String taskId, io.a2a.spec.Event event) {
685685
processingLatch.countDown();
@@ -799,7 +799,7 @@ public void onComplete() {
799799
assertEquals(1, curr.getArtifacts().get(0).parts().size());
800800
assertEquals("text", ((TextPart)curr.getArtifacts().get(0).parts().get(0)).getText());
801801
} finally {
802-
io.a2a.server.events.MainEventBusProcessor.setCallback(null);
802+
mainEventBusProcessor.setCallback(null);
803803
}
804804
}
805805

@@ -1267,7 +1267,7 @@ public void testOnMessageSendTaskIdMismatch() {
12671267
public void testOnMessageStreamTaskIdMismatch() throws InterruptedException {
12681268
// Setup callback to wait for the 1 event to be processed by MainEventBusProcessor
12691269
CountDownLatch processingLatch = new CountDownLatch(1);
1270-
io.a2a.server.events.MainEventBusProcessor.setCallback(new io.a2a.server.events.MainEventBusProcessorCallback() {
1270+
mainEventBusProcessor.setCallback(new io.a2a.server.events.MainEventBusProcessorCallback() {
12711271
@Override
12721272
public void onEventProcessed(String taskId, io.a2a.spec.Event event) {
12731273
processingLatch.countDown();
@@ -1332,7 +1332,7 @@ public void onComplete() {
13321332
Assertions.assertEquals(1, results.size());
13331333
Assertions.assertInstanceOf(InternalError.class, results.get(0).getError());
13341334
} finally {
1335-
io.a2a.server.events.MainEventBusProcessor.setCallback(null);
1335+
mainEventBusProcessor.setCallback(null);
13361336
}
13371337
}
13381338

0 commit comments

Comments
 (0)