Skip to content

Commit 716f3c6

Browse files
committed
Debug logging
1 parent ee9a9ea commit 716f3c6

File tree

3 files changed

+40
-10
lines changed

3 files changed

+40
-10
lines changed

server-common/src/main/java/io/a2a/server/events/EventConsumer.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,15 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
6060
EventQueueItem item;
6161
Event event;
6262
try {
63+
LOGGER.debug("EventConsumer polling queue {} (error={})", System.identityHashCode(queue), error);
6364
item = queue.dequeueEventItem(QUEUE_WAIT_MILLISECONDS);
6465
if (item == null) {
66+
LOGGER.debug("EventConsumer poll timeout (null item), continuing");
6567
continue;
6668
}
6769
event = item.getEvent();
70+
LOGGER.debug("EventConsumer received event: {} (queue={})",
71+
event.getClass().getSimpleName(), System.identityHashCode(queue));
6872

6973
// Defensive logging for error handling
7074
if (event instanceof Throwable thr) {
@@ -141,8 +145,13 @@ private boolean isStreamTerminatingTask(Task task) {
141145

142146
public EnhancedRunnable.DoneCallback createAgentRunnableDoneCallback() {
143147
return agentRunnable -> {
148+
LOGGER.info("EventConsumer: Agent done callback invoked (hasError={}, queue={})",
149+
agentRunnable.getError() != null, System.identityHashCode(queue));
144150
if (agentRunnable.getError() != null) {
145151
error = agentRunnable.getError();
152+
LOGGER.info("EventConsumer: Set error field from agent callback");
153+
} else {
154+
LOGGER.info("EventConsumer: Agent completed successfully (no error), continuing consumption");
146155
}
147156
};
148157
}

server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,17 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
393393

394394
boolean blocking = params.configuration() != null && Boolean.TRUE.equals(params.configuration().blocking());
395395

396+
// Log blocking behavior from client request
397+
if (params.configuration() != null && params.configuration().blocking() != null) {
398+
LOGGER.info("DefaultRequestHandler: Client requested blocking={} for task {}",
399+
params.configuration().blocking(), taskId);
400+
} else if (params.configuration() != null) {
401+
LOGGER.info("DefaultRequestHandler: Client sent configuration but blocking=null, using default blocking=true for task {}", taskId);
402+
} else {
403+
LOGGER.info("DefaultRequestHandler: Client sent no configuration, using default blocking=true for task {}", taskId);
404+
}
405+
LOGGER.info("DefaultRequestHandler: Final blocking decision: {} for task {}", blocking, taskId);
406+
396407
boolean interruptedOrNonBlocking = false;
397408

398409
EnhancedRunnable producerRunnable = registerAndExecuteAgentAsync(taskId, mss.requestContext, queue);
@@ -414,7 +425,8 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
414425
throw new InternalError("No result");
415426
}
416427
interruptedOrNonBlocking = etai.interrupted();
417-
LOGGER.debug("Was interrupted or non-blocking: {}", interruptedOrNonBlocking);
428+
LOGGER.info("DefaultRequestHandler: interruptedOrNonBlocking={} (blocking={}, eventType={})",
429+
interruptedOrNonBlocking, blocking, kind != null ? kind.getClass().getSimpleName() : null);
418430

419431
// For blocking calls that were interrupted (returned on first event),
420432
// wait for agent execution and event processing BEFORE returning to client.
@@ -438,29 +450,31 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
438450
// 2. Close the queue to signal consumption can complete
439451
// 3. Wait for consumption to finish processing events
440452
// 4. Fetch final task state from TaskStore
453+
LOGGER.debug("DefaultRequestHandler: Entering blocking fire-and-forget handling for task {}", taskId);
441454

442455
try {
443456
// Step 1: Wait for agent to finish (with configurable timeout)
444457
if (agentFuture != null) {
445458
try {
446459
agentFuture.get(agentCompletionTimeoutSeconds, SECONDS);
447-
LOGGER.debug("Agent completed for task {}", taskId);
460+
LOGGER.debug("DefaultRequestHandler: Step 1 - Agent completed for task {}", taskId);
448461
} catch (java.util.concurrent.TimeoutException e) {
449462
// Agent still running after timeout - that's fine, events already being processed
450-
LOGGER.debug("Agent still running for task {} after {}s", taskId, agentCompletionTimeoutSeconds);
463+
LOGGER.debug("DefaultRequestHandler: Step 1 - Agent still running for task {} after {}s timeout",
464+
taskId, agentCompletionTimeoutSeconds);
451465
}
452466
}
453467

454468
// Step 2: Close the queue to signal consumption can complete
455469
// For fire-and-forget tasks, there's no final event, so we need to close the queue
456470
// This allows EventConsumer.consumeAll() to exit
457471
queue.close(false, false); // graceful close, don't notify parent yet
458-
LOGGER.debug("Closed queue for task {} to allow consumption completion", taskId);
472+
LOGGER.debug("DefaultRequestHandler: Step 2 - Closed queue for task {} to allow consumption completion", taskId);
459473

460474
// Step 3: Wait for consumption to complete (now that queue is closed)
461475
if (etai.consumptionFuture() != null) {
462476
etai.consumptionFuture().get(consumptionCompletionTimeoutSeconds, SECONDS);
463-
LOGGER.debug("Consumption completed for task {}", taskId);
477+
LOGGER.debug("DefaultRequestHandler: Step 3 - Consumption completed for task {}", taskId);
464478
}
465479
} catch (InterruptedException e) {
466480
Thread.currentThread().interrupt();
@@ -483,11 +497,11 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
483497
Task updatedTask = taskStore.get(nonNullTaskId);
484498
if (updatedTask != null) {
485499
kind = updatedTask;
486-
if (LOGGER.isDebugEnabled()) {
487-
LOGGER.debug("Fetched final task for {} with state {} and {} artifacts",
488-
nonNullTaskId, updatedTask.status().state(),
489-
updatedTask.artifacts().size());
490-
}
500+
LOGGER.debug("DefaultRequestHandler: Step 4 - Fetched final task for {} with state {} and {} artifacts",
501+
taskId, updatedTask.status().state(),
502+
updatedTask.artifacts().size());
503+
} else {
504+
LOGGER.warn("DefaultRequestHandler: Step 4 - Task {} not found in TaskStore!", taskId);
491505
}
492506
}
493507
if (kind instanceof Task taskResult && !taskId.equals(taskResult.id())) {

server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,9 @@ public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer,
152152
boolean isAuthRequired = (event instanceof Task task && task.status().state() == TaskState.AUTH_REQUIRED)
153153
|| (event instanceof TaskStatusUpdateEvent tsue && tsue.status().state() == TaskState.AUTH_REQUIRED);
154154

155+
LOGGER.info("ResultAggregator: Evaluating interrupt (blocking={}, isFinal={}, isAuth={}, eventType={})",
156+
blocking, isFinalEvent, isAuthRequired, event.getClass().getSimpleName());
157+
155158
// Always interrupt on auth_required, as it needs external action.
156159
if (isAuthRequired) {
157160
// auth-required is a special state: the message should be
@@ -161,23 +164,27 @@ public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer,
161164
// new request is expected in order for the agent to make progress,
162165
// so the agent should exit.
163166
shouldInterrupt = true;
167+
LOGGER.info("ResultAggregator: Setting shouldInterrupt=true (AUTH_REQUIRED)");
164168
}
165169
else if (!blocking) {
166170
// For non-blocking calls, interrupt as soon as a task is available.
167171
shouldInterrupt = true;
172+
LOGGER.info("ResultAggregator: Setting shouldInterrupt=true (non-blocking)");
168173
}
169174
else if (blocking) {
170175
// For blocking calls: Interrupt to free Vert.x thread, but continue in background
171176
// Python's async consumption doesn't block threads, but Java's does
172177
// So we interrupt to return quickly, then rely on background consumption
173178
shouldInterrupt = true;
179+
LOGGER.info("ResultAggregator: Setting shouldInterrupt=true (blocking, isFinal={})", isFinalEvent);
174180
if (LOGGER.isDebugEnabled()) {
175181
LOGGER.debug("Blocking call for task {}: {} event, returning with background consumption",
176182
taskIdForLogging(), isFinalEvent ? "final" : "non-final");
177183
}
178184
}
179185

180186
if (shouldInterrupt) {
187+
LOGGER.info("ResultAggregator: Interrupting consumption (setting interrupted=true)");
181188
// Complete the future to unblock the main thread
182189
interrupted.set(true);
183190
completionFuture.complete(null);

0 commit comments

Comments
 (0)