Skip to content

Commit fb6af2a

Browse files
committed
WIP: converting json-rpc to the new payload format
Signed-off-by: Emmanuel Hugonnet <[email protected]>
1 parent 7640e16 commit fb6af2a

File tree

20 files changed

+275
-69
lines changed

20 files changed

+275
-69
lines changed

client/transport/grpc/src/main/java/io/a2a/client/transport/grpc/GrpcTransport.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import io.a2a.spec.TaskIdParams;
4545
import io.a2a.spec.TaskPushNotificationConfig;
4646
import io.a2a.spec.TaskQueryParams;
47-
import io.a2a.spec.TaskResubscriptionRequest;
4847
import io.grpc.Channel;
4948
import io.grpc.Metadata;
5049
import io.grpc.StatusRuntimeException;
@@ -295,7 +294,7 @@ public void resubscribe(TaskIdParams request, Consumer<StreamingEventKind> event
295294
SubscribeToTaskRequest grpcRequest = SubscribeToTaskRequest.newBuilder()
296295
.setName("tasks/" + request.id())
297296
.build();
298-
PayloadAndHeaders payloadAndHeaders = applyInterceptors(TaskResubscriptionRequest.METHOD,
297+
PayloadAndHeaders payloadAndHeaders = applyInterceptors(io.a2a.spec.SubscribeToTaskRequest.METHOD,
299298
grpcRequest, agentCard, context);
300299

301300
StreamObserver<StreamResponse> streamObserver = new EventStreamObserver(eventConsumer, errorConsumer);

client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransport.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
import io.a2a.spec.TaskIdParams;
5858
import io.a2a.spec.TaskPushNotificationConfig;
5959
import io.a2a.spec.TaskQueryParams;
60-
import io.a2a.spec.TaskResubscriptionRequest;
60+
import io.a2a.spec.SubscribeToTaskRequest;
6161
import io.a2a.client.transport.jsonrpc.sse.SSEEventListener;
6262
import java.util.concurrent.CompletableFuture;
6363
import java.util.concurrent.atomic.AtomicReference;
@@ -328,13 +328,13 @@ public void resubscribe(TaskIdParams request, Consumer<StreamingEventKind> event
328328
checkNotNullParam("request", request);
329329
checkNotNullParam("eventConsumer", eventConsumer);
330330
checkNotNullParam("errorConsumer", errorConsumer);
331-
TaskResubscriptionRequest taskResubscriptionRequest = new TaskResubscriptionRequest.Builder()
331+
SubscribeToTaskRequest taskResubscriptionRequest = new SubscribeToTaskRequest.Builder()
332332
.jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
333-
.method(TaskResubscriptionRequest.METHOD)
333+
.method(SubscribeToTaskRequest.METHOD)
334334
.params(request)
335335
.build(); // id will be randomly generated
336336

337-
PayloadAndHeaders payloadAndHeaders = applyInterceptors(TaskResubscriptionRequest.METHOD,
337+
PayloadAndHeaders payloadAndHeaders = applyInterceptors(SubscribeToTaskRequest.METHOD,
338338
taskResubscriptionRequest, agentCard, context);
339339

340340
AtomicReference<CompletableFuture<Void>> ref = new AtomicReference<>();

client/transport/jsonrpc/src/test/java/io/a2a/client/transport/jsonrpc/JsonStreamingMessages.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ public class JsonStreamingMessages {
106106
public static final String SEND_MESSAGE_STREAMING_TEST_REQUEST = """
107107
{
108108
"jsonrpc": "2.0",
109-
"method": "message/stream",
109+
"method": SendStreamingMessage,
110110
"params": {
111111
"message": {
112112
"role": "user",
@@ -138,7 +138,7 @@ public class JsonStreamingMessages {
138138
public static final String TASK_RESUBSCRIPTION_TEST_REQUEST = """
139139
{
140140
"jsonrpc": "2.0",
141-
"method": "tasks/resubscribe",
141+
"method": "SubscribeToTask",
142142
"params": {
143143
"id": "task-1234"
144144
}

client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ public void resubscribe(TaskIdParams request, Consumer<StreamingEventKind> event
371371
checkNotNullParam("request", request);
372372
io.a2a.grpc.SubscribeToTaskRequest.Builder builder = io.a2a.grpc.SubscribeToTaskRequest.newBuilder();
373373
builder.setName("tasks/" + request.id());
374-
PayloadAndHeaders payloadAndHeaders = applyInterceptors(io.a2a.spec.TaskResubscriptionRequest.METHOD, builder,
374+
PayloadAndHeaders payloadAndHeaders = applyInterceptors(io.a2a.spec.SubscribeToTaskRequest.METHOD, builder,
375375
agentCard, context);
376376
AtomicReference<CompletableFuture<Void>> ref = new AtomicReference<>();
377377
RestSSEEventListener sseEventListener = new RestSSEEventListener(eventConsumer, errorConsumer);

client/transport/rest/src/test/java/io/a2a/client/transport/rest/JsonRestMessages.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -646,7 +646,7 @@ public class JsonRestMessages {
646646
public static final String TASK_RESUBSCRIPTION_TEST_REQUEST = """
647647
{
648648
"jsonrpc": "2.0",
649-
"method": "tasks/resubscribe",
649+
"method": "SubscribeToTask",
650650
"params": {
651651
"id": "task-1234"
652652
}

reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222
import com.fasterxml.jackson.core.JsonParseException;
2323
import com.fasterxml.jackson.core.JsonProcessingException;
2424
import com.fasterxml.jackson.core.io.JsonEOFException;
25-
import com.fasterxml.jackson.databind.JsonNode;
2625
import io.a2a.common.A2AHeaders;
26+
import io.a2a.grpc.utils.JSONRPCUtils;
2727
import io.a2a.server.ServerCallContext;
2828
import io.a2a.server.auth.UnauthenticatedUser;
2929
import io.a2a.server.auth.User;
@@ -53,8 +53,7 @@
5353
import io.a2a.spec.SendMessageRequest;
5454
import io.a2a.spec.SendStreamingMessageRequest;
5555
import io.a2a.spec.SetTaskPushNotificationConfigRequest;
56-
import io.a2a.spec.StreamingJSONRPCRequest;
57-
import io.a2a.spec.TaskResubscriptionRequest;
56+
import io.a2a.spec.SubscribeToTaskRequest;
5857
import io.a2a.spec.UnsupportedOperationError;
5958
import io.a2a.transport.jsonrpc.handler.JSONRPCHandler;
6059
import io.a2a.util.Utils;
@@ -90,26 +89,20 @@ public class A2AServerRoutes {
9089
@Route(path = "/", methods = {Route.HttpMethod.POST}, consumes = {APPLICATION_JSON}, type = Route.HandlerType.BLOCKING)
9190
@Authenticated
9291
public void invokeJSONRPCHandler(@Body String body, RoutingContext rc) {
92+
9393
boolean streaming = false;
9494
ServerCallContext context = createCallContext(rc);
9595
JSONRPCResponse<?> nonStreamingResponse = null;
9696
Multi<? extends JSONRPCResponse<?>> streamingResponse = null;
9797
JSONRPCErrorResponse error = null;
9898
try {
99-
JsonNode node = Utils.OBJECT_MAPPER.readTree(body);
100-
JsonNode method = node != null ? node.get("method") : null;
101-
streaming = method != null && (SendStreamingMessageRequest.METHOD.equals(method.asText())
102-
|| TaskResubscriptionRequest.METHOD.equals(method.asText()));
103-
String methodName = (method != null && method.isTextual()) ? method.asText() : null;
104-
if (methodName != null) {
105-
context.getState().put(METHOD_NAME_KEY, methodName);
106-
}
107-
if (streaming) {
108-
StreamingJSONRPCRequest<?> request = Utils.OBJECT_MAPPER.treeToValue(node, StreamingJSONRPCRequest.class);
109-
streamingResponse = processStreamingRequest(request, context);
99+
JSONRPCRequest<?> request = JSONRPCUtils.parseBody(body);
100+
context.getState().put(METHOD_NAME_KEY, request.getMethod());
101+
if (request instanceof NonStreamingJSONRPCRequest nonStreamingRequest) {
102+
nonStreamingResponse = processNonStreamingRequest(nonStreamingRequest, context);
110103
} else {
111-
NonStreamingJSONRPCRequest<?> request = Utils.OBJECT_MAPPER.treeToValue(node, NonStreamingJSONRPCRequest.class);
112-
nonStreamingResponse = processNonStreamingRequest(request, context);
104+
streamingResponse = processStreamingRequest(request, context);
105+
nonStreamingResponse = processNonStreamingRequest((NonStreamingJSONRPCRequest<?>) request, context);
113106
}
114107
} catch (JsonProcessingException e) {
115108
error = handleError(e);
@@ -201,8 +194,8 @@ private Multi<? extends JSONRPCResponse<?>> processStreamingRequest(
201194
Flow.Publisher<? extends JSONRPCResponse<?>> publisher;
202195
if (request instanceof SendStreamingMessageRequest req) {
203196
publisher = jsonRpcHandler.onMessageSendStream(req, context);
204-
} else if (request instanceof TaskResubscriptionRequest req) {
205-
publisher = jsonRpcHandler.onResubscribeToTask(req, context);
197+
} else if (request instanceof SubscribeToTaskRequest req) {
198+
publisher = jsonRpcHandler.onSubscribeToTask(req, context);
206199
} else {
207200
return Multi.createFrom().item(generateErrorResponse(request, new UnsupportedOperationError()));
208201
}

reference/jsonrpc/src/test/java/io/a2a/server/apps/quarkus/A2AServerRoutesTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import io.a2a.spec.SendStreamingMessageResponse;
3434
import io.a2a.spec.SetTaskPushNotificationConfigRequest;
3535
import io.a2a.spec.SetTaskPushNotificationConfigResponse;
36-
import io.a2a.spec.TaskResubscriptionRequest;
36+
import io.a2a.spec.SubscribeToTaskRequest;
3737
import io.a2a.transport.jsonrpc.handler.JSONRPCHandler;
3838
import io.vertx.core.MultiMap;
3939
import io.vertx.core.http.HttpServerRequest;
@@ -143,7 +143,7 @@ public void testSendStreamingMessage_MethodNameSetInContext() {
143143
String jsonRpcRequest = """
144144
{
145145
"jsonrpc": "2.0",
146-
"method": "message/stream",
146+
"method": "SendStreamingMessage",
147147
"params": {
148148
"message": {
149149
"role": "user",
@@ -249,7 +249,7 @@ public void testTaskResubscription_MethodNameSetInContext() {
249249
String jsonRpcRequest = """
250250
{
251251
"jsonrpc": "2.0",
252-
"method": "tasks/resubscribe",
252+
"method": "SubscribeToTask",
253253
"params": {
254254
"id": "de38c76d-d54c-436c-8b9f-4c2703648d64"
255255
}
@@ -258,7 +258,7 @@ public void testTaskResubscription_MethodNameSetInContext() {
258258

259259
@SuppressWarnings("unchecked")
260260
Flow.Publisher<SendStreamingMessageResponse> mockPublisher = mock(Flow.Publisher.class);
261-
when(mockJsonRpcHandler.onResubscribeToTask(any(TaskResubscriptionRequest.class),
261+
when(mockJsonRpcHandler.onSubscribeToTask(any(SubscribeToTaskRequest.class),
262262
any(ServerCallContext.class))).thenReturn(mockPublisher);
263263

264264
ArgumentCaptor<ServerCallContext> contextCaptor = ArgumentCaptor.forClass(ServerCallContext.class);
@@ -267,11 +267,11 @@ public void testTaskResubscription_MethodNameSetInContext() {
267267
routes.invokeJSONRPCHandler(jsonRpcRequest, mockRoutingContext);
268268

269269
// Assert
270-
verify(mockJsonRpcHandler).onResubscribeToTask(any(TaskResubscriptionRequest.class),
270+
verify(mockJsonRpcHandler).onSubscribeToTask(any(SubscribeToTaskRequest.class),
271271
contextCaptor.capture());
272272
ServerCallContext capturedContext = contextCaptor.getValue();
273273
assertNotNull(capturedContext);
274-
assertEquals(TaskResubscriptionRequest.METHOD, capturedContext.getState().get(METHOD_NAME_KEY));
274+
assertEquals(SubscribeToTaskRequest.METHOD, capturedContext.getState().get(METHOD_NAME_KEY));
275275
}
276276

277277
@Test

reference/rest/src/main/java/io/a2a/server/rest/quarkus/A2AServerRoutes.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
import io.a2a.spec.SendMessageRequest;
5555
import io.a2a.spec.SendStreamingMessageRequest;
5656
import io.a2a.spec.SetTaskPushNotificationConfigRequest;
57-
import io.a2a.spec.TaskResubscriptionRequest;
57+
import io.a2a.spec.SubscribeToTaskRequest;
5858
import org.jspecify.annotations.Nullable;
5959

6060
@Singleton
@@ -220,7 +220,7 @@ private void sendResponse(RoutingContext rc, @Nullable HTTPRestResponse response
220220
@Route(regex = "^/v1/tasks/([^/]+):subscribe$", order = 1, methods = {Route.HttpMethod.POST}, type = Route.HandlerType.BLOCKING)
221221
public void resubscribeTask(RoutingContext rc) {
222222
String taskId = rc.pathParam("param0");
223-
ServerCallContext context = createCallContext(rc, TaskResubscriptionRequest.METHOD);
223+
ServerCallContext context = createCallContext(rc, SubscribeToTaskRequest.METHOD);
224224
HTTPRestStreamingResponse streamingResponse = null;
225225
HTTPRestResponse error = null;
226226
try {

reference/rest/src/test/java/io/a2a/server/rest/quarkus/A2AServerRoutesTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import io.a2a.spec.SendMessageRequest;
2525
import io.a2a.spec.SendStreamingMessageRequest;
2626
import io.a2a.spec.SetTaskPushNotificationConfigRequest;
27-
import io.a2a.spec.TaskResubscriptionRequest;
27+
import io.a2a.spec.SubscribeToTaskRequest;
2828
import io.a2a.transport.rest.handler.RestHandler;
2929
import io.a2a.transport.rest.handler.RestHandler.HTTPRestResponse;
3030
import io.vertx.core.Future;
@@ -194,7 +194,7 @@ public void testResubscribeTask_MethodNameSetInContext() {
194194
verify(mockRestHandler).resubscribeTask(eq("task123"), contextCaptor.capture());
195195
ServerCallContext capturedContext = contextCaptor.getValue();
196196
assertNotNull(capturedContext);
197-
assertEquals(TaskResubscriptionRequest.METHOD, capturedContext.getState().get(METHOD_NAME_KEY));
197+
assertEquals(SubscribeToTaskRequest.METHOD, capturedContext.getState().get(METHOD_NAME_KEY));
198198
}
199199

200200
@Test

server-common/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@
2121
<groupId>${project.groupId}</groupId>
2222
<artifactId>a2a-java-sdk-spec</artifactId>
2323
</dependency>
24+
<dependency>
25+
<groupId>${project.groupId}</groupId>
26+
<artifactId>a2a-java-sdk-spec-grpc</artifactId>
27+
</dependency>
2428
<dependency>
2529
<groupId>${project.groupId}</groupId>
2630
<artifactId>a2a-java-sdk-http-client</artifactId>

0 commit comments

Comments
 (0)