Skip to content

Commit a6d3664

Browse files
committed
chore: Update resubscribe and sendMessage without consumers will use the client's consumers
and its streamingErrorHandler The code was already doing that down the path but this simplifies the code as the consumers and streamingErrorhandler can be validated from the top of the API Add a bunch of @nullable & @nonnull to clarify the API contract This fixes #526 Signed-off-by: Jeff Mesnil <[email protected]>
1 parent 7465213 commit a6d3664

File tree

3 files changed

+172
-82
lines changed

3 files changed

+172
-82
lines changed

client/base/src/main/java/io/a2a/client/AbstractClient.java

Lines changed: 34 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,10 @@
3232
*/
3333
public abstract class AbstractClient {
3434

35-
private final List<BiConsumer<ClientEvent, AgentCard>> consumers;
36-
private final @Nullable Consumer<Throwable> streamingErrorHandler;
35+
protected final @NonNull List<BiConsumer<ClientEvent, AgentCard>> consumers;
36+
protected final @Nullable Consumer<Throwable> streamingErrorHandler;
3737

38-
public AbstractClient(List<BiConsumer<ClientEvent, AgentCard>> consumers) {
38+
public AbstractClient(@NonNull List<BiConsumer<ClientEvent, AgentCard>> consumers) {
3939
this(consumers, null);
4040
}
4141

@@ -57,7 +57,7 @@ public AbstractClient(@NonNull List<BiConsumer<ClientEvent, AgentCard>> consumer
5757
* @param request the message
5858
* @throws A2AClientException if sending the message fails for any reason
5959
*/
60-
public void sendMessage(Message request) throws A2AClientException {
60+
public void sendMessage(@NonNull Message request) throws A2AClientException {
6161
sendMessage(request, null);
6262
}
6363

@@ -74,7 +74,10 @@ public void sendMessage(Message request) throws A2AClientException {
7474
* @param context optional client call context for the request (may be {@code null})
7575
* @throws A2AClientException if sending the message fails for any reason
7676
*/
77-
public abstract void sendMessage(Message request, @Nullable ClientCallContext context) throws A2AClientException;
77+
public void sendMessage(@NonNull Message request,
78+
@Nullable ClientCallContext context) throws A2AClientException {
79+
sendMessage(request, consumers, streamingErrorHandler, context);
80+
}
7881

7982
/**
8083
* Send a message to the remote agent. This method will automatically use
@@ -90,9 +93,9 @@ public void sendMessage(Message request) throws A2AClientException {
9093
* @param streamingErrorHandler an error handler that should be used for the streaming case if an error occurs
9194
* @throws A2AClientException if sending the message fails for any reason
9295
*/
93-
public void sendMessage(Message request,
94-
List<BiConsumer<ClientEvent, AgentCard>> consumers,
95-
Consumer<Throwable> streamingErrorHandler) throws A2AClientException {
96+
public void sendMessage(@NonNull Message request,
97+
@NonNull List<BiConsumer<ClientEvent, AgentCard>> consumers,
98+
@Nullable Consumer<Throwable> streamingErrorHandler) throws A2AClientException {
9699
sendMessage(request, consumers, streamingErrorHandler, null);
97100
}
98101

@@ -111,9 +114,9 @@ public void sendMessage(Message request,
111114
* @param context optional client call context for the request (may be {@code null})
112115
* @throws A2AClientException if sending the message fails for any reason
113116
*/
114-
public abstract void sendMessage(Message request,
115-
List<BiConsumer<ClientEvent, AgentCard>> consumers,
116-
Consumer<Throwable> streamingErrorHandler,
117+
public abstract void sendMessage(@NonNull Message request,
118+
@NonNull List<BiConsumer<ClientEvent, AgentCard>> consumers,
119+
@Nullable Consumer<Throwable> streamingErrorHandler,
117120
@Nullable ClientCallContext context) throws A2AClientException;
118121

119122
/**
@@ -130,8 +133,9 @@ public abstract void sendMessage(Message request,
130133
* @param metadata the optional metadata to include when sending the message
131134
* @throws A2AClientException if sending the message fails for any reason
132135
*/
133-
public void sendMessage(Message request, PushNotificationConfig pushNotificationConfiguration,
134-
Map<String, Object> metadata) throws A2AClientException {
136+
public void sendMessage(@NonNull Message request,
137+
@Nullable PushNotificationConfig pushNotificationConfiguration,
138+
@Nullable Map<String, Object> metadata) throws A2AClientException {
135139
sendMessage(request, pushNotificationConfiguration, metadata, null);
136140
}
137141

@@ -147,11 +151,11 @@ public void sendMessage(Message request, PushNotificationConfig pushNotification
147151
* @param pushNotificationConfiguration the push notification configuration that should be
148152
* used if the streaming approach is used
149153
* @param metadata the optional metadata to include when sending the message
150-
* @param context optional client call context for the request (may be {@code null})
154+
* @param context optional client call context for the request
151155
* @throws A2AClientException if sending the message fails for any reason
152156
*/
153-
public abstract void sendMessage(Message request, PushNotificationConfig pushNotificationConfiguration,
154-
Map<String, Object> metadata, @Nullable ClientCallContext context) throws A2AClientException;
157+
public abstract void sendMessage(@NonNull Message request, @Nullable PushNotificationConfig pushNotificationConfiguration,
158+
@Nullable Map<String, Object> metadata, @Nullable ClientCallContext context) throws A2AClientException;
155159

156160
/**
157161
* Retrieve the current state and history of a specific task.
@@ -320,8 +324,8 @@ public abstract void deleteTaskPushNotificationConfigurations(
320324
* @param request the parameters specifying which task's notification configs to delete
321325
* @throws A2AClientException if resubscribing fails for any reason
322326
*/
323-
public void resubscribe(TaskIdParams request) throws A2AClientException {
324-
resubscribe(request, null);
327+
public void resubscribe(@NonNull TaskIdParams request) throws A2AClientException {
328+
resubscribe(request, consumers, streamingErrorHandler, null);
325329
}
326330

327331
/**
@@ -332,10 +336,12 @@ public void resubscribe(TaskIdParams request) throws A2AClientException {
332336
* error handler will be used if an error occurs during streaming.
333337
*
334338
* @param request the parameters specifying which task's notification configs to delete
335-
* @param context optional client call context for the request (may be {@code null})
339+
* @param context optional client call context for the request
336340
* @throws A2AClientException if resubscribing fails for any reason
337341
*/
338-
public abstract void resubscribe(TaskIdParams request, @Nullable ClientCallContext context) throws A2AClientException;
342+
public void resubscribe(@NonNull TaskIdParams request, @Nullable ClientCallContext context) throws A2AClientException {
343+
resubscribe(request, consumers, streamingErrorHandler, context);
344+
}
339345

340346
/**
341347
* Resubscribe to a task's event stream.
@@ -349,8 +355,9 @@ public void resubscribe(TaskIdParams request) throws A2AClientException {
349355
* @param streamingErrorHandler an error handler that should be used for the streaming case if an error occurs
350356
* @throws A2AClientException if resubscribing fails for any reason
351357
*/
352-
public void resubscribe(TaskIdParams request, List<BiConsumer<ClientEvent, AgentCard>> consumers,
353-
Consumer<Throwable> streamingErrorHandler) throws A2AClientException {
358+
public void resubscribe(@NonNull TaskIdParams request,
359+
@NonNull List<BiConsumer<ClientEvent, AgentCard>> consumers,
360+
@Nullable Consumer<Throwable> streamingErrorHandler) throws A2AClientException {
354361
resubscribe(request, consumers, streamingErrorHandler, null);
355362
}
356363

@@ -364,11 +371,13 @@ public void resubscribe(TaskIdParams request, List<BiConsumer<ClientEvent, Agent
364371
* @param request the parameters specifying which task's notification configs to delete
365372
* @param consumers a list of consumers to pass responses from the remote agent to
366373
* @param streamingErrorHandler an error handler that should be used for the streaming case if an error occurs
367-
* @param context optional client call context for the request (may be {@code null})
374+
* @param context optional client call context for the request
368375
* @throws A2AClientException if resubscribing fails for any reason
369376
*/
370-
public abstract void resubscribe(TaskIdParams request, List<BiConsumer<ClientEvent, AgentCard>> consumers,
371-
Consumer<Throwable> streamingErrorHandler, @Nullable ClientCallContext context) throws A2AClientException;
377+
public abstract void resubscribe(@NonNull TaskIdParams request,
378+
@NonNull List<BiConsumer<ClientEvent, AgentCard>> consumers,
379+
@Nullable Consumer<Throwable> streamingErrorHandler,
380+
@Nullable ClientCallContext context) throws A2AClientException;
372381

373382
/**
374383
* Retrieve the AgentCard.
@@ -394,15 +403,6 @@ public AgentCard getAgentCard() throws A2AClientException {
394403
*/
395404
public abstract void close();
396405

397-
/**
398-
* Process the event using all configured consumers.
399-
*/
400-
void consume(ClientEvent clientEventOrMessage, AgentCard agentCard) {
401-
for (BiConsumer<ClientEvent, AgentCard> consumer : consumers) {
402-
consumer.accept(clientEventOrMessage, agentCard);
403-
}
404-
}
405-
406406
/**
407407
* Get the error handler that should be used during streaming.
408408
*

client/base/src/main/java/io/a2a/client/Client.java

Lines changed: 32 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -56,30 +56,28 @@ public static ClientBuilder builder(AgentCard agentCard) {
5656
}
5757

5858
@Override
59-
public void sendMessage(Message request, @Nullable ClientCallContext context) throws A2AClientException {
60-
MessageSendParams messageSendParams = getMessageSendParams(request, clientConfig);
61-
sendMessage(messageSendParams, null, null, context);
62-
}
63-
64-
@Override
65-
public void sendMessage(Message request, List<BiConsumer<ClientEvent, AgentCard>> consumers,
66-
Consumer<Throwable> streamingErrorHandler, @Nullable ClientCallContext context) throws A2AClientException {
59+
public void sendMessage(@NonNull Message request,
60+
@NonNull List<BiConsumer<ClientEvent, AgentCard>> consumers,
61+
@Nullable Consumer<Throwable> streamingErrorHandler,
62+
@Nullable ClientCallContext context) throws A2AClientException {
6763
MessageSendParams messageSendParams = getMessageSendParams(request, clientConfig);
6864
sendMessage(messageSendParams, consumers, streamingErrorHandler, context);
6965
}
7066

7167
@Override
72-
public void sendMessage(Message request, PushNotificationConfig pushNotificationConfiguration,
73-
Map<String, Object> metatadata, @Nullable ClientCallContext context) throws A2AClientException {
68+
public void sendMessage(@NonNull Message request,
69+
@Nullable PushNotificationConfig pushNotificationConfiguration,
70+
@Nullable Map<String, Object> metadata,
71+
@Nullable ClientCallContext context) throws A2AClientException {
7472
MessageSendConfiguration messageSendConfiguration = createMessageSendConfiguration(pushNotificationConfiguration);
7573

7674
MessageSendParams messageSendParams = new MessageSendParams.Builder()
7775
.message(request)
7876
.configuration(messageSendConfiguration)
79-
.metadata(metatadata)
77+
.metadata(metadata)
8078
.build();
8179

82-
sendMessage(messageSendParams, null, null, context);
80+
sendMessage(messageSendParams, consumers, streamingErrorHandler, context);
8381
}
8482

8583
@Override
@@ -122,14 +120,24 @@ public void deleteTaskPushNotificationConfigurations(
122120
}
123121

124122
@Override
125-
public void resubscribe(TaskIdParams request, @Nullable ClientCallContext context) throws A2AClientException {
126-
resubscribeToTask(request, null, null, context);
127-
}
128-
129-
@Override
130-
public void resubscribe(TaskIdParams request, @Nullable List<BiConsumer<ClientEvent, AgentCard>> consumers,
131-
@Nullable Consumer<Throwable> streamingErrorHandler, @Nullable ClientCallContext context) throws A2AClientException {
132-
resubscribeToTask(request, consumers, streamingErrorHandler, context);
123+
public void resubscribe(@NonNull TaskIdParams request,
124+
@NonNull List<BiConsumer<ClientEvent, AgentCard>> consumers,
125+
@Nullable Consumer<Throwable> streamingErrorHandler,
126+
@Nullable ClientCallContext context) throws A2AClientException {
127+
if (! clientConfig.isStreaming() || ! agentCard.capabilities().streaming()) {
128+
throw new A2AClientException("Client and/or server does not support resubscription");
129+
}
130+
ClientTaskManager tracker = new ClientTaskManager();
131+
Consumer<Throwable> overriddenErrorHandler = getOverriddenErrorHandler(streamingErrorHandler);
132+
Consumer<StreamingEventKind> eventHandler = event -> {
133+
try {
134+
ClientEvent clientEvent = getClientEvent(event, tracker);
135+
consume(clientEvent, agentCard, consumers);
136+
} catch (A2AClientError e) {
137+
overriddenErrorHandler.accept(e);
138+
}
139+
};
140+
clientTransport.resubscribe(request, eventHandler, overriddenErrorHandler, context);
133141
}
134142

135143
@Override
@@ -169,7 +177,7 @@ private MessageSendConfiguration createMessageSendConfiguration(@Nullable PushNo
169177
.build();
170178
}
171179

172-
private void sendMessage(MessageSendParams messageSendParams, @Nullable List<BiConsumer<ClientEvent, AgentCard>> consumers,
180+
private void sendMessage(@NonNull MessageSendParams messageSendParams, @NonNull List<BiConsumer<ClientEvent, AgentCard>> consumers,
173181
@Nullable Consumer<Throwable> errorHandler, @Nullable ClientCallContext context) throws A2AClientException {
174182
if (! clientConfig.isStreaming() || ! agentCard.capabilities().streaming()) {
175183
EventKind eventKind = clientTransport.sendMessage(messageSendParams, context);
@@ -196,24 +204,6 @@ private void sendMessage(MessageSendParams messageSendParams, @Nullable List<BiC
196204
}
197205
}
198206

199-
private void resubscribeToTask(TaskIdParams request, @Nullable List<BiConsumer<ClientEvent, AgentCard>> consumers,
200-
@Nullable Consumer<Throwable> errorHandler, @Nullable ClientCallContext context) throws A2AClientException {
201-
if (! clientConfig.isStreaming() || ! agentCard.capabilities().streaming()) {
202-
throw new A2AClientException("Client and/or server does not support resubscription");
203-
}
204-
ClientTaskManager tracker = new ClientTaskManager();
205-
Consumer<Throwable> overriddenErrorHandler = getOverriddenErrorHandler(errorHandler);
206-
Consumer<StreamingEventKind> eventHandler = event -> {
207-
try {
208-
ClientEvent clientEvent = getClientEvent(event, tracker);
209-
consume(clientEvent, agentCard, consumers);
210-
} catch (A2AClientError e) {
211-
overriddenErrorHandler.accept(e);
212-
}
213-
};
214-
clientTransport.resubscribe(request, eventHandler, overriddenErrorHandler, context);
215-
}
216-
217207
private @NonNull Consumer<Throwable> getOverriddenErrorHandler(@Nullable Consumer<Throwable> errorHandler) {
218208
return e -> {
219209
if (errorHandler != null) {
@@ -226,15 +216,9 @@ private void resubscribeToTask(TaskIdParams request, @Nullable List<BiConsumer<C
226216
};
227217
}
228218

229-
private void consume(ClientEvent clientEvent, AgentCard agentCard, @Nullable List<BiConsumer<ClientEvent, AgentCard>> consumers) {
230-
if (consumers != null) {
231-
// use specified consumers
232-
for (BiConsumer<ClientEvent, AgentCard> consumer : consumers) {
233-
consumer.accept(clientEvent, agentCard);
234-
}
235-
} else {
236-
// use configured consumers
237-
consume(clientEvent, agentCard);
219+
private void consume(ClientEvent clientEvent, AgentCard agentCard, @NonNull List<BiConsumer<ClientEvent, AgentCard>> consumers) {
220+
for (BiConsumer<ClientEvent, AgentCard> consumer : consumers) {
221+
consumer.accept(clientEvent, agentCard);
238222
}
239223
}
240224

0 commit comments

Comments
 (0)