Skip to content

Commit 4b11c00

Browse files
fjumakabir
andauthored
feat: gRPC transport implementation (#211)
This PR contains: * Utility classes for converting from spec classes and gRPC classes and vice versa * An initial gRPC client implementation * An initial gRPC request handler implementation * Tests for the gRPC request handler implementation * gRPC server integration - [X] Follow the [`CONTRIBUTING` Guide](../CONTRIBUTING.md). - [X] Make your Pull Request title in the <https://www.conventionalcommits.org/> specification. - Important Prefixes for [release-please](https://github.com/googleapis/release-please): - `fix:` which represents bug fixes, and correlates to a [SemVer](https://semver.org/) patch. - `feat:` represents a new feature, and correlates to a SemVer minor. - `feat!:`, or `fix!:`, `refactor!:`, etc., which represent a breaking change (indicated by the `!`) and will result in a SemVer major. - [X] Ensure the tests pass - [X] Appropriate READMEs were updated (if necessary) Fixes #210 🦕 --------- Co-authored-by: Kabir Khan <[email protected]>
1 parent 76cc03a commit 4b11c00

File tree

109 files changed

+56119
-184
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

109 files changed

+56119
-184
lines changed

client/pom.xml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@
2222
<artifactId>a2a-java-sdk-common</artifactId>
2323
<version>${project.version}</version>
2424
</dependency>
25-
25+
<dependency>
26+
<groupId>${project.groupId}</groupId>
27+
<artifactId>a2a-java-sdk-grpc</artifactId>
28+
<version>${project.version}</version>
29+
</dependency>
2630
<dependency>
2731
<groupId>${project.groupId}</groupId>
2832
<artifactId>a2a-java-sdk-spec</artifactId>
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
package io.a2a.client;
2+
3+
import static io.a2a.grpc.A2AServiceGrpc.A2AServiceBlockingV2Stub;
4+
import static io.a2a.grpc.A2AServiceGrpc.A2AServiceStub;
5+
import static io.a2a.grpc.utils.ProtoUtils.FromProto;
6+
import static io.a2a.grpc.utils.ProtoUtils.ToProto;
7+
import static io.a2a.util.Assert.checkNotNullParam;
8+
9+
import java.util.function.Consumer;
10+
11+
import io.a2a.client.sse.SSEStreamObserver;
12+
import io.a2a.grpc.A2AServiceGrpc;
13+
import io.a2a.grpc.CancelTaskRequest;
14+
import io.a2a.grpc.CreateTaskPushNotificationConfigRequest;
15+
import io.a2a.grpc.GetTaskPushNotificationConfigRequest;
16+
import io.a2a.grpc.GetTaskRequest;
17+
import io.a2a.grpc.SendMessageRequest;
18+
import io.a2a.grpc.SendMessageResponse;
19+
import io.a2a.grpc.StreamResponse;
20+
import io.a2a.grpc.utils.ProtoUtils;
21+
import io.a2a.spec.A2AServerException;
22+
import io.a2a.spec.AgentCard;
23+
import io.a2a.spec.EventKind;
24+
import io.a2a.spec.GetTaskPushNotificationConfigParams;
25+
import io.a2a.spec.MessageSendParams;
26+
import io.a2a.spec.StreamingEventKind;
27+
import io.a2a.spec.Task;
28+
import io.a2a.spec.TaskIdParams;
29+
import io.a2a.spec.TaskPushNotificationConfig;
30+
import io.a2a.spec.TaskQueryParams;
31+
import io.grpc.Channel;
32+
import io.grpc.StatusRuntimeException;
33+
import io.grpc.stub.StreamObserver;
34+
35+
/**
36+
* A2A Client for interacting with an A2A agent via gRPC.
37+
*/
38+
public class A2AGrpcClient {
39+
40+
private A2AServiceBlockingV2Stub blockingStub;
41+
private A2AServiceStub asyncStub;
42+
private AgentCard agentCard;
43+
44+
/**
45+
* Create an A2A client for interacting with an A2A agent via gRPC.
46+
*
47+
* @param channel the gRPC channel
48+
* @param agentCard the agent card for the A2A server this client will be communicating with
49+
*/
50+
public A2AGrpcClient(Channel channel, AgentCard agentCard) {
51+
checkNotNullParam("channel", channel);
52+
checkNotNullParam("agentCard", agentCard);
53+
this.asyncStub = A2AServiceGrpc.newStub(channel);
54+
this.blockingStub = A2AServiceGrpc.newBlockingV2Stub(channel);
55+
this.agentCard = agentCard;
56+
}
57+
58+
/**
59+
* Send a message to the remote agent.
60+
*
61+
* @param messageSendParams the parameters for the message to be sent
62+
* @return the response, may be a message or a task
63+
* @throws A2AServerException if sending the message fails for any reason
64+
*/
65+
public EventKind sendMessage(MessageSendParams messageSendParams) throws A2AServerException {
66+
SendMessageRequest request = createGrpcSendMessageRequestFromMessageSendParams(messageSendParams);
67+
try {
68+
SendMessageResponse response = blockingStub.sendMessage(request);
69+
if (response.hasMsg()) {
70+
return FromProto.message(response.getMsg());
71+
} else if (response.hasTask()) {
72+
return FromProto.task(response.getTask());
73+
} else {
74+
throw new A2AServerException("Server response did not contain a message or task");
75+
}
76+
} catch (StatusRuntimeException e) {
77+
throw new A2AServerException("Failed to send message: " + e, e);
78+
}
79+
}
80+
81+
/**
82+
* Retrieves the current state and history of a specific task.
83+
*
84+
* @param taskQueryParams the params for the task to be queried
85+
* @return the task
86+
* @throws A2AServerException if retrieving the task fails for any reason
87+
*/
88+
public Task getTask(TaskQueryParams taskQueryParams) throws A2AServerException {
89+
GetTaskRequest.Builder requestBuilder = GetTaskRequest.newBuilder();
90+
requestBuilder.setName("tasks/" + taskQueryParams.id());
91+
if (taskQueryParams.historyLength() != null) {
92+
requestBuilder.setHistoryLength(taskQueryParams.historyLength());
93+
}
94+
GetTaskRequest getTaskRequest = requestBuilder.build();
95+
try {
96+
return FromProto.task(blockingStub.getTask(getTaskRequest));
97+
} catch (StatusRuntimeException e) {
98+
throw new A2AServerException("Failed to get task: " + e, e);
99+
}
100+
}
101+
102+
/**
103+
* Cancel a task that was previously submitted to the A2A server.
104+
*
105+
* @param taskIdParams the params for the task to be cancelled
106+
* @return the updated task
107+
* @throws A2AServerException if cancelling the task fails for any reason
108+
*/
109+
public Task cancelTask(TaskIdParams taskIdParams) throws A2AServerException {
110+
CancelTaskRequest cancelTaskRequest = CancelTaskRequest.newBuilder()
111+
.setName("tasks/" + taskIdParams.id())
112+
.build();
113+
try {
114+
return FromProto.task(blockingStub.cancelTask(cancelTaskRequest));
115+
} catch (StatusRuntimeException e) {
116+
throw new A2AServerException("Failed to cancel task: " + e, e);
117+
}
118+
}
119+
120+
/**
121+
* Set push notification configuration for a task.
122+
*
123+
* @param taskPushNotificationConfig the task push notification configuration
124+
* @return the task push notification config
125+
* @throws A2AServerException if setting the push notification configuration fails for any reason
126+
*/
127+
public TaskPushNotificationConfig setTaskPushNotificationConfig(TaskPushNotificationConfig taskPushNotificationConfig) throws A2AServerException {
128+
String configId = taskPushNotificationConfig.pushNotificationConfig().id();
129+
CreateTaskPushNotificationConfigRequest request = CreateTaskPushNotificationConfigRequest.newBuilder()
130+
.setParent("tasks/" + taskPushNotificationConfig.taskId())
131+
.setConfig(ToProto.taskPushNotificationConfig(taskPushNotificationConfig))
132+
.setConfigId(configId == null ? "" : configId)
133+
.build();
134+
try {
135+
return FromProto.taskPushNotificationConfig(blockingStub.createTaskPushNotificationConfig(request));
136+
} catch (StatusRuntimeException e) {
137+
throw new A2AServerException("Failed to set the task push notification config: " + e, e);
138+
}
139+
}
140+
141+
/**
142+
* Get the push notification configuration for a task.
143+
*
144+
* @param getTaskPushNotificationConfigParams the params for the task
145+
* @return the push notification configuration
146+
* @throws A2AServerException if getting the push notification configuration fails for any reason
147+
*/
148+
public TaskPushNotificationConfig getTaskPushNotificationConfig(GetTaskPushNotificationConfigParams getTaskPushNotificationConfigParams) throws A2AServerException {
149+
GetTaskPushNotificationConfigRequest getTaskPushNotificationConfigRequest = GetTaskPushNotificationConfigRequest.newBuilder()
150+
.setName(getTaskPushNotificationConfigName(getTaskPushNotificationConfigParams))
151+
.build();
152+
try {
153+
return FromProto.taskPushNotificationConfig(blockingStub.getTaskPushNotificationConfig(getTaskPushNotificationConfigRequest));
154+
} catch (StatusRuntimeException e) {
155+
throw new A2AServerException("Failed to get the task push notification config: " + e, e);
156+
}
157+
}
158+
159+
/**
160+
* Send a streaming message request to the remote agent.
161+
*
162+
* @param messageSendParams the parameters for the message to be sent
163+
* @param eventHandler a consumer that will be invoked for each event received from the remote agent
164+
* @param errorHandler a consumer that will be invoked if an error occurs
165+
* @throws A2AServerException if sending the streaming message fails for any reason
166+
*/
167+
public void sendMessageStreaming(MessageSendParams messageSendParams, Consumer<StreamingEventKind> eventHandler,
168+
Consumer<Throwable> errorHandler) throws A2AServerException {
169+
SendMessageRequest request = createGrpcSendMessageRequestFromMessageSendParams(messageSendParams);
170+
StreamObserver<StreamResponse> streamObserver = new SSEStreamObserver(eventHandler, errorHandler);
171+
try {
172+
asyncStub.sendStreamingMessage(request, streamObserver);
173+
} catch (StatusRuntimeException e) {
174+
throw new A2AServerException("Failed to send streaming message: " + e, e);
175+
}
176+
}
177+
178+
private SendMessageRequest createGrpcSendMessageRequestFromMessageSendParams(MessageSendParams messageSendParams) {
179+
SendMessageRequest.Builder builder = SendMessageRequest.newBuilder();
180+
builder.setRequest(ToProto.message(messageSendParams.message()));
181+
if (messageSendParams.configuration() != null) {
182+
builder.setConfiguration(ToProto.messageSendConfiguration(messageSendParams.configuration()));
183+
}
184+
if (messageSendParams.metadata() != null) {
185+
builder.setMetadata(ToProto.struct(messageSendParams.metadata()));
186+
}
187+
return builder.build();
188+
}
189+
190+
private String getTaskPushNotificationConfigName(GetTaskPushNotificationConfigParams getTaskPushNotificationConfigParams) {
191+
StringBuilder name = new StringBuilder();
192+
name.append("tasks/");
193+
name.append(getTaskPushNotificationConfigParams.id());
194+
if (getTaskPushNotificationConfigParams.pushNotificationConfigId() != null) {
195+
name.append("/pushNotificationConfigs/");
196+
name.append(getTaskPushNotificationConfigParams.pushNotificationConfigId());
197+
}
198+
return name.toString();
199+
}
200+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package io.a2a.client.sse;
2+
3+
4+
import static io.a2a.grpc.utils.ProtoUtils.FromProto;
5+
6+
import java.util.function.Consumer;
7+
import java.util.logging.Logger;
8+
9+
import io.a2a.grpc.StreamResponse;
10+
import io.a2a.spec.StreamingEventKind;
11+
import io.grpc.stub.StreamObserver;
12+
13+
public class SSEStreamObserver implements StreamObserver<StreamResponse> {
14+
15+
private static final Logger log = Logger.getLogger(SSEStreamObserver.class.getName());
16+
private final Consumer<StreamingEventKind> eventHandler;
17+
private final Consumer<Throwable> errorHandler;
18+
19+
public SSEStreamObserver(Consumer<StreamingEventKind> eventHandler, Consumer<Throwable> errorHandler) {
20+
this.eventHandler = eventHandler;
21+
this.errorHandler = errorHandler;
22+
}
23+
24+
@Override
25+
public void onNext(StreamResponse response) {
26+
StreamingEventKind event;
27+
switch (response.getPayloadCase()) {
28+
case MSG:
29+
event = FromProto.message(response.getMsg());
30+
break;
31+
case TASK:
32+
event = FromProto.task(response.getTask());
33+
break;
34+
case STATUS_UPDATE:
35+
event = FromProto.taskStatusUpdateEvent(response.getStatusUpdate());
36+
break;
37+
case ARTIFACT_UPDATE:
38+
event = FromProto.taskArtifactUpdateEvent(response.getArtifactUpdate());
39+
break;
40+
default:
41+
log.warning("Invalid stream response " + response.getPayloadCase());
42+
errorHandler.accept(new IllegalStateException("Invalid stream response from server: " + response.getPayloadCase()));
43+
return;
44+
}
45+
eventHandler.accept(event);
46+
}
47+
48+
@Override
49+
public void onError(Throwable t) {
50+
errorHandler.accept(t);
51+
}
52+
53+
@Override
54+
public void onCompleted() {
55+
// done
56+
}
57+
}

grpc/pom.xml

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
<?xml version="1.0"?>
2+
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"
3+
xmlns="http://maven.apache.org/POM/4.0.0"
4+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<parent>
8+
<groupId>io.github.a2asdk</groupId>
9+
<artifactId>a2a-java-sdk-parent</artifactId>
10+
<version>0.2.6.Beta1-SNAPSHOT</version>
11+
</parent>
12+
<artifactId>a2a-java-sdk-grpc</artifactId>
13+
14+
<packaging>jar</packaging>
15+
16+
<name>Java SDK A2A gRPC</name>
17+
<description>Java SDK for the Agent2Agent Protocol (A2A) - gRPC</description>
18+
19+
<dependencies>
20+
<dependency>
21+
<groupId>io.github.a2asdk</groupId>
22+
<artifactId>a2a-java-sdk-spec</artifactId>
23+
<version>${project.version}</version>
24+
</dependency>
25+
<dependency>
26+
<groupId>com.google.protobuf</groupId>
27+
<artifactId>protobuf-java</artifactId>
28+
</dependency>
29+
<dependency>
30+
<groupId>io.grpc</groupId>
31+
<artifactId>grpc-protobuf</artifactId>
32+
</dependency>
33+
<dependency>
34+
<groupId>io.grpc</groupId>
35+
<artifactId>grpc-stub</artifactId>
36+
</dependency>
37+
</dependencies>
38+
39+
</project>

0 commit comments

Comments
 (0)