2222import com .fasterxml .jackson .core .JsonParseException ;
2323import com .fasterxml .jackson .core .JsonProcessingException ;
2424import com .fasterxml .jackson .core .io .JsonEOFException ;
25+ import com .google .gson .JsonSyntaxException ;
2526import io .a2a .common .A2AHeaders ;
2627import io .a2a .grpc .utils .JSONRPCUtils ;
2728import io .a2a .server .ServerCallContext ;
3132import io .a2a .server .util .async .Internal ;
3233import io .a2a .spec .AgentCard ;
3334import io .a2a .spec .CancelTaskRequest ;
35+ import io .a2a .spec .CancelTaskResponse ;
3436import io .a2a .spec .DeleteTaskPushNotificationConfigRequest ;
37+ import io .a2a .spec .DeleteTaskPushNotificationConfigResponse ;
3538import io .a2a .spec .GetAuthenticatedExtendedCardRequest ;
39+ import io .a2a .spec .GetAuthenticatedExtendedCardResponse ;
3640import io .a2a .spec .GetTaskPushNotificationConfigRequest ;
41+ import io .a2a .spec .GetTaskPushNotificationConfigResponse ;
3742import io .a2a .spec .GetTaskRequest ;
43+ import io .a2a .spec .GetTaskResponse ;
3844import io .a2a .spec .IdJsonMappingException ;
3945import io .a2a .spec .InternalError ;
4046import io .a2a .spec .InvalidParamsError ;
4652import io .a2a .spec .JSONRPCRequest ;
4753import io .a2a .spec .JSONRPCResponse ;
4854import io .a2a .spec .ListTaskPushNotificationConfigRequest ;
55+ import io .a2a .spec .ListTaskPushNotificationConfigResponse ;
4956import io .a2a .spec .ListTasksRequest ;
57+ import io .a2a .spec .ListTasksResponse ;
5058import io .a2a .spec .MethodNotFoundError ;
5159import io .a2a .spec .MethodNotFoundJsonMappingException ;
5260import io .a2a .spec .NonStreamingJSONRPCRequest ;
5361import io .a2a .spec .SendMessageRequest ;
62+ import io .a2a .spec .SendMessageResponse ;
5463import io .a2a .spec .SendStreamingMessageRequest ;
64+ import io .a2a .spec .SendStreamingMessageResponse ;
5565import io .a2a .spec .SetTaskPushNotificationConfigRequest ;
66+ import io .a2a .spec .SetTaskPushNotificationConfigResponse ;
5667import io .a2a .spec .SubscribeToTaskRequest ;
5768import io .a2a .spec .UnsupportedOperationError ;
5869import io .a2a .transport .jsonrpc .handler .JSONRPCHandler ;
@@ -103,16 +114,20 @@ public void invokeJSONRPCHandler(@Body String body, RoutingContext rc) {
103114 streaming = true ;
104115 streamingResponse = processStreamingRequest (request , context );
105116 }
117+ } catch (JSONRPCError e ) {
118+ error = new JSONRPCErrorResponse (e );
106119 } catch (JsonProcessingException e ) {
107120 error = handleError (e );
121+ } catch (JsonSyntaxException e ) {
122+ error = new JSONRPCErrorResponse (new JSONParseError (e .getMessage ()));
108123 } catch (Throwable t ) {
109124 error = new JSONRPCErrorResponse (new InternalError (t .getMessage ()));
110125 } finally {
111126 if (error != null ) {
112127 rc .response ()
113128 .setStatusCode (200 )
114129 .putHeader (CONTENT_TYPE , APPLICATION_JSON )
115- .end (Utils . toJsonString (error ));
130+ .end (serializeResponse (error ));
116131 } else if (streaming ) {
117132 final Multi <? extends JSONRPCResponse <?>> finalStreamingResponse = streamingResponse ;
118133 executor .execute (() -> {
@@ -124,7 +139,7 @@ public void invokeJSONRPCHandler(@Body String body, RoutingContext rc) {
124139 rc .response ()
125140 .setStatusCode (200 )
126141 .putHeader (CONTENT_TYPE , APPLICATION_JSON )
127- .end (Utils . toJsonString (nonStreamingResponse ));
142+ .end (serializeResponse (nonStreamingResponse ));
128143 }
129144 }
130145 }
@@ -163,29 +178,35 @@ public AgentCard getAgentCard() {
163178 return jsonRpcHandler .getAgentCard ();
164179 }
165180
166- private JSONRPCResponse <?> processNonStreamingRequest (
167- NonStreamingJSONRPCRequest <?> request , ServerCallContext context ) {
181+ private JSONRPCResponse <?> processNonStreamingRequest (NonStreamingJSONRPCRequest <?> request , ServerCallContext context ) {
168182 if (request instanceof GetTaskRequest req ) {
169183 return jsonRpcHandler .onGetTask (req , context );
170- } else if (request instanceof CancelTaskRequest req ) {
184+ }
185+ if (request instanceof CancelTaskRequest req ) {
171186 return jsonRpcHandler .onCancelTask (req , context );
172- } else if (request instanceof ListTasksRequest req ) {
187+ }
188+ if (request instanceof ListTasksRequest req ) {
173189 return jsonRpcHandler .onListTasks (req , context );
174- } else if (request instanceof SetTaskPushNotificationConfigRequest req ) {
190+ }
191+ if (request instanceof SetTaskPushNotificationConfigRequest req ) {
175192 return jsonRpcHandler .setPushNotificationConfig (req , context );
176- } else if (request instanceof GetTaskPushNotificationConfigRequest req ) {
193+ }
194+ if (request instanceof GetTaskPushNotificationConfigRequest req ) {
177195 return jsonRpcHandler .getPushNotificationConfig (req , context );
178- } else if (request instanceof SendMessageRequest req ) {
196+ }
197+ if (request instanceof SendMessageRequest req ) {
179198 return jsonRpcHandler .onMessageSend (req , context );
180- } else if (request instanceof ListTaskPushNotificationConfigRequest req ) {
199+ }
200+ if (request instanceof ListTaskPushNotificationConfigRequest req ) {
181201 return jsonRpcHandler .listPushNotificationConfig (req , context );
182- } else if (request instanceof DeleteTaskPushNotificationConfigRequest req ) {
202+ }
203+ if (request instanceof DeleteTaskPushNotificationConfigRequest req ) {
183204 return jsonRpcHandler .deletePushNotificationConfig (req , context );
184- } else if (request instanceof GetAuthenticatedExtendedCardRequest req ) {
205+ }
206+ if (request instanceof GetAuthenticatedExtendedCardRequest req ) {
185207 return jsonRpcHandler .onGetAuthenticatedExtendedCardRequest (req , context );
186- } else {
187- return generateErrorResponse (request , new UnsupportedOperationError ());
188208 }
209+ return generateErrorResponse (request , new UnsupportedOperationError ());
189210 }
190211
191212 private Multi <? extends JSONRPCResponse <?>> processStreamingRequest (
@@ -249,6 +270,51 @@ public String getUsername() {
249270 }
250271 }
251272
273+ private static String serializeResponse (JSONRPCResponse <?> response ) {
274+ // For error responses, use Jackson serialization (errors are standardized)
275+ if (response instanceof JSONRPCErrorResponse error ) {
276+ return JSONRPCUtils .toJsonRPCErrorResponse (error .getId (), error .getError ());
277+ }
278+ if (response .getError () != null ) {
279+ System .out .println ("------------------------------------------------------------" );
280+ System .out .println ("We have an error " + response .getError ());
281+ System .out .println ("------------------------------------------------------------" );
282+ return JSONRPCUtils .toJsonRPCErrorResponse (response .getId (), response .getError ());
283+ }
284+
285+ // Convert domain response to protobuf message and serialize
286+ com .google .protobuf .MessageOrBuilder protoMessage = convertToProto (response );
287+ return JSONRPCUtils .toJsonRPCResultResponse (response .getId (), protoMessage
288+ );
289+ }
290+
291+ private static com .google .protobuf .MessageOrBuilder convertToProto (JSONRPCResponse <?> response ) {
292+ if (response instanceof GetTaskResponse r ) {
293+ return io .a2a .grpc .utils .ProtoUtils .ToProto .task (r .getResult ());
294+ } else if (response instanceof CancelTaskResponse r ) {
295+ return io .a2a .grpc .utils .ProtoUtils .ToProto .task (r .getResult ());
296+ } else if (response instanceof SendMessageResponse r ) {
297+ return io .a2a .grpc .utils .ProtoUtils .ToProto .taskOrMessage (r .getResult ());
298+ } else if (response instanceof ListTasksResponse r ) {
299+ return io .a2a .grpc .utils .ProtoUtils .ToProto .listTasksResult (r .getResult ());
300+ } else if (response instanceof SetTaskPushNotificationConfigResponse r ) {
301+ return io .a2a .grpc .utils .ProtoUtils .ToProto .setTaskPushNotificationConfigResponse (r .getResult ());
302+ } else if (response instanceof GetTaskPushNotificationConfigResponse r ) {
303+ return io .a2a .grpc .utils .ProtoUtils .ToProto .getTaskPushNotificationConfigResponse (r .getResult ());
304+ } else if (response instanceof ListTaskPushNotificationConfigResponse r ) {
305+ return io .a2a .grpc .utils .ProtoUtils .ToProto .listTaskPushNotificationConfigResponse (r .getResult ());
306+ } else if (response instanceof DeleteTaskPushNotificationConfigResponse ) {
307+ // DeleteTaskPushNotificationConfig has no result body, just return empty message
308+ return com .google .protobuf .Empty .getDefaultInstance ();
309+ } else if (response instanceof GetAuthenticatedExtendedCardResponse r ) {
310+ return io .a2a .grpc .utils .ProtoUtils .ToProto .getAuthenticatedExtendedCardResponse (r .getResult ());
311+ } else if (response instanceof SendStreamingMessageResponse r ) {
312+ return io .a2a .grpc .utils .ProtoUtils .ToProto .taskOrMessageStream (r .getResult ());
313+ } else {
314+ throw new IllegalArgumentException ("Unknown response type: " + response .getClass ().getName ());
315+ }
316+ }
317+
252318 // Port of import io.quarkus.vertx.web.runtime.MultiSseSupport, which is considered internal API
253319 private static class MultiSseSupport {
254320
@@ -323,9 +389,15 @@ public Buffer apply(Object o) {
323389 ReactiveRoutes .ServerSentEvent <?> ev = (ReactiveRoutes .ServerSentEvent <?>) o ;
324390 long id = ev .id () != -1 ? ev .id () : count .getAndIncrement ();
325391 String e = ev .event () == null ? "" : "event: " + ev .event () + "\n " ;
326- return Buffer .buffer (e + "data: " + Utils .toJsonString (ev .data ()) + "\n id: " + id + "\n \n " );
392+ String data = ev .data () instanceof JSONRPCResponse
393+ ? serializeResponse ((JSONRPCResponse <?>) ev .data ())
394+ : Utils .toJsonString (ev .data ());
395+ return Buffer .buffer (e + "data: " + data + "\n id: " + id + "\n \n " );
327396 }
328- return Buffer .buffer ("data: " + Utils .toJsonString (o ) + "\n id: " + count .getAndIncrement () + "\n \n " );
397+ String data = o instanceof JSONRPCResponse
398+ ? serializeResponse ((JSONRPCResponse <?>) o )
399+ : Utils .toJsonString (o );
400+ return Buffer .buffer ("data: " + data + "\n id: " + count .getAndIncrement () + "\n \n " );
329401 }
330402 }), rc );
331403 }
0 commit comments