Skip to content

Commit 46b1b0a

Browse files
authored
chore: Update resubscribe and sendMessage without consumers will use … (#528)
…the client's consumers and its streamingErrorHandler The code was already doing that down the path but this simplifies the code as the consumers and streamingErrorhandler can be validated from the top of the API Add a bunch of @nullable & @nonnull to clarify the API contract This fixes #526 Signed-off-by: Jeff Mesnil <[email protected]>
1 parent b70b2bf commit 46b1b0a

File tree

3 files changed

+177
-84
lines changed

3 files changed

+177
-84
lines changed

client/base/src/main/java/io/a2a/client/AbstractClient.java

Lines changed: 39 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,10 @@
3232
*/
3333
public abstract class AbstractClient {
3434

35-
private final List<BiConsumer<ClientEvent, AgentCard>> consumers;
36-
private final @Nullable Consumer<Throwable> streamingErrorHandler;
35+
protected final @NonNull List<BiConsumer<ClientEvent, AgentCard>> consumers;
36+
protected final @Nullable Consumer<Throwable> streamingErrorHandler;
3737

38-
public AbstractClient(List<BiConsumer<ClientEvent, AgentCard>> consumers) {
38+
public AbstractClient(@NonNull List<BiConsumer<ClientEvent, AgentCard>> consumers) {
3939
this(consumers, null);
4040
}
4141

@@ -57,7 +57,7 @@ public AbstractClient(@NonNull List<BiConsumer<ClientEvent, AgentCard>> consumer
5757
* @param request the message
5858
* @throws A2AClientException if sending the message fails for any reason
5959
*/
60-
public void sendMessage(Message request) throws A2AClientException {
60+
public void sendMessage(@NonNull Message request) throws A2AClientException {
6161
sendMessage(request, null);
6262
}
6363

@@ -71,10 +71,13 @@ public void sendMessage(Message request) throws A2AClientException {
7171
* configuration will get used for streaming.
7272
*
7373
* @param request the message
74-
* @param context optional client call context for the request (may be {@code null})
74+
* @param context optional client call context for the request
7575
* @throws A2AClientException if sending the message fails for any reason
7676
*/
77-
public abstract void sendMessage(Message request, @Nullable ClientCallContext context) throws A2AClientException;
77+
public void sendMessage(@NonNull Message request,
78+
@Nullable ClientCallContext context) throws A2AClientException {
79+
sendMessage(request, consumers, streamingErrorHandler, context);
80+
}
7881

7982
/**
8083
* Send a message to the remote agent. This method will automatically use
@@ -90,9 +93,9 @@ public void sendMessage(Message request) throws A2AClientException {
9093
* @param streamingErrorHandler an error handler that should be used for the streaming case if an error occurs
9194
* @throws A2AClientException if sending the message fails for any reason
9295
*/
93-
public void sendMessage(Message request,
94-
List<BiConsumer<ClientEvent, AgentCard>> consumers,
95-
Consumer<Throwable> streamingErrorHandler) throws A2AClientException {
96+
public void sendMessage(@NonNull Message request,
97+
@NonNull List<BiConsumer<ClientEvent, AgentCard>> consumers,
98+
@Nullable Consumer<Throwable> streamingErrorHandler) throws A2AClientException {
9699
sendMessage(request, consumers, streamingErrorHandler, null);
97100
}
98101

@@ -108,12 +111,12 @@ public void sendMessage(Message request,
108111
* @param request the message
109112
* @param consumers a list of consumers to pass responses from the remote agent to
110113
* @param streamingErrorHandler an error handler that should be used for the streaming case if an error occurs
111-
* @param context optional client call context for the request (may be {@code null})
114+
* @param context optional client call context for the request
112115
* @throws A2AClientException if sending the message fails for any reason
113116
*/
114-
public abstract void sendMessage(Message request,
115-
List<BiConsumer<ClientEvent, AgentCard>> consumers,
116-
Consumer<Throwable> streamingErrorHandler,
117+
public abstract void sendMessage(@NonNull Message request,
118+
@NonNull List<BiConsumer<ClientEvent, AgentCard>> consumers,
119+
@Nullable Consumer<Throwable> streamingErrorHandler,
117120
@Nullable ClientCallContext context) throws A2AClientException;
118121

119122
/**
@@ -130,8 +133,9 @@ public abstract void sendMessage(Message request,
130133
* @param metadata the optional metadata to include when sending the message
131134
* @throws A2AClientException if sending the message fails for any reason
132135
*/
133-
public void sendMessage(Message request, PushNotificationConfig pushNotificationConfiguration,
134-
Map<String, Object> metadata) throws A2AClientException {
136+
public void sendMessage(@NonNull Message request,
137+
@Nullable PushNotificationConfig pushNotificationConfiguration,
138+
@Nullable Map<String, Object> metadata) throws A2AClientException {
135139
sendMessage(request, pushNotificationConfiguration, metadata, null);
136140
}
137141

@@ -147,11 +151,13 @@ public void sendMessage(Message request, PushNotificationConfig pushNotification
147151
* @param pushNotificationConfiguration the push notification configuration that should be
148152
* used if the streaming approach is used
149153
* @param metadata the optional metadata to include when sending the message
150-
* @param context optional client call context for the request (may be {@code null})
154+
* @param context optional client call context for the request
151155
* @throws A2AClientException if sending the message fails for any reason
152156
*/
153-
public abstract void sendMessage(Message request, PushNotificationConfig pushNotificationConfiguration,
154-
Map<String, Object> metadata, @Nullable ClientCallContext context) throws A2AClientException;
157+
public abstract void sendMessage(@NonNull Message request,
158+
@Nullable PushNotificationConfig pushNotificationConfiguration,
159+
@Nullable Map<String, Object> metadata,
160+
@Nullable ClientCallContext context) throws A2AClientException;
155161

156162
/**
157163
* Retrieve the current state and history of a specific task.
@@ -320,8 +326,8 @@ public abstract void deleteTaskPushNotificationConfigurations(
320326
* @param request the parameters specifying which task's notification configs to delete
321327
* @throws A2AClientException if resubscribing fails for any reason
322328
*/
323-
public void resubscribe(TaskIdParams request) throws A2AClientException {
324-
resubscribe(request, null);
329+
public void resubscribe(@NonNull TaskIdParams request) throws A2AClientException {
330+
resubscribe(request, consumers, streamingErrorHandler, null);
325331
}
326332

327333
/**
@@ -332,10 +338,13 @@ public void resubscribe(TaskIdParams request) throws A2AClientException {
332338
* error handler will be used if an error occurs during streaming.
333339
*
334340
* @param request the parameters specifying which task's notification configs to delete
335-
* @param context optional client call context for the request (may be {@code null})
341+
* @param context optional client call context for the request
336342
* @throws A2AClientException if resubscribing fails for any reason
337343
*/
338-
public abstract void resubscribe(TaskIdParams request, @Nullable ClientCallContext context) throws A2AClientException;
344+
public void resubscribe(@NonNull TaskIdParams request,
345+
@Nullable ClientCallContext context) throws A2AClientException {
346+
resubscribe(request, consumers, streamingErrorHandler, context);
347+
}
339348

340349
/**
341350
* Resubscribe to a task's event stream.
@@ -349,8 +358,9 @@ public void resubscribe(TaskIdParams request) throws A2AClientException {
349358
* @param streamingErrorHandler an error handler that should be used for the streaming case if an error occurs
350359
* @throws A2AClientException if resubscribing fails for any reason
351360
*/
352-
public void resubscribe(TaskIdParams request, List<BiConsumer<ClientEvent, AgentCard>> consumers,
353-
Consumer<Throwable> streamingErrorHandler) throws A2AClientException {
361+
public void resubscribe(@NonNull TaskIdParams request,
362+
@NonNull List<BiConsumer<ClientEvent, AgentCard>> consumers,
363+
@Nullable Consumer<Throwable> streamingErrorHandler) throws A2AClientException {
354364
resubscribe(request, consumers, streamingErrorHandler, null);
355365
}
356366

@@ -364,11 +374,13 @@ public void resubscribe(TaskIdParams request, List<BiConsumer<ClientEvent, Agent
364374
* @param request the parameters specifying which task's notification configs to delete
365375
* @param consumers a list of consumers to pass responses from the remote agent to
366376
* @param streamingErrorHandler an error handler that should be used for the streaming case if an error occurs
367-
* @param context optional client call context for the request (may be {@code null})
377+
* @param context optional client call context for the request
368378
* @throws A2AClientException if resubscribing fails for any reason
369379
*/
370-
public abstract void resubscribe(TaskIdParams request, List<BiConsumer<ClientEvent, AgentCard>> consumers,
371-
Consumer<Throwable> streamingErrorHandler, @Nullable ClientCallContext context) throws A2AClientException;
380+
public abstract void resubscribe(@NonNull TaskIdParams request,
381+
@NonNull List<BiConsumer<ClientEvent, AgentCard>> consumers,
382+
@Nullable Consumer<Throwable> streamingErrorHandler,
383+
@Nullable ClientCallContext context) throws A2AClientException;
372384

373385
/**
374386
* Retrieve the AgentCard.
@@ -394,15 +406,6 @@ public AgentCard getAgentCard() throws A2AClientException {
394406
*/
395407
public abstract void close();
396408

397-
/**
398-
* Process the event using all configured consumers.
399-
*/
400-
void consume(ClientEvent clientEventOrMessage, AgentCard agentCard) {
401-
for (BiConsumer<ClientEvent, AgentCard> consumer : consumers) {
402-
consumer.accept(clientEventOrMessage, agentCard);
403-
}
404-
}
405-
406409
/**
407410
* Get the error handler that should be used during streaming.
408411
*

client/base/src/main/java/io/a2a/client/Client.java

Lines changed: 32 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -56,30 +56,28 @@ public static ClientBuilder builder(AgentCard agentCard) {
5656
}
5757

5858
@Override
59-
public void sendMessage(Message request, @Nullable ClientCallContext context) throws A2AClientException {
60-
MessageSendParams messageSendParams = getMessageSendParams(request, clientConfig);
61-
sendMessage(messageSendParams, null, null, context);
62-
}
63-
64-
@Override
65-
public void sendMessage(Message request, List<BiConsumer<ClientEvent, AgentCard>> consumers,
66-
Consumer<Throwable> streamingErrorHandler, @Nullable ClientCallContext context) throws A2AClientException {
59+
public void sendMessage(@NonNull Message request,
60+
@NonNull List<BiConsumer<ClientEvent, AgentCard>> consumers,
61+
@Nullable Consumer<Throwable> streamingErrorHandler,
62+
@Nullable ClientCallContext context) throws A2AClientException {
6763
MessageSendParams messageSendParams = getMessageSendParams(request, clientConfig);
6864
sendMessage(messageSendParams, consumers, streamingErrorHandler, context);
6965
}
7066

7167
@Override
72-
public void sendMessage(Message request, PushNotificationConfig pushNotificationConfiguration,
73-
Map<String, Object> metatadata, @Nullable ClientCallContext context) throws A2AClientException {
68+
public void sendMessage(@NonNull Message request,
69+
@Nullable PushNotificationConfig pushNotificationConfiguration,
70+
@Nullable Map<String, Object> metadata,
71+
@Nullable ClientCallContext context) throws A2AClientException {
7472
MessageSendConfiguration messageSendConfiguration = createMessageSendConfiguration(pushNotificationConfiguration);
7573

7674
MessageSendParams messageSendParams = new MessageSendParams.Builder()
7775
.message(request)
7876
.configuration(messageSendConfiguration)
79-
.metadata(metatadata)
77+
.metadata(metadata)
8078
.build();
8179

82-
sendMessage(messageSendParams, null, null, context);
80+
sendMessage(messageSendParams, consumers, streamingErrorHandler, context);
8381
}
8482

8583
@Override
@@ -122,14 +120,24 @@ public void deleteTaskPushNotificationConfigurations(
122120
}
123121

124122
@Override
125-
public void resubscribe(TaskIdParams request, @Nullable ClientCallContext context) throws A2AClientException {
126-
resubscribeToTask(request, null, null, context);
127-
}
128-
129-
@Override
130-
public void resubscribe(TaskIdParams request, @Nullable List<BiConsumer<ClientEvent, AgentCard>> consumers,
131-
@Nullable Consumer<Throwable> streamingErrorHandler, @Nullable ClientCallContext context) throws A2AClientException {
132-
resubscribeToTask(request, consumers, streamingErrorHandler, context);
123+
public void resubscribe(@NonNull TaskIdParams request,
124+
@NonNull List<BiConsumer<ClientEvent, AgentCard>> consumers,
125+
@Nullable Consumer<Throwable> streamingErrorHandler,
126+
@Nullable ClientCallContext context) throws A2AClientException {
127+
if (! clientConfig.isStreaming() || ! agentCard.capabilities().streaming()) {
128+
throw new A2AClientException("Client and/or server does not support resubscription");
129+
}
130+
ClientTaskManager tracker = new ClientTaskManager();
131+
Consumer<Throwable> overriddenErrorHandler = getOverriddenErrorHandler(streamingErrorHandler);
132+
Consumer<StreamingEventKind> eventHandler = event -> {
133+
try {
134+
ClientEvent clientEvent = getClientEvent(event, tracker);
135+
consume(clientEvent, agentCard, consumers);
136+
} catch (A2AClientError e) {
137+
overriddenErrorHandler.accept(e);
138+
}
139+
};
140+
clientTransport.resubscribe(request, eventHandler, overriddenErrorHandler, context);
133141
}
134142

135143
@Override
@@ -169,7 +177,7 @@ private MessageSendConfiguration createMessageSendConfiguration(@Nullable PushNo
169177
.build();
170178
}
171179

172-
private void sendMessage(MessageSendParams messageSendParams, @Nullable List<BiConsumer<ClientEvent, AgentCard>> consumers,
180+
private void sendMessage(@NonNull MessageSendParams messageSendParams, @NonNull List<BiConsumer<ClientEvent, AgentCard>> consumers,
173181
@Nullable Consumer<Throwable> errorHandler, @Nullable ClientCallContext context) throws A2AClientException {
174182
if (! clientConfig.isStreaming() || ! agentCard.capabilities().streaming()) {
175183
EventKind eventKind = clientTransport.sendMessage(messageSendParams, context);
@@ -196,24 +204,6 @@ private void sendMessage(MessageSendParams messageSendParams, @Nullable List<BiC
196204
}
197205
}
198206

199-
private void resubscribeToTask(TaskIdParams request, @Nullable List<BiConsumer<ClientEvent, AgentCard>> consumers,
200-
@Nullable Consumer<Throwable> errorHandler, @Nullable ClientCallContext context) throws A2AClientException {
201-
if (! clientConfig.isStreaming() || ! agentCard.capabilities().streaming()) {
202-
throw new A2AClientException("Client and/or server does not support resubscription");
203-
}
204-
ClientTaskManager tracker = new ClientTaskManager();
205-
Consumer<Throwable> overriddenErrorHandler = getOverriddenErrorHandler(errorHandler);
206-
Consumer<StreamingEventKind> eventHandler = event -> {
207-
try {
208-
ClientEvent clientEvent = getClientEvent(event, tracker);
209-
consume(clientEvent, agentCard, consumers);
210-
} catch (A2AClientError e) {
211-
overriddenErrorHandler.accept(e);
212-
}
213-
};
214-
clientTransport.resubscribe(request, eventHandler, overriddenErrorHandler, context);
215-
}
216-
217207
private @NonNull Consumer<Throwable> getOverriddenErrorHandler(@Nullable Consumer<Throwable> errorHandler) {
218208
return e -> {
219209
if (errorHandler != null) {
@@ -226,15 +216,9 @@ private void resubscribeToTask(TaskIdParams request, @Nullable List<BiConsumer<C
226216
};
227217
}
228218

229-
private void consume(ClientEvent clientEvent, AgentCard agentCard, @Nullable List<BiConsumer<ClientEvent, AgentCard>> consumers) {
230-
if (consumers != null) {
231-
// use specified consumers
232-
for (BiConsumer<ClientEvent, AgentCard> consumer : consumers) {
233-
consumer.accept(clientEvent, agentCard);
234-
}
235-
} else {
236-
// use configured consumers
237-
consume(clientEvent, agentCard);
219+
private void consume(ClientEvent clientEvent, AgentCard agentCard, @NonNull List<BiConsumer<ClientEvent, AgentCard>> consumers) {
220+
for (BiConsumer<ClientEvent, AgentCard> consumer : consumers) {
221+
consumer.accept(clientEvent, agentCard);
238222
}
239223
}
240224

0 commit comments

Comments
 (0)