Skip to content

Commit f487663

Browse files
committed
Add configurable timeout
1 parent c97d236 commit f487663

File tree

5 files changed

+58
-24
lines changed

5 files changed

+58
-24
lines changed

server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import jakarta.enterprise.context.ApplicationScoped;
2222
import jakarta.inject.Inject;
2323

24+
import org.eclipse.microprofile.config.inject.ConfigProperty;
25+
2426
import io.a2a.server.ServerCallContext;
2527
import io.a2a.server.agentexecution.AgentExecutor;
2628
import io.a2a.server.agentexecution.RequestContext;
@@ -63,8 +65,26 @@
6365
public class DefaultRequestHandler implements RequestHandler {
6466

6567
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultRequestHandler.class);
66-
private static final int AGENT_COMPLETION_TIMEOUT_SECONDS = 5;
67-
private static final int CONSUMPTION_COMPLETION_TIMEOUT_SECONDS = 2;
68+
69+
/**
70+
* Timeout in seconds to wait for agent execution to complete in blocking calls.
71+
* This allows slow agents (LLM-based, data processing, external APIs) sufficient time.
72+
* Configurable via: a2a.blocking.agent.timeout.seconds
73+
* Default: 30 seconds
74+
*/
75+
@Inject
76+
@ConfigProperty(name = "a2a.blocking.agent.timeout.seconds", defaultValue = "30")
77+
int agentCompletionTimeoutSeconds;
78+
79+
/**
80+
* Timeout in seconds to wait for event consumption to complete in blocking calls.
81+
* This ensures all events are processed and persisted before returning to client.
82+
* Configurable via: a2a.blocking.consumption.timeout.seconds
83+
* Default: 5 seconds
84+
*/
85+
@Inject
86+
@ConfigProperty(name = "a2a.blocking.consumption.timeout.seconds", defaultValue = "5")
87+
int consumptionCompletionTimeoutSeconds;
6888

6989
private final AgentExecutor agentExecutor;
7090
private final TaskStore taskStore;
@@ -95,6 +115,19 @@ public DefaultRequestHandler(AgentExecutor agentExecutor, TaskStore taskStore,
95115
this.requestContextBuilder = () -> new SimpleRequestContextBuilder(taskStore, false);
96116
}
97117

118+
/**
119+
* For testing
120+
*/
121+
public static DefaultRequestHandler create(AgentExecutor agentExecutor, TaskStore taskStore,
122+
QueueManager queueManager, PushNotificationConfigStore pushConfigStore,
123+
PushNotificationSender pushSender, Executor executor) {
124+
DefaultRequestHandler handler =
125+
new DefaultRequestHandler(agentExecutor, taskStore, queueManager, pushConfigStore, pushSender, executor);
126+
handler.agentCompletionTimeoutSeconds = 5;
127+
handler.consumptionCompletionTimeoutSeconds = 2;
128+
return handler;
129+
}
130+
98131
@Override
99132
public Task onGetTask(TaskQueryParams params, ServerCallContext context) throws JSONRPCError {
100133
LOGGER.debug("onGetTask {}", params.id());
@@ -231,14 +264,14 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
231264
// 4. Fetch final task state from TaskStore
232265

233266
try {
234-
// Step 1: Wait for agent to finish (with short timeout for fast agents)
267+
// Step 1: Wait for agent to finish (with configurable timeout)
235268
if (agentFuture != null) {
236269
try {
237-
agentFuture.get(AGENT_COMPLETION_TIMEOUT_SECONDS, java.util.concurrent.TimeUnit.SECONDS);
270+
agentFuture.get(agentCompletionTimeoutSeconds, java.util.concurrent.TimeUnit.SECONDS);
238271
LOGGER.debug("Agent completed for task {}", taskId);
239272
} catch (java.util.concurrent.TimeoutException e) {
240273
// Agent still running after timeout - that's fine, events already being processed
241-
LOGGER.debug("Agent still running for task {} after {}s", taskId, AGENT_COMPLETION_TIMEOUT_SECONDS);
274+
LOGGER.debug("Agent still running for task {} after {}s", taskId, agentCompletionTimeoutSeconds);
242275
}
243276
}
244277

@@ -250,7 +283,7 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
250283

251284
// Step 3: Wait for consumption to complete (now that queue is closed)
252285
if (etai.consumptionFuture() != null) {
253-
etai.consumptionFuture().get(CONSUMPTION_COMPLETION_TIMEOUT_SECONDS, java.util.concurrent.TimeUnit.SECONDS);
286+
etai.consumptionFuture().get(consumptionCompletionTimeoutSeconds, java.util.concurrent.TimeUnit.SECONDS);
254287
LOGGER.debug("Consumption completed for task {}", taskId);
255288
}
256289
} catch (InterruptedException e) {

server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,8 @@ public void cancel(RequestContext context, EventQueue eventQueue) throws JSONRPC
9898
PushNotificationConfigStore pushConfigStore = new InMemoryPushNotificationConfigStore();
9999
PushNotificationSender pushSender = new BasePushNotificationSender(pushConfigStore, httpClient);
100100

101-
requestHandler = new DefaultRequestHandler(executor, taskStore, queueManager, pushConfigStore, pushSender, internalExecutor);
101+
requestHandler = DefaultRequestHandler.create(
102+
executor, taskStore, queueManager, pushConfigStore, pushSender, internalExecutor);
102103
}
103104

104105
@AfterEach

server-common/src/test/java/io/a2a/server/requesthandlers/DefaultRequestHandlerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ void setUp() {
5454
queueManager = new InMemoryQueueManager(taskStore);
5555
agentExecutor = new TestAgentExecutor();
5656

57-
requestHandler = new DefaultRequestHandler(
57+
requestHandler = DefaultRequestHandler.create(
5858
agentExecutor,
5959
taskStore,
6060
queueManager,

transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcHandlerTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -280,8 +280,8 @@ public void testOnGetPushNotificationNoPushNotifierConfig() throws Exception {
280280
@Test
281281
public void testOnSetPushNotificationNoPushNotifierConfig() throws Exception {
282282
// Create request handler without a push notifier
283-
DefaultRequestHandler requestHandler =
284-
new DefaultRequestHandler(executor, taskStore, queueManager, null, null, internalExecutor);
283+
DefaultRequestHandler requestHandler = DefaultRequestHandler.create(
284+
executor, taskStore, queueManager, null, null, internalExecutor);
285285
AgentCard card = AbstractA2ARequestHandlerTest.createAgentCard(false, true, false);
286286
GrpcHandler handler = new TestGrpcHandler(card, requestHandler, internalExecutor);
287287
String NAME = "tasks/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId() + "/pushNotificationConfigs/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId();
@@ -655,8 +655,8 @@ public void testListPushNotificationConfigNotSupported() throws Exception {
655655

656656
@Test
657657
public void testListPushNotificationConfigNoPushConfigStore() {
658-
DefaultRequestHandler requestHandler =
659-
new DefaultRequestHandler(executor, taskStore, queueManager, null, null, internalExecutor);
658+
DefaultRequestHandler requestHandler = DefaultRequestHandler.create(
659+
executor, taskStore, queueManager, null, null, internalExecutor);
660660
GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor);
661661
taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK);
662662
agentExecutorExecute = (context, eventQueue) -> {
@@ -728,8 +728,8 @@ public void testDeletePushNotificationConfigNotSupported() throws Exception {
728728

729729
@Test
730730
public void testDeletePushNotificationConfigNoPushConfigStore() {
731-
DefaultRequestHandler requestHandler =
732-
new DefaultRequestHandler(executor, taskStore, queueManager, null, null, internalExecutor);
731+
DefaultRequestHandler requestHandler = DefaultRequestHandler.create(
732+
executor, taskStore, queueManager, null, null, internalExecutor);
733733
GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor);
734734
String NAME = "tasks/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId() + "/pushNotificationConfigs/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId();
735735
DeleteTaskPushNotificationConfigRequest request = DeleteTaskPushNotificationConfigRequest.newBuilder()

transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1112,8 +1112,8 @@ public void testPushNotificationsNotSupportedError() {
11121112
@Test
11131113
public void testOnGetPushNotificationNoPushNotifierConfig() {
11141114
// Create request handler without a push notifier
1115-
DefaultRequestHandler requestHandler =
1116-
new DefaultRequestHandler(executor, taskStore, queueManager, null, null, internalExecutor);
1115+
DefaultRequestHandler requestHandler = DefaultRequestHandler.create(
1116+
executor, taskStore, queueManager, null, null, internalExecutor);
11171117
AgentCard card = createAgentCard(false, true, false);
11181118
JSONRPCHandler handler = new JSONRPCHandler(card, requestHandler, internalExecutor);
11191119

@@ -1131,8 +1131,8 @@ public void testOnGetPushNotificationNoPushNotifierConfig() {
11311131
@Test
11321132
public void testOnSetPushNotificationNoPushNotifierConfig() {
11331133
// Create request handler without a push notifier
1134-
DefaultRequestHandler requestHandler =
1135-
new DefaultRequestHandler(executor, taskStore, queueManager, null, null, internalExecutor);
1134+
DefaultRequestHandler requestHandler = DefaultRequestHandler.create(
1135+
executor, taskStore, queueManager, null, null, internalExecutor);
11361136
AgentCard card = createAgentCard(false, true, false);
11371137
JSONRPCHandler handler = new JSONRPCHandler(card, requestHandler, internalExecutor);
11381138

@@ -1222,8 +1222,8 @@ public void testDefaultRequestHandlerWithCustomComponents() {
12221222

12231223
@Test
12241224
public void testOnMessageSendErrorHandling() {
1225-
DefaultRequestHandler requestHandler =
1226-
new DefaultRequestHandler(executor, taskStore, queueManager, null, null, internalExecutor);
1225+
DefaultRequestHandler requestHandler = DefaultRequestHandler.create(
1226+
executor, taskStore, queueManager, null, null, internalExecutor);
12271227
AgentCard card = createAgentCard(false, true, false);
12281228
JSONRPCHandler handler = new JSONRPCHandler(card, requestHandler, internalExecutor);
12291229

@@ -1377,8 +1377,8 @@ public void testListPushNotificationConfigNotSupported() {
13771377

13781378
@Test
13791379
public void testListPushNotificationConfigNoPushConfigStore() {
1380-
DefaultRequestHandler requestHandler =
1381-
new DefaultRequestHandler(executor, taskStore, queueManager, null, null, internalExecutor);
1380+
DefaultRequestHandler requestHandler = DefaultRequestHandler.create(
1381+
executor, taskStore, queueManager, null, null, internalExecutor);
13821382
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
13831383
taskStore.save(MINIMAL_TASK);
13841384
agentExecutorExecute = (context, eventQueue) -> {
@@ -1469,8 +1469,8 @@ public void testDeletePushNotificationConfigNotSupported() {
14691469

14701470
@Test
14711471
public void testDeletePushNotificationConfigNoPushConfigStore() {
1472-
DefaultRequestHandler requestHandler =
1473-
new DefaultRequestHandler(executor, taskStore, queueManager, null, null, internalExecutor);
1472+
DefaultRequestHandler requestHandler = DefaultRequestHandler.create(
1473+
executor, taskStore, queueManager, null, null, internalExecutor);
14741474
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
14751475
taskStore.save(MINIMAL_TASK);
14761476
agentExecutorExecute = (context, eventQueue) -> {

0 commit comments

Comments
 (0)