Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@
<artifactId>a2a-java-sdk-common</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>a2a-java-sdk-grpc</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>a2a-java-sdk-spec</artifactId>
Expand Down
200 changes: 200 additions & 0 deletions client/src/main/java/io/a2a/client/A2AGrpcClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
package io.a2a.client;

import static io.a2a.grpc.A2AServiceGrpc.A2AServiceBlockingV2Stub;
import static io.a2a.grpc.A2AServiceGrpc.A2AServiceStub;
import static io.a2a.grpc.utils.ProtoUtils.FromProto;
import static io.a2a.grpc.utils.ProtoUtils.ToProto;
import static io.a2a.util.Assert.checkNotNullParam;

import java.util.function.Consumer;

import io.a2a.client.sse.SSEStreamObserver;
import io.a2a.grpc.A2AServiceGrpc;
import io.a2a.grpc.CancelTaskRequest;
import io.a2a.grpc.CreateTaskPushNotificationConfigRequest;
import io.a2a.grpc.GetTaskPushNotificationConfigRequest;
import io.a2a.grpc.GetTaskRequest;
import io.a2a.grpc.SendMessageRequest;
import io.a2a.grpc.SendMessageResponse;
import io.a2a.grpc.StreamResponse;
import io.a2a.grpc.utils.ProtoUtils;
import io.a2a.spec.A2AServerException;
import io.a2a.spec.AgentCard;
import io.a2a.spec.EventKind;
import io.a2a.spec.GetTaskPushNotificationConfigParams;
import io.a2a.spec.MessageSendParams;
import io.a2a.spec.StreamingEventKind;
import io.a2a.spec.Task;
import io.a2a.spec.TaskIdParams;
import io.a2a.spec.TaskPushNotificationConfig;
import io.a2a.spec.TaskQueryParams;
import io.grpc.Channel;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

/**
* A2A Client for interacting with an A2A agent via gRPC.
*/
public class A2AGrpcClient {

private A2AServiceBlockingV2Stub blockingStub;
private A2AServiceStub asyncStub;
private AgentCard agentCard;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The agentCard field is initialized in the constructor but is never used within the class. It should be removed to avoid dead code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The agentCard field is initialized in the constructor but is never used within the A2AGrpcClient class. This could indicate dead code or a feature that is not yet fully implemented. If it's not needed, it should be removed to simplify the code.


/**
* Create an A2A client for interacting with an A2A agent via gRPC.
*
* @param channel the gRPC channel
* @param agentCard the agent card for the A2A server this client will be communicating with
*/
public A2AGrpcClient(Channel channel, AgentCard agentCard) {
checkNotNullParam("channel", channel);
checkNotNullParam("agentCard", agentCard);
this.asyncStub = A2AServiceGrpc.newStub(channel);
this.blockingStub = A2AServiceGrpc.newBlockingV2Stub(channel);
this.agentCard = agentCard;
}
Comment on lines +50 to +56
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The agentCard field is initialized in the constructor but its value is never used throughout the class. To improve code clarity and remove dead code, this field and its corresponding constructor parameter and assignments should be removed.

    public A2AGrpcClient(Channel channel) {
        checkNotNullParam("channel", channel);
        this.asyncStub = A2AServiceGrpc.newStub(channel);
        this.blockingStub = A2AServiceGrpc.newBlockingV2Stub(channel);
    }

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping it to be consistent with the python implementation


/**
* Send a message to the remote agent.
*
* @param messageSendParams the parameters for the message to be sent
* @return the response, may be a message or a task
* @throws A2AServerException if sending the message fails for any reason
*/
public EventKind sendMessage(MessageSendParams messageSendParams) throws A2AServerException {
SendMessageRequest request = createGrpcSendMessageRequestFromMessageSendParams(messageSendParams);
try {
SendMessageResponse response = blockingStub.sendMessage(request);
if (response.hasMsg()) {
return FromProto.message(response.getMsg());
} else if (response.hasTask()) {
return FromProto.task(response.getTask());
} else {
throw new A2AServerException("Server response did not contain a message or task");
}
} catch (StatusRuntimeException e) {
throw new A2AServerException("Failed to send message: " + e, e);
}
}

/**
* Retrieves the current state and history of a specific task.
*
* @param taskQueryParams the params for the task to be queried
* @return the task
* @throws A2AServerException if retrieving the task fails for any reason
*/
public Task getTask(TaskQueryParams taskQueryParams) throws A2AServerException {
GetTaskRequest.Builder requestBuilder = GetTaskRequest.newBuilder();
requestBuilder.setName("tasks/" + taskQueryParams.id());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The resource name prefix "tasks/" is hardcoded here and in other places in this class. It's a good practice to define such strings as constants to improve maintainability and prevent typos. Consider creating a constant for this prefix.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For consistency and better readability when constructing resource names, consider using String.format(). This approach is generally preferred over string concatenation, especially as resource paths become more complex. The same applies to the cancelTask and getTaskPushNotificationConfigName methods.

Suggested change
requestBuilder.setName("tasks/" + taskQueryParams.id());
requestBuilder.setName(String.format("tasks/%s", taskQueryParams.id()));

if (taskQueryParams.historyLength() != null) {
requestBuilder.setHistoryLength(taskQueryParams.historyLength());
}
GetTaskRequest getTaskRequest = requestBuilder.build();
try {
return FromProto.task(blockingStub.getTask(getTaskRequest));
} catch (StatusRuntimeException e) {
throw new A2AServerException("Failed to get task: " + e, e);
}
}

/**
* Cancel a task that was previously submitted to the A2A server.
*
* @param taskIdParams the params for the task to be cancelled
* @return the updated task
* @throws A2AServerException if cancelling the task fails for any reason
*/
public Task cancelTask(TaskIdParams taskIdParams) throws A2AServerException {
CancelTaskRequest cancelTaskRequest = CancelTaskRequest.newBuilder()
.setName("tasks/" + taskIdParams.id())
.build();
try {
return FromProto.task(blockingStub.cancelTask(cancelTaskRequest));
} catch (StatusRuntimeException e) {
throw new A2AServerException("Failed to cancel task: " + e, e);
}
}

/**
* Set push notification configuration for a task.
*
* @param taskPushNotificationConfig the task push notification configuration
* @return the task push notification config
* @throws A2AServerException if setting the push notification configuration fails for any reason
*/
public TaskPushNotificationConfig setTaskPushNotificationConfig(TaskPushNotificationConfig taskPushNotificationConfig) throws A2AServerException {
String configId = taskPushNotificationConfig.pushNotificationConfig().id();
CreateTaskPushNotificationConfigRequest request = CreateTaskPushNotificationConfigRequest.newBuilder()
.setParent("tasks/" + taskPushNotificationConfig.taskId())
.setConfig(ToProto.taskPushNotificationConfig(taskPushNotificationConfig))
.setConfigId(configId == null ? "" : configId)
.build();
try {
return FromProto.taskPushNotificationConfig(blockingStub.createTaskPushNotificationConfig(request));
} catch (StatusRuntimeException e) {
throw new A2AServerException("Failed to set the task push notification config: " + e, e);
}
}

/**
* Get the push notification configuration for a task.
*
* @param getTaskPushNotificationConfigParams the params for the task
* @return the push notification configuration
* @throws A2AServerException if getting the push notification configuration fails for any reason
*/
public TaskPushNotificationConfig getTaskPushNotificationConfig(GetTaskPushNotificationConfigParams getTaskPushNotificationConfigParams) throws A2AServerException {
GetTaskPushNotificationConfigRequest getTaskPushNotificationConfigRequest = GetTaskPushNotificationConfigRequest.newBuilder()
.setName(getTaskPushNotificationConfigName(getTaskPushNotificationConfigParams))
.build();
try {
return FromProto.taskPushNotificationConfig(blockingStub.getTaskPushNotificationConfig(getTaskPushNotificationConfigRequest));
} catch (StatusRuntimeException e) {
throw new A2AServerException("Failed to get the task push notification config: " + e, e);
}
}

/**
* Send a streaming message request to the remote agent.
*
* @param messageSendParams the parameters for the message to be sent
* @param eventHandler a consumer that will be invoked for each event received from the remote agent
* @param errorHandler a consumer that will be invoked if an error occurs
* @throws A2AServerException if sending the streaming message fails for any reason
*/
public void sendMessageStreaming(MessageSendParams messageSendParams, Consumer<StreamingEventKind> eventHandler,
Consumer<Throwable> errorHandler) throws A2AServerException {
SendMessageRequest request = createGrpcSendMessageRequestFromMessageSendParams(messageSendParams);
StreamObserver<StreamResponse> streamObserver = new SSEStreamObserver(eventHandler, errorHandler);
try {
asyncStub.sendStreamingMessage(request, streamObserver);
} catch (StatusRuntimeException e) {
throw new A2AServerException("Failed to send streaming message: " + e, e);
}
}

private SendMessageRequest createGrpcSendMessageRequestFromMessageSendParams(MessageSendParams messageSendParams) {
SendMessageRequest.Builder builder = SendMessageRequest.newBuilder();
builder.setRequest(ToProto.message(messageSendParams.message()));
if (messageSendParams.configuration() != null) {
builder.setConfiguration(ToProto.messageSendConfiguration(messageSendParams.configuration()));
}
if (messageSendParams.metadata() != null) {
builder.setMetadata(ToProto.struct(messageSendParams.metadata()));
}
return builder.build();
}

private String getTaskPushNotificationConfigName(GetTaskPushNotificationConfigParams getTaskPushNotificationConfigParams) {
StringBuilder name = new StringBuilder();
name.append("tasks/");
name.append(getTaskPushNotificationConfigParams.id());
if (getTaskPushNotificationConfigParams.pushNotificationConfigId() != null) {
name.append("/pushNotificationConfigs/");
name.append(getTaskPushNotificationConfigParams.pushNotificationConfigId());
}
return name.toString();
}
}
57 changes: 57 additions & 0 deletions client/src/main/java/io/a2a/client/sse/SSEStreamObserver.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.a2a.client.sse;


import static io.a2a.grpc.utils.ProtoUtils.FromProto;

import java.util.function.Consumer;
import java.util.logging.Logger;

import io.a2a.grpc.StreamResponse;
import io.a2a.spec.StreamingEventKind;
import io.grpc.stub.StreamObserver;

public class SSEStreamObserver implements StreamObserver<StreamResponse> {

private static final Logger log = Logger.getLogger(SSEStreamObserver.class.getName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using java.util.logging.Logger directly couples this library to a specific logging implementation. It is a best practice for libraries to use a logging facade like SLF4J. This allows the application using this library to choose its own logging framework (e.g., Logback, Log4j2) without conflicts.

private final Consumer<StreamingEventKind> eventHandler;
private final Consumer<Throwable> errorHandler;

public SSEStreamObserver(Consumer<StreamingEventKind> eventHandler, Consumer<Throwable> errorHandler) {
this.eventHandler = eventHandler;
this.errorHandler = errorHandler;
}

@Override
public void onNext(StreamResponse response) {
StreamingEventKind event;
switch (response.getPayloadCase()) {
case MSG:
event = FromProto.message(response.getMsg());
break;
case TASK:
event = FromProto.task(response.getTask());
break;
case STATUS_UPDATE:
event = FromProto.taskStatusUpdateEvent(response.getStatusUpdate());
break;
case ARTIFACT_UPDATE:
event = FromProto.taskArtifactUpdateEvent(response.getArtifactUpdate());
break;
default:
log.warning("Invalid stream response " + response.getPayloadCase());
errorHandler.accept(new IllegalStateException("Invalid stream response from server: " + response.getPayloadCase()));
return;
}
eventHandler.accept(event);
}
Comment on lines +25 to +46
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This onNext method can be simplified by using a switch expression (available since Java 14) to make the code more concise and readable.

    public void onNext(StreamResponse response) {
        switch (response.getPayloadCase()) {
            case MSG -> eventHandler.accept(FromProto.message(response.getMsg()));
            case TASK -> eventHandler.accept(FromProto.task(response.getTask()));
            case STATUS_UPDATE -> eventHandler.accept(FromProto.taskStatusUpdateEvent(response.getStatusUpdate()));
            case ARTIFACT_UPDATE -> eventHandler.accept(FromProto.taskArtifactUpdateEvent(response.getArtifactUpdate()));
            default -> {
                log.warning("Invalid stream response " + response.getPayloadCase());
                errorHandler.accept(new IllegalStateException("Invalid stream response from server: " + response.getPayloadCase()));
            }
        }
    }


@Override
public void onError(Throwable t) {
errorHandler.accept(t);
}

@Override
public void onCompleted() {
// done
}
Comment on lines +54 to +56
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The onCompleted method is currently a no-op. It would be beneficial to provide a mechanism for the client to be notified when the stream completes successfully. Consider adding an onCompletedHandler Runnable to the constructor and invoking it here to signal the end of the stream to the consumer.

}
39 changes: 39 additions & 0 deletions grpc/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.github.a2asdk</groupId>
<artifactId>a2a-java-sdk-parent</artifactId>
<version>0.2.6.Beta1-SNAPSHOT</version>
</parent>
<artifactId>a2a-java-sdk-grpc</artifactId>

<packaging>jar</packaging>

<name>Java SDK A2A gRPC</name>
<description>Java SDK for the Agent2Agent Protocol (A2A) - gRPC</description>

<dependencies>
<dependency>
<groupId>io.github.a2asdk</groupId>
<artifactId>a2a-java-sdk-spec</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
</dependency>
</dependencies>

</project>
Loading