Skip to content

Commit ece9af8

Browse files
committed
fix: Wait for agent completion and ensure all events processed in blocking calls
Fixes race condition where DefaultRequestHandler.onMessageSend() returns before all task events are fully processed and persisted to TaskStore, resulting in incomplete Task objects being returned to clients (missing artifacts, incorrect state). Root Cause: - Blocking calls interrupted immediately after first event and returned to client before background event consumption completed - Agent execution and event processing happened asynchronously in background - No synchronization to ensure all events were consumed and persisted before returning Task to client Solution (4-step process): 1. Wait for agent to finish enqueueing events (5s timeout) 2. Close the queue to signal consumption can complete (breaks dependency) 3. Wait for consumption to finish processing events (2s timeout) 4. Fetch final task state from TaskStore (has all artifacts and correct state) This ensures blocking calls return complete Task objects with all artifacts and correct state, including support for fire-and-forget tasks that never emit final state events. Added unit tests: - testBlockingFireAndForgetReturnsNonFinalTask: Validates fire-and-forget pattern - testBlockingCallReturnsCompleteTaskWithArtifacts: Ensures all artifacts included
1 parent 5c5970d commit ece9af8

File tree

6 files changed

+306
-58
lines changed

6 files changed

+306
-58
lines changed

server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java

Lines changed: 99 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import jakarta.enterprise.context.ApplicationScoped;
2222
import jakarta.inject.Inject;
2323

24+
import org.eclipse.microprofile.config.inject.ConfigProperty;
25+
2426
import io.a2a.server.ServerCallContext;
2527
import io.a2a.server.agentexecution.AgentExecutor;
2628
import io.a2a.server.agentexecution.RequestContext;
@@ -64,6 +66,26 @@ public class DefaultRequestHandler implements RequestHandler {
6466

6567
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultRequestHandler.class);
6668

69+
/**
70+
* Timeout in seconds to wait for agent execution to complete in blocking calls.
71+
* This allows slow agents (LLM-based, data processing, external APIs) sufficient time.
72+
* Configurable via: a2a.blocking.agent.timeout.seconds
73+
* Default: 30 seconds
74+
*/
75+
@Inject
76+
@ConfigProperty(name = "a2a.blocking.agent.timeout.seconds", defaultValue = "30")
77+
int agentCompletionTimeoutSeconds;
78+
79+
/**
80+
* Timeout in seconds to wait for event consumption to complete in blocking calls.
81+
* This ensures all events are processed and persisted before returning to client.
82+
* Configurable via: a2a.blocking.consumption.timeout.seconds
83+
* Default: 5 seconds
84+
*/
85+
@Inject
86+
@ConfigProperty(name = "a2a.blocking.consumption.timeout.seconds", defaultValue = "5")
87+
int consumptionCompletionTimeoutSeconds;
88+
6789
private final AgentExecutor agentExecutor;
6890
private final TaskStore taskStore;
6991
private final QueueManager queueManager;
@@ -93,6 +115,19 @@ public DefaultRequestHandler(AgentExecutor agentExecutor, TaskStore taskStore,
93115
this.requestContextBuilder = () -> new SimpleRequestContextBuilder(taskStore, false);
94116
}
95117

118+
/**
119+
* For testing
120+
*/
121+
public static DefaultRequestHandler create(AgentExecutor agentExecutor, TaskStore taskStore,
122+
QueueManager queueManager, PushNotificationConfigStore pushConfigStore,
123+
PushNotificationSender pushSender, Executor executor) {
124+
DefaultRequestHandler handler =
125+
new DefaultRequestHandler(agentExecutor, taskStore, queueManager, pushConfigStore, pushSender, executor);
126+
handler.agentCompletionTimeoutSeconds = 5;
127+
handler.consumptionCompletionTimeoutSeconds = 2;
128+
return handler;
129+
}
130+
96131
@Override
97132
public Task onGetTask(TaskQueryParams params, ServerCallContext context) throws JSONRPCError {
98133
LOGGER.debug("onGetTask {}", params.id());
@@ -192,6 +227,7 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
192227

193228
EnhancedRunnable producerRunnable = registerAndExecuteAgentAsync(taskId, mss.requestContext, queue);
194229
ResultAggregator.EventTypeAndInterrupt etai = null;
230+
EventKind kind = null; // Declare outside try block so it's in scope for return
195231
try {
196232
// Create callback for push notifications during background event processing
197233
Runnable pushNotificationCallback = () -> sendPushNotification(taskId, resultAggregator);
@@ -201,7 +237,10 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
201237
// This callback must be added before we start consuming. Otherwise,
202238
// any errors thrown by the producerRunnable are not picked up by the consumer
203239
producerRunnable.addDoneCallback(consumer.createAgentRunnableDoneCallback());
204-
etai = resultAggregator.consumeAndBreakOnInterrupt(consumer, blocking, pushNotificationCallback);
240+
241+
// Get agent future before consuming (for blocking calls to wait for agent completion)
242+
CompletableFuture<Void> agentFuture = runningAgents.get(taskId);
243+
etai = resultAggregator.consumeAndBreakOnInterrupt(consumer, blocking, pushNotificationCallback, agentFuture);
205244

206245
if (etai == null) {
207246
LOGGER.debug("No result, throwing InternalError");
@@ -210,7 +249,63 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
210249
interruptedOrNonBlocking = etai.interrupted();
211250
LOGGER.debug("Was interrupted or non-blocking: {}", interruptedOrNonBlocking);
212251

213-
EventKind kind = etai.eventType();
252+
// For blocking calls that were interrupted (returned on first event),
253+
// wait for agent execution and event processing BEFORE returning to client.
254+
// This ensures the returned Task has all artifacts and current state.
255+
// We do this HERE (not in ResultAggregator) to avoid blocking Vert.x worker threads
256+
// during the consumption loop itself.
257+
kind = etai.eventType();
258+
if (blocking && interruptedOrNonBlocking) {
259+
// For blocking calls: ensure all events are processed before returning
260+
// Order of operations is critical to avoid circular dependency:
261+
// 1. Wait for agent to finish enqueueing events
262+
// 2. Close the queue to signal consumption can complete
263+
// 3. Wait for consumption to finish processing events
264+
// 4. Fetch final task state from TaskStore
265+
266+
try {
267+
// Step 1: Wait for agent to finish (with configurable timeout)
268+
if (agentFuture != null) {
269+
try {
270+
agentFuture.get(agentCompletionTimeoutSeconds, java.util.concurrent.TimeUnit.SECONDS);
271+
LOGGER.debug("Agent completed for task {}", taskId);
272+
} catch (java.util.concurrent.TimeoutException e) {
273+
// Agent still running after timeout - that's fine, events already being processed
274+
LOGGER.debug("Agent still running for task {} after {}s", taskId, agentCompletionTimeoutSeconds);
275+
}
276+
}
277+
278+
// Step 2: Close the queue to signal consumption can complete
279+
// For fire-and-forget tasks, there's no final event, so we need to close the queue
280+
// This allows EventConsumer.consumeAll() to exit
281+
queue.close(false, false); // graceful close, don't notify parent yet
282+
LOGGER.debug("Closed queue for task {} to allow consumption completion", taskId);
283+
284+
// Step 3: Wait for consumption to complete (now that queue is closed)
285+
if (etai.consumptionFuture() != null) {
286+
etai.consumptionFuture().get(consumptionCompletionTimeoutSeconds, java.util.concurrent.TimeUnit.SECONDS);
287+
LOGGER.debug("Consumption completed for task {}", taskId);
288+
}
289+
} catch (InterruptedException e) {
290+
Thread.currentThread().interrupt();
291+
LOGGER.warn("Interrupted waiting for task {} completion", taskId, e);
292+
} catch (java.util.concurrent.ExecutionException e) {
293+
LOGGER.warn("Error during task {} execution", taskId, e.getCause());
294+
} catch (java.util.concurrent.TimeoutException e) {
295+
LOGGER.warn("Timeout waiting for consumption to complete for task {}", taskId);
296+
}
297+
298+
// Step 4: Fetch the final task state from TaskStore (all events have been processed)
299+
Task updatedTask = taskStore.get(taskId);
300+
if (updatedTask != null) {
301+
kind = updatedTask;
302+
if (LOGGER.isDebugEnabled()) {
303+
LOGGER.debug("Fetched final task for {} with state {} and {} artifacts",
304+
taskId, updatedTask.getStatus().state(),
305+
updatedTask.getArtifacts().size());
306+
}
307+
}
308+
}
214309
if (kind instanceof Task taskResult && !taskId.equals(taskResult.getId())) {
215310
throw new InternalError("Task ID mismatch in agent response");
216311
}
@@ -227,8 +322,8 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
227322
trackBackgroundTask(cleanupProducer(agentFuture, etai != null ? etai.consumptionFuture() : null, taskId, queue, false));
228323
}
229324

230-
LOGGER.debug("Returning: {}", etai.eventType());
231-
return etai.eventType();
325+
LOGGER.debug("Returning: {}", kind);
326+
return kind;
232327
}
233328

234329
@Override

server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,20 @@
1111
import java.util.concurrent.atomic.AtomicBoolean;
1212
import java.util.concurrent.atomic.AtomicReference;
1313

14-
import org.slf4j.Logger;
15-
import org.slf4j.LoggerFactory;
16-
1714
import io.a2a.server.events.EventConsumer;
1815
import io.a2a.server.events.EventQueueItem;
1916
import io.a2a.spec.A2AServerException;
2017
import io.a2a.spec.Event;
2118
import io.a2a.spec.EventKind;
19+
import io.a2a.spec.InternalError;
2220
import io.a2a.spec.JSONRPCError;
2321
import io.a2a.spec.Message;
2422
import io.a2a.spec.Task;
2523
import io.a2a.spec.TaskState;
2624
import io.a2a.spec.TaskStatusUpdateEvent;
2725
import io.a2a.util.Utils;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
2828

2929
public class ResultAggregator {
3030
private static final Logger LOGGER = LoggerFactory.getLogger(ResultAggregator.class);
@@ -106,10 +106,14 @@ public EventKind consumeAll(EventConsumer consumer) throws JSONRPCError {
106106
}
107107

108108
public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer, boolean blocking) throws JSONRPCError {
109-
return consumeAndBreakOnInterrupt(consumer, blocking, null);
109+
return consumeAndBreakOnInterrupt(consumer, blocking, null, null);
110110
}
111111

112112
public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer, boolean blocking, Runnable eventCallback) throws JSONRPCError {
113+
return consumeAndBreakOnInterrupt(consumer, blocking, eventCallback, null);
114+
}
115+
116+
public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer, boolean blocking, Runnable eventCallback, CompletableFuture<Void> agentFuture) throws JSONRPCError {
113117
Flow.Publisher<EventQueueItem> allItems = consumer.consumeAll();
114118
AtomicReference<Message> message = new AtomicReference<>();
115119
AtomicBoolean interrupted = new AtomicBoolean(false);
@@ -180,11 +184,11 @@ else if (!blocking) {
180184
shouldInterrupt = true;
181185
continueInBackground = true;
182186
}
183-
else {
184-
// For ALL blocking calls (both final and non-final events), use background consumption
185-
// This ensures all events are processed and persisted to TaskStore in background
186-
// Queue lifecycle is now managed by DefaultRequestHandler.cleanupProducer()
187-
// which waits for BOTH agent and consumption futures before closing queues
187+
else if (blocking) {
188+
// For blocking calls: Interrupt to free Vert.x thread, but continue in background
189+
// Python's async consumption doesn't block threads, but Java's does
190+
// So we interrupt to return quickly, then rely on background consumption
191+
// DefaultRequestHandler will fetch the final state from TaskStore
188192
shouldInterrupt = true;
189193
continueInBackground = true;
190194
if (LOGGER.isDebugEnabled()) {
@@ -198,10 +202,17 @@ else if (!blocking) {
198202
interrupted.set(true);
199203
completionFuture.complete(null);
200204

201-
// Signal that cleanup can proceed while consumption continues in background.
202-
// This prevents infinite hangs for fire-and-forget agents that never emit final events.
203-
// Processing continues (return true below) and all events are still persisted to TaskStore.
204-
consumptionCompletionFuture.complete(null);
205+
// For blocking calls, DON'T complete consumptionCompletionFuture here.
206+
// Let it complete naturally when subscription finishes (onComplete callback below).
207+
// This ensures all events are processed and persisted to TaskStore before
208+
// DefaultRequestHandler.cleanupProducer() proceeds with cleanup.
209+
//
210+
// For non-blocking and auth-required calls, complete immediately to allow
211+
// cleanup to proceed while consumption continues in background.
212+
if (!blocking) {
213+
consumptionCompletionFuture.complete(null);
214+
}
215+
// else: blocking calls wait for actual consumption completion in onComplete
205216

206217
// Continue consuming in background - keep requesting events
207218
// Note: continueInBackground is always true when shouldInterrupt is true
@@ -244,8 +255,8 @@ else if (!blocking) {
244255
}
245256
}
246257

247-
// Background consumption continues automatically via the subscription
248-
// returning true in the consumer function keeps the subscription alive
258+
// Note: For blocking calls that were interrupted, the wait logic has been moved
259+
// to DefaultRequestHandler.onMessageSend() to avoid blocking Vert.x worker threads.
249260
// Queue lifecycle is managed by DefaultRequestHandler.cleanupProducer()
250261

251262
Throwable error = errorRef.get();

server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,8 @@ public void cancel(RequestContext context, EventQueue eventQueue) throws JSONRPC
9898
PushNotificationConfigStore pushConfigStore = new InMemoryPushNotificationConfigStore();
9999
PushNotificationSender pushSender = new BasePushNotificationSender(pushConfigStore, httpClient);
100100

101-
requestHandler = new DefaultRequestHandler(executor, taskStore, queueManager, pushConfigStore, pushSender, internalExecutor);
101+
requestHandler = DefaultRequestHandler.create(
102+
executor, taskStore, queueManager, pushConfigStore, pushSender, internalExecutor);
102103
}
103104

104105
@AfterEach

0 commit comments

Comments
 (0)