Skip to content

Commit 465a5e4

Browse files
authored
fix: Wait for agent completion and ensure all events processed in blo… (#431)
…cking calls Fixes race condition where DefaultRequestHandler.onMessageSend() returns before all task events are fully processed and persisted to TaskStore, resulting in incomplete Task objects being returned to clients (missing artifacts, incorrect state). Root Cause: - Blocking calls interrupted immediately after first event and returned to client before background event consumption completed - Agent execution and event processing happened asynchronously in background - No synchronization to ensure all events were consumed and persisted before returning Task to client Solution (4-step process): 1. Wait for agent to finish enqueueing events (configurable timeout) 2. Close the queue to signal consumption can complete (breaks dependency) 3. Wait for consumption to finish processing events (configurable timeout) 4. Fetch final task state from TaskStore (has all artifacts and correct state) This ensures blocking calls return complete Task objects with all artifacts and correct state, including support for fire-and-forget tasks that never emit final state events. Added unit tests: - testBlockingFireAndForgetReturnsNonFinalTask: Validates fire-and-forget pattern - testBlockingCallReturnsCompleteTaskWithArtifacts: Ensures all artifacts included Fixes #428 🦕
1 parent 5c5970d commit 465a5e4

File tree

6 files changed

+308
-64
lines changed

6 files changed

+308
-64
lines changed

server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java

Lines changed: 106 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import static io.a2a.server.util.async.AsyncUtils.convertingProcessor;
44
import static io.a2a.server.util.async.AsyncUtils.createTubeConfig;
55
import static io.a2a.server.util.async.AsyncUtils.processor;
6+
import static java.util.concurrent.TimeUnit.*;
67

78
import java.util.ArrayList;
89
import java.util.List;
@@ -51,11 +52,12 @@
5152
import io.a2a.spec.Task;
5253
import io.a2a.spec.TaskIdParams;
5354
import io.a2a.spec.TaskNotCancelableError;
54-
import io.a2a.spec.TaskState;
5555
import io.a2a.spec.TaskNotFoundError;
5656
import io.a2a.spec.TaskPushNotificationConfig;
5757
import io.a2a.spec.TaskQueryParams;
58+
import io.a2a.spec.TaskState;
5859
import io.a2a.spec.UnsupportedOperationError;
60+
import org.eclipse.microprofile.config.inject.ConfigProperty;
5961
import org.slf4j.Logger;
6062
import org.slf4j.LoggerFactory;
6163

@@ -64,6 +66,26 @@ public class DefaultRequestHandler implements RequestHandler {
6466

6567
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultRequestHandler.class);
6668

69+
/**
70+
* Timeout in seconds to wait for agent execution to complete in blocking calls.
71+
* This allows slow agents (LLM-based, data processing, external APIs) sufficient time.
72+
* Configurable via: a2a.blocking.agent.timeout.seconds
73+
* Default: 30 seconds
74+
*/
75+
@Inject
76+
@ConfigProperty(name = "a2a.blocking.agent.timeout.seconds", defaultValue = "30")
77+
int agentCompletionTimeoutSeconds;
78+
79+
/**
80+
* Timeout in seconds to wait for event consumption to complete in blocking calls.
81+
* This ensures all events are processed and persisted before returning to client.
82+
* Configurable via: a2a.blocking.consumption.timeout.seconds
83+
* Default: 5 seconds
84+
*/
85+
@Inject
86+
@ConfigProperty(name = "a2a.blocking.consumption.timeout.seconds", defaultValue = "5")
87+
int consumptionCompletionTimeoutSeconds;
88+
6789
private final AgentExecutor agentExecutor;
6890
private final TaskStore taskStore;
6991
private final QueueManager queueManager;
@@ -93,6 +115,19 @@ public DefaultRequestHandler(AgentExecutor agentExecutor, TaskStore taskStore,
93115
this.requestContextBuilder = () -> new SimpleRequestContextBuilder(taskStore, false);
94116
}
95117

118+
/**
119+
* For testing
120+
*/
121+
public static DefaultRequestHandler create(AgentExecutor agentExecutor, TaskStore taskStore,
122+
QueueManager queueManager, PushNotificationConfigStore pushConfigStore,
123+
PushNotificationSender pushSender, Executor executor) {
124+
DefaultRequestHandler handler =
125+
new DefaultRequestHandler(agentExecutor, taskStore, queueManager, pushConfigStore, pushSender, executor);
126+
handler.agentCompletionTimeoutSeconds = 5;
127+
handler.consumptionCompletionTimeoutSeconds = 2;
128+
return handler;
129+
}
130+
96131
@Override
97132
public Task onGetTask(TaskQueryParams params, ServerCallContext context) throws JSONRPCError {
98133
LOGGER.debug("onGetTask {}", params.id());
@@ -192,6 +227,7 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
192227

193228
EnhancedRunnable producerRunnable = registerAndExecuteAgentAsync(taskId, mss.requestContext, queue);
194229
ResultAggregator.EventTypeAndInterrupt etai = null;
230+
EventKind kind = null; // Declare outside try block so it's in scope for return
195231
try {
196232
// Create callback for push notifications during background event processing
197233
Runnable pushNotificationCallback = () -> sendPushNotification(taskId, resultAggregator);
@@ -201,7 +237,10 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
201237
// This callback must be added before we start consuming. Otherwise,
202238
// any errors thrown by the producerRunnable are not picked up by the consumer
203239
producerRunnable.addDoneCallback(consumer.createAgentRunnableDoneCallback());
204-
etai = resultAggregator.consumeAndBreakOnInterrupt(consumer, blocking, pushNotificationCallback);
240+
241+
// Get agent future before consuming (for blocking calls to wait for agent completion)
242+
CompletableFuture<Void> agentFuture = runningAgents.get(taskId);
243+
etai = resultAggregator.consumeAndBreakOnInterrupt(consumer, blocking);
205244

206245
if (etai == null) {
207246
LOGGER.debug("No result, throwing InternalError");
@@ -210,7 +249,69 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
210249
interruptedOrNonBlocking = etai.interrupted();
211250
LOGGER.debug("Was interrupted or non-blocking: {}", interruptedOrNonBlocking);
212251

213-
EventKind kind = etai.eventType();
252+
// For blocking calls that were interrupted (returned on first event),
253+
// wait for agent execution and event processing BEFORE returning to client.
254+
// This ensures the returned Task has all artifacts and current state.
255+
// We do this HERE (not in ResultAggregator) to avoid blocking Vert.x worker threads
256+
// during the consumption loop itself.
257+
kind = etai.eventType();
258+
if (blocking && interruptedOrNonBlocking) {
259+
// For blocking calls: ensure all events are processed before returning
260+
// Order of operations is critical to avoid circular dependency:
261+
// 1. Wait for agent to finish enqueueing events
262+
// 2. Close the queue to signal consumption can complete
263+
// 3. Wait for consumption to finish processing events
264+
// 4. Fetch final task state from TaskStore
265+
266+
try {
267+
// Step 1: Wait for agent to finish (with configurable timeout)
268+
if (agentFuture != null) {
269+
try {
270+
agentFuture.get(agentCompletionTimeoutSeconds, SECONDS);
271+
LOGGER.debug("Agent completed for task {}", taskId);
272+
} catch (java.util.concurrent.TimeoutException e) {
273+
// Agent still running after timeout - that's fine, events already being processed
274+
LOGGER.debug("Agent still running for task {} after {}s", taskId, agentCompletionTimeoutSeconds);
275+
}
276+
}
277+
278+
// Step 2: Close the queue to signal consumption can complete
279+
// For fire-and-forget tasks, there's no final event, so we need to close the queue
280+
// This allows EventConsumer.consumeAll() to exit
281+
queue.close(false, false); // graceful close, don't notify parent yet
282+
LOGGER.debug("Closed queue for task {} to allow consumption completion", taskId);
283+
284+
// Step 3: Wait for consumption to complete (now that queue is closed)
285+
if (etai.consumptionFuture() != null) {
286+
etai.consumptionFuture().get(consumptionCompletionTimeoutSeconds, SECONDS);
287+
LOGGER.debug("Consumption completed for task {}", taskId);
288+
}
289+
} catch (InterruptedException e) {
290+
Thread.currentThread().interrupt();
291+
String msg = String.format("Error waiting for task %s completion", taskId);
292+
LOGGER.warn(msg, e);
293+
throw new InternalError(msg);
294+
} catch (java.util.concurrent.ExecutionException e) {
295+
String msg = String.format("Error during task %s execution", taskId);
296+
LOGGER.warn(msg, e.getCause());
297+
throw new InternalError(msg);
298+
} catch (java.util.concurrent.TimeoutException e) {
299+
String msg = String.format("Timeout waiting for consumption to complete for task %s", taskId);
300+
LOGGER.warn(msg, taskId);
301+
throw new InternalError(msg);
302+
}
303+
304+
// Step 4: Fetch the final task state from TaskStore (all events have been processed)
305+
Task updatedTask = taskStore.get(taskId);
306+
if (updatedTask != null) {
307+
kind = updatedTask;
308+
if (LOGGER.isDebugEnabled()) {
309+
LOGGER.debug("Fetched final task for {} with state {} and {} artifacts",
310+
taskId, updatedTask.getStatus().state(),
311+
updatedTask.getArtifacts().size());
312+
}
313+
}
314+
}
214315
if (kind instanceof Task taskResult && !taskId.equals(taskResult.getId())) {
215316
throw new InternalError("Task ID mismatch in agent response");
216317
}
@@ -227,8 +328,8 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
227328
trackBackgroundTask(cleanupProducer(agentFuture, etai != null ? etai.consumptionFuture() : null, taskId, queue, false));
228329
}
229330

230-
LOGGER.debug("Returning: {}", etai.eventType());
231-
return etai.eventType();
331+
LOGGER.debug("Returning: {}", kind);
332+
return kind;
232333
}
233334

234335
@Override

server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,20 @@
1111
import java.util.concurrent.atomic.AtomicBoolean;
1212
import java.util.concurrent.atomic.AtomicReference;
1313

14-
import org.slf4j.Logger;
15-
import org.slf4j.LoggerFactory;
16-
1714
import io.a2a.server.events.EventConsumer;
1815
import io.a2a.server.events.EventQueueItem;
1916
import io.a2a.spec.A2AServerException;
2017
import io.a2a.spec.Event;
2118
import io.a2a.spec.EventKind;
19+
import io.a2a.spec.InternalError;
2220
import io.a2a.spec.JSONRPCError;
2321
import io.a2a.spec.Message;
2422
import io.a2a.spec.Task;
2523
import io.a2a.spec.TaskState;
2624
import io.a2a.spec.TaskStatusUpdateEvent;
2725
import io.a2a.util.Utils;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
2828

2929
public class ResultAggregator {
3030
private static final Logger LOGGER = LoggerFactory.getLogger(ResultAggregator.class);
@@ -106,10 +106,6 @@ public EventKind consumeAll(EventConsumer consumer) throws JSONRPCError {
106106
}
107107

108108
public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer, boolean blocking) throws JSONRPCError {
109-
return consumeAndBreakOnInterrupt(consumer, blocking, null);
110-
}
111-
112-
public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer, boolean blocking, Runnable eventCallback) throws JSONRPCError {
113109
Flow.Publisher<EventQueueItem> allItems = consumer.consumeAll();
114110
AtomicReference<Message> message = new AtomicReference<>();
115111
AtomicBoolean interrupted = new AtomicBoolean(false);
@@ -180,11 +176,11 @@ else if (!blocking) {
180176
shouldInterrupt = true;
181177
continueInBackground = true;
182178
}
183-
else {
184-
// For ALL blocking calls (both final and non-final events), use background consumption
185-
// This ensures all events are processed and persisted to TaskStore in background
186-
// Queue lifecycle is now managed by DefaultRequestHandler.cleanupProducer()
187-
// which waits for BOTH agent and consumption futures before closing queues
179+
else if (blocking) {
180+
// For blocking calls: Interrupt to free Vert.x thread, but continue in background
181+
// Python's async consumption doesn't block threads, but Java's does
182+
// So we interrupt to return quickly, then rely on background consumption
183+
// DefaultRequestHandler will fetch the final state from TaskStore
188184
shouldInterrupt = true;
189185
continueInBackground = true;
190186
if (LOGGER.isDebugEnabled()) {
@@ -198,10 +194,17 @@ else if (!blocking) {
198194
interrupted.set(true);
199195
completionFuture.complete(null);
200196

201-
// Signal that cleanup can proceed while consumption continues in background.
202-
// This prevents infinite hangs for fire-and-forget agents that never emit final events.
203-
// Processing continues (return true below) and all events are still persisted to TaskStore.
204-
consumptionCompletionFuture.complete(null);
197+
// For blocking calls, DON'T complete consumptionCompletionFuture here.
198+
// Let it complete naturally when subscription finishes (onComplete callback below).
199+
// This ensures all events are processed and persisted to TaskStore before
200+
// DefaultRequestHandler.cleanupProducer() proceeds with cleanup.
201+
//
202+
// For non-blocking and auth-required calls, complete immediately to allow
203+
// cleanup to proceed while consumption continues in background.
204+
if (!blocking) {
205+
consumptionCompletionFuture.complete(null);
206+
}
207+
// else: blocking calls wait for actual consumption completion in onComplete
205208

206209
// Continue consuming in background - keep requesting events
207210
// Note: continueInBackground is always true when shouldInterrupt is true
@@ -244,8 +247,8 @@ else if (!blocking) {
244247
}
245248
}
246249

247-
// Background consumption continues automatically via the subscription
248-
// returning true in the consumer function keeps the subscription alive
250+
// Note: For blocking calls that were interrupted, the wait logic has been moved
251+
// to DefaultRequestHandler.onMessageSend() to avoid blocking Vert.x worker threads.
249252
// Queue lifecycle is managed by DefaultRequestHandler.cleanupProducer()
250253

251254
Throwable error = errorRef.get();

server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,8 @@ public void cancel(RequestContext context, EventQueue eventQueue) throws JSONRPC
9898
PushNotificationConfigStore pushConfigStore = new InMemoryPushNotificationConfigStore();
9999
PushNotificationSender pushSender = new BasePushNotificationSender(pushConfigStore, httpClient);
100100

101-
requestHandler = new DefaultRequestHandler(executor, taskStore, queueManager, pushConfigStore, pushSender, internalExecutor);
101+
requestHandler = DefaultRequestHandler.create(
102+
executor, taskStore, queueManager, pushConfigStore, pushSender, internalExecutor);
102103
}
103104

104105
@AfterEach

0 commit comments

Comments
 (0)