Skip to content

Commit f43ef4f

Browse files
kabirclaude
andcommitted
fix: Use @internal executor for streaming subscriptions to prevent ForkJoinPool saturation
Root cause: Transport handlers used CompletableFuture.runAsync() without executor parameter, defaulting to ForkJoinPool.commonPool (3 threads on CI). During concurrent test execution, pool saturation prevented streaming subscriptions from starting, causing SSE tests to timeout waiting for HTTP headers. Changes: - RestHandler: Add @internal Executor injection, use in runAsync() - GrpcHandler: Add abstract getExecutor(), use in convertToStreamResponse() - QuarkusGrpcHandler: Implement getExecutor() with @internal Executor - JSONRPCHandler: Add @internal Executor injection, use in convertToSendStreamingMessageResponse() - All test classes: Update handler instantiations to pass internalExecutor Fixes SSE streaming test timeouts (test_sse_header_compliance, test_sse_event_format_compliance) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent 97a28b8 commit f43ef4f

File tree

7 files changed

+132
-100
lines changed

7 files changed

+132
-100
lines changed

reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/QuarkusGrpcHandler.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@
33
import jakarta.enterprise.inject.Instance;
44
import jakarta.inject.Inject;
55

6+
import java.util.concurrent.Executor;
7+
68
import io.a2a.server.PublicAgentCard;
79
import io.a2a.server.requesthandlers.RequestHandler;
10+
import io.a2a.server.util.async.Internal;
811
import io.a2a.spec.AgentCard;
912
import io.a2a.transport.grpc.handler.CallContextFactory;
1013
import io.a2a.transport.grpc.handler.GrpcHandler;
@@ -20,14 +23,17 @@ public class QuarkusGrpcHandler extends GrpcHandler {
2023
private final AgentCard agentCard;
2124
private final RequestHandler requestHandler;
2225
private final Instance<CallContextFactory> callContextFactoryInstance;
26+
private final Executor executor;
2327

2428
@Inject
2529
public QuarkusGrpcHandler(@PublicAgentCard AgentCard agentCard,
2630
RequestHandler requestHandler,
27-
Instance<CallContextFactory> callContextFactoryInstance) {
31+
Instance<CallContextFactory> callContextFactoryInstance,
32+
@Internal Executor executor) {
2833
this.agentCard = agentCard;
2934
this.requestHandler = requestHandler;
3035
this.callContextFactoryInstance = callContextFactoryInstance;
36+
this.executor = executor;
3137
}
3238

3339
@Override
@@ -44,4 +50,9 @@ protected AgentCard getAgentCard() {
4450
protected CallContextFactory getCallContextFactory() {
4551
return callContextFactoryInstance.isUnsatisfied() ? null : callContextFactoryInstance.get();
4652
}
53+
54+
@Override
55+
protected Executor getExecutor() {
56+
return executor;
57+
}
4758
}

transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcHandler.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import java.util.Map;
1212
import java.util.Set;
1313
import java.util.concurrent.CompletableFuture;
14+
import java.util.concurrent.Executor;
1415

1516
import io.grpc.Context;
1617
import java.util.concurrent.Flow;
@@ -294,7 +295,7 @@ public void onComplete() {
294295
responseObserver.onCompleted();
295296
}
296297
});
297-
});
298+
}, getExecutor());
298299
}
299300

300301
@Override
@@ -504,6 +505,8 @@ public static void setStreamingSubscribedRunnable(Runnable runnable) {
504505

505506
protected abstract CallContextFactory getCallContextFactory();
506507

508+
protected abstract Executor getExecutor();
509+
507510
/**
508511
* Attempts to extract the X-A2A-Extensions header from the current gRPC context.
509512
* This will only work if a server interceptor has been configured to capture

transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcHandlerTest.java

Lines changed: 39 additions & 32 deletions
Large diffs are not rendered by default.

transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandler.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import java.util.List;
99
import java.util.concurrent.CompletableFuture;
10+
import java.util.concurrent.Executor;
1011
import java.util.concurrent.Flow;
1112

1213
import io.a2a.server.AgentCardValidator;
@@ -44,6 +45,7 @@
4445
import io.a2a.spec.TaskNotFoundError;
4546
import io.a2a.spec.TaskPushNotificationConfig;
4647
import io.a2a.spec.TaskResubscriptionRequest;
48+
import io.a2a.server.util.async.Internal;
4749
import mutiny.zero.ZeroPublisher;
4850

4951
@ApplicationScoped
@@ -52,23 +54,26 @@ public class JSONRPCHandler {
5254
private AgentCard agentCard;
5355
private Instance<AgentCard> extendedAgentCard;
5456
private RequestHandler requestHandler;
57+
private final Executor executor;
5558

5659
protected JSONRPCHandler() {
60+
this.executor = null;
5761
}
5862

5963
@Inject
6064
public JSONRPCHandler(@PublicAgentCard AgentCard agentCard, @ExtendedAgentCard Instance<AgentCard> extendedAgentCard,
61-
RequestHandler requestHandler) {
65+
RequestHandler requestHandler, @Internal Executor executor) {
6266
this.agentCard = agentCard;
6367
this.extendedAgentCard = extendedAgentCard;
6468
this.requestHandler = requestHandler;
65-
69+
this.executor = executor;
70+
6671
// Validate transport configuration
6772
AgentCardValidator.validateTransportConfiguration(agentCard);
6873
}
6974

70-
public JSONRPCHandler(@PublicAgentCard AgentCard agentCard, RequestHandler requestHandler) {
71-
this(agentCard, null, requestHandler);
75+
public JSONRPCHandler(@PublicAgentCard AgentCard agentCard, RequestHandler requestHandler, Executor executor) {
76+
this(agentCard, null, requestHandler, executor);
7277
}
7378

7479
public SendMessageResponse onMessageSend(SendMessageRequest request, ServerCallContext context) {
@@ -278,7 +283,7 @@ public void onComplete() {
278283
tube.complete();
279284
}
280285
});
281-
});
286+
}, executor);
282287
});
283288
}
284289
}

0 commit comments

Comments
 (0)