Skip to content

Commit 1ff6e4c

Browse files
committed
fix: Adding support for Jspecify for JSONRPC
fixing issue #335 Signed-off-by: Emmanuel Hugonnet <[email protected]>
1 parent da7ee20 commit 1ff6e4c

File tree

12 files changed

+88
-47
lines changed

12 files changed

+88
-47
lines changed

client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransport.java

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import java.util.concurrent.atomic.AtomicReference;
6060

6161
import io.a2a.util.Utils;
62+
import org.jspecify.annotations.Nullable;
6263

6364
public class JSONRPCTransport implements ClientTransport {
6465

@@ -73,8 +74,8 @@ public class JSONRPCTransport implements ClientTransport {
7374

7475
private final A2AHttpClient httpClient;
7576
private final String agentUrl;
76-
private final List<ClientCallInterceptor> interceptors;
77-
private AgentCard agentCard;
77+
private final @Nullable List<ClientCallInterceptor> interceptors;
78+
private @Nullable AgentCard agentCard;
7879
private boolean needsExtendedCard = false;
7980

8081
public JSONRPCTransport(String agentUrl) {
@@ -85,8 +86,8 @@ public JSONRPCTransport(AgentCard agentCard) {
8586
this(null, agentCard, agentCard.url(), null);
8687
}
8788

88-
public JSONRPCTransport(A2AHttpClient httpClient, AgentCard agentCard,
89-
String agentUrl, List<ClientCallInterceptor> interceptors) {
89+
public JSONRPCTransport(@Nullable A2AHttpClient httpClient, @Nullable AgentCard agentCard,
90+
String agentUrl, @Nullable List<ClientCallInterceptor> interceptors) {
9091
this.httpClient = httpClient == null ? new JdkA2AHttpClient() : httpClient;
9192
this.agentCard = agentCard;
9293
this.agentUrl = agentUrl;
@@ -95,7 +96,7 @@ public JSONRPCTransport(A2AHttpClient httpClient, AgentCard agentCard,
9596
}
9697

9798
@Override
98-
public EventKind sendMessage(MessageSendParams request, ClientCallContext context) throws A2AClientException {
99+
public EventKind sendMessage(MessageSendParams request, @Nullable ClientCallContext context) throws A2AClientException {
99100
checkNotNullParam("request", request);
100101
SendMessageRequest sendMessageRequest = new SendMessageRequest.Builder()
101102
.jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
@@ -119,7 +120,7 @@ public EventKind sendMessage(MessageSendParams request, ClientCallContext contex
119120

120121
@Override
121122
public void sendMessageStreaming(MessageSendParams request, Consumer<StreamingEventKind> eventConsumer,
122-
Consumer<Throwable> errorConsumer, ClientCallContext context) throws A2AClientException {
123+
@Nullable Consumer<Throwable> errorConsumer, @Nullable ClientCallContext context) throws A2AClientException {
123124
checkNotNullParam("request", request);
124125
checkNotNullParam("eventConsumer", eventConsumer);
125126
SendStreamingMessageRequest sendStreamingMessageRequest = new SendStreamingMessageRequest.Builder()
@@ -131,7 +132,7 @@ public void sendMessageStreaming(MessageSendParams request, Consumer<StreamingEv
131132
PayloadAndHeaders payloadAndHeaders = applyInterceptors(SendStreamingMessageRequest.METHOD,
132133
sendStreamingMessageRequest, agentCard, context);
133134

134-
AtomicReference<CompletableFuture<Void>> ref = new AtomicReference<>();
135+
final AtomicReference<CompletableFuture<Void>> ref = new AtomicReference<>();
135136
SSEEventListener sseEventListener = new SSEEventListener(eventConsumer, errorConsumer);
136137

137138
try {
@@ -151,7 +152,7 @@ public void sendMessageStreaming(MessageSendParams request, Consumer<StreamingEv
151152
}
152153

153154
@Override
154-
public Task getTask(TaskQueryParams request, ClientCallContext context) throws A2AClientException {
155+
public Task getTask(TaskQueryParams request, @Nullable ClientCallContext context) throws A2AClientException {
155156
checkNotNullParam("request", request);
156157
GetTaskRequest getTaskRequest = new GetTaskRequest.Builder()
157158
.jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
@@ -174,7 +175,7 @@ public Task getTask(TaskQueryParams request, ClientCallContext context) throws A
174175
}
175176

176177
@Override
177-
public Task cancelTask(TaskIdParams request, ClientCallContext context) throws A2AClientException {
178+
public Task cancelTask(TaskIdParams request, @Nullable ClientCallContext context) throws A2AClientException {
178179
checkNotNullParam("request", request);
179180
CancelTaskRequest cancelTaskRequest = new CancelTaskRequest.Builder()
180181
.jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
@@ -198,7 +199,7 @@ public Task cancelTask(TaskIdParams request, ClientCallContext context) throws A
198199

199200
@Override
200201
public TaskPushNotificationConfig setTaskPushNotificationConfiguration(TaskPushNotificationConfig request,
201-
ClientCallContext context) throws A2AClientException {
202+
@Nullable ClientCallContext context) throws A2AClientException {
202203
checkNotNullParam("request", request);
203204
SetTaskPushNotificationConfigRequest setTaskPushNotificationRequest = new SetTaskPushNotificationConfigRequest.Builder()
204205
.jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
@@ -223,7 +224,7 @@ public TaskPushNotificationConfig setTaskPushNotificationConfiguration(TaskPushN
223224

224225
@Override
225226
public TaskPushNotificationConfig getTaskPushNotificationConfiguration(GetTaskPushNotificationConfigParams request,
226-
ClientCallContext context) throws A2AClientException {
227+
@Nullable ClientCallContext context) throws A2AClientException {
227228
checkNotNullParam("request", request);
228229
GetTaskPushNotificationConfigRequest getTaskPushNotificationRequest = new GetTaskPushNotificationConfigRequest.Builder()
229230
.jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
@@ -249,7 +250,7 @@ public TaskPushNotificationConfig getTaskPushNotificationConfiguration(GetTaskPu
249250
@Override
250251
public List<TaskPushNotificationConfig> listTaskPushNotificationConfigurations(
251252
ListTaskPushNotificationConfigParams request,
252-
ClientCallContext context) throws A2AClientException {
253+
@Nullable ClientCallContext context) throws A2AClientException {
253254
checkNotNullParam("request", request);
254255
ListTaskPushNotificationConfigRequest listTaskPushNotificationRequest = new ListTaskPushNotificationConfigRequest.Builder()
255256
.jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
@@ -274,7 +275,7 @@ public List<TaskPushNotificationConfig> listTaskPushNotificationConfigurations(
274275

275276
@Override
276277
public void deleteTaskPushNotificationConfigurations(DeleteTaskPushNotificationConfigParams request,
277-
ClientCallContext context) throws A2AClientException {
278+
@Nullable ClientCallContext context) throws A2AClientException {
278279
checkNotNullParam("request", request);
279280
DeleteTaskPushNotificationConfigRequest deleteTaskPushNotificationRequest = new DeleteTaskPushNotificationConfigRequest.Builder()
280281
.jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
@@ -297,7 +298,7 @@ public void deleteTaskPushNotificationConfigurations(DeleteTaskPushNotificationC
297298

298299
@Override
299300
public void resubscribe(TaskIdParams request, Consumer<StreamingEventKind> eventConsumer,
300-
Consumer<Throwable> errorConsumer, ClientCallContext context) throws A2AClientException {
301+
@Nullable Consumer<Throwable> errorConsumer, @Nullable ClientCallContext context) throws A2AClientException {
301302
checkNotNullParam("request", request);
302303
checkNotNullParam("eventConsumer", eventConsumer);
303304
checkNotNullParam("errorConsumer", errorConsumer);
@@ -330,7 +331,7 @@ public void resubscribe(TaskIdParams request, Consumer<StreamingEventKind> event
330331
}
331332

332333
@Override
333-
public AgentCard getAgentCard(ClientCallContext context) throws A2AClientException {
334+
public AgentCard getAgentCard(@Nullable ClientCallContext context) throws A2AClientException {
334335
A2ACardResolver resolver;
335336
try {
336337
if (agentCard == null) {
@@ -370,8 +371,8 @@ public void close() {
370371
// no-op
371372
}
372373

373-
private PayloadAndHeaders applyInterceptors(String methodName, Object payload,
374-
AgentCard agentCard, ClientCallContext clientCallContext) {
374+
private PayloadAndHeaders applyInterceptors(String methodName, @Nullable Object payload,
375+
@Nullable AgentCard agentCard, @Nullable ClientCallContext clientCallContext) {
375376
PayloadAndHeaders payloadAndHeaders = new PayloadAndHeaders(payload, getHttpHeaders(clientCallContext));
376377
if (interceptors != null && ! interceptors.isEmpty()) {
377378
for (ClientCallInterceptor interceptor : interceptors) {
@@ -416,7 +417,7 @@ private <T extends JSONRPCResponse<?>> T unmarshalResponse(String response, Type
416417
return value;
417418
}
418419

419-
private Map<String, String> getHttpHeaders(ClientCallContext context) {
420+
private @Nullable Map<String, String> getHttpHeaders(@Nullable ClientCallContext context) {
420421
return context != null ? context.getHeaders() : null;
421422
}
422423
}

client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransportConfig.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@
22

33
import io.a2a.client.transport.spi.ClientTransportConfig;
44
import io.a2a.client.http.A2AHttpClient;
5+
import org.jspecify.annotations.Nullable;
56

67
public class JSONRPCTransportConfig extends ClientTransportConfig<JSONRPCTransport> {
78

8-
private final A2AHttpClient httpClient;
9+
private final @Nullable A2AHttpClient httpClient;
910

1011
public JSONRPCTransportConfig() {
1112
this.httpClient = null;
@@ -15,7 +16,7 @@ public JSONRPCTransportConfig(A2AHttpClient httpClient) {
1516
this.httpClient = httpClient;
1617
}
1718

18-
public A2AHttpClient getHttpClient() {
19+
public @Nullable A2AHttpClient getHttpClient() {
1920
return httpClient;
2021
}
2122
}

client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransportConfigBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@
33
import io.a2a.client.http.A2AHttpClient;
44
import io.a2a.client.http.JdkA2AHttpClient;
55
import io.a2a.client.transport.spi.ClientTransportConfigBuilder;
6+
import org.jspecify.annotations.Nullable;
67

78
public class JSONRPCTransportConfigBuilder extends ClientTransportConfigBuilder<JSONRPCTransportConfig, JSONRPCTransportConfigBuilder> {
89

9-
private A2AHttpClient httpClient;
10+
private @Nullable A2AHttpClient httpClient;
1011

1112
public JSONRPCTransportConfigBuilder httpClient(A2AHttpClient httpClient) {
1213
this.httpClient = httpClient;
13-
1414
return this;
1515
}
1616

client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransportProvider.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,17 @@
55
import io.a2a.spec.A2AClientException;
66
import io.a2a.spec.AgentCard;
77
import io.a2a.spec.TransportProtocol;
8+
import org.jspecify.annotations.Nullable;
89

910
public class JSONRPCTransportProvider implements ClientTransportProvider<JSONRPCTransport, JSONRPCTransportConfig> {
1011

1112
@Override
12-
public JSONRPCTransport create(JSONRPCTransportConfig clientTransportConfig, AgentCard agentCard, String agentUrl) throws A2AClientException {
13-
if (clientTransportConfig == null) {
14-
clientTransportConfig = new JSONRPCTransportConfig(new JdkA2AHttpClient());
13+
public JSONRPCTransport create(@Nullable JSONRPCTransportConfig clientTransportConfig, AgentCard agentCard, String agentUrl) throws A2AClientException {
14+
JSONRPCTransportConfig currentClientTransportConfig = clientTransportConfig;
15+
if (currentClientTransportConfig == null) {
16+
currentClientTransportConfig = new JSONRPCTransportConfig(new JdkA2AHttpClient());
1517
}
16-
17-
return new JSONRPCTransport(clientTransportConfig.getHttpClient(), agentCard, agentUrl, clientTransportConfig.getInterceptors());
18+
return new JSONRPCTransport(currentClientTransportConfig.getHttpClient(), agentCard, agentUrl, currentClientTransportConfig.getInterceptors());
1819
}
1920

2021
@Override
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
@NullMarked
2+
package io.a2a.client.transport.jsonrpc;
3+
4+
import org.jspecify.annotations.NullMarked;
5+

client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/sse/SSEEventListener.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,31 +12,37 @@
1212

1313
import static io.a2a.util.Utils.OBJECT_MAPPER;
1414

15+
import org.jspecify.annotations.Nullable;
16+
1517
public class SSEEventListener {
18+
1619
private static final Logger log = Logger.getLogger(SSEEventListener.class.getName());
1720
private final Consumer<StreamingEventKind> eventHandler;
18-
private final Consumer<Throwable> errorHandler;
21+
private final @Nullable
22+
Consumer<Throwable> errorHandler;
1923
private volatile boolean completed = false;
2024

2125
public SSEEventListener(Consumer<StreamingEventKind> eventHandler,
22-
Consumer<Throwable> errorHandler) {
26+
@Nullable Consumer<Throwable> errorHandler) {
2327
this.eventHandler = eventHandler;
2428
this.errorHandler = errorHandler;
2529
}
2630

27-
public void onMessage(String message, Future<Void> completableFuture) {
31+
public void onMessage(String message, @Nullable Future<Void> completableFuture) {
2832
try {
29-
handleMessage(OBJECT_MAPPER.readTree(message),completableFuture);
33+
handleMessage(OBJECT_MAPPER.readTree(message), completableFuture);
3034
} catch (JsonProcessingException e) {
3135
log.warning("Failed to parse JSON message: " + message);
3236
}
3337
}
3438

35-
public void onError(Throwable throwable, Future<Void> future) {
39+
public void onError(Throwable throwable, @Nullable Future<Void> future) {
3640
if (errorHandler != null) {
3741
errorHandler.accept(throwable);
3842
}
39-
future.cancel(true); // close SSE channel
43+
if (future != null) {
44+
future.cancel(true); // close SSE channel
45+
}
4046
}
4147

4248
public void onComplete() {
@@ -57,7 +63,7 @@ public void onComplete() {
5763
}
5864
}
5965

60-
private void handleMessage(JsonNode jsonNode, Future<Void> future) {
66+
private void handleMessage(JsonNode jsonNode, @Nullable Future<Void> future) {
6167
try {
6268
if (jsonNode.has("error")) {
6369
JSONRPCError error = OBJECT_MAPPER.treeToValue(jsonNode.get("error"), JSONRPCError.class);
@@ -70,7 +76,9 @@ private void handleMessage(JsonNode jsonNode, Future<Void> future) {
7076
StreamingEventKind event = OBJECT_MAPPER.treeToValue(result, StreamingEventKind.class);
7177
eventHandler.accept(event);
7278
if (event instanceof TaskStatusUpdateEvent && ((TaskStatusUpdateEvent) event).isFinal()) {
73-
future.cancel(true); // close SSE channel
79+
if (future != null) {
80+
future.cancel(true); // close SSE channel
81+
}
7482
}
7583
} else {
7684
throw new IllegalArgumentException("Unknown message type");
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
@NullMarked
2+
package io.a2a.client.transport.jsonrpc.sse;
3+
4+
import org.jspecify.annotations.NullMarked;
5+

client/transport/spi/src/main/java/io/a2a/client/transport/spi/interceptors/ClientCallInterceptor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,5 @@ public abstract class ClientCallInterceptor {
2323
* @return the potentially modified payload and headers
2424
*/
2525
public abstract PayloadAndHeaders intercept(String methodName, @Nullable Object payload, Map<String, String> headers,
26-
AgentCard agentCard, @Nullable ClientCallContext clientCallContext);
26+
@Nullable AgentCard agentCard, @Nullable ClientCallContext clientCallContext);
2727
}

client/transport/spi/src/main/java/io/a2a/client/transport/spi/interceptors/auth/AuthInterceptor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public AuthInterceptor(final CredentialService credentialService) {
3333

3434
@Override
3535
public PayloadAndHeaders intercept(String methodName, @Nullable Object payload, Map<String, String> headers,
36-
AgentCard agentCard, @Nullable ClientCallContext clientCallContext) {
36+
@Nullable AgentCard agentCard, @Nullable ClientCallContext clientCallContext) {
3737
Map<String, String> updatedHeaders = new HashMap<>(headers == null ? new HashMap<>() : headers);
3838
if (agentCard == null || agentCard.security() == null || agentCard.securitySchemes() == null) {
3939
return new PayloadAndHeaders(payload, updatedHeaders);

pom.xml

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,9 @@
6464
<rest-assured.version>5.5.1</rest-assured.version>
6565
<slf4j.version>2.0.17</slf4j.version>
6666
<logback.version>1.5.18</logback.version>
67-
<error-prone.version>2.42.0</error-prone.version>
67+
<error-prone.version>2.43.0</error-prone.version>
68+
<nullaway.version>0.12.12</nullaway.version>
69+
<error-prone.flag>-XDaddTypeAnnotationsToSymbol=true </error-prone.flag>
6870

6971
<!-- Redirect test output to file -->
7072
<maven.test.redirectTestOutputToFile>true</maven.test.redirectTestOutputToFile>
@@ -321,7 +323,7 @@
321323
<arg>-XDcompilePolicy=simple</arg>
322324
<arg>--should-stop=ifError=FLOW</arg>
323325
<arg>-parameters</arg>
324-
<arg>-Xplugin:ErrorProne -XepDisableAllChecks -Xep:NullAway:ERROR -XepOpt:NullAway:OnlyNullMarked=true -XepOpt:NullAway:JSpecifyMode=true -XepExcludedPaths:.*/src/test/.* -XepDisableWarningsInGeneratedCode </arg>
326+
<arg>${error-prone.flag}-Xplugin:ErrorProne -XepDisableAllChecks -Xep:NullAway:ERROR -XepOpt:NullAway:OnlyNullMarked=true -XepOpt:NullAway:JSpecifyMode=true -XepExcludedPaths:.*/src/test/.* -XepDisableWarningsInGeneratedCode </arg>
325327
</compilerArgs>
326328
<annotationProcessorPaths>
327329
<path>
@@ -332,7 +334,7 @@
332334
<path>
333335
<groupId>com.uber.nullaway</groupId>
334336
<artifactId>nullaway</artifactId>
335-
<version>0.12.10</version>
337+
<version>${nullaway.version}</version>
336338
</path>
337339
<!-- Other annotation processors go here.
338340
@@ -483,6 +485,21 @@
483485
</plugins>
484486
</build>
485487
</profile>
488+
<profile>
489+
<!--
490+
This profile gis required for old JDK that don't support reading type use annotations from bytecodes.
491+
https://github.com/uber/NullAway/wiki/JSpecify-Support#supported-jdk-versions
492+
-->
493+
<id>old_jdk_support</id>
494+
<activation>
495+
<jdk>(17, 21]</jdk>
496+
</activation>
497+
<properties>
498+
<error-prone.version>2.42.0</error-prone.version>
499+
<nullaway.version>0.12.10</nullaway.version>
500+
<error-prone.flag></error-prone.flag>
501+
</properties>
502+
</profile>
486503
</profiles>
487504

488505
</project>

0 commit comments

Comments
 (0)