Skip to content

Commit 39edeca

Browse files
committed
Try to handle the MultiSseSupport async
1 parent da3c2ad commit 39edeca

File tree

1 file changed

+10
-1
lines changed

1 file changed

+10
-1
lines changed

quarkus-sdk/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import static io.vertx.core.http.HttpHeaders.CONTENT_TYPE;
44
import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON;
55

6+
import java.util.concurrent.Executor;
7+
import java.util.concurrent.Executors;
68
import java.util.concurrent.Flow;
79
import java.util.concurrent.atomic.AtomicLong;
810
import java.util.function.Function;
@@ -66,6 +68,8 @@ public class A2AServerRoutes {
6668
@ExtendedAgentCard
6769
Instance<AgentCard> extendedAgentCard;
6870

71+
private final Executor executor = Executors.newCachedThreadPool();
72+
6973
@Route(path = "/", methods = {Route.HttpMethod.POST}, consumes = {APPLICATION_JSON}, type = Route.HandlerType.BLOCKING)
7074
public void invokeJSONRPCHandler(@Body String body, RoutingContext rc) {
7175
boolean streaming = false;
@@ -93,7 +97,12 @@ public void invokeJSONRPCHandler(@Body String body, RoutingContext rc) {
9397
.putHeader(CONTENT_TYPE, APPLICATION_JSON)
9498
.end(Json.encodeToBuffer(error));
9599
} else if (streaming) {
96-
MultiSseSupport.subscribeObject(streamingResponse.map(i -> (Object)i), rc);
100+
final Multi<? extends JSONRPCResponse<?>> finalStreamingResponse = streamingResponse;
101+
executor.execute(() -> {
102+
MultiSseSupport.subscribeObject(
103+
finalStreamingResponse.map(i -> (Object)i), rc);
104+
});
105+
97106
} else {
98107
rc.response()
99108
.setStatusCode(200)

0 commit comments

Comments
 (0)