Skip to content

Commit 3e93dd2

Browse files
kabirclaude
andcommitted
fix: Remove awaitQueuePollerStart to eliminate thread blocking bottleneck
Problem: - TCK tests timing out in CI with 11 leaked agents (183 registered, 172 removed) - Agent-executor threads blocking up to 10 seconds waiting for queue polling to start - Cascading delays when Vert.x worker threads are busy (11 × 10s = 110s+ wasted time) - Tests timeout at 5 minutes before cleanup can execute Root Cause: Agent-executor threads (limited pool of 15) were blocking in awaitQueuePollerStart() waiting for Vert.x worker threads to become available and start polling queues. This created a resource bottleneck where valuable executor threads sat idle. Solution: Remove synchronous wait and move queue lifecycle entirely to the consumer: - Agent-executor thread: Returns immediately after execution, no blocking - Consumer (Vert.x worker): Handles all queue lifecycle via EventConsumer.consumeAll() - Background cleanup: Manages ChildQueue/MainQueue reference counting Changes: - DefaultRequestHandler.registerAndExecuteAgentAsync(): * Removed awaitQueuePollerStart() call * Removed immediate queue closing in whenComplete() * Added architecture documentation - Queue lifecycle now managed entirely by EventConsumer.consumeAll() which closes queue on final events/errors Benefits: - Eliminates cascading 10-second delays - Better thread utilization (no idle blocking) - Simpler architecture (consumer owns lifecycle) - No race conditions (consumer running before it can close queue) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent 09b1c17 commit 3e93dd2

File tree

2 files changed

+24
-18
lines changed

2 files changed

+24
-18
lines changed

server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,17 @@ private boolean shouldAddPushInfo(MessageSendParams params) {
483483
return pushConfigStore != null && params.configuration() != null && params.configuration().pushNotificationConfig() != null;
484484
}
485485

486+
/**
487+
* Register and execute the agent asynchronously in the agent-executor thread pool.
488+
*
489+
* Queue Lifecycle Architecture:
490+
* - Agent-executor thread: Executes agent and enqueues events, returns immediately
491+
* - Vert.x worker thread (consumer): Polls queue, processes events, closes queue on final event
492+
* - Background cleanup: Manages ChildQueue/MainQueue lifecycle after agent completes
493+
*
494+
* This design avoids blocking agent-executor threads waiting for consumer polling to start,
495+
* eliminating cascading delays when Vert.x worker threads are busy.
496+
*/
486497
private EnhancedRunnable registerAndExecuteAgentAsync(String taskId, RequestContext requestContext, EventQueue queue) {
487498
LOGGER.debug("Registering agent execution for task {}, runningAgents.size() before: {}", taskId, runningAgents.size());
488499
logThreadStats("AGENT START");
@@ -492,11 +503,9 @@ public void run() {
492503
LOGGER.debug("Agent execution starting for task {}", taskId);
493504
agentExecutor.execute(requestContext, queue);
494505
LOGGER.debug("Agent execution completed for task {}", taskId);
495-
try {
496-
queueManager.awaitQueuePollerStart(queue);
497-
} catch (InterruptedException e) {
498-
Thread.currentThread().interrupt();
499-
}
506+
// No longer wait for queue poller to start - the consumer (which is guaranteed
507+
// to be running on the Vert.x worker thread) will handle queue lifecycle.
508+
// This avoids blocking agent-executor threads waiting for worker threads.
500509
}
501510
};
502511

@@ -505,18 +514,11 @@ public void run() {
505514
if (err != null) {
506515
LOGGER.error("Agent execution failed for task {}", taskId, err);
507516
runnable.setError(err);
508-
// Close queue on error
509-
queue.close();
510-
} else {
511-
// Only close queue if task is in a final state
512-
Task task = taskStore.get(taskId);
513-
if (task != null && task.getStatus().state().isFinal()) {
514-
LOGGER.debug("Task {} in final state {}, closing queue", taskId, task.getStatus().state());
515-
queue.close();
516-
} else {
517-
LOGGER.debug("Task {} not in final state or not yet created, keeping queue open", taskId);
518-
}
517+
// Don't close queue here - let the consumer handle it via error callback
518+
// This ensures the consumer (which may not have started polling yet) gets the error
519519
}
520+
// Queue lifecycle is now managed entirely by EventConsumer.consumeAll()
521+
// which closes the queue on final events. No need to close here.
520522
logThreadStats("AGENT COMPLETE END");
521523
runnable.invokeDoneCallbacks();
522524
});

tck/src/main/resources/application.properties

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,12 @@ a2a.executor.core-pool-size=5
88
a2a.executor.max-pool-size=15
99
a2a.executor.keep-alive-seconds=60
1010

11-
# Increase Vert.x worker thread blocking timeout to prevent thread abandonment
12-
# Default is 60s, increase to 300s (5 minutes) to match TCK timeout
11+
# TEMPORARY WORKAROUND: Increase Vert.x worker thread blocking timeout to prevent thread abandonment
12+
# Default is 60s, increased to 300s (5 minutes) to match TCK timeout
13+
# TODO: Remove this workaround once proper fix is implemented:
14+
# - Make EventQueue polling non-blocking with async/reactive patterns
15+
# - Or add timeout to consumeAndBreakOnInterrupt() to return early
16+
# See: Investigation of TCK timeout issues in CI (21 agents leaked due to thread abandonment)
1317
quarkus.vertx.max-worker-execute-time=300s
1418

1519
# Enable debug logging for troubleshooting SSE streaming test timeouts

0 commit comments

Comments
 (0)