33import static io .a2a .server .util .async .AsyncUtils .convertingProcessor ;
44import static io .a2a .server .util .async .AsyncUtils .createTubeConfig ;
55import static io .a2a .server .util .async .AsyncUtils .processor ;
6+ import static java .util .concurrent .TimeUnit .*;
67
78import java .util .ArrayList ;
89import java .util .List ;
@@ -266,7 +267,7 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
266267 // Step 1: Wait for agent to finish (with configurable timeout)
267268 if (agentFuture != null ) {
268269 try {
269- agentFuture .get (agentCompletionTimeoutSeconds , java . util . concurrent . TimeUnit . SECONDS );
270+ agentFuture .get (agentCompletionTimeoutSeconds , SECONDS );
270271 LOGGER .debug ("Agent completed for task {}" , taskId );
271272 } catch (java .util .concurrent .TimeoutException e ) {
272273 // Agent still running after timeout - that's fine, events already being processed
@@ -282,16 +283,22 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
282283
283284 // Step 3: Wait for consumption to complete (now that queue is closed)
284285 if (etai .consumptionFuture () != null ) {
285- etai .consumptionFuture ().get (consumptionCompletionTimeoutSeconds , java . util . concurrent . TimeUnit . SECONDS );
286+ etai .consumptionFuture ().get (consumptionCompletionTimeoutSeconds , SECONDS );
286287 LOGGER .debug ("Consumption completed for task {}" , taskId );
287288 }
288289 } catch (InterruptedException e ) {
289290 Thread .currentThread ().interrupt ();
290- LOGGER .warn ("Interrupted waiting for task {} completion" , taskId , e );
291+ String msg = String .format ("Error waiting for task %s completion" , taskId );
292+ LOGGER .warn (msg , e );
293+ throw new InternalError (msg );
291294 } catch (java .util .concurrent .ExecutionException e ) {
292- LOGGER .warn ("Error during task {} execution" , taskId , e .getCause ());
295+ String msg = String .format ("Error during task %s execution" , taskId );
296+ LOGGER .warn (msg , e .getCause ());
297+ throw new InternalError (msg );
293298 } catch (java .util .concurrent .TimeoutException e ) {
294- LOGGER .warn ("Timeout waiting for consumption to complete for task {}" , taskId );
299+ String msg = String .format ("Timeout waiting for consumption to complete for task %s" , taskId );
300+ LOGGER .warn (msg , taskId );
301+ throw new InternalError (msg );
295302 }
296303
297304 // Step 4: Fetch the final task state from TaskStore (all events have been processed)
0 commit comments