Skip to content

Commit 3456605

Browse files
authored
Merge pull request #120 from fjuma/client
Update the client docs and A2AC
2 parents 80cb1ea + f756492 commit 3456605

File tree

6 files changed

+235
-31
lines changed

6 files changed

+235
-31
lines changed

README.md

Lines changed: 49 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -139,18 +139,6 @@ public class WeatherAgentExecutorProducer {
139139
updater.complete();
140140
}
141141

142-
private String extractTextFromMessage(Message message) {
143-
StringBuilder textBuilder = new StringBuilder();
144-
if (message.getParts() != null) {
145-
for (Part part : message.getParts()) {
146-
if (part instanceof TextPart textPart) {
147-
textBuilder.append(textPart.getText());
148-
}
149-
}
150-
}
151-
return textBuilder.toString();
152-
}
153-
154142
@Override
155143
public void cancel(RequestContext context, EventQueue eventQueue) throws JSONRPCError {
156144
Task task = context.getTask();
@@ -169,6 +157,18 @@ public class WeatherAgentExecutorProducer {
169157
TaskUpdater updater = new TaskUpdater(context, eventQueue);
170158
updater.cancel();
171159
}
160+
161+
private String extractTextFromMessage(Message message) {
162+
StringBuilder textBuilder = new StringBuilder();
163+
if (message.getParts() != null) {
164+
for (Part part : message.getParts()) {
165+
if (part instanceof TextPart textPart) {
166+
textBuilder.append(textPart.getText());
167+
}
168+
}
169+
}
170+
return textBuilder.toString();
171+
}
172172
}
173173
}
174174
```
@@ -202,26 +202,25 @@ OR
202202
</dependency>
203203
```
204204

205-
## Client
205+
## A2A Client
206206

207-
An *initial* [A2AClient](https://github.com/fjuma/a2a-java-sdk/blob/main/src/main/java/io/a2a/client/A2AClient.java) class has been added. This is very much work in progress, we are working on implementing the methods required by the client side of the protocol.
207+
The A2A Java SDK provides a Java client implementation of the [Agent2Agent (A2A) Protocol](https://google-a2a.github.io/A2A), allowing communication with A2A servers.
208208

209209
### Sample Usage
210210

211-
#### Create a client
211+
#### Create an A2A client
212212

213213
```java
214-
// Create an A2AClient (the URL specified is the server agent's URL)
214+
// Create an A2AClient (the URL specified is the server agent's URL, be sure to replace it with the actual URL of the A2A server you want to connect to)
215215
A2AClient client = new A2AClient("http://localhost:1234");
216216
```
217217

218-
#### Send a message
218+
#### Send a message to the A2A server agent
219219

220220
```java
221-
// Send a text message to the server agent
221+
// Send a text message to the A2A server agent
222222
Message message = A2A.toUserMessage("tell me a joke"); // the message ID will be automatically generated for you
223223
MessageSendParams params = new MessageSendParams.Builder()
224-
.id("task-1234") // id is optional
225224
.message(message)
226225
.build();
227226
SendMessageResponse response = client.sendMessage(params);
@@ -234,7 +233,7 @@ if you don't specify it. You can also explicitly specify a message ID like this:
234233
Message message = A2A.toUserMessage("tell me a joke", "message-1234"); // messageId is message-1234
235234
```
236235

237-
#### Get a task
236+
#### Get the current state of a task
238237

239238
```java
240239
// Retrieve the task with id "task-1234"
@@ -245,7 +244,7 @@ GetTaskResponse response = client.getTask("task-1234");
245244
GetTaskResponse response = client.getTask(new TaskQueryParams("task-1234", 10));
246245
```
247246

248-
#### Cancel a task
247+
#### Cancel an ongoing task
249248

250249
```java
251250
// Cancel the task we previously submitted with id "task-1234"
@@ -259,12 +258,12 @@ CancelTaskResponse response = client.cancelTask(new TaskIdParams("task-1234", me
259258
#### Get the push notification configuration for a task
260259

261260
```java
262-
// Get task push notification
263-
GetTaskPushNotificationResponse response = client.getTaskPushNotificationConfig("task-1234");
261+
// Get task push notification configuration
262+
GetTaskPushNotificationConfigResponse response = client.getTaskPushNotificationConfig("task-1234");
264263

265264
// You can also specify additional properties using a map
266265
Map<String, Object> metadata = ...
267-
GetTaskPushNotificationResponse response = client.getTaskPushNotificationConfig(new TaskIdParams("task-1234", metadata));
266+
GetTaskPushNotificationConfigResponse response = client.getTaskPushNotificationConfig(new TaskIdParams("task-1234", metadata));
268267
```
269268

270269
#### Set the push notification configuration for a task
@@ -274,7 +273,7 @@ GetTaskPushNotificationResponse response = client.getTaskPushNotificationConfig(
274273
PushNotificationConfig pushNotificationConfig = new PushNotificationConfig.Builder()
275274
.url("https://example.com/callback")
276275
.authenticationInfo(new AuthenticationInfo(Collections.singletonList("jwt"), null))
277-
.build());
276+
.build();
278277
SetTaskPushNotificationResponse response = client.setTaskPushNotificationConfig("task-1234", pushNotificationConfig);
279278
```
280279

@@ -284,12 +283,11 @@ SetTaskPushNotificationResponse response = client.setTaskPushNotificationConfig(
284283
// Send a text message to the remote agent
285284
Message message = A2A.toUserMessage("tell me some jokes"); // the message ID will be automatically generated for you
286285
MessageSendParams params = new MessageSendParams.Builder()
287-
.id("task-1234") // id is optional
288286
.message(message)
289287
.build();
290288

291289
// Create a handler that will be invoked for Task, Message, TaskStatusUpdateEvent, and TaskArtifactUpdateEvent
292-
Consumer<StreamingEventType> eventHandler = event -> {...};
290+
Consumer<StreamingEventKind> eventHandler = event -> {...};
293291

294292
// Create a handler that will be invoked if an error is received
295293
Consumer<JSONRPCError> errorHandler = error -> {...};
@@ -301,6 +299,23 @@ Runnable failureHandler = () -> {...};
301299
client.sendStreamingMessage(params, eventHandler, errorHandler, failureHandler);
302300
```
303301

302+
#### Resubscribe to a task
303+
304+
```java
305+
// Create a handler that will be invoked for Task, Message, TaskStatusUpdateEvent, and TaskArtifactUpdateEvent
306+
Consumer<StreamingEventKind> eventHandler = event -> {...};
307+
308+
// Create a handler that will be invoked if an error is received
309+
Consumer<JSONRPCError> errorHandler = error -> {...};
310+
311+
// Create a handler that will be invoked in the event of a failure
312+
Runnable failureHandler = () -> {...};
313+
314+
// Resubscribe to an ongoing task with id "task-1234"
315+
TaskIdParams taskIdParams = new TaskIdParams("task-1234");
316+
client.resubscribeToTask("request-1234", taskIdParams, eventHandler, errorHandler, failureHandler);
317+
```
318+
304319
#### Retrieve details about the server agent that this client agent is communicating with
305320
```java
306321
AgentCard serverAgentCard = client.getAgentCard();
@@ -312,7 +327,7 @@ An agent card can also be retrieved using the `A2A#getAgentCard` method:
312327
AgentCard agentCard = A2A.getAgentCard("http://localhost:1234");
313328
```
314329

315-
## Examples
330+
## Additional Examples
316331

317332
### Hello World Example
318333

@@ -324,9 +339,13 @@ A complete example of an A2A client communicating with a Python A2A server is av
324339

325340
The example includes detailed instructions on how to run both the Python server and the Java client using JBang. Check out the [example's README](src/main/java/io/a2a/examples/helloworld/README.md) for more information.
326341

327-
## Server
342+
## License
343+
344+
This project is licensed under the terms of the [Apache 2.0 License](LICENSE).
345+
346+
## Contributing
328347

329-
TODO
348+
See [CONTRIBUTING.md](CONTRIBUTING.md) for contribution guidelines.
330349

331350

332351

core/src/main/java/io/a2a/client/A2AClient.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import static io.a2a.spec.A2A.JSONRPC_VERSION;
77
import static io.a2a.spec.A2A.SEND_MESSAGE_METHOD;
88
import static io.a2a.spec.A2A.SEND_STREAMING_MESSAGE_METHOD;
9+
import static io.a2a.spec.A2A.SEND_TASK_RESUBSCRIPTION_METHOD;
910
import static io.a2a.spec.A2A.SET_TASK_PUSH_NOTIFICATION_CONFIG_METHOD;
1011
import static io.a2a.util.Assert.checkNotNullParam;
1112
import static io.a2a.util.Utils.OBJECT_MAPPER;
@@ -45,6 +46,7 @@
4546
import io.a2a.spec.TaskIdParams;
4647
import io.a2a.spec.TaskPushNotificationConfig;
4748
import io.a2a.spec.TaskQueryParams;
49+
import io.a2a.spec.TaskResubscriptionRequest;
4850

4951
/**
5052
* An A2A client.
@@ -347,6 +349,20 @@ public SetTaskPushNotificationConfigResponse setTaskPushNotificationConfig(Strin
347349
}
348350
}
349351

352+
/**
353+
* Send a streaming message to the remote agent.
354+
*
355+
* @param messageSendParams the parameters for the message to be sent
356+
* @param eventHandler a consumer that will be invoked for each event received from the remote agent
357+
* @param errorHandler a consumer that will be invoked if the remote agent returns an error
358+
* @param failureHandler a consumer that will be invoked if a failure occurs when processing events
359+
* @throws A2AServerException if sending the streaming message fails for any reason
360+
*/
361+
public void sendStreamingMessage(MessageSendParams messageSendParams, Consumer<StreamingEventKind> eventHandler,
362+
Consumer<JSONRPCError> errorHandler, Runnable failureHandler) throws A2AServerException {
363+
sendStreamingMessage(null, messageSendParams, eventHandler, errorHandler, failureHandler);
364+
}
365+
350366
/**
351367
* Send a streaming message to the remote agent.
352368
*
@@ -392,6 +408,65 @@ public void sendStreamingMessage(String requestId, MessageSendParams messageSend
392408
}
393409
}
394410

411+
/**
412+
* Resubscribe to an ongoing task.
413+
*
414+
* @param taskIdParams the params for the task to resubscribe to
415+
* @param eventHandler a consumer that will be invoked for each event received from the remote agent
416+
* @param errorHandler a consumer that will be invoked if the remote agent returns an error
417+
* @param failureHandler a consumer that will be invoked if a failure occurs when processing events
418+
* @throws A2AServerException if resubscribing to the task fails for any reason
419+
*/
420+
public void resubscribeToTask(TaskIdParams taskIdParams, Consumer<StreamingEventKind> eventHandler,
421+
Consumer<JSONRPCError> errorHandler, Runnable failureHandler) throws A2AServerException {
422+
resubscribeToTask(null, taskIdParams, eventHandler, errorHandler, failureHandler);
423+
}
424+
425+
/**
426+
* Resubscribe to an ongoing task.
427+
*
428+
* @param requestId the request ID to use
429+
* @param taskIdParams the params for the task to resubscribe to
430+
* @param eventHandler a consumer that will be invoked for each event received from the remote agent
431+
* @param errorHandler a consumer that will be invoked if the remote agent returns an error
432+
* @param failureHandler a consumer that will be invoked if a failure occurs when processing events
433+
* @throws A2AServerException if resubscribing to the task fails for any reason
434+
*/
435+
public void resubscribeToTask(String requestId, TaskIdParams taskIdParams, Consumer<StreamingEventKind> eventHandler,
436+
Consumer<JSONRPCError> errorHandler, Runnable failureHandler) throws A2AServerException {
437+
checkNotNullParam("taskIdParams", taskIdParams);
438+
checkNotNullParam("eventHandler", eventHandler);
439+
checkNotNullParam("errorHandler", errorHandler);
440+
checkNotNullParam("failureHandler", failureHandler);
441+
442+
TaskResubscriptionRequest.Builder taskResubscriptionRequestBuilder = new TaskResubscriptionRequest.Builder()
443+
.jsonrpc(JSONRPC_VERSION)
444+
.method(SEND_TASK_RESUBSCRIPTION_METHOD)
445+
.params(taskIdParams);
446+
447+
if (requestId != null) {
448+
taskResubscriptionRequestBuilder.id(requestId);
449+
}
450+
451+
AtomicReference<CompletableFuture<Void>> ref = new AtomicReference<>();
452+
SSEEventListener sseEventListener = new SSEEventListener(eventHandler, errorHandler, failureHandler);
453+
TaskResubscriptionRequest taskResubscriptionRequest = taskResubscriptionRequestBuilder.build();
454+
try {
455+
A2AHttpClient.PostBuilder builder = createPostBuilder(taskResubscriptionRequest);
456+
ref.set(builder.postAsyncSSE(
457+
msg -> sseEventListener.onMessage(msg, ref.get()),
458+
throwable -> sseEventListener.onError(throwable, ref.get()),
459+
() -> {
460+
// We don't need to do anything special on completion
461+
}));
462+
463+
} catch (IOException e) {
464+
throw new A2AServerException("Failed to send task resubscription request: " + e);
465+
} catch (InterruptedException e) {
466+
throw new A2AServerException("Task resubscription request timed out: " + e);
467+
}
468+
}
469+
395470
private String sendPostRequest(Object value) throws IOException, InterruptedException {
396471
A2AHttpClient.PostBuilder builder = createPostBuilder(value);
397472
A2AHttpResponse response = builder.post();

core/src/main/java/io/a2a/spec/SendStreamingMessageRequest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
import static io.a2a.spec.A2A.SEND_STREAMING_MESSAGE_METHOD;
55
import static io.a2a.util.Utils.defaultIfNull;
66

7+
import java.util.UUID;
8+
79
import com.fasterxml.jackson.annotation.JsonCreator;
810
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
911
import com.fasterxml.jackson.annotation.JsonInclude;
@@ -67,6 +69,9 @@ public Builder params(MessageSendParams params) {
6769
}
6870

6971
public SendStreamingMessageRequest build() {
72+
if (id == null) {
73+
id = UUID.randomUUID().toString();
74+
}
7075
return new SendStreamingMessageRequest(jsonrpc, id, method, params);
7176
}
7277
}

core/src/main/java/io/a2a/spec/TaskResubscriptionRequest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,38 @@ public TaskResubscriptionRequest(@JsonProperty("jsonrpc") String jsonrpc, @JsonP
4040
public TaskResubscriptionRequest(Object id, TaskIdParams params) {
4141
this(null, id, SEND_TASK_RESUBSCRIPTION_METHOD, params);
4242
}
43+
44+
public static class Builder {
45+
private String jsonrpc;
46+
private Object id;
47+
private String method = SEND_TASK_RESUBSCRIPTION_METHOD;
48+
private TaskIdParams params;
49+
50+
public TaskResubscriptionRequest.Builder jsonrpc(String jsonrpc) {
51+
this.jsonrpc = jsonrpc;
52+
return this;
53+
}
54+
55+
public TaskResubscriptionRequest.Builder id(Object id) {
56+
this.id = id;
57+
return this;
58+
}
59+
60+
public TaskResubscriptionRequest.Builder method(String method) {
61+
this.method = method;
62+
return this;
63+
}
64+
65+
public TaskResubscriptionRequest.Builder params(TaskIdParams params) {
66+
this.params = params;
67+
return this;
68+
}
69+
70+
public TaskResubscriptionRequest build() {
71+
if (id == null) {
72+
id = UUID.randomUUID().toString();
73+
}
74+
return new TaskResubscriptionRequest(jsonrpc, id, method, params);
75+
}
76+
}
4377
}

0 commit comments

Comments
 (0)