Skip to content

Commit 757312d

Browse files
committed
Debug logging
1 parent e692719 commit 757312d

File tree

3 files changed

+40
-10
lines changed

3 files changed

+40
-10
lines changed

server-common/src/main/java/io/a2a/server/events/EventConsumer.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,15 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
5858
EventQueueItem item;
5959
Event event;
6060
try {
61+
LOGGER.debug("EventConsumer polling queue {} (error={})", System.identityHashCode(queue), error);
6162
item = queue.dequeueEventItem(QUEUE_WAIT_MILLISECONDS);
6263
if (item == null) {
64+
LOGGER.debug("EventConsumer poll timeout (null item), continuing");
6365
continue;
6466
}
6567
event = item.getEvent();
68+
LOGGER.debug("EventConsumer received event: {} (queue={})",
69+
event.getClass().getSimpleName(), System.identityHashCode(queue));
6670

6771
// Defensive logging for error handling
6872
if (event instanceof Throwable thr) {
@@ -124,8 +128,13 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
124128

125129
public EnhancedRunnable.DoneCallback createAgentRunnableDoneCallback() {
126130
return agentRunnable -> {
131+
LOGGER.info("EventConsumer: Agent done callback invoked (hasError={}, queue={})",
132+
agentRunnable.getError() != null, System.identityHashCode(queue));
127133
if (agentRunnable.getError() != null) {
128134
error = agentRunnable.getError();
135+
LOGGER.info("EventConsumer: Set error field from agent callback");
136+
} else {
137+
LOGGER.info("EventConsumer: Agent completed successfully (no error), continuing consumption");
129138
}
130139
};
131140
}

server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,17 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
271271
blocking = false;
272272
}
273273

274+
// Log blocking behavior from client request
275+
if (params.configuration() != null && params.configuration().blocking() != null) {
276+
LOGGER.info("DefaultRequestHandler: Client requested blocking={} for task {}",
277+
params.configuration().blocking(), taskId);
278+
} else if (params.configuration() != null) {
279+
LOGGER.info("DefaultRequestHandler: Client sent configuration but blocking=null, using default blocking=true for task {}", taskId);
280+
} else {
281+
LOGGER.info("DefaultRequestHandler: Client sent no configuration, using default blocking=true for task {}", taskId);
282+
}
283+
LOGGER.info("DefaultRequestHandler: Final blocking decision: {} for task {}", blocking, taskId);
284+
274285
boolean interruptedOrNonBlocking = false;
275286

276287
EnhancedRunnable producerRunnable = registerAndExecuteAgentAsync(taskId, mss.requestContext, queue);
@@ -292,7 +303,8 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
292303
throw new InternalError("No result");
293304
}
294305
interruptedOrNonBlocking = etai.interrupted();
295-
LOGGER.debug("Was interrupted or non-blocking: {}", interruptedOrNonBlocking);
306+
LOGGER.info("DefaultRequestHandler: interruptedOrNonBlocking={} (blocking={}, eventType={})",
307+
interruptedOrNonBlocking, blocking, kind != null ? kind.getClass().getSimpleName() : null);
296308

297309
// For blocking calls that were interrupted (returned on first event),
298310
// wait for agent execution and event processing BEFORE returning to client.
@@ -316,29 +328,31 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
316328
// 2. Close the queue to signal consumption can complete
317329
// 3. Wait for consumption to finish processing events
318330
// 4. Fetch final task state from TaskStore
331+
LOGGER.info("DefaultRequestHandler: Entering blocking fire-and-forget handling for task {}", taskId);
319332

320333
try {
321334
// Step 1: Wait for agent to finish (with configurable timeout)
322335
if (agentFuture != null) {
323336
try {
324337
agentFuture.get(agentCompletionTimeoutSeconds, SECONDS);
325-
LOGGER.debug("Agent completed for task {}", taskId);
338+
LOGGER.info("DefaultRequestHandler: Step 1 - Agent completed for task {}", taskId);
326339
} catch (java.util.concurrent.TimeoutException e) {
327340
// Agent still running after timeout - that's fine, events already being processed
328-
LOGGER.debug("Agent still running for task {} after {}s", taskId, agentCompletionTimeoutSeconds);
341+
LOGGER.info("DefaultRequestHandler: Step 1 - Agent still running for task {} after {}s timeout",
342+
taskId, agentCompletionTimeoutSeconds);
329343
}
330344
}
331345

332346
// Step 2: Close the queue to signal consumption can complete
333347
// For fire-and-forget tasks, there's no final event, so we need to close the queue
334348
// This allows EventConsumer.consumeAll() to exit
335349
queue.close(false, false); // graceful close, don't notify parent yet
336-
LOGGER.debug("Closed queue for task {} to allow consumption completion", taskId);
350+
LOGGER.info("DefaultRequestHandler: Step 2 - Closed queue for task {} to allow consumption completion", taskId);
337351

338352
// Step 3: Wait for consumption to complete (now that queue is closed)
339353
if (etai.consumptionFuture() != null) {
340354
etai.consumptionFuture().get(consumptionCompletionTimeoutSeconds, SECONDS);
341-
LOGGER.debug("Consumption completed for task {}", taskId);
355+
LOGGER.info("DefaultRequestHandler: Step 3 - Consumption completed for task {}", taskId);
342356
}
343357
} catch (InterruptedException e) {
344358
Thread.currentThread().interrupt();
@@ -359,11 +373,11 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
359373
Task updatedTask = taskStore.get(taskId);
360374
if (updatedTask != null) {
361375
kind = updatedTask;
362-
if (LOGGER.isDebugEnabled()) {
363-
LOGGER.debug("Fetched final task for {} with state {} and {} artifacts",
364-
taskId, updatedTask.getStatus().state(),
365-
updatedTask.getArtifacts().size());
366-
}
376+
LOGGER.info("DefaultRequestHandler: Step 4 - Fetched final task for {} with state {} and {} artifacts",
377+
taskId, updatedTask.getStatus().state(),
378+
updatedTask.getArtifacts().size());
379+
} else {
380+
LOGGER.warn("DefaultRequestHandler: Step 4 - Task {} not found in TaskStore!", taskId);
367381
}
368382
}
369383
if (kind instanceof Task taskResult && !taskId.equals(taskResult.getId())) {

server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,9 @@ public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer,
145145
boolean isAuthRequired = (event instanceof Task task && task.getStatus().state() == TaskState.AUTH_REQUIRED)
146146
|| (event instanceof TaskStatusUpdateEvent tsue && tsue.getStatus().state() == TaskState.AUTH_REQUIRED);
147147

148+
LOGGER.info("ResultAggregator: Evaluating interrupt (blocking={}, isFinal={}, isAuth={}, eventType={})",
149+
blocking, isFinalEvent, isAuthRequired, event.getClass().getSimpleName());
150+
148151
// Always interrupt on auth_required, as it needs external action.
149152
if (isAuthRequired) {
150153
// auth-required is a special state: the message should be
@@ -154,23 +157,27 @@ public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer,
154157
// new request is expected in order for the agent to make progress,
155158
// so the agent should exit.
156159
shouldInterrupt = true;
160+
LOGGER.info("ResultAggregator: Setting shouldInterrupt=true (AUTH_REQUIRED)");
157161
}
158162
else if (!blocking) {
159163
// For non-blocking calls, interrupt as soon as a task is available.
160164
shouldInterrupt = true;
165+
LOGGER.info("ResultAggregator: Setting shouldInterrupt=true (non-blocking)");
161166
}
162167
else if (blocking) {
163168
// For blocking calls: Interrupt to free Vert.x thread, but continue in background
164169
// Python's async consumption doesn't block threads, but Java's does
165170
// So we interrupt to return quickly, then rely on background consumption
166171
shouldInterrupt = true;
172+
LOGGER.info("ResultAggregator: Setting shouldInterrupt=true (blocking, isFinal={})", isFinalEvent);
167173
if (LOGGER.isDebugEnabled()) {
168174
LOGGER.debug("Blocking call for task {}: {} event, returning with background consumption",
169175
taskIdForLogging(), isFinalEvent ? "final" : "non-final");
170176
}
171177
}
172178

173179
if (shouldInterrupt) {
180+
LOGGER.info("ResultAggregator: Interrupting consumption (setting interrupted=true)");
174181
// Complete the future to unblock the main thread
175182
interrupted.set(true);
176183
completionFuture.complete(null);

0 commit comments

Comments
 (0)