Skip to content

Commit a7bb042

Browse files
kabirclaude
andcommitted
fix: Use @internal Executor instead of ForkJoinPool for event consumption
Problem: - ResultAggregator.consumeAndBreakOnInterrupt() used CompletableFuture.runAsync() without executor parameter - Defaulted to ForkJoinPool.commonPool (only 3 threads on CI) - During concurrent request bursts (6+ simultaneous), ForkJoinPool saturates - Some consumer subscriptions never scheduled, Vert.x threads block forever - Agents accumulate in runningAgents map, never cleaned up Root Cause: - ForkJoinPool.commonPool designed for CPU-bound tasks - Our queue polling loop blocks for I/O (queue.dequeueEvent(500ms)) - Blocking I/O saturates limited ForkJoinPool workers - Subsequent tasks queued but never execute Solution: - Pass @internal Executor to ResultAggregator constructor - Use executor in runAsync() call instead of default ForkJoinPool - @internal executor configured with 15 threads for I/O-bound operations - Can handle concurrent request bursts without saturation Evidence: - Run 2/3 logs: Agents accumulate during 6-request burst at 30s intervals - Thread dumps: Vert.x workers blocked at completionFuture.join() - ForkJoinPool stops after initial requests, subsequent tasks never poll queues - Detailed investigation in claudedocs/TCK_TIMEOUT_INVESTIGATION.md Related: - Issue #248 - Commit 3e93dd2 (removed awaitQueuePollerStart workaround) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent 6572cdb commit a7bb042

File tree

3 files changed

+18
-11
lines changed

3 files changed

+18
-11
lines changed

server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ public Task onCancelTask(TaskIdParams params, ServerCallContext context) throws
138138
taskStore,
139139
null);
140140

141-
ResultAggregator resultAggregator = new ResultAggregator(taskManager, null);
141+
ResultAggregator resultAggregator = new ResultAggregator(taskManager, null, executor);
142142

143143
EventQueue queue = queueManager.tap(task.getId());
144144
if (queue == null) {
@@ -180,7 +180,7 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
180180
LOGGER.debug("Request context taskId: {}", taskId);
181181

182182
EventQueue queue = queueManager.createOrTap(taskId);
183-
ResultAggregator resultAggregator = new ResultAggregator(mss.taskManager, null);
183+
ResultAggregator resultAggregator = new ResultAggregator(mss.taskManager, null, executor);
184184

185185
boolean blocking = true; // Default to blocking behavior
186186
if (params.configuration() != null && Boolean.FALSE.equals(params.configuration().blocking())) {
@@ -239,7 +239,7 @@ public Flow.Publisher<StreamingEventKind> onMessageSendStream(
239239
AtomicReference<String> taskId = new AtomicReference<>(mss.requestContext.getTaskId());
240240
EventQueue queue = queueManager.createOrTap(taskId.get());
241241
LOGGER.debug("Created/tapped queue for task {}: {}", taskId.get(), queue);
242-
ResultAggregator resultAggregator = new ResultAggregator(mss.taskManager, null);
242+
ResultAggregator resultAggregator = new ResultAggregator(mss.taskManager, null, executor);
243243

244244
EnhancedRunnable producerRunnable = registerAndExecuteAgentAsync(taskId.get(), mss.requestContext, queue);
245245

@@ -422,7 +422,7 @@ public Flow.Publisher<StreamingEventKind> onResubscribeToTask(
422422
}
423423

424424
TaskManager taskManager = new TaskManager(task.getId(), task.getContextId(), taskStore, null);
425-
ResultAggregator resultAggregator = new ResultAggregator(taskManager, null);
425+
ResultAggregator resultAggregator = new ResultAggregator(taskManager, null, executor);
426426
EventQueue queue = queueManager.tap(task.getId());
427427

428428
if (queue == null) {

server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import java.util.concurrent.CompletableFuture;
88
import java.util.concurrent.CompletionException;
9+
import java.util.concurrent.Executor;
910
import java.util.concurrent.Flow;
1011
import java.util.concurrent.atomic.AtomicBoolean;
1112
import java.util.concurrent.atomic.AtomicReference;
@@ -23,11 +24,13 @@
2324

2425
public class ResultAggregator {
2526
private final TaskManager taskManager;
27+
private final Executor executor;
2628
private volatile Message message;
2729

28-
public ResultAggregator(TaskManager taskManager, Message message) {
30+
public ResultAggregator(TaskManager taskManager, Message message, Executor executor) {
2931
this.taskManager = taskManager;
3032
this.message = message;
33+
this.executor = executor;
3134
}
3235

3336
public EventKind getCurrentResult() {
@@ -102,6 +105,8 @@ public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer,
102105
// CRITICAL: The subscription itself must run on a background thread to avoid blocking
103106
// the Vert.x worker thread. EventConsumer.consumeAll() starts a polling loop that
104107
// blocks in dequeueEvent(), so we must subscribe from a background thread.
108+
// Use the @Internal executor (not ForkJoinPool.commonPool) to avoid saturation
109+
// during concurrent request bursts.
105110
CompletableFuture.runAsync(() -> {
106111
consumer(
107112
createTubeConfig(),
@@ -195,7 +200,7 @@ else if (!blocking) {
195200
}
196201
}
197202
);
198-
});
203+
}, executor);
199204

200205
// Wait for completion or interruption
201206
try {

server-common/src/test/java/io/a2a/server/tasks/ResultAggregatorTest.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import static org.mockito.Mockito.when;
1111

1212
import java.util.Collections;
13+
import java.util.concurrent.Executor;
1314

1415
import io.a2a.server.events.EventConsumer;
1516
import io.a2a.server.events.EventQueue;
@@ -39,11 +40,12 @@ public class ResultAggregatorTest {
3940
private TaskManager mockTaskManager;
4041

4142
private ResultAggregator aggregator;
43+
private final Executor directExecutor = Runnable::run;
4244

4345
@BeforeEach
4446
void setUp() {
4547
MockitoAnnotations.openMocks(this);
46-
aggregator = new ResultAggregator(mockTaskManager, null);
48+
aggregator = new ResultAggregator(mockTaskManager, null, directExecutor);
4749
}
4850

4951
// Helper methods for creating sample data
@@ -69,7 +71,7 @@ private Task createSampleTask(String taskId, TaskState statusState, String conte
6971
@Test
7072
void testConstructorWithMessage() {
7173
Message initialMessage = createSampleMessage("initial", "msg1", Message.Role.USER);
72-
ResultAggregator aggregatorWithMessage = new ResultAggregator(mockTaskManager, initialMessage);
74+
ResultAggregator aggregatorWithMessage = new ResultAggregator(mockTaskManager, initialMessage, directExecutor);
7375

7476
// Test that the message is properly stored by checking getCurrentResult
7577
assertEquals(initialMessage, aggregatorWithMessage.getCurrentResult());
@@ -80,7 +82,7 @@ void testConstructorWithMessage() {
8082
@Test
8183
void testGetCurrentResultWithMessageSet() {
8284
Message sampleMessage = createSampleMessage("hola", "msg1", Message.Role.USER);
83-
ResultAggregator aggregatorWithMessage = new ResultAggregator(mockTaskManager, sampleMessage);
85+
ResultAggregator aggregatorWithMessage = new ResultAggregator(mockTaskManager, sampleMessage, directExecutor);
8486

8587
EventKind result = aggregatorWithMessage.getCurrentResult();
8688

@@ -115,7 +117,7 @@ void testConstructorStoresTaskManagerCorrectly() {
115117

116118
@Test
117119
void testConstructorWithNullMessage() {
118-
ResultAggregator aggregatorWithNullMessage = new ResultAggregator(mockTaskManager, null);
120+
ResultAggregator aggregatorWithNullMessage = new ResultAggregator(mockTaskManager, null, directExecutor);
119121
Task expectedTask = createSampleTask("null_msg_task", TaskState.WORKING, "ctx1");
120122
when(mockTaskManager.getTask()).thenReturn(expectedTask);
121123

@@ -175,7 +177,7 @@ void testMultipleGetCurrentResultCalls() {
175177
void testGetCurrentResultWithMessageTakesPrecedence() {
176178
// Test that when both message and task are available, message takes precedence
177179
Message message = createSampleMessage("priority message", "pri1", Message.Role.USER);
178-
ResultAggregator messageAggregator = new ResultAggregator(mockTaskManager, message);
180+
ResultAggregator messageAggregator = new ResultAggregator(mockTaskManager, message, directExecutor);
179181

180182
// Even if we set up the task manager to return something, message should take precedence
181183
Task task = createSampleTask("should_not_be_returned", TaskState.WORKING, "ctx1");

0 commit comments

Comments
 (0)