Skip to content

Commit afe2050

Browse files
committed
Adding streaming send message and setPushNotificationConfig
Signed-off-by: Emmanuel Hugonnet <[email protected]>
1 parent ef62516 commit afe2050

File tree

13 files changed

+653
-209
lines changed

13 files changed

+653
-209
lines changed

client/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@
4646
<artifactId>protobuf-java-util</artifactId>
4747
<type>jar</type>
4848
</dependency>
49+
<dependency>
50+
<groupId>org.slf4j</groupId>
51+
<artifactId>slf4j-jdk14</artifactId>
52+
<scope>test</scope>
53+
</dependency>
4954
</dependencies>
5055

5156
</project>
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package io.a2a.client.sse;
2+
3+
import static io.a2a.grpc.StreamResponse.PayloadCase.ARTIFACT_UPDATE;
4+
import static io.a2a.grpc.StreamResponse.PayloadCase.MSG;
5+
import static io.a2a.grpc.StreamResponse.PayloadCase.STATUS_UPDATE;
6+
import static io.a2a.grpc.StreamResponse.PayloadCase.TASK;
7+
8+
import java.util.concurrent.Future;
9+
import java.util.function.Consumer;
10+
import java.util.logging.Logger;
11+
12+
import com.google.protobuf.InvalidProtocolBufferException;
13+
import com.google.protobuf.util.JsonFormat;
14+
import io.a2a.grpc.StreamResponse;
15+
import io.a2a.grpc.utils.ProtoUtils;
16+
import io.a2a.spec.StreamingEventKind;
17+
import java.util.logging.Level;
18+
19+
public class JSONRestSSEEventListener {
20+
21+
private static final Logger log = Logger.getLogger(JSONRestSSEEventListener.class.getName());
22+
private final Consumer<StreamingEventKind> eventHandler;
23+
private final Consumer<Throwable> errorHandler;
24+
25+
public JSONRestSSEEventListener(Consumer<StreamingEventKind> eventHandler,
26+
Consumer<Throwable> errorHandler) {
27+
this.eventHandler = eventHandler;
28+
this.errorHandler = errorHandler;
29+
}
30+
31+
public void onMessage(String message, Future<Void> completableFuture) {
32+
try {
33+
io.a2a.grpc.StreamResponse.Builder builder = io.a2a.grpc.StreamResponse.newBuilder();
34+
JsonFormat.parser().merge(message, builder);
35+
handleMessage(builder.build(), completableFuture);
36+
} catch (InvalidProtocolBufferException e) {
37+
Logger.getLogger(JSONRestSSEEventListener.class.getName()).log(Level.SEVERE, null, e);
38+
errorHandler.accept(e);
39+
}
40+
}
41+
42+
public void onError(Throwable throwable, Future<Void> future) {
43+
if (errorHandler != null) {
44+
errorHandler.accept(throwable);
45+
}
46+
future.cancel(true); // close SSE channel
47+
}
48+
49+
private void handleMessage(StreamResponse response, Future<Void> future) {
50+
StreamingEventKind event;
51+
switch (response.getPayloadCase()) {
52+
case MSG:
53+
event = ProtoUtils.FromProto.message(response.getMsg());
54+
break;
55+
case TASK:
56+
event = ProtoUtils.FromProto.task(response.getTask());
57+
break;
58+
case STATUS_UPDATE:
59+
event = ProtoUtils.FromProto.taskStatusUpdateEvent(response.getStatusUpdate());
60+
break;
61+
case ARTIFACT_UPDATE:
62+
event = ProtoUtils.FromProto.taskArtifactUpdateEvent(response.getArtifactUpdate());
63+
break;
64+
default:
65+
log.warning("Invalid stream response " + response.getPayloadCase());
66+
errorHandler.accept(new IllegalStateException("Invalid stream response from server: " + response.getPayloadCase()));
67+
return;
68+
}
69+
eventHandler.accept(event);
70+
}
71+
72+
}

client/src/main/java/io/a2a/client/transport/JSONRestTransport.java

Lines changed: 154 additions & 31 deletions
Large diffs are not rendered by default.
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package io.a2a.client.transport;
2+
3+
import java.util.List;
4+
5+
import io.a2a.client.ClientCallInterceptor;
6+
import io.a2a.client.ClientConfig;
7+
import io.a2a.spec.AgentCard;
8+
import io.a2a.spec.TransportProtocol;
9+
10+
public class JSONRestTransportProvider implements ClientTransportProvider {
11+
12+
@Override
13+
public ClientTransport create(ClientConfig clientConfig, AgentCard agentCard,
14+
String agentUrl, List<ClientCallInterceptor> interceptors) {
15+
return new JSONRestTransport(clientConfig.getHttpClient(), agentCard, agentUrl, interceptors);
16+
}
17+
18+
@Override
19+
public String getTransportProtocol() {
20+
return TransportProtocol.HTTP_JSON.asString();
21+
}
22+
}

client/src/main/java/io/a2a/http/A2AHttpClient.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ public interface A2AHttpClient {
1010

1111
PostBuilder createPost();
1212

13+
DeleteBuilder createDelete();
14+
1315
interface Builder<T extends Builder<T>> {
1416
T url(String s);
1517
T addHeader(String name, String value);
@@ -31,4 +33,8 @@ CompletableFuture<Void> postAsyncSSE(
3133
Consumer<Throwable> errorConsumer,
3234
Runnable completeRunnable) throws IOException, InterruptedException;
3335
}
36+
37+
interface DeleteBuilder extends Builder<DeleteBuilder> {
38+
A2AHttpResponse delete() throws IOException, InterruptedException;
39+
}
3440
}

client/src/main/java/io/a2a/http/JdkA2AHttpClient.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@ public PostBuilder createPost() {
3535
return new JdkPostBuilder();
3636
}
3737

38+
@Override
39+
public DeleteBuilder createDelete() {
40+
return new JdkDeleteBuilder();
41+
}
42+
3843
private abstract class JdkBuilder<T extends Builder<T>> implements Builder<T> {
3944
private String url;
4045
private Map<String, String> headers = new HashMap<>();
@@ -147,6 +152,18 @@ public CompletableFuture<Void> getAsyncSSE(
147152
}
148153
}
149154

155+
private class JdkDeleteBuilder extends JdkBuilder<DeleteBuilder> implements A2AHttpClient.DeleteBuilder {
156+
157+
@Override
158+
public A2AHttpResponse delete() throws IOException, InterruptedException {
159+
HttpRequest request = super.createRequestBuilder().DELETE().build();
160+
HttpResponse<String> response =
161+
httpClient.send(request, BodyHandlers.ofString(StandardCharsets.UTF_8));
162+
return new JdkHttpResponse(response);
163+
}
164+
165+
}
166+
150167
private class JdkPostBuilder extends JdkBuilder<PostBuilder> implements A2AHttpClient.PostBuilder {
151168
String body = "";
152169

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
io.a2a.client.transport.JSONRPCTransportProvider
2-
io.a2a.client.transport.GrpcTransportProvider
2+
io.a2a.client.transport.GrpcTransportProvider
3+
io.a2a.client.transport.JSONRestTransportProvider

client/src/test/java/io/a2a/client/A2ACardResolverTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,11 @@ public PostBuilder createPost() {
128128
return null;
129129
}
130130

131+
@Override
132+
public DeleteBuilder createDelete() {
133+
return null;
134+
}
135+
131136
class TestGetBuilder implements A2AHttpClient.GetBuilder {
132137

133138
@Override

0 commit comments

Comments
 (0)