Skip to content

Commit aa41de5

Browse files
committed
fix: Encapsulate the payload of Push notifications
The payload of push notifications is encapsulated by the `kind` of payload as specified in [§ 4.3.3. Push Notification Payload](https://a2a-protocol.org/latest/specification/#433-push-notification-payload) This commit only works for `task` payload and will required more work to support additional payloads (tracked by #490). This fixes #491 Signed-off-by: Jeff Mesnil <[email protected]>
1 parent 686e40a commit aa41de5

File tree

4 files changed

+44
-17
lines changed

4 files changed

+44
-17
lines changed

server-common/src/main/java/io/a2a/server/tasks/BasePushNotificationSender.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,25 +3,25 @@
33
import static io.a2a.client.http.A2AHttpClient.APPLICATION_JSON;
44
import static io.a2a.client.http.A2AHttpClient.CONTENT_TYPE;
55
import static io.a2a.common.A2AHeaders.X_A2A_NOTIFICATION_TOKEN;
6-
import jakarta.enterprise.context.ApplicationScoped;
7-
import jakarta.inject.Inject;
86

97
import java.io.IOException;
108
import java.util.List;
119
import java.util.concurrent.CompletableFuture;
1210
import java.util.concurrent.ExecutionException;
1311

12+
import jakarta.enterprise.context.ApplicationScoped;
13+
import jakarta.inject.Inject;
14+
1415
import com.fasterxml.jackson.core.JsonProcessingException;
16+
import org.slf4j.Logger;
17+
import org.slf4j.LoggerFactory;
1518

1619
import io.a2a.client.http.A2AHttpClient;
1720
import io.a2a.client.http.JdkA2AHttpClient;
1821
import io.a2a.spec.PushNotificationConfig;
1922
import io.a2a.spec.Task;
2023
import io.a2a.util.Utils;
2124

22-
import org.slf4j.Logger;
23-
import org.slf4j.LoggerFactory;
24-
2525
@ApplicationScoped
2626
public class BasePushNotificationSender implements PushNotificationSender {
2727

@@ -80,7 +80,7 @@ private boolean dispatchNotification(Task task, PushNotificationConfig pushInfo)
8080

8181
String body;
8282
try {
83-
body = Utils.OBJECT_MAPPER.writeValueAsString(task);
83+
body = Utils.marshalFrom(task);
8484
} catch (JsonProcessingException e) {
8585
LOGGER.debug("Error writing value as string: {}", e.getMessage(), e);
8686
return false;

server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import java.util.ArrayList;
77
import java.util.Collections;
88
import java.util.List;
9+
import java.util.Map;
910
import java.util.Properties;
1011
import java.util.concurrent.CompletableFuture;
1112
import java.util.concurrent.CountDownLatch;
@@ -15,6 +16,12 @@
1516

1617
import jakarta.enterprise.context.Dependent;
1718

19+
import com.fasterxml.jackson.databind.JsonNode;
20+
import io.quarkus.arc.profile.IfBuildProfile;
21+
import org.junit.jupiter.api.AfterEach;
22+
import org.junit.jupiter.api.Assertions;
23+
import org.junit.jupiter.api.BeforeEach;
24+
1825
import io.a2a.client.http.A2AHttpClient;
1926
import io.a2a.client.http.A2AHttpResponse;
2027
import io.a2a.server.agentexecution.AgentExecutor;
@@ -30,20 +37,14 @@
3037
import io.a2a.server.tasks.TaskStore;
3138
import io.a2a.spec.AgentCapabilities;
3239
import io.a2a.spec.AgentCard;
40+
import io.a2a.spec.Event;
3341
import io.a2a.spec.JSONRPCError;
3442
import io.a2a.spec.Message;
3543
import io.a2a.spec.Task;
3644
import io.a2a.spec.TaskState;
3745
import io.a2a.spec.TaskStatus;
38-
import io.a2a.spec.Event;
3946
import io.a2a.spec.TextPart;
4047
import io.a2a.util.Utils;
41-
import io.quarkus.arc.profile.IfBuildProfile;
42-
import java.util.Map;
43-
44-
import org.junit.jupiter.api.AfterEach;
45-
import org.junit.jupiter.api.Assertions;
46-
import org.junit.jupiter.api.BeforeEach;
4748

4849
public class AbstractA2ARequestHandlerTest {
4950

@@ -199,7 +200,11 @@ public PostBuilder body(String body) {
199200

200201
@Override
201202
public A2AHttpResponse post() throws IOException, InterruptedException {
202-
tasks.add(Utils.OBJECT_MAPPER.readValue(body, Task.TYPE_REFERENCE));
203+
JsonNode root = Utils.OBJECT_MAPPER.readTree(body);
204+
// This will need to be updated for #490 to unmarshall based on the kind of payload
205+
JsonNode taskNode = root.elements().next();
206+
Task task = Utils.OBJECT_MAPPER.treeToValue(taskNode, Task.TYPE_REFERENCE);
207+
tasks.add(task);
203208
try {
204209
return new A2AHttpResponse() {
205210
@Override

server-common/src/test/java/io/a2a/server/tasks/PushNotificationSenderTest.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package io.a2a.server.tasks;
22

3-
import static io.a2a.client.http.A2AHttpClient.APPLICATION_JSON;
3+
import static io.a2a.client.http.A2AHttpClient.APPLICATION_JSON;
44
import static io.a2a.client.http.A2AHttpClient.CONTENT_TYPE;
55
import static org.junit.jupiter.api.Assertions.assertEquals;
66
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -16,17 +16,18 @@
1616
import java.util.concurrent.TimeUnit;
1717
import java.util.function.Consumer;
1818

19+
import com.fasterxml.jackson.databind.JsonNode;
1920
import org.junit.jupiter.api.BeforeEach;
2021
import org.junit.jupiter.api.Test;
2122

2223
import io.a2a.client.http.A2AHttpClient;
2324
import io.a2a.client.http.A2AHttpResponse;
2425
import io.a2a.common.A2AHeaders;
25-
import io.a2a.util.Utils;
2626
import io.a2a.spec.PushNotificationConfig;
2727
import io.a2a.spec.Task;
2828
import io.a2a.spec.TaskState;
2929
import io.a2a.spec.TaskStatus;
30+
import io.a2a.util.Utils;
3031

3132
public class PushNotificationSenderTest {
3233

@@ -77,7 +78,11 @@ public A2AHttpResponse post() throws IOException, InterruptedException {
7778
}
7879

7980
try {
80-
Task task = Utils.OBJECT_MAPPER.readValue(body, Task.TYPE_REFERENCE);
81+
JsonNode root = Utils.OBJECT_MAPPER.readTree(body);
82+
// This assumes there is always one field in the outer JSON object.
83+
// This will need to be updated for #490 to unmarshall based on the kind of payload
84+
JsonNode taskNode = root.elements().next();
85+
Task task = Utils.OBJECT_MAPPER.treeToValue(taskNode, Task.TYPE_REFERENCE);
8186
tasks.add(task);
8287
urls.add(url);
8388
headers.add(new java.util.HashMap<>(requestHeaders));

spec/src/main/java/io/a2a/util/Utils.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.util.ArrayList;
44
import java.util.List;
5+
import java.util.Map;
56
import java.util.logging.Logger;
67

78
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -11,6 +12,7 @@
1112

1213
import io.a2a.spec.Artifact;
1314
import io.a2a.spec.Part;
15+
import io.a2a.spec.StreamingEventKind;
1416
import io.a2a.spec.Task;
1517
import io.a2a.spec.TaskArtifactUpdateEvent;
1618

@@ -69,6 +71,21 @@ public static <T> T unmarshalFrom(String data, TypeReference<T> typeRef) throws
6971
return OBJECT_MAPPER.readValue(data, typeRef);
7072
}
7173

74+
/**
75+
* Serializes a StreamingEventKind in a JSON string
76+
* <p>
77+
* The StreamingEventKind object is wrapped in a JSON field named from its kind (e.g. "task") before
78+
* it is serialized
79+
*
80+
* @param kind the StreamingEventKind to deserialize
81+
* @return a JSON String
82+
* @throws JsonProcessingException if JSON parsing fails
83+
*/
84+
public static String marshalFrom(StreamingEventKind kind) throws JsonProcessingException {
85+
Map<String, StreamingEventKind> wrapper = Map.of(kind.getKind(), kind);
86+
return OBJECT_MAPPER.writeValueAsString(wrapper);
87+
}
88+
7289
/**
7390
* Returns the provided value if non-null, otherwise returns the default value.
7491
* <p>

0 commit comments

Comments
 (0)