Skip to content

Commit 2b29f36

Browse files
ehsavoieclaude
andcommitted
refactor: Update SSEEventListener to use ProtoUtils.FromProto.streamingEventKind
Updated SSEEventListener to use the new ProtoUtils.FromProto.streamingEventKind() method instead of Jackson for deserializing streaming events. Changes: - Replaced Jackson JsonNode parsing with JSONRPCUtils.parseResponseEvent() - Now uses ProtoUtils.FromProto.streamingEventKind() to convert proto StreamResponse to domain StreamingEventKind - Removed dependency on Utils.OBJECT_MAPPER - Simplified error handling - JSONRPCUtils.parseResponseEvent() throws JSONRPCError which is caught and passed to errorHandler This ensures consistent use of protobuf JSON parsing throughout the streaming implementation and leverages the StreamResponseMapper we created earlier. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent dea14ea commit 2b29f36

File tree

1 file changed

+16
-25
lines changed

1 file changed

+16
-25
lines changed

client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/sse/SSEEventListener.java

Lines changed: 16 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package io.a2a.client.transport.jsonrpc.sse;
22

33
import com.fasterxml.jackson.core.JsonProcessingException;
4-
import com.fasterxml.jackson.databind.JsonNode;
54
import io.a2a.spec.JSONRPCError;
65
import io.a2a.spec.StreamingEventKind;
76
import io.a2a.spec.TaskStatusUpdateEvent;
@@ -10,8 +9,9 @@
109
import java.util.function.Consumer;
1110
import java.util.logging.Logger;
1211

13-
import static io.a2a.util.Utils.OBJECT_MAPPER;
14-
12+
import io.a2a.grpc.StreamResponse;
13+
import io.a2a.grpc.utils.JSONRPCUtils;
14+
import io.a2a.grpc.utils.ProtoUtils;
1515
import org.jspecify.annotations.Nullable;
1616

1717
public class SSEEventListener {
@@ -29,11 +29,7 @@ public SSEEventListener(Consumer<StreamingEventKind> eventHandler,
2929
}
3030

3131
public void onMessage(String message, @Nullable Future<Void> completableFuture) {
32-
try {
33-
handleMessage(OBJECT_MAPPER.readTree(message), completableFuture);
34-
} catch (JsonProcessingException e) {
35-
log.warning("Failed to parse JSON message: " + message);
36-
}
32+
handleMessage(message, completableFuture);
3733
}
3834

3935
public void onError(Throwable throwable, @Nullable Future<Void> future) {
@@ -63,25 +59,20 @@ public void onComplete() {
6359
}
6460
}
6561

66-
private void handleMessage(JsonNode jsonNode, @Nullable Future<Void> future) {
62+
private void handleMessage(String message, @Nullable Future<Void> future) {
6763
try {
68-
if (jsonNode.has("error")) {
69-
JSONRPCError error = OBJECT_MAPPER.treeToValue(jsonNode.get("error"), JSONRPCError.class);
70-
if (errorHandler != null) {
71-
errorHandler.accept(error);
72-
}
73-
} else if (jsonNode.has("result")) {
74-
// result can be a Task, Message, TaskStatusUpdateEvent, or TaskArtifactUpdateEvent
75-
JsonNode result = jsonNode.path("result");
76-
StreamingEventKind event = OBJECT_MAPPER.treeToValue(result, StreamingEventKind.class);
77-
eventHandler.accept(event);
78-
if (event instanceof TaskStatusUpdateEvent && ((TaskStatusUpdateEvent) event).isFinal()) {
79-
if (future != null) {
80-
future.cancel(true); // close SSE channel
81-
}
64+
StreamResponse response = JSONRPCUtils.parseResponseEvent(message);
65+
66+
StreamingEventKind event = ProtoUtils.FromProto.streamingEventKind(response);
67+
eventHandler.accept(event);
68+
if (event instanceof TaskStatusUpdateEvent && ((TaskStatusUpdateEvent) event).isFinal()) {
69+
if (future != null) {
70+
future.cancel(true); // close SSE channel
8271
}
83-
} else {
84-
throw new IllegalArgumentException("Unknown message type");
72+
}
73+
} catch (JSONRPCError error) {
74+
if (errorHandler != null) {
75+
errorHandler.accept(error);
8576
}
8677
} catch (JsonProcessingException e) {
8778
throw new RuntimeException(e);

0 commit comments

Comments
 (0)