Skip to content

Commit c55e828

Browse files
committed
fix: [WIP] Additional updates to the client and update AbstractA2AServerTest to be able to make use of the appropriate client based on the transport and update the JSONRPC and gRPC tests to extend this
1 parent 19d2401 commit c55e828

File tree

23 files changed

+1115
-1939
lines changed

23 files changed

+1115
-1939
lines changed

client/base/src/main/java/io/a2a/client/AbstractClient.java

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public AbstractClient(List<BiConsumer<ClientEvent, AgentCard>> consumers, Consum
4545
* Send a message to the remote agent. This method will automatically use
4646
* the streaming or non-streaming approach as determined by the server's
4747
* agent card and the client configuration. The configured client consumers
48-
* and will be used to handle messages, tasks, and update events received
48+
* will be used to handle messages, tasks, and update events received
4949
* from the remote agent. The configured streaming error handler will be used
5050
* if an error occurs during streaming. The configured client push notification
5151
* configuration will get used for streaming.
@@ -56,6 +56,26 @@ public AbstractClient(List<BiConsumer<ClientEvent, AgentCard>> consumers, Consum
5656
*/
5757
public abstract void sendMessage(Message request, ClientCallContext context) throws A2AClientException;
5858

59+
/**
60+
* Send a message to the remote agent. This method will automatically use
61+
* the streaming or non-streaming approach as determined by the server's
62+
* agent card and the client configuration. The specified client consumers
63+
* will be used to handle messages, tasks, and update events received
64+
* from the remote agent. The specified streaming error handler will be used
65+
* if an error occurs during streaming. The configured client push notification
66+
* configuration will get used for streaming.
67+
*
68+
* @param request the message
69+
* @param consumers a list of consumers to pass responses from the remote agent to
70+
* @param streamingErrorHandler an error handler that should be used for the streaming case if an error occurs
71+
* @param context optional client call context for the request (may be {@code null})
72+
* @throws A2AClientException if sending the message fails for any reason
73+
*/
74+
public abstract void sendMessage(Message request,
75+
List<BiConsumer<ClientEvent, AgentCard>> consumers,
76+
Consumer<Throwable> streamingErrorHandler,
77+
ClientCallContext context) throws A2AClientException;
78+
5979
/**
6080
* Send a message to the remote agent. This method will automatically use
6181
* the streaming or non-streaming approach as determined by the server's
@@ -143,13 +163,33 @@ public abstract void deleteTaskPushNotificationConfigurations(
143163
/**
144164
* Resubscribe to a task's event stream.
145165
* This is only available if both the client and server support streaming.
166+
* The configured client consumers will be used to handle messages, tasks,
167+
* and update events received from the remote agent. The configured streaming
168+
* error handler will be used if an error occurs during streaming.
146169
*
147170
* @param request the parameters specifying which task's notification configs to delete
148171
* @param context optional client call context for the request (may be {@code null})
149172
* @throws A2AClientException if resubscribing fails for any reason
150173
*/
151174
public abstract void resubscribe(TaskIdParams request, ClientCallContext context) throws A2AClientException;
152175

176+
/**
177+
* Resubscribe to a task's event stream.
178+
* This is only available if both the client and server support streaming.
179+
* The specified client consumers will be used to handle messages, tasks, and
180+
* update events received from the remote agent. The specified streaming error
181+
* handler will be used if an error occurs during streaming.
182+
*
183+
* @param request the parameters specifying which task's notification configs to delete
184+
* @param consumers a list of consumers to pass responses from the remote agent to
185+
* @param streamingErrorHandler an error handler that should be used for the streaming case if an error occurs
186+
* @param context optional client call context for the request (may be {@code null})
187+
* @throws A2AClientException if resubscribing fails for any reason
188+
*/
189+
public abstract void resubscribe(TaskIdParams request, List<BiConsumer<ClientEvent, AgentCard>> consumers,
190+
Consumer<Throwable> streamingErrorHandler, ClientCallContext context) throws A2AClientException;
191+
192+
153193
/**
154194
* Retrieve the AgentCard.
155195
*
@@ -167,7 +207,7 @@ public abstract void deleteTaskPushNotificationConfigurations(
167207
/**
168208
* Process the event using all configured consumers.
169209
*/
170-
public void consume(ClientEvent clientEventOrMessage, AgentCard agentCard) {
210+
void consume(ClientEvent clientEventOrMessage, AgentCard agentCard) {
171211
for (BiConsumer<ClientEvent, AgentCard> consumer : consumers) {
172212
consumer.accept(clientEventOrMessage, agentCard);
173213
}

client/base/src/main/java/io/a2a/client/Client.java

Lines changed: 77 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,26 @@ public void sendMessage(Message request, ClientCallContext context) throws A2ACl
5858
.metadata(clientConfig.getMetadata())
5959
.build();
6060

61-
sendMessage(messageSendParams, context);
61+
sendMessage(messageSendParams, null, null, context);
62+
}
63+
64+
@Override
65+
public void sendMessage(Message request, List<BiConsumer<ClientEvent, AgentCard>> consumers,
66+
Consumer<Throwable> streamingErrorHandler, ClientCallContext context) throws A2AClientException {
67+
MessageSendConfiguration messageSendConfiguration = new MessageSendConfiguration.Builder()
68+
.acceptedOutputModes(clientConfig.getAcceptedOutputModes())
69+
.blocking(clientConfig.isPolling())
70+
.historyLength(clientConfig.getHistoryLength())
71+
.pushNotification(clientConfig.getPushNotificationConfig())
72+
.build();
73+
74+
MessageSendParams messageSendParams = new MessageSendParams.Builder()
75+
.message(request)
76+
.configuration(messageSendConfiguration)
77+
.metadata(clientConfig.getMetadata())
78+
.build();
79+
80+
sendMessage(messageSendParams, consumers, streamingErrorHandler, context);
6281
}
6382

6483
@Override
@@ -77,7 +96,7 @@ public void sendMessage(Message request, PushNotificationConfig pushNotification
7796
.metadata(metatadata)
7897
.build();
7998

80-
sendMessage(messageSendParams, context);
99+
sendMessage(messageSendParams, null, null, context);
81100
}
82101

83102
@Override
@@ -116,19 +135,13 @@ public void deleteTaskPushNotificationConfigurations(
116135

117136
@Override
118137
public void resubscribe(TaskIdParams request, ClientCallContext context) throws A2AClientException {
119-
if (! clientConfig.isStreaming() || ! agentCard.capabilities().streaming()) {
120-
throw new A2AClientException("Client and/or server does not support resubscription");
121-
}
122-
ClientTaskManager tracker = new ClientTaskManager();
123-
Consumer<StreamingEventKind> eventHandler = event -> {
124-
try {
125-
ClientEvent clientEvent = getClientEvent(event, tracker);
126-
consume(clientEvent, agentCard);
127-
} catch (A2AClientError e) {
128-
getStreamingErrorHandler().accept(e);
129-
}
130-
};
131-
clientTransport.resubscribe(request, eventHandler, getStreamingErrorHandler(), context);
138+
resubscribeToTask(request, null, null, context);
139+
}
140+
141+
@Override
142+
public void resubscribe(TaskIdParams request, List<BiConsumer<ClientEvent, AgentCard>> consumers,
143+
Consumer<Throwable> streamingErrorHandler, ClientCallContext context) throws A2AClientException {
144+
resubscribeToTask(request, consumers, streamingErrorHandler, context);
132145
}
133146

134147
@Override
@@ -159,7 +172,8 @@ private ClientEvent getClientEvent(StreamingEventKind event, ClientTaskManager t
159172
}
160173
}
161174

162-
private void sendMessage(MessageSendParams messageSendParams, ClientCallContext context) throws A2AClientException {
175+
private void sendMessage(MessageSendParams messageSendParams, List<BiConsumer<ClientEvent, AgentCard>> consumers,
176+
Consumer<Throwable> errorHandler, ClientCallContext context) throws A2AClientException {
163177
if (! clientConfig.isStreaming() || ! agentCard.capabilities().streaming()) {
164178
EventKind eventKind = clientTransport.sendMessage(messageSendParams, context);
165179
ClientEvent clientEvent;
@@ -169,18 +183,61 @@ private void sendMessage(MessageSendParams messageSendParams, ClientCallContext
169183
// must be a message
170184
clientEvent = new MessageEvent((Message) eventKind);
171185
}
172-
consume(clientEvent, agentCard);
186+
consume(clientEvent, agentCard, consumers);
173187
} else {
174188
ClientTaskManager tracker = new ClientTaskManager();
189+
Consumer<Throwable> overriddenErrorHandler = getOverriddenErrorHandler(errorHandler);
175190
Consumer<StreamingEventKind> eventHandler = event -> {
176191
try {
177192
ClientEvent clientEvent = getClientEvent(event, tracker);
178-
consume(clientEvent, agentCard);
193+
consume(clientEvent, agentCard, consumers);
179194
} catch (A2AClientError e) {
180-
getStreamingErrorHandler().accept(e);
195+
overriddenErrorHandler.accept(e);
181196
}
182197
};
183-
clientTransport.sendMessageStreaming(messageSendParams, eventHandler, getStreamingErrorHandler(), context);
198+
clientTransport.sendMessageStreaming(messageSendParams, eventHandler, overriddenErrorHandler, context);
199+
}
200+
}
201+
202+
private void resubscribeToTask(TaskIdParams request, List<BiConsumer<ClientEvent, AgentCard>> consumers,
203+
Consumer<Throwable> errorHandler, ClientCallContext context) throws A2AClientException {
204+
if (! clientConfig.isStreaming() || ! agentCard.capabilities().streaming()) {
205+
throw new A2AClientException("Client and/or server does not support resubscription");
206+
}
207+
ClientTaskManager tracker = new ClientTaskManager();
208+
Consumer<Throwable> overriddenErrorHandler = getOverriddenErrorHandler(errorHandler);
209+
Consumer<StreamingEventKind> eventHandler = event -> {
210+
try {
211+
ClientEvent clientEvent = getClientEvent(event, tracker);
212+
consume(clientEvent, agentCard, consumers);
213+
} catch (A2AClientError e) {
214+
overriddenErrorHandler.accept(e);
215+
}
216+
};
217+
clientTransport.resubscribe(request, eventHandler, overriddenErrorHandler, context);
218+
}
219+
220+
private Consumer<Throwable> getOverriddenErrorHandler(Consumer<Throwable> errorHandler) {
221+
return e -> {
222+
if (errorHandler != null) {
223+
errorHandler.accept(e);
224+
} else {
225+
if (getStreamingErrorHandler() != null) {
226+
getStreamingErrorHandler().accept(e);
227+
}
228+
}
229+
};
230+
}
231+
232+
private void consume(ClientEvent clientEvent, AgentCard agentCard, List<BiConsumer<ClientEvent, AgentCard>> consumers) {
233+
if (consumers != null) {
234+
// use specified consumers
235+
for (BiConsumer<ClientEvent, AgentCard> consumer : consumers) {
236+
consumer.accept(clientEvent, agentCard);
237+
}
238+
} else {
239+
// use configured consumers
240+
consume(clientEvent, agentCard);
184241
}
185242
}
186243
}

client/base/src/main/java/io/a2a/client/ClientFactory.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,11 @@ public ClientFactory(ClientConfig clientConfig) {
4242
* @param agentCard the agent card for the remote agent
4343
* @param consumers a list of consumers to pass responses from the remote agent to
4444
* @param streamingErrorHandler an error handler that should be used for the streaming case if an error occurs
45+
* @return the client to use
4546
* @throws A2AClientException if the client cannot be created for any reason
4647
*/
47-
public AbstractClient create(AgentCard agentCard, List<BiConsumer<ClientEvent, AgentCard>> consumers,
48-
Consumer<Throwable> streamingErrorHandler) throws A2AClientException {
48+
public Client create(AgentCard agentCard, List<BiConsumer<ClientEvent, AgentCard>> consumers,
49+
Consumer<Throwable> streamingErrorHandler) throws A2AClientException {
4950
return create(agentCard, consumers, streamingErrorHandler, null);
5051
}
5152

@@ -56,10 +57,11 @@ public AbstractClient create(AgentCard agentCard, List<BiConsumer<ClientEvent, A
5657
* @param consumers a list of consumers to pass responses from the remote agent to
5758
* @param streamingErrorHandler an error handler that should be used for the streaming case if an error occurs
5859
* @param interceptors the optional list of client call interceptors (may be {@code null})
60+
* @return the client to use
5961
* @throws A2AClientException if the client cannot be created for any reason
6062
*/
61-
public AbstractClient create(AgentCard agentCard, List<BiConsumer<ClientEvent, AgentCard>> consumers,
62-
Consumer<Throwable> streamingErrorHandler, List<ClientCallInterceptor> interceptors) throws A2AClientException {
63+
public Client create(AgentCard agentCard, List<BiConsumer<ClientEvent, AgentCard>> consumers,
64+
Consumer<Throwable> streamingErrorHandler, List<ClientCallInterceptor> interceptors) throws A2AClientException {
6365
checkNotNullParam("agentCard", agentCard);
6466
checkNotNullParam("consumers", consumers);
6567
LinkedHashMap<String, String> serverPreferredTransports = getServerPreferredTransports(agentCard);

client/config/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<parent>
88
<groupId>io.github.a2asdk</groupId>
99
<artifactId>a2a-java-sdk-parent</artifactId>
10-
<version>0.2.6.Beta1-SNAPSHOT</version>
10+
<version>0.3.0.Beta1-SNAPSHOT</version>
1111
<relativePath>../../pom.xml</relativePath>
1212
</parent>
1313
<artifactId>a2a-java-sdk-client-config</artifactId>

client/transport/grpc/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<parent>
88
<groupId>io.github.a2asdk</groupId>
99
<artifactId>a2a-java-sdk-parent</artifactId>
10-
<version>0.2.6.Beta1-SNAPSHOT</version>
10+
<version>0.3.0.Beta1-SNAPSHOT</version>
1111
<relativePath>../../../pom.xml</relativePath>
1212
</parent>
1313
<artifactId>a2a-java-sdk-client-transport-grpc</artifactId>

client/transport/grpc/src/main/java/io/a2a/client/transport/grpc/EventStreamObserver.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,12 @@ public void onNext(StreamResponse response) {
4848
@Override
4949
public void onError(Throwable t) {
5050
if (errorHandler != null) {
51-
errorHandler.accept(t);
51+
// Map gRPC errors to proper A2A exceptions
52+
if (t instanceof io.grpc.StatusRuntimeException) {
53+
errorHandler.accept(GrpcErrorMapper.mapGrpcError((io.grpc.StatusRuntimeException) t));
54+
} else {
55+
errorHandler.accept(t);
56+
}
5257
}
5358
}
5459

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package io.a2a.client.transport.grpc;
2+
3+
import io.a2a.spec.A2AClientException;
4+
import io.a2a.spec.ContentTypeNotSupportedError;
5+
import io.a2a.spec.InvalidAgentResponseError;
6+
import io.a2a.spec.InvalidParamsError;
7+
import io.a2a.spec.InvalidRequestError;
8+
import io.a2a.spec.JSONParseError;
9+
import io.a2a.spec.MethodNotFoundError;
10+
import io.a2a.spec.PushNotificationNotSupportedError;
11+
import io.a2a.spec.TaskNotCancelableError;
12+
import io.a2a.spec.TaskNotFoundError;
13+
import io.a2a.spec.UnsupportedOperationError;
14+
import io.grpc.Status;
15+
import io.grpc.StatusRuntimeException;
16+
17+
/**
18+
* Utility class to map gRPC StatusRuntimeException to appropriate A2A error types
19+
*/
20+
public class GrpcErrorMapper {
21+
22+
public static A2AClientException mapGrpcError(StatusRuntimeException e) {
23+
return mapGrpcError(e, "gRPC error: ");
24+
}
25+
26+
public static A2AClientException mapGrpcError(StatusRuntimeException e, String errorPrefix) {
27+
Status.Code code = e.getStatus().getCode();
28+
String description = e.getStatus().getDescription();
29+
30+
// Extract the actual error type from the description if possible
31+
// (using description because the same code can map to multiple errors -
32+
// see GrpcHandler#handleError)
33+
if (description != null) {
34+
if (description.contains("TaskNotFoundError")) {
35+
return new A2AClientException(errorPrefix + description, new TaskNotFoundError());
36+
} else if (description.contains("UnsupportedOperationError")) {
37+
return new A2AClientException(errorPrefix + description, new UnsupportedOperationError());
38+
} else if (description.contains("InvalidParamsError")) {
39+
return new A2AClientException(errorPrefix + description, new InvalidParamsError());
40+
} else if (description.contains("InvalidRequestError")) {
41+
return new A2AClientException(errorPrefix + description, new InvalidRequestError());
42+
} else if (description.contains("MethodNotFoundError")) {
43+
return new A2AClientException(errorPrefix + description, new MethodNotFoundError());
44+
} else if (description.contains("TaskNotCancelableError")) {
45+
return new A2AClientException(errorPrefix + description, new TaskNotCancelableError());
46+
} else if (description.contains("PushNotificationNotSupportedError")) {
47+
return new A2AClientException(errorPrefix + description, new PushNotificationNotSupportedError());
48+
} else if (description.contains("JSONParseError")) {
49+
return new A2AClientException(errorPrefix + description, new JSONParseError());
50+
} else if (description.contains("ContentTypeNotSupportedError")) {
51+
return new A2AClientException(errorPrefix + description, new ContentTypeNotSupportedError(null, description, null));
52+
} else if (description.contains("InvalidAgentResponseError")) {
53+
return new A2AClientException(errorPrefix + description, new InvalidAgentResponseError(null, description, null));
54+
}
55+
}
56+
57+
// Fall back to mapping based on status code
58+
switch (code) {
59+
case NOT_FOUND:
60+
return new A2AClientException(errorPrefix + (description != null ? description : e.getMessage()), new TaskNotFoundError());
61+
case UNIMPLEMENTED:
62+
return new A2AClientException(errorPrefix + (description != null ? description : e.getMessage()), new UnsupportedOperationError());
63+
case INVALID_ARGUMENT:
64+
return new A2AClientException(errorPrefix + (description != null ? description : e.getMessage()), new InvalidParamsError());
65+
case INTERNAL:
66+
return new A2AClientException(errorPrefix + (description != null ? description : e.getMessage()), new io.a2a.spec.InternalError(null, e.getMessage(), null));
67+
default:
68+
return new A2AClientException(errorPrefix + e.getMessage(), e);
69+
}
70+
}
71+
}

0 commit comments

Comments
 (0)