Skip to content

Commit a3f2721

Browse files
kabirclaude
andcommitted
fix: Use thread pool executor in ResultAggregatorTest instead of direct executor
Problem: - Tests used directExecutor (Runnable::run) which executes tasks synchronously - When ResultAggregator calls CompletableFuture.runAsync(task, directExecutor), the task runs on the calling thread instead of a background thread - In SSE streaming tests on REST transport, this blocks the Vert.x worker thread - Client times out waiting for response headers (httpx.ReadTimeout) Root Cause: - directExecutor executes runAsync() tasks synchronously on calling thread - This defeats the purpose of using runAsync() for non-blocking operation - Real production code uses @internal Executor (thread pool) which works correctly Solution: - Use Executors.newCachedThreadPool() in tests instead of directExecutor - Now runAsync() properly schedules work on background threads - Vert.x worker threads don't block during consumer subscription Evidence: - test_sse_header_compliance and test_sse_event_format_compliance timed out - Only on REST transport in CI (works locally, works on JSON-RPC/gRPC) - Server logs show onMessageSendStream completing but client never receives response 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent a7bb042 commit a3f2721

File tree

1 file changed

+9
-6
lines changed

1 file changed

+9
-6
lines changed

server-common/src/test/java/io/a2a/server/tasks/ResultAggregatorTest.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import java.util.Collections;
1313
import java.util.concurrent.Executor;
14+
import java.util.concurrent.Executors;
1415

1516
import io.a2a.server.events.EventConsumer;
1617
import io.a2a.server.events.EventQueue;
@@ -40,12 +41,14 @@ public class ResultAggregatorTest {
4041
private TaskManager mockTaskManager;
4142

4243
private ResultAggregator aggregator;
43-
private final Executor directExecutor = Runnable::run;
44+
// Use a real thread pool executor instead of direct executor
45+
// to avoid blocking the calling thread during async operations
46+
private final Executor testExecutor = Executors.newCachedThreadPool();
4447

4548
@BeforeEach
4649
void setUp() {
4750
MockitoAnnotations.openMocks(this);
48-
aggregator = new ResultAggregator(mockTaskManager, null, directExecutor);
51+
aggregator = new ResultAggregator(mockTaskManager, null, testExecutor);
4952
}
5053

5154
// Helper methods for creating sample data
@@ -71,7 +74,7 @@ private Task createSampleTask(String taskId, TaskState statusState, String conte
7174
@Test
7275
void testConstructorWithMessage() {
7376
Message initialMessage = createSampleMessage("initial", "msg1", Message.Role.USER);
74-
ResultAggregator aggregatorWithMessage = new ResultAggregator(mockTaskManager, initialMessage, directExecutor);
77+
ResultAggregator aggregatorWithMessage = new ResultAggregator(mockTaskManager, initialMessage, testExecutor);
7578

7679
// Test that the message is properly stored by checking getCurrentResult
7780
assertEquals(initialMessage, aggregatorWithMessage.getCurrentResult());
@@ -82,7 +85,7 @@ void testConstructorWithMessage() {
8285
@Test
8386
void testGetCurrentResultWithMessageSet() {
8487
Message sampleMessage = createSampleMessage("hola", "msg1", Message.Role.USER);
85-
ResultAggregator aggregatorWithMessage = new ResultAggregator(mockTaskManager, sampleMessage, directExecutor);
88+
ResultAggregator aggregatorWithMessage = new ResultAggregator(mockTaskManager, sampleMessage, testExecutor);
8689

8790
EventKind result = aggregatorWithMessage.getCurrentResult();
8891

@@ -117,7 +120,7 @@ void testConstructorStoresTaskManagerCorrectly() {
117120

118121
@Test
119122
void testConstructorWithNullMessage() {
120-
ResultAggregator aggregatorWithNullMessage = new ResultAggregator(mockTaskManager, null, directExecutor);
123+
ResultAggregator aggregatorWithNullMessage = new ResultAggregator(mockTaskManager, null, testExecutor);
121124
Task expectedTask = createSampleTask("null_msg_task", TaskState.WORKING, "ctx1");
122125
when(mockTaskManager.getTask()).thenReturn(expectedTask);
123126

@@ -177,7 +180,7 @@ void testMultipleGetCurrentResultCalls() {
177180
void testGetCurrentResultWithMessageTakesPrecedence() {
178181
// Test that when both message and task are available, message takes precedence
179182
Message message = createSampleMessage("priority message", "pri1", Message.Role.USER);
180-
ResultAggregator messageAggregator = new ResultAggregator(mockTaskManager, message, directExecutor);
183+
ResultAggregator messageAggregator = new ResultAggregator(mockTaskManager, message, testExecutor);
181184

182185
// Even if we set up the task manager to return something, message should take precedence
183186
Task task = createSampleTask("should_not_be_returned", TaskState.WORKING, "ctx1");

0 commit comments

Comments
 (0)