Skip to content

Commit a7026d0

Browse files
committed
refactor: Refactor SSEEventListener to support accepting JSONRPCResponse as a parameter.
Signed-off-by: Sun Yuhan <sunyuhan1998@users.noreply.github.com>
1 parent 19b9249 commit a7026d0

File tree

2 files changed

+21
-38
lines changed

2 files changed

+21
-38
lines changed

client/src/main/java/io/a2a/client/A2AClient.java

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -413,15 +413,7 @@ public void sendStreamingMessage(String requestId, MessageSendParams messageSend
413413
SendStreamingMessageRequest sendStreamingMessageRequest = sendStreamingMessageRequestBuilder.build();
414414
try {
415415
transport.sendMessageStreaming(sendStreamingMessageRequest, agentUrl, SEND_MESSAGE_RESPONSE_REFERENCE,
416-
response -> {
417-
String msg;
418-
try {
419-
msg = OBJECT_MAPPER.writeValueAsString(response);
420-
} catch (JsonProcessingException e) {
421-
throw new RuntimeException(e);
422-
}
423-
sseEventListener.onMessage(msg, ref.get());
424-
},
416+
response -> sseEventListener.onMessage(response, ref.get()),
425417
throwable -> sseEventListener.onError(throwable, ref.get()),
426418
() -> {
427419
// We don't need to do anything special on completion
@@ -478,15 +470,7 @@ public void resubscribeToTask(String requestId, TaskIdParams taskIdParams, Consu
478470
TaskResubscriptionRequest taskResubscriptionRequest = taskResubscriptionRequestBuilder.build();
479471
try {
480472
transport.sendMessageStreaming(taskResubscriptionRequest, agentUrl, GET_TASK_RESPONSE_REFERENCE,
481-
response -> {
482-
String msg;
483-
try {
484-
msg = OBJECT_MAPPER.writeValueAsString(response);
485-
} catch (JsonProcessingException e) {
486-
throw new RuntimeException(e);
487-
}
488-
sseEventListener.onMessage(msg, ref.get());
489-
},
473+
response -> sseEventListener.onMessage(response, ref.get()),
490474
throwable -> sseEventListener.onError(throwable, ref.get()),
491475
() -> {
492476
// We don't need to do anything special on completion

client/src/main/java/io/a2a/client/sse/SSEEventListener.java

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@
77
import java.util.logging.Logger;
88

99
import com.fasterxml.jackson.core.JsonProcessingException;
10-
import com.fasterxml.jackson.databind.JsonNode;
1110
import io.a2a.spec.JSONRPCError;
11+
import io.a2a.spec.JSONRPCResponse;
12+
import io.a2a.spec.SendStreamingMessageResponse;
1213
import io.a2a.spec.StreamingEventKind;
1314
import io.a2a.spec.TaskStatusUpdateEvent;
1415

@@ -26,36 +27,34 @@ public SSEEventListener(Consumer<StreamingEventKind> eventHandler, Consumer<JSON
2627

2728
public void onMessage(String message, Future<Void> completableFuture) {
2829
try {
29-
handleMessage(OBJECT_MAPPER.readTree(message),completableFuture);
30+
SendStreamingMessageResponse sendStreamingMessageResponse = OBJECT_MAPPER.readValue(message, SendStreamingMessageResponse.class);
31+
handleMessage(sendStreamingMessageResponse,completableFuture);
3032
} catch (JsonProcessingException e) {
3133
log.warning("Failed to parse JSON message: " + message);
3234
}
3335
}
3436

37+
public void onMessage(JSONRPCResponse<?> response, Future<Void> completableFuture) {
38+
handleMessage(response,completableFuture);
39+
}
40+
3541
public void onError(Throwable throwable, Future<Void> future) {
3642
failureHandler.run();
3743
future.cancel(true); // close SSE channel
3844
}
3945

40-
private void handleMessage(JsonNode jsonNode, Future<Void> future) {
41-
try {
42-
if (jsonNode.has("error")) {
43-
JSONRPCError error = OBJECT_MAPPER.treeToValue(jsonNode.get("error"), JSONRPCError.class);
44-
errorHandler.accept(error);
45-
} else if (jsonNode.has("result")) {
46-
// result can be a Task, Message, TaskStatusUpdateEvent, or TaskArtifactUpdateEvent
47-
JsonNode result = jsonNode.path("result");
48-
StreamingEventKind event = OBJECT_MAPPER.treeToValue(result, StreamingEventKind.class);
49-
eventHandler.accept(event);
50-
if (event instanceof TaskStatusUpdateEvent && ((TaskStatusUpdateEvent) event).isFinal()) {
51-
future.cancel(true); // close SSE channel
52-
}
53-
} else {
54-
throw new IllegalArgumentException("Unknown message type");
46+
private void handleMessage(JSONRPCResponse<?> response, Future<Void> future) {
47+
if (null != response.getError()) {
48+
errorHandler.accept(response.getError());
49+
} else if (null != response.getResult()) {
50+
// result can be a Task, Message, TaskStatusUpdateEvent, or TaskArtifactUpdateEvent
51+
StreamingEventKind event = (StreamingEventKind) response.getResult();
52+
eventHandler.accept(event);
53+
if (event instanceof TaskStatusUpdateEvent && ((TaskStatusUpdateEvent) event).isFinal()) {
54+
future.cancel(true); // close SSE channel
5555
}
56-
} catch (JsonProcessingException e) {
57-
throw new RuntimeException(e);
56+
} else {
57+
throw new IllegalArgumentException("Unknown message type");
5858
}
5959
}
60-
6160
}

0 commit comments

Comments
 (0)