Skip to content

Commit 136fc0b

Browse files
authored
Merge branch 'main' into grpc
2 parents f604de3 + 465a5e4 commit 136fc0b

File tree

6 files changed

+308
-64
lines changed

6 files changed

+308
-64
lines changed

server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java

Lines changed: 106 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import static io.a2a.server.util.async.AsyncUtils.convertingProcessor;
44
import static io.a2a.server.util.async.AsyncUtils.createTubeConfig;
55
import static io.a2a.server.util.async.AsyncUtils.processor;
6+
import static java.util.concurrent.TimeUnit.*;
67

78
import java.util.ArrayList;
89
import java.util.List;
@@ -51,11 +52,12 @@
5152
import io.a2a.spec.Task;
5253
import io.a2a.spec.TaskIdParams;
5354
import io.a2a.spec.TaskNotCancelableError;
54-
import io.a2a.spec.TaskState;
5555
import io.a2a.spec.TaskNotFoundError;
5656
import io.a2a.spec.TaskPushNotificationConfig;
5757
import io.a2a.spec.TaskQueryParams;
58+
import io.a2a.spec.TaskState;
5859
import io.a2a.spec.UnsupportedOperationError;
60+
import org.eclipse.microprofile.config.inject.ConfigProperty;
5961
import org.slf4j.Logger;
6062
import org.slf4j.LoggerFactory;
6163

@@ -64,6 +66,26 @@ public class DefaultRequestHandler implements RequestHandler {
6466

6567
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultRequestHandler.class);
6668

69+
/**
70+
* Timeout in seconds to wait for agent execution to complete in blocking calls.
71+
* This allows slow agents (LLM-based, data processing, external APIs) sufficient time.
72+
* Configurable via: a2a.blocking.agent.timeout.seconds
73+
* Default: 30 seconds
74+
*/
75+
@Inject
76+
@ConfigProperty(name = "a2a.blocking.agent.timeout.seconds", defaultValue = "30")
77+
int agentCompletionTimeoutSeconds;
78+
79+
/**
80+
* Timeout in seconds to wait for event consumption to complete in blocking calls.
81+
* This ensures all events are processed and persisted before returning to client.
82+
* Configurable via: a2a.blocking.consumption.timeout.seconds
83+
* Default: 5 seconds
84+
*/
85+
@Inject
86+
@ConfigProperty(name = "a2a.blocking.consumption.timeout.seconds", defaultValue = "5")
87+
int consumptionCompletionTimeoutSeconds;
88+
6789
private final AgentExecutor agentExecutor;
6890
private final TaskStore taskStore;
6991
private final QueueManager queueManager;
@@ -93,6 +115,19 @@ public DefaultRequestHandler(AgentExecutor agentExecutor, TaskStore taskStore,
93115
this.requestContextBuilder = () -> new SimpleRequestContextBuilder(taskStore, false);
94116
}
95117

118+
/**
119+
* For testing
120+
*/
121+
public static DefaultRequestHandler create(AgentExecutor agentExecutor, TaskStore taskStore,
122+
QueueManager queueManager, PushNotificationConfigStore pushConfigStore,
123+
PushNotificationSender pushSender, Executor executor) {
124+
DefaultRequestHandler handler =
125+
new DefaultRequestHandler(agentExecutor, taskStore, queueManager, pushConfigStore, pushSender, executor);
126+
handler.agentCompletionTimeoutSeconds = 5;
127+
handler.consumptionCompletionTimeoutSeconds = 2;
128+
return handler;
129+
}
130+
96131
@Override
97132
public Task onGetTask(TaskQueryParams params, ServerCallContext context) throws JSONRPCError {
98133
LOGGER.debug("onGetTask {}", params.id());
@@ -192,6 +227,7 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
192227

193228
EnhancedRunnable producerRunnable = registerAndExecuteAgentAsync(taskId, mss.requestContext, queue);
194229
ResultAggregator.EventTypeAndInterrupt etai = null;
230+
EventKind kind = null; // Declare outside try block so it's in scope for return
195231
try {
196232
// Create callback for push notifications during background event processing
197233
Runnable pushNotificationCallback = () -> sendPushNotification(taskId, resultAggregator);
@@ -201,7 +237,10 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
201237
// This callback must be added before we start consuming. Otherwise,
202238
// any errors thrown by the producerRunnable are not picked up by the consumer
203239
producerRunnable.addDoneCallback(consumer.createAgentRunnableDoneCallback());
204-
etai = resultAggregator.consumeAndBreakOnInterrupt(consumer, blocking, pushNotificationCallback);
240+
241+
// Get agent future before consuming (for blocking calls to wait for agent completion)
242+
CompletableFuture<Void> agentFuture = runningAgents.get(taskId);
243+
etai = resultAggregator.consumeAndBreakOnInterrupt(consumer, blocking);
205244

206245
if (etai == null) {
207246
LOGGER.debug("No result, throwing InternalError");
@@ -210,7 +249,69 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
210249
interruptedOrNonBlocking = etai.interrupted();
211250
LOGGER.debug("Was interrupted or non-blocking: {}", interruptedOrNonBlocking);
212251

213-
EventKind kind = etai.eventType();
252+
// For blocking calls that were interrupted (returned on first event),
253+
// wait for agent execution and event processing BEFORE returning to client.
254+
// This ensures the returned Task has all artifacts and current state.
255+
// We do this HERE (not in ResultAggregator) to avoid blocking Vert.x worker threads
256+
// during the consumption loop itself.
257+
kind = etai.eventType();
258+
if (blocking && interruptedOrNonBlocking) {
259+
// For blocking calls: ensure all events are processed before returning
260+
// Order of operations is critical to avoid circular dependency:
261+
// 1. Wait for agent to finish enqueueing events
262+
// 2. Close the queue to signal consumption can complete
263+
// 3. Wait for consumption to finish processing events
264+
// 4. Fetch final task state from TaskStore
265+
266+
try {
267+
// Step 1: Wait for agent to finish (with configurable timeout)
268+
if (agentFuture != null) {
269+
try {
270+
agentFuture.get(agentCompletionTimeoutSeconds, SECONDS);
271+
LOGGER.debug("Agent completed for task {}", taskId);
272+
} catch (java.util.concurrent.TimeoutException e) {
273+
// Agent still running after timeout - that's fine, events already being processed
274+
LOGGER.debug("Agent still running for task {} after {}s", taskId, agentCompletionTimeoutSeconds);
275+
}
276+
}
277+
278+
// Step 2: Close the queue to signal consumption can complete
279+
// For fire-and-forget tasks, there's no final event, so we need to close the queue
280+
// This allows EventConsumer.consumeAll() to exit
281+
queue.close(false, false); // graceful close, don't notify parent yet
282+
LOGGER.debug("Closed queue for task {} to allow consumption completion", taskId);
283+
284+
// Step 3: Wait for consumption to complete (now that queue is closed)
285+
if (etai.consumptionFuture() != null) {
286+
etai.consumptionFuture().get(consumptionCompletionTimeoutSeconds, SECONDS);
287+
LOGGER.debug("Consumption completed for task {}", taskId);
288+
}
289+
} catch (InterruptedException e) {
290+
Thread.currentThread().interrupt();
291+
String msg = String.format("Error waiting for task %s completion", taskId);
292+
LOGGER.warn(msg, e);
293+
throw new InternalError(msg);
294+
} catch (java.util.concurrent.ExecutionException e) {
295+
String msg = String.format("Error during task %s execution", taskId);
296+
LOGGER.warn(msg, e.getCause());
297+
throw new InternalError(msg);
298+
} catch (java.util.concurrent.TimeoutException e) {
299+
String msg = String.format("Timeout waiting for consumption to complete for task %s", taskId);
300+
LOGGER.warn(msg, taskId);
301+
throw new InternalError(msg);
302+
}
303+
304+
// Step 4: Fetch the final task state from TaskStore (all events have been processed)
305+
Task updatedTask = taskStore.get(taskId);
306+
if (updatedTask != null) {
307+
kind = updatedTask;
308+
if (LOGGER.isDebugEnabled()) {
309+
LOGGER.debug("Fetched final task for {} with state {} and {} artifacts",
310+
taskId, updatedTask.getStatus().state(),
311+
updatedTask.getArtifacts().size());
312+
}
313+
}
314+
}
214315
if (kind instanceof Task taskResult && !taskId.equals(taskResult.getId())) {
215316
throw new InternalError("Task ID mismatch in agent response");
216317
}
@@ -227,8 +328,8 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
227328
trackBackgroundTask(cleanupProducer(agentFuture, etai != null ? etai.consumptionFuture() : null, taskId, queue, false));
228329
}
229330

230-
LOGGER.debug("Returning: {}", etai.eventType());
231-
return etai.eventType();
331+
LOGGER.debug("Returning: {}", kind);
332+
return kind;
232333
}
233334

234335
@Override

server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,20 @@
1111
import java.util.concurrent.atomic.AtomicBoolean;
1212
import java.util.concurrent.atomic.AtomicReference;
1313

14-
import org.slf4j.Logger;
15-
import org.slf4j.LoggerFactory;
16-
1714
import io.a2a.server.events.EventConsumer;
1815
import io.a2a.server.events.EventQueueItem;
1916
import io.a2a.spec.A2AServerException;
2017
import io.a2a.spec.Event;
2118
import io.a2a.spec.EventKind;
19+
import io.a2a.spec.InternalError;
2220
import io.a2a.spec.JSONRPCError;
2321
import io.a2a.spec.Message;
2422
import io.a2a.spec.Task;
2523
import io.a2a.spec.TaskState;
2624
import io.a2a.spec.TaskStatusUpdateEvent;
2725
import io.a2a.util.Utils;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
2828

2929
public class ResultAggregator {
3030
private static final Logger LOGGER = LoggerFactory.getLogger(ResultAggregator.class);
@@ -106,10 +106,6 @@ public EventKind consumeAll(EventConsumer consumer) throws JSONRPCError {
106106
}
107107

108108
public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer, boolean blocking) throws JSONRPCError {
109-
return consumeAndBreakOnInterrupt(consumer, blocking, null);
110-
}
111-
112-
public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer, boolean blocking, Runnable eventCallback) throws JSONRPCError {
113109
Flow.Publisher<EventQueueItem> allItems = consumer.consumeAll();
114110
AtomicReference<Message> message = new AtomicReference<>();
115111
AtomicBoolean interrupted = new AtomicBoolean(false);
@@ -180,11 +176,11 @@ else if (!blocking) {
180176
shouldInterrupt = true;
181177
continueInBackground = true;
182178
}
183-
else {
184-
// For ALL blocking calls (both final and non-final events), use background consumption
185-
// This ensures all events are processed and persisted to TaskStore in background
186-
// Queue lifecycle is now managed by DefaultRequestHandler.cleanupProducer()
187-
// which waits for BOTH agent and consumption futures before closing queues
179+
else if (blocking) {
180+
// For blocking calls: Interrupt to free Vert.x thread, but continue in background
181+
// Python's async consumption doesn't block threads, but Java's does
182+
// So we interrupt to return quickly, then rely on background consumption
183+
// DefaultRequestHandler will fetch the final state from TaskStore
188184
shouldInterrupt = true;
189185
continueInBackground = true;
190186
if (LOGGER.isDebugEnabled()) {
@@ -198,10 +194,17 @@ else if (!blocking) {
198194
interrupted.set(true);
199195
completionFuture.complete(null);
200196

201-
// Signal that cleanup can proceed while consumption continues in background.
202-
// This prevents infinite hangs for fire-and-forget agents that never emit final events.
203-
// Processing continues (return true below) and all events are still persisted to TaskStore.
204-
consumptionCompletionFuture.complete(null);
197+
// For blocking calls, DON'T complete consumptionCompletionFuture here.
198+
// Let it complete naturally when subscription finishes (onComplete callback below).
199+
// This ensures all events are processed and persisted to TaskStore before
200+
// DefaultRequestHandler.cleanupProducer() proceeds with cleanup.
201+
//
202+
// For non-blocking and auth-required calls, complete immediately to allow
203+
// cleanup to proceed while consumption continues in background.
204+
if (!blocking) {
205+
consumptionCompletionFuture.complete(null);
206+
}
207+
// else: blocking calls wait for actual consumption completion in onComplete
205208

206209
// Continue consuming in background - keep requesting events
207210
// Note: continueInBackground is always true when shouldInterrupt is true
@@ -244,8 +247,8 @@ else if (!blocking) {
244247
}
245248
}
246249

247-
// Background consumption continues automatically via the subscription
248-
// returning true in the consumer function keeps the subscription alive
250+
// Note: For blocking calls that were interrupted, the wait logic has been moved
251+
// to DefaultRequestHandler.onMessageSend() to avoid blocking Vert.x worker threads.
249252
// Queue lifecycle is managed by DefaultRequestHandler.cleanupProducer()
250253

251254
Throwable error = errorRef.get();

server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,8 @@ public void cancel(RequestContext context, EventQueue eventQueue) throws JSONRPC
9898
PushNotificationConfigStore pushConfigStore = new InMemoryPushNotificationConfigStore();
9999
PushNotificationSender pushSender = new BasePushNotificationSender(pushConfigStore, httpClient);
100100

101-
requestHandler = new DefaultRequestHandler(executor, taskStore, queueManager, pushConfigStore, pushSender, internalExecutor);
101+
requestHandler = DefaultRequestHandler.create(
102+
executor, taskStore, queueManager, pushConfigStore, pushSender, internalExecutor);
102103
}
103104

104105
@AfterEach

0 commit comments

Comments
 (0)