5151import io .a2a .spec .Task ;
5252import io .a2a .spec .TaskIdParams ;
5353import io .a2a .spec .TaskNotCancelableError ;
54- import io .a2a .spec .TaskState ;
5554import io .a2a .spec .TaskNotFoundError ;
5655import io .a2a .spec .TaskPushNotificationConfig ;
5756import io .a2a .spec .TaskQueryParams ;
57+ import io .a2a .spec .TaskState ;
5858import io .a2a .spec .UnsupportedOperationError ;
59+ import org .eclipse .microprofile .config .inject .ConfigProperty ;
5960import org .slf4j .Logger ;
6061import org .slf4j .LoggerFactory ;
6162
@@ -64,6 +65,26 @@ public class DefaultRequestHandler implements RequestHandler {
6465
6566 private static final Logger LOGGER = LoggerFactory .getLogger (DefaultRequestHandler .class );
6667
68+ /**
69+ * Timeout in seconds to wait for agent execution to complete in blocking calls.
70+ * This allows slow agents (LLM-based, data processing, external APIs) sufficient time.
71+ * Configurable via: a2a.blocking.agent.timeout.seconds
72+ * Default: 30 seconds
73+ */
74+ @ Inject
75+ @ ConfigProperty (name = "a2a.blocking.agent.timeout.seconds" , defaultValue = "30" )
76+ int agentCompletionTimeoutSeconds ;
77+
78+ /**
79+ * Timeout in seconds to wait for event consumption to complete in blocking calls.
80+ * This ensures all events are processed and persisted before returning to client.
81+ * Configurable via: a2a.blocking.consumption.timeout.seconds
82+ * Default: 5 seconds
83+ */
84+ @ Inject
85+ @ ConfigProperty (name = "a2a.blocking.consumption.timeout.seconds" , defaultValue = "5" )
86+ int consumptionCompletionTimeoutSeconds ;
87+
6788 private final AgentExecutor agentExecutor ;
6889 private final TaskStore taskStore ;
6990 private final QueueManager queueManager ;
@@ -93,6 +114,19 @@ public DefaultRequestHandler(AgentExecutor agentExecutor, TaskStore taskStore,
93114 this .requestContextBuilder = () -> new SimpleRequestContextBuilder (taskStore , false );
94115 }
95116
117+ /**
118+ * For testing
119+ */
120+ public static DefaultRequestHandler create (AgentExecutor agentExecutor , TaskStore taskStore ,
121+ QueueManager queueManager , PushNotificationConfigStore pushConfigStore ,
122+ PushNotificationSender pushSender , Executor executor ) {
123+ DefaultRequestHandler handler =
124+ new DefaultRequestHandler (agentExecutor , taskStore , queueManager , pushConfigStore , pushSender , executor );
125+ handler .agentCompletionTimeoutSeconds = 5 ;
126+ handler .consumptionCompletionTimeoutSeconds = 2 ;
127+ return handler ;
128+ }
129+
96130 @ Override
97131 public Task onGetTask (TaskQueryParams params , ServerCallContext context ) throws JSONRPCError {
98132 LOGGER .debug ("onGetTask {}" , params .id ());
@@ -192,6 +226,7 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
192226
193227 EnhancedRunnable producerRunnable = registerAndExecuteAgentAsync (taskId , mss .requestContext , queue );
194228 ResultAggregator .EventTypeAndInterrupt etai = null ;
229+ EventKind kind = null ; // Declare outside try block so it's in scope for return
195230 try {
196231 // Create callback for push notifications during background event processing
197232 Runnable pushNotificationCallback = () -> sendPushNotification (taskId , resultAggregator );
@@ -201,7 +236,10 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
201236 // This callback must be added before we start consuming. Otherwise,
202237 // any errors thrown by the producerRunnable are not picked up by the consumer
203238 producerRunnable .addDoneCallback (consumer .createAgentRunnableDoneCallback ());
204- etai = resultAggregator .consumeAndBreakOnInterrupt (consumer , blocking , pushNotificationCallback );
239+
240+ // Get agent future before consuming (for blocking calls to wait for agent completion)
241+ CompletableFuture <Void > agentFuture = runningAgents .get (taskId );
242+ etai = resultAggregator .consumeAndBreakOnInterrupt (consumer , blocking );
205243
206244 if (etai == null ) {
207245 LOGGER .debug ("No result, throwing InternalError" );
@@ -210,7 +248,63 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
210248 interruptedOrNonBlocking = etai .interrupted ();
211249 LOGGER .debug ("Was interrupted or non-blocking: {}" , interruptedOrNonBlocking );
212250
213- EventKind kind = etai .eventType ();
251+ // For blocking calls that were interrupted (returned on first event),
252+ // wait for agent execution and event processing BEFORE returning to client.
253+ // This ensures the returned Task has all artifacts and current state.
254+ // We do this HERE (not in ResultAggregator) to avoid blocking Vert.x worker threads
255+ // during the consumption loop itself.
256+ kind = etai .eventType ();
257+ if (blocking && interruptedOrNonBlocking ) {
258+ // For blocking calls: ensure all events are processed before returning
259+ // Order of operations is critical to avoid circular dependency:
260+ // 1. Wait for agent to finish enqueueing events
261+ // 2. Close the queue to signal consumption can complete
262+ // 3. Wait for consumption to finish processing events
263+ // 4. Fetch final task state from TaskStore
264+
265+ try {
266+ // Step 1: Wait for agent to finish (with configurable timeout)
267+ if (agentFuture != null ) {
268+ try {
269+ agentFuture .get (agentCompletionTimeoutSeconds , java .util .concurrent .TimeUnit .SECONDS );
270+ LOGGER .debug ("Agent completed for task {}" , taskId );
271+ } catch (java .util .concurrent .TimeoutException e ) {
272+ // Agent still running after timeout - that's fine, events already being processed
273+ LOGGER .debug ("Agent still running for task {} after {}s" , taskId , agentCompletionTimeoutSeconds );
274+ }
275+ }
276+
277+ // Step 2: Close the queue to signal consumption can complete
278+ // For fire-and-forget tasks, there's no final event, so we need to close the queue
279+ // This allows EventConsumer.consumeAll() to exit
280+ queue .close (false , false ); // graceful close, don't notify parent yet
281+ LOGGER .debug ("Closed queue for task {} to allow consumption completion" , taskId );
282+
283+ // Step 3: Wait for consumption to complete (now that queue is closed)
284+ if (etai .consumptionFuture () != null ) {
285+ etai .consumptionFuture ().get (consumptionCompletionTimeoutSeconds , java .util .concurrent .TimeUnit .SECONDS );
286+ LOGGER .debug ("Consumption completed for task {}" , taskId );
287+ }
288+ } catch (InterruptedException e ) {
289+ Thread .currentThread ().interrupt ();
290+ LOGGER .warn ("Interrupted waiting for task {} completion" , taskId , e );
291+ } catch (java .util .concurrent .ExecutionException e ) {
292+ LOGGER .warn ("Error during task {} execution" , taskId , e .getCause ());
293+ } catch (java .util .concurrent .TimeoutException e ) {
294+ LOGGER .warn ("Timeout waiting for consumption to complete for task {}" , taskId );
295+ }
296+
297+ // Step 4: Fetch the final task state from TaskStore (all events have been processed)
298+ Task updatedTask = taskStore .get (taskId );
299+ if (updatedTask != null ) {
300+ kind = updatedTask ;
301+ if (LOGGER .isDebugEnabled ()) {
302+ LOGGER .debug ("Fetched final task for {} with state {} and {} artifacts" ,
303+ taskId , updatedTask .getStatus ().state (),
304+ updatedTask .getArtifacts ().size ());
305+ }
306+ }
307+ }
214308 if (kind instanceof Task taskResult && !taskId .equals (taskResult .getId ())) {
215309 throw new InternalError ("Task ID mismatch in agent response" );
216310 }
@@ -227,8 +321,8 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
227321 trackBackgroundTask (cleanupProducer (agentFuture , etai != null ? etai .consumptionFuture () : null , taskId , queue , false ));
228322 }
229323
230- LOGGER .debug ("Returning: {}" , etai . eventType () );
231- return etai . eventType () ;
324+ LOGGER .debug ("Returning: {}" , kind );
325+ return kind ;
232326 }
233327
234328 @ Override
0 commit comments