1- package io .a2a .server .apps ;
1+ package io .a2a .server .apps . jakarta ;
22
3+ import java .util .concurrent .Executor ;
4+ import java .util .concurrent .Executors ;
35import java .util .concurrent .Flow ;
46
57import jakarta .enterprise .inject .Instance ;
@@ -56,6 +58,12 @@ public class A2AServerResource {
5658 @ ExtendedAgentCard
5759 Instance <AgentCard > extendedAgentCard ;
5860
61+ // Hook so testing can wait until the async Subscription is subscribed.
62+ private static volatile Runnable streamingIsSubscribedRunnable ;
63+
64+
65+ private final Executor executor = Executors .newCachedThreadPool ();
66+
5967 /**
6068 * Handles incoming POST requests to the main A2A endpoint. Dispatches the
6169 * request to the appropriate JSON-RPC handler method and returns the response.
@@ -78,7 +86,9 @@ public JSONRPCResponse<?> handleNonStreamingRequests(NonStreamingJSONRPCRequest<
7886 @ Consumes (MediaType .APPLICATION_JSON )
7987 @ Produces (MediaType .SERVER_SENT_EVENTS )
8088 public void handleStreamingRequests (StreamingJSONRPCRequest <?> request , @ Context SseEventSink sseEventSink , @ Context Sse sse ) {
81- processStreamingRequest (request , sseEventSink , sse );
89+ System .out .println ("=====> Streaming" );
90+ executor .execute (() -> processStreamingRequest (request , sseEventSink , sse ));
91+ System .out .println ("=====> Streaming - done" );
8292 }
8393
8494 /**
@@ -156,10 +166,17 @@ private void handleStreamingResponse(Flow.Publisher<? extends JSONRPCResponse<?>
156166 public void onSubscribe (Flow .Subscription subscription ) {
157167 this .subscription = subscription ;
158168 subscription .request (Long .MAX_VALUE );
169+ System .out .println ("SUBSCRIBING!" );
170+ // Notify tests that we are subscribed
171+ Runnable runnable = streamingIsSubscribedRunnable ;
172+ if (runnable != null ) {
173+ runnable .run ();
174+ }
159175 }
160176
161177 @ Override
162178 public void onNext (JSONRPCResponse <?> item ) {
179+
163180 sseEventSink .send (sse .newEventBuilder ()
164181 .mediaType (MediaType .APPLICATION_JSON_TYPE )
165182 .data (item )
@@ -183,6 +200,10 @@ private JSONRPCResponse<?> generateErrorResponse(JSONRPCRequest<?> request, JSON
183200 return new JSONRPCErrorResponse (request .getId (), error );
184201 }
185202
203+ static void setStreamingIsSubscribedRunnable (Runnable streamingIsSubscribedRunnable ) {
204+ A2AServerResource .streamingIsSubscribedRunnable = streamingIsSubscribedRunnable ;
205+ }
206+
186207 @ Provider
187208 public class JsonParseExceptionMapper implements ExceptionMapper <JsonParseException > {
188209
0 commit comments