Skip to content

Commit 01c776d

Browse files
committed
Switch InMemoryPushNotifier to use the new client
1 parent 783f3da commit 01c776d

File tree

5 files changed

+108
-44
lines changed

5 files changed

+108
-44
lines changed

src/main/java/io/a2a/http/TempA2AHttpClient.java

Lines changed: 0 additions & 7 deletions
This file was deleted.

src/main/java/io/a2a/server/tasks/InMemoryPushNotifier.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,26 @@
11
package io.a2a.server.tasks;
22

3+
import java.io.IOException;
34
import java.util.Collections;
45
import java.util.HashMap;
56
import java.util.Map;
67

78
import jakarta.enterprise.context.ApplicationScoped;
89
import jakarta.inject.Inject;
910

10-
import io.a2a.http.TempA2AHttpClient;
11+
import com.fasterxml.jackson.core.JsonProcessingException;
12+
import io.a2a.http.A2AHttpClient;
1113
import io.a2a.spec.PushNotificationConfig;
1214
import io.a2a.spec.Task;
15+
import io.a2a.util.Utils;
1316

1417
@ApplicationScoped
1518
public class InMemoryPushNotifier implements PushNotifier {
16-
private final TempA2AHttpClient httpClient;
19+
private final A2AHttpClient httpClient;
1720
private final Map<String, PushNotificationConfig> pushNotificationInfos = Collections.synchronizedMap(new HashMap<>());
1821

1922
@Inject
20-
public InMemoryPushNotifier(TempA2AHttpClient httpClient) {
23+
public InMemoryPushNotifier(A2AHttpClient httpClient) {
2124
this.httpClient = httpClient;
2225
}
2326

@@ -43,8 +46,28 @@ public void sendNotification(Task task) {
4346
return;
4447
}
4548
String url = pushInfo.url();
46-
// TODO https://github.com/fjuma/a2a-java-sdk/issues/59 will have the real client
47-
httpClient.post(url, task);
49+
50+
// TODO auth
51+
52+
String body;
53+
try {
54+
body = Utils.OBJECT_MAPPER.writeValueAsString(task);
55+
} catch (JsonProcessingException e) {
56+
e.printStackTrace();
57+
throw new RuntimeException("Error writing value as string: " + e.getMessage(), e);
58+
} catch (Throwable throwable) {
59+
throwable.printStackTrace();
60+
throw new RuntimeException("Error writing value as string: " + throwable.getMessage(), throwable);
61+
}
62+
63+
try {
64+
httpClient.createPost()
65+
.url(url)
66+
.body(body)
67+
.post();
68+
} catch (IOException | InterruptedException e) {
69+
throw new RuntimeException("Error pushing data to " + url + ": " + e.getMessage(), e);
70+
}
4871

4972
}
5073
}

src/test/java/io/a2a/server/requesthandlers/JSONRPCHandlerTest.java

Lines changed: 63 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,22 @@
88
import static org.junit.jupiter.api.Assertions.assertTrue;
99
import static org.junit.jupiter.api.Assertions.fail;
1010

11+
import java.io.IOException;
1112
import java.util.ArrayList;
1213
import java.util.Collections;
1314
import java.util.List;
15+
import java.util.concurrent.CompletableFuture;
1416
import java.util.concurrent.CountDownLatch;
1517
import java.util.concurrent.Executors;
1618
import java.util.concurrent.Flow;
1719
import java.util.concurrent.TimeUnit;
1820
import java.util.concurrent.atomic.AtomicReference;
21+
import java.util.function.Consumer;
1922

2023
import jakarta.enterprise.context.Dependent;
2124

22-
import io.a2a.http.TempA2AHttpClient;
25+
import io.a2a.http.A2AHttpClient;
26+
import io.a2a.http.A2AHttpResponse;
2327
import io.a2a.server.agentexecution.AgentExecutor;
2428
import io.a2a.server.agentexecution.RequestContext;
2529
import io.a2a.server.events.Event;
@@ -65,6 +69,7 @@
6569
import io.a2a.spec.TaskStatusUpdateEvent;
6670
import io.a2a.spec.TextPart;
6771
import io.a2a.spec.UnsupportedOperationError;
72+
import io.a2a.util.Utils;
6873
import mutiny.zero.ZeroPublisher;
6974
import org.junit.jupiter.api.AfterEach;
7075
import org.junit.jupiter.api.BeforeEach;
@@ -698,7 +703,7 @@ public void onSubscribe(Flow.Subscription subscription) {
698703

699704
@Override
700705
public void onNext(SendStreamingMessageResponse item) {
701-
// System.out.println("-> " + item.getResult());
706+
System.out.println("-> " + item.getResult());
702707
results.add(item.getResult());
703708
System.out.println(results);
704709
subscriptionRef.get().request(1);
@@ -1260,18 +1265,67 @@ private interface AgentExecutorMethod {
12601265
}
12611266

12621267
@Dependent
1263-
private static class TestHttpClient implements TempA2AHttpClient {
1268+
private static class TestHttpClient implements A2AHttpClient {
12641269
final List<Task> tasks = Collections.synchronizedList(new ArrayList<>());
12651270
volatile CountDownLatch latch;
12661271

12671272
@Override
1268-
public int post(String url, Task task) {
1269-
// System.out.println("----> adding " + task);
1270-
tasks.add(task);
1271-
if (latch != null) {
1272-
latch.countDown();
1273+
public GetBuilder createGet() {
1274+
return null;
1275+
}
1276+
1277+
@Override
1278+
public PostBuilder createPost() {
1279+
return new TestPostBuilder();
1280+
}
1281+
1282+
class TestPostBuilder implements A2AHttpClient.PostBuilder {
1283+
private volatile String body;
1284+
@Override
1285+
public PostBuilder body(String body) {
1286+
this.body = body;
1287+
return this;
1288+
}
1289+
1290+
@Override
1291+
public A2AHttpResponse post() throws IOException, InterruptedException {
1292+
tasks.add(Utils.OBJECT_MAPPER.readValue(body, Task.TYPE_REFERENCE));
1293+
try {
1294+
return new A2AHttpResponse() {
1295+
@Override
1296+
public int status() {
1297+
return 200;
1298+
}
1299+
1300+
@Override
1301+
public boolean success() {
1302+
return true;
1303+
}
1304+
1305+
@Override
1306+
public String body() {
1307+
return "";
1308+
}
1309+
};
1310+
} finally {
1311+
latch.countDown();
1312+
}
1313+
}
1314+
1315+
@Override
1316+
public CompletableFuture<Void> postAsyncSSE(Consumer<String> messageConsumer, Consumer<Throwable> errorConsumer, Runnable completeRunnable) throws IOException, InterruptedException {
1317+
return null;
1318+
}
1319+
1320+
@Override
1321+
public PostBuilder url(String s) {
1322+
return this;
1323+
}
1324+
1325+
@Override
1326+
public PostBuilder addHeader(String name, String value) {
1327+
return this;
12731328
}
1274-
return 200;
12751329
}
12761330
}
12771331
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package io.a2a.examples.helloworld.server;
2+
3+
import io.a2a.http.A2AHttpClient;
4+
import io.a2a.http.JdkA2AHttpClient;
5+
6+
import jakarta.enterprise.context.ApplicationScoped;
7+
import jakarta.enterprise.inject.Produces;
8+
9+
@ApplicationScoped
10+
public class A2AHttpClientProducer {
11+
12+
@Produces
13+
public A2AHttpClient httpClient() {
14+
return new JdkA2AHttpClient();
15+
}
16+
17+
}

tck/src/main/java/io/a2a/examples/helloworld/server/TmpA2AHttpClientProducer.java

Lines changed: 0 additions & 23 deletions
This file was deleted.

0 commit comments

Comments
 (0)