Skip to content

Commit 6edaabf

Browse files
authored
Merge pull request #108 from kabir/http-client
Switch out OkHttpClient for HttpClient from the JDK
2 parents d8f9e4c + 336ad17 commit 6edaabf

File tree

17 files changed

+538
-250
lines changed

17 files changed

+538
-250
lines changed
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package io.a2a.examples.helloworld.server;
2+
3+
import io.a2a.http.A2AHttpClient;
4+
import io.a2a.http.JdkA2AHttpClient;
5+
6+
import jakarta.enterprise.context.ApplicationScoped;
7+
import jakarta.enterprise.inject.Produces;
8+
9+
@ApplicationScoped
10+
public class A2AHttpClientProducer {
11+
12+
@Produces
13+
public A2AHttpClient httpClient() {
14+
return new JdkA2AHttpClient();
15+
}
16+
17+
}

examples/src/main/java/io/a2a/examples/helloworld/server/TmpA2AHttpClientProducer.java

Lines changed: 0 additions & 23 deletions
This file was deleted.

pom.xml

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
<mockserver.version>5.15.0</mockserver.version>
2626
<mutiny-zero.version>1.1.1</mutiny-zero.version>
2727
<quarkus.platform.version>3.22.3</quarkus.platform.version>
28-
<okhttp.version>4.12.0</okhttp.version>
2928
<rest-assured.version>5.5.1</rest-assured.version>
3029
<slf4j.version>2.0.17</slf4j.version>
3130

@@ -63,16 +62,6 @@
6362
<artifactId>jackson-datatype-jsr310</artifactId>
6463
<version>${jackson.version}</version>
6564
</dependency>
66-
<dependency>
67-
<groupId>com.squareup.okhttp3</groupId>
68-
<artifactId>okhttp</artifactId>
69-
<version>${okhttp.version}</version>
70-
</dependency>
71-
<dependency>
72-
<groupId>com.squareup.okhttp3</groupId>
73-
<artifactId>okhttp-sse</artifactId>
74-
<version>${okhttp.version}</version>
75-
</dependency>
7665
<dependency>
7766
<groupId>io.smallrye.reactive</groupId>
7867
<artifactId>mutiny-zero</artifactId>

src/main/java/io/a2a/client/A2AClient.java

Lines changed: 31 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,16 @@
1313

1414
import java.io.IOException;
1515
import java.util.Map;
16+
import java.util.concurrent.CompletableFuture;
17+
import java.util.concurrent.atomic.AtomicReference;
1618
import java.util.function.Consumer;
1719

1820
import com.fasterxml.jackson.core.JsonProcessingException;
1921
import com.fasterxml.jackson.core.type.TypeReference;
2022
import io.a2a.client.sse.SSEEventListener;
23+
import io.a2a.http.A2AHttpClient;
24+
import io.a2a.http.A2AHttpResponse;
25+
import io.a2a.http.JdkA2AHttpClient;
2126
import io.a2a.spec.A2A;
2227
import io.a2a.spec.A2AServerException;
2328
import io.a2a.spec.AgentCard;
@@ -40,12 +45,6 @@
4045
import io.a2a.spec.TaskIdParams;
4146
import io.a2a.spec.TaskPushNotificationConfig;
4247
import io.a2a.spec.TaskQueryParams;
43-
import okhttp3.MediaType;
44-
import okhttp3.OkHttpClient;
45-
import okhttp3.Request;
46-
import okhttp3.RequestBody;
47-
import okhttp3.Response;
48-
import okhttp3.sse.EventSources;
4948

5049
/**
5150
* An A2A client.
@@ -57,8 +56,7 @@ public class A2AClient {
5756
private static final TypeReference<CancelTaskResponse> CANCEL_TASK_RESPONSE_REFERENCE = new TypeReference<>() {};
5857
private static final TypeReference<GetTaskPushNotificationConfigResponse> GET_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE = new TypeReference<>() {};
5958
private static final TypeReference<SetTaskPushNotificationConfigResponse> SET_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE = new TypeReference<>() {};
60-
private static final MediaType JSON_MEDIA_TYPE = MediaType.parse("application/json; charset=utf-8");
61-
private final OkHttpClient httpClient;
59+
private final A2AHttpClient httpClient;
6260
private final String agentUrl;
6361
private AgentCard agentCard;
6462

@@ -72,7 +70,7 @@ public A2AClient(AgentCard agentCard) {
7270
checkNotNullParam("agentCard", agentCard);
7371
this.agentCard = agentCard;
7472
this.agentUrl = agentCard.url();
75-
this.httpClient = new OkHttpClient();
73+
this.httpClient = new JdkA2AHttpClient();
7674
}
7775

7876
/**
@@ -83,7 +81,7 @@ public A2AClient(AgentCard agentCard) {
8381
public A2AClient(String agentUrl) {
8482
checkNotNullParam("agentUrl", agentUrl);
8583
this.agentUrl = agentUrl;
86-
this.httpClient = new OkHttpClient();
84+
this.httpClient = new JdkA2AHttpClient();
8785
}
8886

8987
/**
@@ -349,20 +347,6 @@ public SetTaskPushNotificationConfigResponse setTaskPushNotificationConfig(Strin
349347
}
350348
}
351349

352-
/**
353-
* Send a streaming message to the remote agent.
354-
*
355-
* @param messageSendParams the parameters for the message to be sent
356-
* @param eventHandler a consumer that will be invoked for each event received from the remote agent
357-
* @param errorHandler a consumer that will be invoked if the remote agent returns an error
358-
* @param failureHandler a consumer that will be invoked if a failure occurs when processing events
359-
* @throws A2AServerException if sending the streaming message fails for any reason
360-
*/
361-
public void sendStreamingMessage(MessageSendParams messageSendParams, Consumer<StreamingEventKind> eventHandler,
362-
Consumer<JSONRPCError> errorHandler, Runnable failureHandler) throws A2AServerException {
363-
sendStreamingMessage(null, messageSendParams, eventHandler, errorHandler, failureHandler);
364-
}
365-
366350
/**
367351
* Send a streaming message to the remote agent.
368352
*
@@ -374,7 +358,7 @@ public void sendStreamingMessage(MessageSendParams messageSendParams, Consumer<S
374358
* @throws A2AServerException if sending the streaming message fails for any reason
375359
*/
376360
public void sendStreamingMessage(String requestId, MessageSendParams messageSendParams, Consumer<StreamingEventKind> eventHandler,
377-
Consumer<JSONRPCError> errorHandler, Runnable failureHandler) throws A2AServerException {
361+
Consumer<JSONRPCError> errorHandler, Runnable failureHandler) throws A2AServerException {
378362
checkNotNullParam("messageSendParams", messageSendParams);
379363
checkNotNullParam("eventHandler", eventHandler);
380364
checkNotNullParam("errorHandler", errorHandler);
@@ -389,52 +373,41 @@ public void sendStreamingMessage(String requestId, MessageSendParams messageSend
389373
sendStreamingMessageRequestBuilder.id(requestId);
390374
}
391375

376+
AtomicReference<CompletableFuture<Void>> ref = new AtomicReference<>();
377+
SSEEventListener sseEventListener = new SSEEventListener(eventHandler, errorHandler, failureHandler);
392378
SendStreamingMessageRequest sendStreamingMessageRequest = sendStreamingMessageRequestBuilder.build();
393-
SSEEventListener sseEventListener = new SSEEventListener.Builder()
394-
.eventHandler(eventHandler)
395-
.errorHandler(errorHandler)
396-
.failureHandler(failureHandler)
397-
.build();
398379
try {
399-
EventSources.createFactory(httpClient)
400-
.newEventSource(createPostRequest(sendStreamingMessageRequest,
401-
true), sseEventListener);
380+
A2AHttpClient.PostBuilder builder = createPostBuilder(sendStreamingMessageRequest);
381+
ref.set(builder.postAsyncSSE(
382+
msg -> sseEventListener.onMessage(msg, ref.get()),
383+
throwable -> sseEventListener.onError(throwable, ref.get()),
384+
() -> {
385+
// We don't need to do anything special on completion
386+
}));
387+
402388
} catch (IOException e) {
403389
throw new A2AServerException("Failed to send streaming message request: " + e);
390+
} catch (InterruptedException e) {
391+
throw new A2AServerException("Send streaming message request timed out: " + e);
404392
}
405393
}
406394

407-
private String sendPostRequest(Object value) throws IOException, InterruptedException{
408-
return sendPostRequest(value, false);
409-
}
410-
411-
412-
private String sendPostRequest(Object value, boolean addEventStreamHeader) throws IOException, InterruptedException{
413-
Request okRequest = createPostRequest(value, addEventStreamHeader);
414-
try (Response response = httpClient.newCall(okRequest).execute()) {
415-
if (! response.isSuccessful()) {
416-
throw new IOException("Request failed " + response.code());
417-
}
418-
return response.body().string();
395+
private String sendPostRequest(Object value) throws IOException, InterruptedException {
396+
A2AHttpClient.PostBuilder builder = createPostBuilder(value);
397+
A2AHttpResponse response = builder.post();
398+
if (!response.success()) {
399+
throw new IOException("Request failed " + response.status());
419400
}
420-
401+
return response.body();
421402
}
422403

423-
private Request createPostRequest(Object value) throws IOException {
424-
return createPostRequest(value, false);
425-
}
426-
427-
private Request createPostRequest(Object value, boolean addEventStreamHeader) throws IOException {
428-
Request.Builder builder = new Request.Builder()
404+
private A2AHttpClient.PostBuilder createPostBuilder(Object value) throws JsonProcessingException {
405+
return httpClient.createPost()
429406
.url(agentUrl)
430407
.addHeader("Content-Type", "application/json")
431-
.post(RequestBody.create(OBJECT_MAPPER.writeValueAsString(value), JSON_MEDIA_TYPE));
432-
if (addEventStreamHeader) {
433-
builder.addHeader("Accept", "text/event-stream");
434-
}
435-
return builder.build();
436-
}
408+
.body(OBJECT_MAPPER.writeValueAsString(value));
437409

410+
}
438411

439412
private <T extends JSONRPCResponse> T unmarshalResponse(String response, TypeReference<T> typeReference)
440413
throws A2AServerException, JsonProcessingException {
Lines changed: 12 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -1,85 +1,44 @@
11
package io.a2a.client.sse;
22

3-
import static io.a2a.util.Assert.checkNotNullParam;
43
import static io.a2a.util.Utils.OBJECT_MAPPER;
54

5+
import java.util.concurrent.Future;
66
import java.util.function.Consumer;
77

88
import com.fasterxml.jackson.core.JsonProcessingException;
99
import com.fasterxml.jackson.databind.JsonNode;
1010
import io.a2a.spec.JSONRPCError;
1111
import io.a2a.spec.StreamingEventKind;
1212
import io.a2a.spec.TaskStatusUpdateEvent;
13-
import okhttp3.Response;
14-
import okhttp3.sse.EventSource;
15-
import okhttp3.sse.EventSourceListener;
1613
import org.slf4j.Logger;
1714
import org.slf4j.LoggerFactory;
1815

19-
/**
20-
* A listener for Server-Sent Events (SSE).
21-
*/
22-
public class SSEEventListener extends EventSourceListener {
23-
16+
public class SSEEventListener {
2417
private static final Logger log = LoggerFactory.getLogger(SSEEventListener.class);
25-
private final boolean logEvents;
2618
private final Consumer<StreamingEventKind> eventHandler;
2719
private final Consumer<JSONRPCError> errorHandler;
2820
private final Runnable failureHandler;
2921

30-
public SSEEventListener(Consumer<StreamingEventKind> eventHandler, Consumer<JSONRPCError> errorHandler,
31-
Runnable failureHandler) {
32-
this(false, eventHandler, errorHandler, failureHandler);
33-
}
34-
35-
public SSEEventListener(boolean logEvents, Consumer<StreamingEventKind> eventHandler,
36-
Consumer<JSONRPCError> errorHandler, Runnable failureHandler) {
37-
checkNotNullParam("eventHandler", eventHandler);
38-
checkNotNullParam("errorHandler", errorHandler);
39-
checkNotNullParam("failureHandler", failureHandler);
40-
this.logEvents = logEvents;
22+
public SSEEventListener(Consumer<StreamingEventKind> eventHandler, Consumer<JSONRPCError> errorHandler, Runnable failureHandler) {
4123
this.eventHandler = eventHandler;
4224
this.errorHandler = errorHandler;
4325
this.failureHandler = failureHandler;
4426
}
4527

46-
@Override
47-
public void onOpen(EventSource eventSource, Response response) {
48-
if (logEvents) {
49-
log.debug("onOpen()");
50-
}
51-
}
52-
53-
@Override
54-
public void onClosed(EventSource eventSource) {
55-
if (logEvents) {
56-
log.debug("onClosed()");
57-
}
58-
}
59-
60-
@Override
61-
public void onEvent(EventSource eventSource, String id, String type, String data) {
62-
if (type.equals("message")) {
63-
if (logEvents) {
64-
log.debug("onEvent() {}", data);
65-
}
66-
try {
67-
handleMessage(OBJECT_MAPPER.readTree(data), eventSource);
68-
} catch (JsonProcessingException e) {
69-
log.warn("Failed to parse JSON message: {}", data, e);
70-
}
28+
public void onMessage(String message, Future<Void> completableFuture) {
29+
try {
30+
handleMessage(OBJECT_MAPPER.readTree(message),completableFuture);
31+
} catch (JsonProcessingException e) {
32+
log.warn("Failed to parse JSON message: {}", message, e);
7133
}
7234
}
7335

74-
@Override
75-
public void onFailure(EventSource eventSource, Throwable t, Response response) {
76-
if (logEvents) {
77-
log.debug("onFailure()", t);
78-
}
36+
public void onError(Throwable throwable, Future<Void> future) {
7937
failureHandler.run();
38+
future.cancel(true); // close SSE channel
8039
}
8140

82-
private void handleMessage(JsonNode jsonNode, EventSource eventSource) {
41+
private void handleMessage(JsonNode jsonNode, Future<Void> future) {
8342
try {
8443
if (jsonNode.has("error")) {
8544
JSONRPCError error = OBJECT_MAPPER.treeToValue(jsonNode.get("error"), JSONRPCError.class);
@@ -90,7 +49,7 @@ private void handleMessage(JsonNode jsonNode, EventSource eventSource) {
9049
StreamingEventKind event = OBJECT_MAPPER.treeToValue(result, StreamingEventKind.class);
9150
eventHandler.accept(event);
9251
if (event instanceof TaskStatusUpdateEvent && ((TaskStatusUpdateEvent) event).isFinal()) {
93-
eventSource.cancel(); // close SSE channel
52+
future.cancel(true); // close SSE channel
9453
}
9554
} else {
9655
throw new IllegalArgumentException("Unknown message type");
@@ -100,34 +59,4 @@ private void handleMessage(JsonNode jsonNode, EventSource eventSource) {
10059
}
10160
}
10261

103-
public static class Builder {
104-
private boolean logEvents;
105-
private Consumer<StreamingEventKind> eventHandler;
106-
private Consumer<JSONRPCError> errorHandler;
107-
private Runnable failureHandler;
108-
109-
public Builder logEvents(boolean logEvents) {
110-
this.logEvents = logEvents;
111-
return this;
112-
}
113-
114-
public Builder eventHandler(Consumer<StreamingEventKind> eventHandler) {
115-
this.eventHandler = eventHandler;
116-
return this;
117-
}
118-
119-
public Builder errorHandler(Consumer<JSONRPCError> errorHandler) {
120-
this.errorHandler = errorHandler;
121-
return this;
122-
}
123-
124-
public Builder failureHandler(Runnable failureHandler) {
125-
this.failureHandler = failureHandler;
126-
return this;
127-
}
128-
129-
public SSEEventListener build() {
130-
return new SSEEventListener(logEvents, eventHandler, errorHandler, failureHandler);
131-
}
132-
}
13362
}

0 commit comments

Comments
 (0)