Skip to content

Commit f756492

Browse files
committed
Add resubscribeToTask to A2AClient
1 parent 1e35b62 commit f756492

File tree

5 files changed

+191
-1
lines changed

5 files changed

+191
-1
lines changed

README.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,23 @@ Runnable failureHandler = () -> {...};
299299
client.sendStreamingMessage(params, eventHandler, errorHandler, failureHandler);
300300
```
301301

302+
#### Resubscribe to a task
303+
304+
```java
305+
// Create a handler that will be invoked for Task, Message, TaskStatusUpdateEvent, and TaskArtifactUpdateEvent
306+
Consumer<StreamingEventKind> eventHandler = event -> {...};
307+
308+
// Create a handler that will be invoked if an error is received
309+
Consumer<JSONRPCError> errorHandler = error -> {...};
310+
311+
// Create a handler that will be invoked in the event of a failure
312+
Runnable failureHandler = () -> {...};
313+
314+
// Resubscribe to an ongoing task with id "task-1234"
315+
TaskIdParams taskIdParams = new TaskIdParams("task-1234");
316+
client.resubscribeToTask("request-1234", taskIdParams, eventHandler, errorHandler, failureHandler);
317+
```
318+
302319
#### Retrieve details about the server agent that this client agent is communicating with
303320
```java
304321
AgentCard serverAgentCard = client.getAgentCard();
@@ -322,6 +339,13 @@ A complete example of an A2A client communicating with a Python A2A server is av
322339

323340
The example includes detailed instructions on how to run both the Python server and the Java client using JBang. Check out the [example's README](src/main/java/io/a2a/examples/helloworld/README.md) for more information.
324341

342+
## License
343+
344+
This project is licensed under the terms of the [Apache 2.0 License](LICENSE).
345+
346+
## Contributing
347+
348+
See [CONTRIBUTING.md](CONTRIBUTING.md) for contribution guidelines.
325349

326350

327351

core/src/main/java/io/a2a/client/A2AClient.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import static io.a2a.spec.A2A.JSONRPC_VERSION;
77
import static io.a2a.spec.A2A.SEND_MESSAGE_METHOD;
88
import static io.a2a.spec.A2A.SEND_STREAMING_MESSAGE_METHOD;
9+
import static io.a2a.spec.A2A.SEND_TASK_RESUBSCRIPTION_METHOD;
910
import static io.a2a.spec.A2A.SET_TASK_PUSH_NOTIFICATION_CONFIG_METHOD;
1011
import static io.a2a.util.Assert.checkNotNullParam;
1112
import static io.a2a.util.Utils.OBJECT_MAPPER;
@@ -45,6 +46,7 @@
4546
import io.a2a.spec.TaskIdParams;
4647
import io.a2a.spec.TaskPushNotificationConfig;
4748
import io.a2a.spec.TaskQueryParams;
49+
import io.a2a.spec.TaskResubscriptionRequest;
4850

4951
/**
5052
* An A2A client.
@@ -406,6 +408,65 @@ public void sendStreamingMessage(String requestId, MessageSendParams messageSend
406408
}
407409
}
408410

411+
/**
412+
* Resubscribe to an ongoing task.
413+
*
414+
* @param taskIdParams the params for the task to resubscribe to
415+
* @param eventHandler a consumer that will be invoked for each event received from the remote agent
416+
* @param errorHandler a consumer that will be invoked if the remote agent returns an error
417+
* @param failureHandler a consumer that will be invoked if a failure occurs when processing events
418+
* @throws A2AServerException if resubscribing to the task fails for any reason
419+
*/
420+
public void resubscribeToTask(TaskIdParams taskIdParams, Consumer<StreamingEventKind> eventHandler,
421+
Consumer<JSONRPCError> errorHandler, Runnable failureHandler) throws A2AServerException {
422+
resubscribeToTask(null, taskIdParams, eventHandler, errorHandler, failureHandler);
423+
}
424+
425+
/**
426+
* Resubscribe to an ongoing task.
427+
*
428+
* @param requestId the request ID to use
429+
* @param taskIdParams the params for the task to resubscribe to
430+
* @param eventHandler a consumer that will be invoked for each event received from the remote agent
431+
* @param errorHandler a consumer that will be invoked if the remote agent returns an error
432+
* @param failureHandler a consumer that will be invoked if a failure occurs when processing events
433+
* @throws A2AServerException if resubscribing to the task fails for any reason
434+
*/
435+
public void resubscribeToTask(String requestId, TaskIdParams taskIdParams, Consumer<StreamingEventKind> eventHandler,
436+
Consumer<JSONRPCError> errorHandler, Runnable failureHandler) throws A2AServerException {
437+
checkNotNullParam("taskIdParams", taskIdParams);
438+
checkNotNullParam("eventHandler", eventHandler);
439+
checkNotNullParam("errorHandler", errorHandler);
440+
checkNotNullParam("failureHandler", failureHandler);
441+
442+
TaskResubscriptionRequest.Builder taskResubscriptionRequestBuilder = new TaskResubscriptionRequest.Builder()
443+
.jsonrpc(JSONRPC_VERSION)
444+
.method(SEND_TASK_RESUBSCRIPTION_METHOD)
445+
.params(taskIdParams);
446+
447+
if (requestId != null) {
448+
taskResubscriptionRequestBuilder.id(requestId);
449+
}
450+
451+
AtomicReference<CompletableFuture<Void>> ref = new AtomicReference<>();
452+
SSEEventListener sseEventListener = new SSEEventListener(eventHandler, errorHandler, failureHandler);
453+
TaskResubscriptionRequest taskResubscriptionRequest = taskResubscriptionRequestBuilder.build();
454+
try {
455+
A2AHttpClient.PostBuilder builder = createPostBuilder(taskResubscriptionRequest);
456+
ref.set(builder.postAsyncSSE(
457+
msg -> sseEventListener.onMessage(msg, ref.get()),
458+
throwable -> sseEventListener.onError(throwable, ref.get()),
459+
() -> {
460+
// We don't need to do anything special on completion
461+
}));
462+
463+
} catch (IOException e) {
464+
throw new A2AServerException("Failed to send task resubscription request: " + e);
465+
} catch (InterruptedException e) {
466+
throw new A2AServerException("Task resubscription request timed out: " + e);
467+
}
468+
}
469+
409470
private String sendPostRequest(Object value) throws IOException, InterruptedException {
410471
A2AHttpClient.PostBuilder builder = createPostBuilder(value);
411472
A2AHttpResponse response = builder.post();

core/src/main/java/io/a2a/spec/TaskResubscriptionRequest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,38 @@ public TaskResubscriptionRequest(@JsonProperty("jsonrpc") String jsonrpc, @JsonP
4040
public TaskResubscriptionRequest(Object id, TaskIdParams params) {
4141
this(null, id, SEND_TASK_RESUBSCRIPTION_METHOD, params);
4242
}
43+
44+
public static class Builder {
45+
private String jsonrpc;
46+
private Object id;
47+
private String method = SEND_TASK_RESUBSCRIPTION_METHOD;
48+
private TaskIdParams params;
49+
50+
public TaskResubscriptionRequest.Builder jsonrpc(String jsonrpc) {
51+
this.jsonrpc = jsonrpc;
52+
return this;
53+
}
54+
55+
public TaskResubscriptionRequest.Builder id(Object id) {
56+
this.id = id;
57+
return this;
58+
}
59+
60+
public TaskResubscriptionRequest.Builder method(String method) {
61+
this.method = method;
62+
return this;
63+
}
64+
65+
public TaskResubscriptionRequest.Builder params(TaskIdParams params) {
66+
this.params = params;
67+
return this;
68+
}
69+
70+
public TaskResubscriptionRequest build() {
71+
if (id == null) {
72+
id = UUID.randomUUID().toString();
73+
}
74+
return new TaskResubscriptionRequest(jsonrpc, id, method, params);
75+
}
76+
}
4377
}

core/src/test/java/io/a2a/client/A2AClientStreamingTest.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22

33
import static io.a2a.client.JsonStreamingMessages.SEND_MESSAGE_STREAMING_TEST_REQUEST;
44
import static io.a2a.client.JsonStreamingMessages.SEND_MESSAGE_STREAMING_TEST_RESPONSE;
5+
import static io.a2a.client.JsonStreamingMessages.TASK_RESUBSCRIPTION_REQUEST_TEST_RESPONSE;
6+
import static io.a2a.client.JsonStreamingMessages.TASK_RESUBSCRIPTION_TEST_REQUEST;
57
import static org.junit.jupiter.api.Assertions.assertEquals;
8+
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
69
import static org.junit.jupiter.api.Assertions.assertNotNull;
710
import static org.junit.jupiter.api.Assertions.assertTrue;
811
import static org.mockserver.model.HttpRequest.request;
@@ -15,12 +18,18 @@
1518
import java.util.concurrent.atomic.AtomicReference;
1619
import java.util.function.Consumer;
1720

21+
import io.a2a.spec.Artifact;
1822
import io.a2a.spec.JSONRPCError;
1923
import io.a2a.spec.Message;
2024
import io.a2a.spec.MessageSendConfiguration;
2125
import io.a2a.spec.MessageSendParams;
26+
import io.a2a.spec.Part;
2227
import io.a2a.spec.StreamingEventKind;
28+
import io.a2a.spec.Task;
29+
import io.a2a.spec.TaskIdParams;
30+
import io.a2a.spec.TaskState;
2331
import io.a2a.spec.TextPart;
32+
2433
import org.junit.jupiter.api.AfterEach;
2534
import org.junit.jupiter.api.BeforeEach;
2635
import org.junit.jupiter.api.Test;
@@ -116,4 +125,53 @@ public void testA2AClientSendStreamingMessage() throws Exception {
116125
assertTrue(eventReceived);
117126
assertNotNull(receivedEvent.get());
118127
}
128+
129+
@Test
130+
public void testA2AClientResubscribeToTask() throws Exception {
131+
this.server.when(
132+
request()
133+
.withMethod("POST")
134+
.withPath("/")
135+
.withBody(JsonBody.json(TASK_RESUBSCRIPTION_TEST_REQUEST, MatchType.STRICT))
136+
137+
)
138+
.respond(
139+
response()
140+
.withStatusCode(200)
141+
.withHeader("Content-Type", "text/event-stream")
142+
.withBody(TASK_RESUBSCRIPTION_REQUEST_TEST_RESPONSE)
143+
);
144+
145+
A2AClient client = new A2AClient("http://localhost:4001");
146+
TaskIdParams taskIdParams = new TaskIdParams("task-1234");
147+
148+
AtomicReference<StreamingEventKind> receivedEvent = new AtomicReference<>();
149+
CountDownLatch latch = new CountDownLatch(1);
150+
Consumer<StreamingEventKind> eventHandler = event -> {
151+
receivedEvent.set(event);
152+
latch.countDown();
153+
};
154+
Consumer<JSONRPCError> errorHandler = error -> {};
155+
Runnable failureHandler = () -> {};
156+
client.resubscribeToTask("request-1234", taskIdParams, eventHandler, errorHandler, failureHandler);
157+
158+
boolean eventReceived = latch.await(10, TimeUnit.SECONDS);
159+
assertTrue(eventReceived);
160+
161+
StreamingEventKind eventKind = receivedEvent.get();;
162+
assertNotNull(eventKind);
163+
assertInstanceOf(Task.class, eventKind);
164+
Task task = (Task) eventKind;
165+
assertEquals("2", task.getId());
166+
assertEquals("context-1234", task.getContextId());
167+
assertEquals(TaskState.COMPLETED, task.getStatus().state());
168+
List<Artifact> artifacts = task.getArtifacts();
169+
assertEquals(1, artifacts.size());
170+
Artifact artifact = artifacts.get(0);
171+
assertEquals("artifact-1", artifact.artifactId());
172+
assertEquals("joke", artifact.name());
173+
Part<?> part = artifact.parts().get(0);
174+
assertEquals(Part.Kind.TEXT, part.getKind());
175+
assertEquals("Why did the chicken cross the road? To get to the other side!", ((TextPart) part).getText());
176+
}
119177
}

core/src/test/java/io/a2a/client/JsonStreamingMessages.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,4 +132,17 @@ public class JsonStreamingMessages {
132132
"event: message\n" +
133133
"data: {\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"id\":\"2\",\"contextId\":\"context-1234\",\"status\":{\"state\":\"completed\"},\"artifacts\":[{\"artifactId\":\"artifact-1\",\"name\":\"joke\",\"parts\":[{\"kind\":\"text\",\"text\":\"Why did the chicken cross the road? To get to the other side!\"}]}],\"metadata\":{},\"kind\":\"task\"}}\n\n";
134134

135-
}
135+
static final String TASK_RESUBSCRIPTION_REQUEST_TEST_RESPONSE =
136+
"event: message\n" +
137+
"data: {\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"id\":\"2\",\"contextId\":\"context-1234\",\"status\":{\"state\":\"completed\"},\"artifacts\":[{\"artifactId\":\"artifact-1\",\"name\":\"joke\",\"parts\":[{\"kind\":\"text\",\"text\":\"Why did the chicken cross the road? To get to the other side!\"}]}],\"metadata\":{},\"kind\":\"task\"}}\n\n";
138+
139+
public static final String TASK_RESUBSCRIPTION_TEST_REQUEST = """
140+
{
141+
"jsonrpc": "2.0",
142+
"id": "request-1234",
143+
"method": "tasks/resubscribe",
144+
"params": {
145+
"id": "task-1234"
146+
}
147+
}""";
148+
}

0 commit comments

Comments
 (0)