Skip to content

Conversation

@fjuma
Copy link
Collaborator

@fjuma fjuma commented Jul 31, 2025

This PR contains:

  • Utility classes for converting from spec classes and gRPC classes and vice versa
  • An initial gRPC client implementation
  • An initial gRPC request handler implementation
  • Tests for the gRPC request handler implementation
  • gRPC server integration
  • Follow the CONTRIBUTING Guide.
  • Make your Pull Request title in the https://www.conventionalcommits.org/ specification.
    • Important Prefixes for release-please:
      • fix: which represents bug fixes, and correlates to a SemVer patch.
      • feat: represents a new feature, and correlates to a SemVer minor.
      • feat!:, or fix!:, refactor!:, etc., which represent a breaking change (indicated by the !) and will result in a SemVer major.
  • Ensure the tests pass
  • Appropriate READMEs were updated (if necessary)

Fixes #210 🦕

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @fjuma, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

I've started implementing a partial gRPC transport layer for the A2A Java SDK. This work introduces the necessary client-side components to communicate via gRPC, including a new gRPC client, utility classes for data conversion between A2A's internal specification and gRPC's Protocol Buffers, and a mechanism for handling streaming responses. This foundational work aims to provide an alternative, high-performance communication protocol for the SDK.

Highlights

  • New gRPC Module and Integration: I've introduced a new a2a-java-sdk-grpc module and integrated it into the existing client, enabling gRPC as a transport option.
  • Initial gRPC Client Implementation: I've implemented the core A2AGrpcClient which provides client-side gRPC functionalities for sending messages, managing tasks, and handling push notifications.
  • Data Conversion Utilities: I've added utility classes for seamless conversion between A2A specification classes and their gRPC Protocol Buffer counterparts, ensuring data compatibility across the new transport layer.
  • Streaming Event Handling: I've included an SSEStreamObserver to manage streaming responses from the gRPC server, allowing for real-time updates on tasks and other events.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments or fill out our survey to provide feedback.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a partial implementation of a gRPC transport. The changes include a new grpc module with generated protobuf and gRPC classes, a gRPC client, and a stream observer. My review focuses on improving correctness, maintainability, and error handling in the new client code. Key feedback points include correcting exception chaining to preserve gRPC status details, improving error reporting in the stream observer, and addressing maintainability issues like unused fields and hardcoded strings. I've also suggested using SLF4J for logging, which is a best practice for libraries.

throw new A2AServerException("Failed to send message: ");
}
} catch (StatusRuntimeException e) {
throw new A2AServerException("Failed to send message: " + e, e.getCause());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

When catching a StatusRuntimeException, it's better to wrap the original exception e directly as the cause, rather than e.getCause(). StatusRuntimeException contains valuable gRPC status information (like status code and trailers) that is lost if you only pass its cause. This will significantly aid in debugging network and server-side issues.

This issue is present in all catch (StatusRuntimeException e) blocks within this class.

            throw new A2AServerException("Failed to send message: " + e, e);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Comment on lines 36 to 37
log.warning("Invalid stream response " + response);
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

When an invalid stream response is received, a warning is logged, but the errorHandler is not invoked. This can silently hide problems from the client application, which likely expects all errors to be reported through the provided handler. You should call the errorHandler with an appropriate exception in this case.

            log.warning("Invalid stream response " + response);
            errorHandler.accept(new IllegalStateException("Invalid stream response from server: " + response.getPayloadCase()));
            return;

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Comment on lines +50 to +56
public A2AGrpcClient(Channel channel, AgentCard agentCard) {
checkNotNullParam("channel", channel);
checkNotNullParam("agentCard", agentCard);
this.asyncStub = A2AServiceGrpc.newStub(channel);
this.blockingStub = A2AServiceGrpc.newBlockingV2Stub(channel);
this.agentCard = agentCard;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The agentCard field is initialized in the constructor but its value is never used throughout the class. To improve code clarity and remove dead code, this field and its corresponding constructor parameter and assignments should be removed.

    public A2AGrpcClient(Channel channel) {
        checkNotNullParam("channel", channel);
        this.asyncStub = A2AServiceGrpc.newStub(channel);
        this.blockingStub = A2AServiceGrpc.newBlockingV2Stub(channel);
    }

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping it to be consistent with the python implementation

} else if (response.hasTask()) {
return FromProto.task(response.getTask());
} else {
throw new A2AServerException("Failed to send message: ");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The exception message here is not very descriptive. If the server sends a response that doesn't contain a message or a task, this exception will be thrown with a generic message. It would be more helpful for debugging to include information about the invalid response, such as its payload case.

                throw new A2AServerException("Invalid response from server: no message or task received. Payload case was: " + response.getPayloadCase());

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

*/
public Task getTask(TaskQueryParams taskQueryParams) throws A2AServerException {
GetTaskRequest.Builder requestBuilder = GetTaskRequest.newBuilder();
requestBuilder.setName("tasks/" + taskQueryParams.id());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The resource name prefix "tasks/" is hardcoded here and in other places in this class. It's a good practice to define such strings as constants to improve maintainability and prevent typos. Consider creating a constant for this prefix.


public class SSEStreamObserver implements StreamObserver<StreamResponse> {

private static final Logger log = Logger.getLogger(SSEStreamObserver.class.getName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using java.util.logging.Logger directly couples this library to a specific logging implementation. It is a best practice for libraries to use a logging facade like SLF4J. This allows the application using this library to choose its own logging framework (e.g., Logback, Log4j2) without conflicts.

Comment on lines +48 to +50
public void onCompleted() {
// done
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The onCompleted method is currently a no-op. It would be beneficial to provide a mechanism for the client to be notified when the stream completes successfully. Consider adding an onCompletedHandler Runnable to the constructor and invoking it here to signal the end of the stream to the consumer.

grpc/pom.xml Outdated
</dependency>
</dependencies>

</project> No newline at end of file
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This file does not end with a newline character. It's a common convention and a POSIX standard to end files with a newline. This can prevent issues with some development tools and source control systems.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@fjuma fjuma changed the title [WIP][DO NOT MERGE] Partial gRPC transport implementation [WIP][DO NOT MERGE] feat: Partial gRPC transport implementation Jul 31, 2025
@fjuma fjuma changed the title [WIP][DO NOT MERGE] feat: Partial gRPC transport implementation feat: [WIP][DO NOT MERGE] Partial gRPC transport implementation Jul 31, 2025
@fjuma fjuma force-pushed the grpc branch 2 times, most recently from 955d58b to 56f6706 Compare July 31, 2025 20:33
@fjuma fjuma changed the title feat: [WIP][DO NOT MERGE] Partial gRPC transport implementation feat: gRPC transport implementation Aug 1, 2025
@fjuma fjuma marked this pull request as ready for review August 1, 2025 16:36
@kabir
Copy link
Collaborator

kabir commented Aug 1, 2025

/gemini review

@gemini-code-assist
Copy link
Contributor

Warning

Gemini encountered an error creating the review. You can try again by commenting /gemini review.

@fjuma
Copy link
Collaborator Author

fjuma commented Aug 1, 2025

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a gRPC transport implementation for the A2A Java SDK. The changes include a new gRPC client, request handlers, and updates to the build configuration. My review focuses on improving correctness, readability, and maintainability. I've identified a few issues, including a bug in request creation where required fields were missing, a typo in a resource path, and an unused field. I've also suggested refactoring opportunities to make the code more concise and idiomatic.

Comment on lines 128 to 130
CreateTaskPushNotificationConfigRequest request = CreateTaskPushNotificationConfigRequest.newBuilder()
.setConfig(ToProto.taskPushNotificationConfig(taskPushNotificationConfig))
.build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The CreateTaskPushNotificationConfigRequest is missing the required parent and config_id fields. According to the protobuf definition, these fields are mandatory for this request.

        CreateTaskPushNotificationConfigRequest request = CreateTaskPushNotificationConfigRequest.newBuilder()
                .setParent("tasks/" + taskPushNotificationConfig.taskId())
                .setConfigId(taskPushNotificationConfig.pushNotificationConfig().id())
                .setConfig(ToProto.taskPushNotificationConfig(taskPushNotificationConfig))
                .build();

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

name.append("tasks/");
name.append(getTaskPushNotificationConfigParams.id());
if (getTaskPushNotificationConfigParams.pushNotificationConfigId() != null) {
name.append("pushNotification/");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There's a typo in the path segment for push notifications. It should be pushNotificationConfigs/ instead of pushNotification/ to match the API definition.

            name.append("/pushNotificationConfigs/");

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed


private A2AServiceBlockingV2Stub blockingStub;
private A2AServiceStub asyncStub;
private AgentCard agentCard;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The agentCard field is initialized in the constructor but is never used within the class. It should be removed to avoid dead code.

} else if (response.hasTask()) {
return FromProto.task(response.getTask());
} else {
throw new A2AServerException("Failed to send message: ");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The exception message is not very informative. It would be better to specify that the response from the server was invalid because it was empty.

Suggested change
throw new A2AServerException("Failed to send message: ");
throw new A2AServerException("Invalid response from server: no message or task received.");

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Comment on lines 25 to 40
public void onNext(StreamResponse response) {
StreamingEventKind event;
if (response.hasMsg()) {
event = FromProto.message(response.getMsg());
} else if (response.hasTask()) {
event = FromProto.task(response.getTask());
} else if (response.hasStatusUpdate()) {
event = FromProto.taskStatusUpdateEvent(response.getStatusUpdate());
} else if (response.hasArtifactUpdate()) {
event = FromProto.taskArtifactUpdateEvent(response.getArtifactUpdate());
} else {
log.warning("Invalid stream response " + response);
return;
}
eventHandler.accept(event);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The if-else if chain can be replaced with a switch statement on response.getPayloadCase() for better readability and maintainability. This is the standard way to handle oneof fields in protobuf.

    public void onNext(StreamResponse response) {
        StreamingEventKind event;
        switch (response.getPayloadCase()) {
            case MSG:
                event = FromProto.message(response.getMsg());
                break;
            case TASK:
                event = FromProto.task(response.getTask());
                break;
            case STATUS_UPDATE:
                event = FromProto.taskStatusUpdateEvent(response.getStatusUpdate());
                break;
            case ARTIFACT_UPDATE:
                event = FromProto.taskArtifactUpdateEvent(response.getArtifactUpdate());
                break;
            default:
                log.warning("Invalid stream response " + response);
                return;
        }
        eventHandler.accept(event);
    }

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@kabir
Copy link
Collaborator

kabir commented Aug 3, 2025

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a gRPC transport implementation, including a client, request handler, and necessary utility classes. The changes are well-structured, adding a new grpc module and integrating it into the existing client module. My review focuses on improving code consistency and maintainability in the new A2AGrpcClient and SSEStreamObserver classes. I've suggested using consistent string formatting for resource names and modernizing a switch statement for better readability. Additionally, I've pointed out an unused field that could be removed to simplify the code.


private A2AServiceBlockingV2Stub blockingStub;
private A2AServiceStub asyncStub;
private AgentCard agentCard;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The agentCard field is initialized in the constructor but is never used within the A2AGrpcClient class. This could indicate dead code or a feature that is not yet fully implemented. If it's not needed, it should be removed to simplify the code.

*/
public Task getTask(TaskQueryParams taskQueryParams) throws A2AServerException {
GetTaskRequest.Builder requestBuilder = GetTaskRequest.newBuilder();
requestBuilder.setName("tasks/" + taskQueryParams.id());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For consistency and better readability when constructing resource names, consider using String.format(). This approach is generally preferred over string concatenation, especially as resource paths become more complex. The same applies to the cancelTask and getTaskPushNotificationConfigName methods.

Suggested change
requestBuilder.setName("tasks/" + taskQueryParams.id());
requestBuilder.setName(String.format("tasks/%s", taskQueryParams.id()));

Comment on lines +25 to +46
public void onNext(StreamResponse response) {
StreamingEventKind event;
switch (response.getPayloadCase()) {
case MSG:
event = FromProto.message(response.getMsg());
break;
case TASK:
event = FromProto.task(response.getTask());
break;
case STATUS_UPDATE:
event = FromProto.taskStatusUpdateEvent(response.getStatusUpdate());
break;
case ARTIFACT_UPDATE:
event = FromProto.taskArtifactUpdateEvent(response.getArtifactUpdate());
break;
default:
log.warning("Invalid stream response " + response.getPayloadCase());
errorHandler.accept(new IllegalStateException("Invalid stream response from server: " + response.getPayloadCase()));
return;
}
eventHandler.accept(event);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This onNext method can be simplified by using a switch expression (available since Java 14) to make the code more concise and readable.

    public void onNext(StreamResponse response) {
        switch (response.getPayloadCase()) {
            case MSG -> eventHandler.accept(FromProto.message(response.getMsg()));
            case TASK -> eventHandler.accept(FromProto.task(response.getTask()));
            case STATUS_UPDATE -> eventHandler.accept(FromProto.taskStatusUpdateEvent(response.getStatusUpdate()));
            case ARTIFACT_UPDATE -> eventHandler.accept(FromProto.taskArtifactUpdateEvent(response.getArtifactUpdate()));
            default -> {
                log.warning("Invalid stream response " + response.getPayloadCase());
                errorHandler.accept(new IllegalStateException("Invalid stream response from server: " + response.getPayloadCase()));
            }
        }
    }

@kabir kabir merged commit 4b11c00 into a2aproject:main Aug 4, 2025
3 checks passed
@@ -0,0 +1,5 @@
quarkus.grpc.clients.a2-a-service.host=localhost
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kabir Just to check, is a2-a intentional here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll fix this in #214

kabir added a commit to kabir/a2a-java that referenced this pull request Dec 23, 2025
This PR contains:

* Utility classes for converting from spec classes and gRPC classes and
vice versa
* An initial gRPC client implementation
* An initial gRPC request handler implementation
* Tests for the gRPC request handler implementation
* gRPC server integration

- [X] Follow the [`CONTRIBUTING` Guide](../CONTRIBUTING.md).
- [X] Make your Pull Request title in the
<https://www.conventionalcommits.org/> specification.
- Important Prefixes for
[release-please](https://github.com/googleapis/release-please):
- `fix:` which represents bug fixes, and correlates to a
[SemVer](https://semver.org/) patch.
- `feat:` represents a new feature, and correlates to a SemVer minor.
- `feat!:`, or `fix!:`, `refactor!:`, etc., which represent a breaking
change (indicated by the `!`) and will result in a SemVer major.
- [X] Ensure the tests pass
- [X] Appropriate READMEs were updated (if necessary)

Fixes a2aproject#210 🦕

---------

Co-authored-by: Kabir Khan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feat]: Add support for gRPC

2 participants