Skip to content

Commit c97d236

Browse files
kabirclaude
andcommitted
fix: Wait for agent completion and ensure all events processed in blocking calls
Fixes race condition where DefaultRequestHandler.onMessageSend() returns before all task events are fully processed and persisted to TaskStore, resulting in incomplete Task objects being returned to clients (missing artifacts, incorrect state). Root Cause: - Blocking calls interrupted immediately after first event and returned to client before background event consumption completed - Agent execution and event processing happened asynchronously in background - No synchronization to ensure all events were consumed and persisted before returning Task to client Solution (4-step process): 1. Wait for agent to finish enqueueing events (5s timeout) 2. Close the queue to signal consumption can complete (breaks dependency) 3. Wait for consumption to finish processing events (2s timeout) 4. Fetch final task state from TaskStore (has all artifacts and correct state) This ensures blocking calls return complete Task objects with all artifacts and correct state, including support for fire-and-forget tasks that never emit final state events. Added unit tests: - testBlockingFireAndForgetReturnsNonFinalTask: Validates fire-and-forget pattern - testBlockingCallReturnsCompleteTaskWithArtifacts: Ensures all artifacts included 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent f36db5e commit c97d236

File tree

4 files changed

+254
-40
lines changed

4 files changed

+254
-40
lines changed

server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java

Lines changed: 66 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@
6363
public class DefaultRequestHandler implements RequestHandler {
6464

6565
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultRequestHandler.class);
66+
private static final int AGENT_COMPLETION_TIMEOUT_SECONDS = 5;
67+
private static final int CONSUMPTION_COMPLETION_TIMEOUT_SECONDS = 2;
6668

6769
private final AgentExecutor agentExecutor;
6870
private final TaskStore taskStore;
@@ -192,6 +194,7 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
192194

193195
EnhancedRunnable producerRunnable = registerAndExecuteAgentAsync(taskId, mss.requestContext, queue);
194196
ResultAggregator.EventTypeAndInterrupt etai = null;
197+
EventKind kind = null; // Declare outside try block so it's in scope for return
195198
try {
196199
// Create callback for push notifications during background event processing
197200
Runnable pushNotificationCallback = () -> sendPushNotification(taskId, resultAggregator);
@@ -201,7 +204,10 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
201204
// This callback must be added before we start consuming. Otherwise,
202205
// any errors thrown by the producerRunnable are not picked up by the consumer
203206
producerRunnable.addDoneCallback(consumer.createAgentRunnableDoneCallback());
204-
etai = resultAggregator.consumeAndBreakOnInterrupt(consumer, blocking, pushNotificationCallback);
207+
208+
// Get agent future before consuming (for blocking calls to wait for agent completion)
209+
CompletableFuture<Void> agentFuture = runningAgents.get(taskId);
210+
etai = resultAggregator.consumeAndBreakOnInterrupt(consumer, blocking, pushNotificationCallback, agentFuture);
205211

206212
if (etai == null) {
207213
LOGGER.debug("No result, throwing InternalError");
@@ -210,7 +216,63 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
210216
interruptedOrNonBlocking = etai.interrupted();
211217
LOGGER.debug("Was interrupted or non-blocking: {}", interruptedOrNonBlocking);
212218

213-
EventKind kind = etai.eventType();
219+
// For blocking calls that were interrupted (returned on first event),
220+
// wait for agent execution and event processing BEFORE returning to client.
221+
// This ensures the returned Task has all artifacts and current state.
222+
// We do this HERE (not in ResultAggregator) to avoid blocking Vert.x worker threads
223+
// during the consumption loop itself.
224+
kind = etai.eventType();
225+
if (blocking && interruptedOrNonBlocking) {
226+
// For blocking calls: ensure all events are processed before returning
227+
// Order of operations is critical to avoid circular dependency:
228+
// 1. Wait for agent to finish enqueueing events
229+
// 2. Close the queue to signal consumption can complete
230+
// 3. Wait for consumption to finish processing events
231+
// 4. Fetch final task state from TaskStore
232+
233+
try {
234+
// Step 1: Wait for agent to finish (with short timeout for fast agents)
235+
if (agentFuture != null) {
236+
try {
237+
agentFuture.get(AGENT_COMPLETION_TIMEOUT_SECONDS, java.util.concurrent.TimeUnit.SECONDS);
238+
LOGGER.debug("Agent completed for task {}", taskId);
239+
} catch (java.util.concurrent.TimeoutException e) {
240+
// Agent still running after timeout - that's fine, events already being processed
241+
LOGGER.debug("Agent still running for task {} after {}s", taskId, AGENT_COMPLETION_TIMEOUT_SECONDS);
242+
}
243+
}
244+
245+
// Step 2: Close the queue to signal consumption can complete
246+
// For fire-and-forget tasks, there's no final event, so we need to close the queue
247+
// This allows EventConsumer.consumeAll() to exit
248+
queue.close(false, false); // graceful close, don't notify parent yet
249+
LOGGER.debug("Closed queue for task {} to allow consumption completion", taskId);
250+
251+
// Step 3: Wait for consumption to complete (now that queue is closed)
252+
if (etai.consumptionFuture() != null) {
253+
etai.consumptionFuture().get(CONSUMPTION_COMPLETION_TIMEOUT_SECONDS, java.util.concurrent.TimeUnit.SECONDS);
254+
LOGGER.debug("Consumption completed for task {}", taskId);
255+
}
256+
} catch (InterruptedException e) {
257+
Thread.currentThread().interrupt();
258+
LOGGER.warn("Interrupted waiting for task {} completion", taskId, e);
259+
} catch (java.util.concurrent.ExecutionException e) {
260+
LOGGER.warn("Error during task {} execution", taskId, e.getCause());
261+
} catch (java.util.concurrent.TimeoutException e) {
262+
LOGGER.warn("Timeout waiting for consumption to complete for task {}", taskId);
263+
}
264+
265+
// Step 4: Fetch the final task state from TaskStore (all events have been processed)
266+
Task updatedTask = taskStore.get(taskId);
267+
if (updatedTask != null) {
268+
kind = updatedTask;
269+
if (LOGGER.isDebugEnabled()) {
270+
LOGGER.debug("Fetched final task for {} with state {} and {} artifacts",
271+
taskId, updatedTask.getStatus().state(),
272+
updatedTask.getArtifacts().size());
273+
}
274+
}
275+
}
214276
if (kind instanceof Task taskResult && !taskId.equals(taskResult.getId())) {
215277
throw new InternalError("Task ID mismatch in agent response");
216278
}
@@ -227,8 +289,8 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
227289
trackBackgroundTask(cleanupProducer(agentFuture, etai != null ? etai.consumptionFuture() : null, taskId, queue, false));
228290
}
229291

230-
LOGGER.debug("Returning: {}", etai.eventType());
231-
return etai.eventType();
292+
LOGGER.debug("Returning: {}", kind);
293+
return kind;
232294
}
233295

234296
@Override

server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,20 @@
1111
import java.util.concurrent.atomic.AtomicBoolean;
1212
import java.util.concurrent.atomic.AtomicReference;
1313

14-
import org.slf4j.Logger;
15-
import org.slf4j.LoggerFactory;
16-
1714
import io.a2a.server.events.EventConsumer;
1815
import io.a2a.server.events.EventQueueItem;
1916
import io.a2a.spec.A2AServerException;
2017
import io.a2a.spec.Event;
2118
import io.a2a.spec.EventKind;
19+
import io.a2a.spec.InternalError;
2220
import io.a2a.spec.JSONRPCError;
2321
import io.a2a.spec.Message;
2422
import io.a2a.spec.Task;
2523
import io.a2a.spec.TaskState;
2624
import io.a2a.spec.TaskStatusUpdateEvent;
2725
import io.a2a.util.Utils;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
2828

2929
public class ResultAggregator {
3030
private static final Logger LOGGER = LoggerFactory.getLogger(ResultAggregator.class);
@@ -106,10 +106,14 @@ public EventKind consumeAll(EventConsumer consumer) throws JSONRPCError {
106106
}
107107

108108
public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer, boolean blocking) throws JSONRPCError {
109-
return consumeAndBreakOnInterrupt(consumer, blocking, null);
109+
return consumeAndBreakOnInterrupt(consumer, blocking, null, null);
110110
}
111111

112112
public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer, boolean blocking, Runnable eventCallback) throws JSONRPCError {
113+
return consumeAndBreakOnInterrupt(consumer, blocking, eventCallback, null);
114+
}
115+
116+
public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer, boolean blocking, Runnable eventCallback, CompletableFuture<Void> agentFuture) throws JSONRPCError {
113117
Flow.Publisher<EventQueueItem> allItems = consumer.consumeAll();
114118
AtomicReference<Message> message = new AtomicReference<>();
115119
AtomicBoolean interrupted = new AtomicBoolean(false);
@@ -180,11 +184,11 @@ else if (!blocking) {
180184
shouldInterrupt = true;
181185
continueInBackground = true;
182186
}
183-
else {
184-
// For ALL blocking calls (both final and non-final events), use background consumption
185-
// This ensures all events are processed and persisted to TaskStore in background
186-
// Queue lifecycle is now managed by DefaultRequestHandler.cleanupProducer()
187-
// which waits for BOTH agent and consumption futures before closing queues
187+
else if (blocking) {
188+
// For blocking calls: Interrupt to free Vert.x thread, but continue in background
189+
// Python's async consumption doesn't block threads, but Java's does
190+
// So we interrupt to return quickly, then rely on background consumption
191+
// DefaultRequestHandler will fetch the final state from TaskStore
188192
shouldInterrupt = true;
189193
continueInBackground = true;
190194
if (LOGGER.isDebugEnabled()) {
@@ -198,10 +202,17 @@ else if (!blocking) {
198202
interrupted.set(true);
199203
completionFuture.complete(null);
200204

201-
// Signal that cleanup can proceed while consumption continues in background.
202-
// This prevents infinite hangs for fire-and-forget agents that never emit final events.
203-
// Processing continues (return true below) and all events are still persisted to TaskStore.
204-
consumptionCompletionFuture.complete(null);
205+
// For blocking calls, DON'T complete consumptionCompletionFuture here.
206+
// Let it complete naturally when subscription finishes (onComplete callback below).
207+
// This ensures all events are processed and persisted to TaskStore before
208+
// DefaultRequestHandler.cleanupProducer() proceeds with cleanup.
209+
//
210+
// For non-blocking and auth-required calls, complete immediately to allow
211+
// cleanup to proceed while consumption continues in background.
212+
if (!blocking) {
213+
consumptionCompletionFuture.complete(null);
214+
}
215+
// else: blocking calls wait for actual consumption completion in onComplete
205216

206217
// Continue consuming in background - keep requesting events
207218
// Note: continueInBackground is always true when shouldInterrupt is true
@@ -244,8 +255,8 @@ else if (!blocking) {
244255
}
245256
}
246257

247-
// Background consumption continues automatically via the subscription
248-
// returning true in the consumer function keeps the subscription alive
258+
// Note: For blocking calls that were interrupted, the wait logic has been moved
259+
// to DefaultRequestHandler.onMessageSend() to avoid blocking Vert.x worker threads.
249260
// Queue lifecycle is managed by DefaultRequestHandler.cleanupProducer()
250261

251262
Throwable error = errorRef.get();

0 commit comments

Comments
 (0)