Skip to content

Commit 35df99e

Browse files
committed
Better logging and get server transport tests working
1 parent b852721 commit 35df99e

File tree

6 files changed

+193
-143
lines changed

6 files changed

+193
-143
lines changed

server-common/src/main/java/io/a2a/server/events/EventQueue.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,14 @@ public EventQueue tap() {
294294
return child;
295295
}
296296

297+
/**
298+
* Returns the current number of child queues.
299+
* Useful for debugging and logging event distribution.
300+
*/
301+
public int getChildCount() {
302+
return children.size();
303+
}
304+
297305
@Override
298306
public void enqueueItem(EventQueueItem item) {
299307
// MainQueue must accept events even when closed to support:
@@ -384,10 +392,20 @@ void childClosing(ChildQueue child, boolean immediate) {
384392
*/
385393
void distributeToChildren(EventQueueItem item) {
386394
synchronized (children) {
395+
int childCount = children.size();
396+
if (LOGGER.isDebugEnabled()) {
397+
LOGGER.debug("MainQueue[{}]: Distributing event {} to {} children",
398+
taskId, item.getEvent().getClass().getSimpleName(), childCount);
399+
}
400+
children.forEach(child -> {
401+
LOGGER.debug("MainQueue[{}]: Enqueueing event {} to child queue",
402+
taskId, item.getEvent().getClass().getSimpleName());
403+
child.internalEnqueueItem(item);
404+
});
387405
if (LOGGER.isDebugEnabled()) {
388-
LOGGER.debug("Distributing event to {} children for task {}", children.size(), taskId);
406+
LOGGER.debug("MainQueue[{}]: Completed distribution of {} to {} children",
407+
taskId, item.getEvent().getClass().getSimpleName(), childCount);
389408
}
390-
children.forEach(child -> child.internalEnqueueItem(item));
391409
}
392410
}
393411

server-common/src/main/java/io/a2a/server/events/MainEventBus.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,11 @@ public void submit(String taskId, EventQueue eventQueue, EventQueueItem item) {
2929
}
3030

3131
public MainEventBusContext take() throws InterruptedException {
32-
return queue.take();
32+
LOGGER.debug("MainEventBus: Waiting to take event (current queue size: {})...", queue.size());
33+
MainEventBusContext context = queue.take();
34+
LOGGER.debug("MainEventBus: Took event for task {} (remaining queue size: {})",
35+
context.taskId(), queue.size());
36+
return context;
3337
}
3438

3539
public int size() {

server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,10 @@ public void run() {
9999
LOGGER.info("MainEventBusProcessor processing loop started");
100100
while (running) {
101101
try {
102+
LOGGER.debug("MainEventBusProcessor: Waiting for event from MainEventBus...");
102103
MainEventBusContext context = eventBus.take();
104+
LOGGER.debug("MainEventBusProcessor: Retrieved event for task {} from MainEventBus",
105+
context.taskId());
103106
processEvent(context);
104107
} catch (InterruptedException e) {
105108
Thread.currentThread().interrupt();
@@ -118,7 +121,8 @@ private void processEvent(MainEventBusContext context) {
118121
Event event = context.eventQueueItem().getEvent();
119122
EventQueue eventQueue = context.eventQueue();
120123

121-
LOGGER.debug("Processing event for task {}: {}", taskId, event.getClass().getSimpleName());
124+
LOGGER.debug("MainEventBusProcessor: Processing event for task {}: {} (queue type: {})",
125+
taskId, event.getClass().getSimpleName(), eventQueue.getClass().getSimpleName());
122126

123127
// Step 1: Update TaskStore FIRST (persistence before clients see it)
124128
updateTaskStore(taskId, event);
@@ -128,14 +132,17 @@ private void processEvent(MainEventBusContext context) {
128132

129133
// Step 3: Then distribute to ChildQueues (clients see it AFTER persistence + notification)
130134
if (eventQueue instanceof EventQueue.MainQueue mainQueue) {
135+
int childCount = mainQueue.getChildCount();
136+
LOGGER.debug("MainEventBusProcessor: Distributing event to {} children for task {}", childCount, taskId);
131137
mainQueue.distributeToChildren(context.eventQueueItem());
132-
LOGGER.debug("Distributed event to children for task {}", taskId);
138+
LOGGER.debug("MainEventBusProcessor: Distributed event {} to {} children for task {}",
139+
event.getClass().getSimpleName(), childCount, taskId);
133140
} else {
134-
LOGGER.warn("Expected MainQueue but got {} for task {}",
141+
LOGGER.warn("MainEventBusProcessor: Expected MainQueue but got {} for task {}",
135142
eventQueue.getClass().getSimpleName(), taskId);
136143
}
137144

138-
LOGGER.debug("Completed processing event for task {}", taskId);
145+
LOGGER.debug("MainEventBusProcessor: Completed processing event for task {}", taskId);
139146

140147
// Step 4: Notify callback after all processing is complete
141148
callback.onEventProcessed(taskId, event);

server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -760,14 +760,23 @@ private CompletableFuture<Void> cleanupProducer(CompletableFuture<Void> agentFut
760760
LOGGER.debug("Agent and consumption both completed successfully for task {}", taskId);
761761
}
762762

763-
// Always close the ChildQueue and notify the parent MainQueue
764-
// The parent will close itself when all children are closed (childClosing logic)
765-
// This ensures proper cleanup and removal from QueueManager map
766-
LOGGER.debug("{} call, closing ChildQueue for task {} (immediate=false, notifyParent=true)",
767-
isStreaming ? "Streaming" : "Non-streaming", taskId);
768-
769-
// Always notify parent so MainQueue can clean up when last child closes
770-
queue.close(false, true);
763+
if (isStreaming) {
764+
// For streaming: DON'T close the ChildQueue here
765+
// The ChildQueue must stay open so MainEventBusProcessor can distribute events to it
766+
// and the subscriber can consume them. EventConsumer will close the queue when:
767+
// 1. A final event is detected, or
768+
// 2. The subscriber cancels, or
769+
// 3. EventConsumer completes naturally
770+
LOGGER.debug("Streaming call, NOT closing ChildQueue in cleanup for task {} (EventConsumer will close it)", taskId);
771+
} else {
772+
// For non-streaming: close the ChildQueue and notify the parent MainQueue
773+
// The parent will close itself when all children are closed (childClosing logic)
774+
// This ensures proper cleanup and removal from QueueManager map
775+
LOGGER.debug("Non-streaming call, closing ChildQueue for task {} (immediate=false, notifyParent=true)", taskId);
776+
777+
// Always notify parent so MainQueue can clean up when last child closes
778+
queue.close(false, true);
779+
}
771780

772781
// For replicated environments, the poison pill is now sent via CDI events
773782
// When JpaDatabaseTaskStore.save() persists a final task, it fires TaskFinalizedEvent

transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcHandlerTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ public void testPushNotificationsNotSupportedError() throws Exception {
269269
public void testOnGetPushNotificationNoPushNotifierConfig() throws Exception {
270270
// Create request handler without a push notifier
271271
DefaultRequestHandler requestHandler =
272-
new DefaultRequestHandler(executor, taskStore, queueManager, null, null, internalExecutor);
272+
new DefaultRequestHandler(executor, taskStore, queueManager,null, internalExecutor);
273273
AgentCard card = AbstractA2ARequestHandlerTest.createAgentCard(false, true, false);
274274
GrpcHandler handler = new TestGrpcHandler(card, requestHandler, internalExecutor);
275275
String NAME = "tasks/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId() + "/pushNotificationConfigs/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId();
@@ -281,7 +281,7 @@ public void testOnGetPushNotificationNoPushNotifierConfig() throws Exception {
281281
public void testOnSetPushNotificationNoPushNotifierConfig() throws Exception {
282282
// Create request handler without a push notifier
283283
DefaultRequestHandler requestHandler = DefaultRequestHandler.create(
284-
executor, taskStore, queueManager, null, null, internalExecutor);
284+
executor, taskStore, queueManager, null, internalExecutor);
285285
AgentCard card = AbstractA2ARequestHandlerTest.createAgentCard(false, true, false);
286286
GrpcHandler handler = new TestGrpcHandler(card, requestHandler, internalExecutor);
287287
String NAME = "tasks/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId() + "/pushNotificationConfigs/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId();
@@ -656,7 +656,7 @@ public void testListPushNotificationConfigNotSupported() throws Exception {
656656
@Test
657657
public void testListPushNotificationConfigNoPushConfigStore() {
658658
DefaultRequestHandler requestHandler = DefaultRequestHandler.create(
659-
executor, taskStore, queueManager, null, null, internalExecutor);
659+
executor, taskStore, queueManager, null, internalExecutor);
660660
GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor);
661661
taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK);
662662
agentExecutorExecute = (context, eventQueue) -> {
@@ -729,7 +729,7 @@ public void testDeletePushNotificationConfigNotSupported() throws Exception {
729729
@Test
730730
public void testDeletePushNotificationConfigNoPushConfigStore() {
731731
DefaultRequestHandler requestHandler = DefaultRequestHandler.create(
732-
executor, taskStore, queueManager, null, null, internalExecutor);
732+
executor, taskStore, queueManager, null, internalExecutor);
733733
GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor);
734734
String NAME = "tasks/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId() + "/pushNotificationConfigs/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId();
735735
DeleteTaskPushNotificationConfigRequest request = DeleteTaskPushNotificationConfigRequest.newBuilder()

0 commit comments

Comments
 (0)