Skip to content

Commit 54c02c8

Browse files
committed
Debug logging
1 parent 881bdbe commit 54c02c8

File tree

3 files changed

+40
-10
lines changed

3 files changed

+40
-10
lines changed

server-common/src/main/java/io/a2a/server/events/EventConsumer.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,15 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
5959
EventQueueItem item;
6060
Event event;
6161
try {
62+
LOGGER.debug("EventConsumer polling queue {} (error={})", System.identityHashCode(queue), error);
6263
item = queue.dequeueEventItem(QUEUE_WAIT_MILLISECONDS);
6364
if (item == null) {
65+
LOGGER.debug("EventConsumer poll timeout (null item), continuing");
6466
continue;
6567
}
6668
event = item.getEvent();
69+
LOGGER.debug("EventConsumer received event: {} (queue={})",
70+
event.getClass().getSimpleName(), System.identityHashCode(queue));
6771

6872
// Defensive logging for error handling
6973
if (event instanceof Throwable thr) {
@@ -125,8 +129,13 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
125129

126130
public EnhancedRunnable.DoneCallback createAgentRunnableDoneCallback() {
127131
return agentRunnable -> {
132+
LOGGER.info("EventConsumer: Agent done callback invoked (hasError={}, queue={})",
133+
agentRunnable.getError() != null, System.identityHashCode(queue));
128134
if (agentRunnable.getError() != null) {
129135
error = agentRunnable.getError();
136+
LOGGER.info("EventConsumer: Set error field from agent callback");
137+
} else {
138+
LOGGER.info("EventConsumer: Agent completed successfully (no error), continuing consumption");
130139
}
131140
};
132141
}

server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,17 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
374374

375375
boolean blocking = params.configuration() != null && Boolean.TRUE.equals(params.configuration().blocking());
376376

377+
// Log blocking behavior from client request
378+
if (params.configuration() != null && params.configuration().blocking() != null) {
379+
LOGGER.info("DefaultRequestHandler: Client requested blocking={} for task {}",
380+
params.configuration().blocking(), taskId);
381+
} else if (params.configuration() != null) {
382+
LOGGER.info("DefaultRequestHandler: Client sent configuration but blocking=null, using default blocking=true for task {}", taskId);
383+
} else {
384+
LOGGER.info("DefaultRequestHandler: Client sent no configuration, using default blocking=true for task {}", taskId);
385+
}
386+
LOGGER.info("DefaultRequestHandler: Final blocking decision: {} for task {}", blocking, taskId);
387+
377388
boolean interruptedOrNonBlocking = false;
378389

379390
EnhancedRunnable producerRunnable = registerAndExecuteAgentAsync(taskId, mss.requestContext, queue);
@@ -395,7 +406,8 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
395406
throw new InternalError("No result");
396407
}
397408
interruptedOrNonBlocking = etai.interrupted();
398-
LOGGER.debug("Was interrupted or non-blocking: {}", interruptedOrNonBlocking);
409+
LOGGER.info("DefaultRequestHandler: interruptedOrNonBlocking={} (blocking={}, eventType={})",
410+
interruptedOrNonBlocking, blocking, kind != null ? kind.getClass().getSimpleName() : null);
399411

400412
// For blocking calls that were interrupted (returned on first event),
401413
// wait for agent execution and event processing BEFORE returning to client.
@@ -419,29 +431,31 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
419431
// 2. Close the queue to signal consumption can complete
420432
// 3. Wait for consumption to finish processing events
421433
// 4. Fetch final task state from TaskStore
434+
LOGGER.debug("DefaultRequestHandler: Entering blocking fire-and-forget handling for task {}", taskId);
422435

423436
try {
424437
// Step 1: Wait for agent to finish (with configurable timeout)
425438
if (agentFuture != null) {
426439
try {
427440
agentFuture.get(agentCompletionTimeoutSeconds, SECONDS);
428-
LOGGER.debug("Agent completed for task {}", taskId);
441+
LOGGER.debug("DefaultRequestHandler: Step 1 - Agent completed for task {}", taskId);
429442
} catch (java.util.concurrent.TimeoutException e) {
430443
// Agent still running after timeout - that's fine, events already being processed
431-
LOGGER.debug("Agent still running for task {} after {}s", taskId, agentCompletionTimeoutSeconds);
444+
LOGGER.debug("DefaultRequestHandler: Step 1 - Agent still running for task {} after {}s timeout",
445+
taskId, agentCompletionTimeoutSeconds);
432446
}
433447
}
434448

435449
// Step 2: Close the queue to signal consumption can complete
436450
// For fire-and-forget tasks, there's no final event, so we need to close the queue
437451
// This allows EventConsumer.consumeAll() to exit
438452
queue.close(false, false); // graceful close, don't notify parent yet
439-
LOGGER.debug("Closed queue for task {} to allow consumption completion", taskId);
453+
LOGGER.debug("DefaultRequestHandler: Step 2 - Closed queue for task {} to allow consumption completion", taskId);
440454

441455
// Step 3: Wait for consumption to complete (now that queue is closed)
442456
if (etai.consumptionFuture() != null) {
443457
etai.consumptionFuture().get(consumptionCompletionTimeoutSeconds, SECONDS);
444-
LOGGER.debug("Consumption completed for task {}", taskId);
458+
LOGGER.debug("DefaultRequestHandler: Step 3 - Consumption completed for task {}", taskId);
445459
}
446460
} catch (InterruptedException e) {
447461
Thread.currentThread().interrupt();
@@ -464,11 +478,11 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
464478
Task updatedTask = taskStore.get(nonNullTaskId);
465479
if (updatedTask != null) {
466480
kind = updatedTask;
467-
if (LOGGER.isDebugEnabled()) {
468-
LOGGER.debug("Fetched final task for {} with state {} and {} artifacts",
469-
nonNullTaskId, updatedTask.status().state(),
470-
updatedTask.artifacts().size());
471-
}
481+
LOGGER.debug("DefaultRequestHandler: Step 4 - Fetched final task for {} with state {} and {} artifacts",
482+
taskId, updatedTask.status().state(),
483+
updatedTask.artifacts().size());
484+
} else {
485+
LOGGER.warn("DefaultRequestHandler: Step 4 - Task {} not found in TaskStore!", taskId);
472486
}
473487
}
474488
if (kind instanceof Task taskResult && !taskId.equals(taskResult.id())) {

server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,9 @@ public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer,
152152
boolean isAuthRequired = (event instanceof Task task && task.status().state() == TaskState.AUTH_REQUIRED)
153153
|| (event instanceof TaskStatusUpdateEvent tsue && tsue.status().state() == TaskState.AUTH_REQUIRED);
154154

155+
LOGGER.info("ResultAggregator: Evaluating interrupt (blocking={}, isFinal={}, isAuth={}, eventType={})",
156+
blocking, isFinalEvent, isAuthRequired, event.getClass().getSimpleName());
157+
155158
// Always interrupt on auth_required, as it needs external action.
156159
if (isAuthRequired) {
157160
// auth-required is a special state: the message should be
@@ -161,23 +164,27 @@ public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer,
161164
// new request is expected in order for the agent to make progress,
162165
// so the agent should exit.
163166
shouldInterrupt = true;
167+
LOGGER.info("ResultAggregator: Setting shouldInterrupt=true (AUTH_REQUIRED)");
164168
}
165169
else if (!blocking) {
166170
// For non-blocking calls, interrupt as soon as a task is available.
167171
shouldInterrupt = true;
172+
LOGGER.info("ResultAggregator: Setting shouldInterrupt=true (non-blocking)");
168173
}
169174
else if (blocking) {
170175
// For blocking calls: Interrupt to free Vert.x thread, but continue in background
171176
// Python's async consumption doesn't block threads, but Java's does
172177
// So we interrupt to return quickly, then rely on background consumption
173178
shouldInterrupt = true;
179+
LOGGER.info("ResultAggregator: Setting shouldInterrupt=true (blocking, isFinal={})", isFinalEvent);
174180
if (LOGGER.isDebugEnabled()) {
175181
LOGGER.debug("Blocking call for task {}: {} event, returning with background consumption",
176182
taskIdForLogging(), isFinalEvent ? "final" : "non-final");
177183
}
178184
}
179185

180186
if (shouldInterrupt) {
187+
LOGGER.info("ResultAggregator: Interrupting consumption (setting interrupted=true)");
181188
// Complete the future to unblock the main thread
182189
interrupted.set(true);
183190
completionFuture.complete(null);

0 commit comments

Comments
 (0)