Skip to content

Commit 848cb83

Browse files
fjumaehsavoie
authored andcommitted
feat: Add a base ClientTransport class, create a new JSONRPC and gRPC transport implementations, and introduce a new transport-independent Client implementation
1 parent 950e310 commit 848cb83

19 files changed

+1236
-31
lines changed

client/base/pom.xml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,10 @@
4141
<dependency>
4242
<groupId>${project.groupId}</groupId>
4343
<artifactId>a2a-java-sdk-common</artifactId>
44-
<version>${project.version}</version>
4544
</dependency>
4645
<dependency>
4746
<groupId>${project.groupId}</groupId>
4847
<artifactId>a2a-java-sdk-spec</artifactId>
49-
<version>${project.version}</version>
5048
</dependency>
5149
<dependency>
5250
<groupId>org.junit.jupiter</groupId>
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
package io.a2a.client;
2+
3+
import static io.a2a.util.Assert.checkNotNullParam;
4+
5+
import java.util.List;
6+
import java.util.Map;
7+
import java.util.function.BiConsumer;
8+
import java.util.function.Consumer;
9+
10+
import io.a2a.spec.A2AClientException;
11+
import io.a2a.spec.AgentCard;
12+
import io.a2a.spec.DeleteTaskPushNotificationConfigParams;
13+
import io.a2a.spec.GetTaskPushNotificationConfigParams;
14+
import io.a2a.spec.ListTaskPushNotificationConfigParams;
15+
import io.a2a.spec.Message;
16+
import io.a2a.spec.PushNotificationConfig;
17+
import io.a2a.spec.Task;
18+
import io.a2a.spec.TaskIdParams;
19+
import io.a2a.spec.TaskPushNotificationConfig;
20+
import io.a2a.spec.TaskQueryParams;
21+
22+
/**
23+
* Abstract class representing an A2A client. Provides a standard set
24+
* of methods for interacting with an A2A agent, regardless of the underlying
25+
* transport protocol. It supports sending messages, managing tasks, and
26+
* handling event streams.
27+
*/
28+
public abstract class AbstractClient {
29+
30+
private final List<BiConsumer<ClientEvent, AgentCard>> consumers;
31+
private final Consumer<Throwable> streamingErrorHandler;
32+
33+
public AbstractClient(List<BiConsumer<ClientEvent, AgentCard>> consumers) {
34+
this(consumers, null);
35+
}
36+
37+
public AbstractClient(List<BiConsumer<ClientEvent, AgentCard>> consumers, Consumer<Throwable> streamingErrorHandler) {
38+
checkNotNullParam("consumers", consumers);
39+
this.consumers = consumers;
40+
this.streamingErrorHandler = streamingErrorHandler;
41+
}
42+
43+
/**
44+
* Send a message to the remote agent. This method will automatically use
45+
* the streaming or non-streaming approach as determined by the server's
46+
* agent card and the client configuration. The configured client consumers
47+
* and will be used to handle messages, tasks, and update events received
48+
* from the remote agent. The configured streaming error handler will be used
49+
* if an error occurs during streaming. The configured client push notification
50+
* configuration will get used for streaming.
51+
*
52+
* @param request the message
53+
* @param context optional client call context for the request (may be {@code null})
54+
* @throws A2AClientException if sending the message fails for any reason
55+
*/
56+
public abstract void sendMessage(Message request, ClientCallContext context) throws A2AClientException;
57+
58+
/**
59+
* Send a message to the remote agent. This method will automatically use
60+
* the streaming or non-streaming approach as determined by the server's
61+
* agent card and the client configuration. The configured client consumers
62+
* will be used to handle messages, tasks, and update events received from
63+
* the remote agent. The configured streaming error handler will be used
64+
* if an error occurs during streaming.
65+
*
66+
* @param request the message
67+
* @param pushNotificationConfiguration the push notification configuration that should be
68+
* used if the streaming approach is used
69+
* @param metadata the optional metadata to include when sending the message
70+
* @throws A2AClientException if sending the message fails for any reason
71+
*/
72+
public abstract void sendMessage(Message request, PushNotificationConfig pushNotificationConfiguration,
73+
Map<String, Object> metadata, ClientCallContext context) throws A2AClientException;
74+
75+
/**
76+
* Retrieve the current state and history of a specific task.
77+
*
78+
* @param request the task query parameters specifying which task to retrieve
79+
* @param context optional client call context for the request (may be {@code null})
80+
* @return the task
81+
* @throws A2AClientException if retrieving the task fails for any reason
82+
*/
83+
public abstract Task getTask(TaskQueryParams request, ClientCallContext context) throws A2AClientException;
84+
85+
/**
86+
* Request the agent to cancel a specific task.
87+
*
88+
* @param request the task ID parameters specifying which task to cancel
89+
* @param context optional client call context for the request (may be {@code null})
90+
* @return the cancelled task
91+
* @throws A2AClientException if cancelling the task fails for any reason
92+
*/
93+
public abstract Task cancelTask(TaskIdParams request, ClientCallContext context) throws A2AClientException;
94+
95+
/**
96+
* Set or update the push notification configuration for a specific task.
97+
*
98+
* @param request the push notification configuration to set for the task
99+
* @param context optional client call context for the request (may be {@code null})
100+
* @return the configured TaskPushNotificationConfig
101+
* @throws A2AClientException if setting the task push notification configuration fails for any reason
102+
*/
103+
public abstract TaskPushNotificationConfig setTaskPushNotificationConfiguration(
104+
TaskPushNotificationConfig request,
105+
ClientCallContext context) throws A2AClientException;
106+
107+
/**
108+
* Retrieve the push notification configuration for a specific task.
109+
*
110+
* @param request the parameters specifying which task's notification config to retrieve
111+
* @param context optional client call context for the request (may be {@code null})
112+
* @return the task push notification config
113+
* @throws A2AClientException if getting the task push notification config fails for any reason
114+
*/
115+
public abstract TaskPushNotificationConfig getTaskPushNotificationConfiguration(
116+
GetTaskPushNotificationConfigParams request,
117+
ClientCallContext context) throws A2AClientException;
118+
119+
/**
120+
* Retrieve the list of push notification configurations for a specific task.
121+
*
122+
* @param request the parameters specifying which task's notification configs to retrieve
123+
* @param context optional client call context for the request (may be {@code null})
124+
* @return the list of task push notification configs
125+
* @throws A2AClientException if getting the task push notification configs fails for any reason
126+
*/
127+
public abstract List<TaskPushNotificationConfig> listTaskPushNotificationConfigurations(
128+
ListTaskPushNotificationConfigParams request,
129+
ClientCallContext context) throws A2AClientException;
130+
131+
/**
132+
* Delete the list of push notification configurations for a specific task.
133+
*
134+
* @param request the parameters specifying which task's notification configs to delete
135+
* @param context optional client call context for the request (may be {@code null})
136+
* @throws A2AClientException if deleting the task push notification configs fails for any reason
137+
*/
138+
public abstract void deleteTaskPushNotificationConfigurations(
139+
DeleteTaskPushNotificationConfigParams request,
140+
ClientCallContext context) throws A2AClientException;
141+
142+
/**
143+
* Resubscribe to a task's event stream.
144+
* This is only available if both the client and server support streaming.
145+
*
146+
* @param request the parameters specifying which task's notification configs to delete
147+
* @param context optional client call context for the request (may be {@code null})
148+
* @throws A2AClientException if resubscribing fails for any reason
149+
*/
150+
public abstract void resubscribe(TaskIdParams request, ClientCallContext context) throws A2AClientException;
151+
152+
/**
153+
* Retrieve the AgentCard.
154+
*
155+
* @param context optional client call context for the request (may be {@code null})
156+
* @return the AgentCard
157+
* @throws A2AClientException if retrieving the agent card fails for any reason
158+
*/
159+
public abstract AgentCard getAgentCard(ClientCallContext context) throws A2AClientException;
160+
161+
/**
162+
* Close the transport and release any associated resources.
163+
*/
164+
public abstract void close();
165+
166+
/**
167+
* Process the event using all configured consumers.
168+
*/
169+
public void consume(ClientEvent clientEventOrMessage, AgentCard agentCard) {
170+
for (BiConsumer<ClientEvent, AgentCard> consumer : consumers) {
171+
consumer.accept(clientEventOrMessage, agentCard);
172+
}
173+
}
174+
175+
/**
176+
* Get the error handler that should be used during streaming.
177+
*
178+
* @return the streaming error handler
179+
*/
180+
public Consumer<Throwable> getStreamingErrorHandler() {
181+
return streamingErrorHandler;
182+
}
183+
184+
}
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
package io.a2a.client;
2+
3+
import java.util.List;
4+
import java.util.Map;
5+
import java.util.function.BiConsumer;
6+
import java.util.function.Consumer;
7+
8+
import io.a2a.client.transport.ClientTransport;
9+
import io.a2a.spec.A2AClientError;
10+
import io.a2a.spec.A2AClientException;
11+
import io.a2a.spec.A2AClientInvalidStateError;
12+
import io.a2a.spec.AgentCard;
13+
import io.a2a.spec.DeleteTaskPushNotificationConfigParams;
14+
import io.a2a.spec.EventKind;
15+
import io.a2a.spec.GetTaskPushNotificationConfigParams;
16+
import io.a2a.spec.ListTaskPushNotificationConfigParams;
17+
import io.a2a.spec.Message;
18+
import io.a2a.spec.MessageSendConfiguration;
19+
import io.a2a.spec.MessageSendParams;
20+
import io.a2a.spec.PushNotificationConfig;
21+
import io.a2a.spec.StreamingEventKind;
22+
import io.a2a.spec.Task;
23+
import io.a2a.spec.TaskArtifactUpdateEvent;
24+
import io.a2a.spec.TaskIdParams;
25+
import io.a2a.spec.TaskPushNotificationConfig;
26+
import io.a2a.spec.TaskQueryParams;
27+
import io.a2a.spec.TaskStatusUpdateEvent;
28+
29+
public class Client extends AbstractClient {
30+
31+
private final ClientConfig clientConfig;
32+
private final ClientTransport clientTransport;
33+
private AgentCard agentCard;
34+
35+
public Client(AgentCard agentCard, ClientConfig clientConfig, ClientTransport clientTransport,
36+
List<BiConsumer<ClientEvent, AgentCard>> consumers, Consumer<Throwable> streamingErrorHandler) {
37+
super(consumers, streamingErrorHandler);
38+
this.agentCard = agentCard;
39+
this.clientConfig = clientConfig;
40+
this.clientTransport = clientTransport;
41+
}
42+
43+
44+
@Override
45+
public void sendMessage(Message request, ClientCallContext context) throws A2AClientException {
46+
MessageSendConfiguration messageSendConfiguration = new MessageSendConfiguration.Builder()
47+
.acceptedOutputModes(clientConfig.getAcceptedOutputModes())
48+
.blocking(clientConfig.isPolling())
49+
.historyLength(clientConfig.getHistoryLength())
50+
.pushNotification(clientConfig.getPushNotificationConfig())
51+
.build();
52+
53+
MessageSendParams messageSendParams = new MessageSendParams.Builder()
54+
.message(request)
55+
.configuration(messageSendConfiguration)
56+
.metadata(clientConfig.getMetadata())
57+
.build();
58+
59+
sendMessage(messageSendParams, context);
60+
}
61+
62+
@Override
63+
public void sendMessage(Message request, PushNotificationConfig pushNotificationConfiguration,
64+
Map<String, Object> metatadata, ClientCallContext context) throws A2AClientException {
65+
MessageSendConfiguration messageSendConfiguration = new MessageSendConfiguration.Builder()
66+
.acceptedOutputModes(clientConfig.getAcceptedOutputModes())
67+
.blocking(clientConfig.isPolling())
68+
.historyLength(clientConfig.getHistoryLength())
69+
.pushNotification(pushNotificationConfiguration)
70+
.build();
71+
72+
MessageSendParams messageSendParams = new MessageSendParams.Builder()
73+
.message(request)
74+
.configuration(messageSendConfiguration)
75+
.metadata(metatadata)
76+
.build();
77+
78+
sendMessage(messageSendParams, context);
79+
}
80+
81+
@Override
82+
public Task getTask(TaskQueryParams request, ClientCallContext context) throws A2AClientException {
83+
return clientTransport.getTask(request, context);
84+
}
85+
86+
@Override
87+
public Task cancelTask(TaskIdParams request, ClientCallContext context) throws A2AClientException {
88+
return clientTransport.cancelTask(request, context);
89+
}
90+
91+
@Override
92+
public TaskPushNotificationConfig setTaskPushNotificationConfiguration(
93+
TaskPushNotificationConfig request, ClientCallContext context) throws A2AClientException {
94+
return clientTransport.setTaskPushNotificationConfiguration(request, context);
95+
}
96+
97+
@Override
98+
public TaskPushNotificationConfig getTaskPushNotificationConfiguration(
99+
GetTaskPushNotificationConfigParams request, ClientCallContext context) throws A2AClientException {
100+
return clientTransport.getTaskPushNotificationConfiguration(request, context);
101+
}
102+
103+
@Override
104+
public List<TaskPushNotificationConfig> listTaskPushNotificationConfigurations(
105+
ListTaskPushNotificationConfigParams request, ClientCallContext context) throws A2AClientException {
106+
return clientTransport.listTaskPushNotificationConfigurations(request, context);
107+
}
108+
109+
@Override
110+
public void deleteTaskPushNotificationConfigurations(
111+
DeleteTaskPushNotificationConfigParams request, ClientCallContext context) throws A2AClientException {
112+
clientTransport.deleteTaskPushNotificationConfigurations(request, context);
113+
}
114+
115+
@Override
116+
public void resubscribe(TaskIdParams request, ClientCallContext context) throws A2AClientException {
117+
if (! clientConfig.isStreaming() || ! agentCard.capabilities().streaming()) {
118+
throw new A2AClientException("Client and/or server does not support resubscription");
119+
}
120+
ClientTaskManager tracker = new ClientTaskManager();
121+
Consumer<StreamingEventKind> eventHandler = event -> {
122+
try {
123+
ClientEvent clientEvent = getClientEvent(event, tracker);
124+
consume(clientEvent, agentCard);
125+
} catch (A2AClientError e) {
126+
getStreamingErrorHandler().accept(e);
127+
}
128+
};
129+
clientTransport.resubscribe(request, eventHandler, getStreamingErrorHandler(), context);
130+
}
131+
132+
@Override
133+
public AgentCard getAgentCard(ClientCallContext context) throws A2AClientException {
134+
agentCard = clientTransport.getAgentCard(context);
135+
return agentCard;
136+
}
137+
138+
@Override
139+
public void close() {
140+
clientTransport.close();
141+
}
142+
143+
private ClientEvent getClientEvent(StreamingEventKind event, ClientTaskManager taskManager) throws A2AClientError {
144+
if (event instanceof Message message) {
145+
return new MessageEvent(message);
146+
} else if (event instanceof Task task) {
147+
taskManager.saveTaskEvent(task);
148+
return new TaskEvent(taskManager.getCurrentTask());
149+
} else if (event instanceof TaskStatusUpdateEvent updateEvent) {
150+
taskManager.saveTaskEvent(updateEvent);
151+
return new TaskUpdateEvent(taskManager.getCurrentTask(), updateEvent);
152+
} else if (event instanceof TaskArtifactUpdateEvent updateEvent) {
153+
taskManager.saveTaskEvent(updateEvent);
154+
return new TaskUpdateEvent(taskManager.getCurrentTask(), updateEvent);
155+
} else {
156+
throw new A2AClientInvalidStateError("Invalid client event");
157+
}
158+
}
159+
160+
private void sendMessage(MessageSendParams messageSendParams, ClientCallContext context) throws A2AClientException {
161+
if (! clientConfig.isStreaming() || ! agentCard.capabilities().streaming()) {
162+
EventKind eventKind = clientTransport.sendMessage(messageSendParams, context);
163+
ClientEvent clientEvent;
164+
if (eventKind instanceof Task task) {
165+
clientEvent = new TaskEvent(task);
166+
} else {
167+
// must be a message
168+
clientEvent = new MessageEvent((Message) eventKind);
169+
}
170+
consume(clientEvent, agentCard);
171+
} else {
172+
ClientTaskManager tracker = new ClientTaskManager();
173+
Consumer<StreamingEventKind> eventHandler = event -> {
174+
try {
175+
ClientEvent clientEvent = getClientEvent(event, tracker);
176+
consume(clientEvent, agentCard);
177+
} catch (A2AClientError e) {
178+
getStreamingErrorHandler().accept(e);
179+
}
180+
};
181+
clientTransport.sendMessageStreaming(messageSendParams, eventHandler, getStreamingErrorHandler(), context);
182+
}
183+
}
184+
}

0 commit comments

Comments
 (0)