Skip to content

Conversation

@ChenKangQiang
Copy link
Contributor

Description

fix issuse #224

  • Follow the CONTRIBUTING Guide.
  • Make your Pull Request title in the https://www.conventionalcommits.org/ specification.
    • Important Prefixes for release-please:
      • fix: which represents bug fixes, and correlates to a SemVer patch.
      • feat: represents a new feature, and correlates to a SemVer minor.
      • feat!:, or fix!:, refactor!:, etc., which represent a breaking change (indicated by the !) and will result in a SemVer major.
  • Ensure the tests pass
  • Appropriate READMEs were updated (if necessary)

Fixes #<issue_number_goes_here> 🦕

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @ChenKangQiang, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses issue #224 by modifying the JSON-RPC handler to ensure proper asynchronous processing of streaming messages and updating related tests for correct synchronization. The core change involves wrapping the Flow.Subscriber logic in a CompletableFuture.runAsync block, which ensures that message processing occurs off the main thread. Correspondingly, several unit tests have been refactored to correctly await the completion of these asynchronous operations, preventing race conditions and ensuring reliable test outcomes.

Highlights

  • Asynchronous Streaming Message Handling: The convertToSendStreamingMessageResponse method in JSONRPCHandler now executes the Flow.Subscriber logic asynchronously using CompletableFuture.runAsync. This change aims to prevent blocking and improve the responsiveness of streaming message processing.
  • Enhanced Test Synchronization for Asynchronous Flows: Multiple test methods in JSONRPCHandlerTest have been updated to correctly synchronize with asynchronous operations. This includes adding CompletableFuture and future.join() calls to ensure tests wait for all streaming events to be processed before asserting results, thereby improving test reliability.
  • Improved Error and Completion Handling in Tests: The onError and onComplete callbacks within the test subscribers now explicitly complete the CompletableFuture, allowing the main test thread to correctly handle exceptions or signal successful completion of the asynchronous stream.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request addresses a potential blocking issue in JSONRPCHandler by executing a subscription asynchronously. It also fixes several race conditions in the test suite by ensuring tests wait for asynchronous operations to complete. My review highlights a potential issue with the asynchronous implementation in JSONRPCHandler where exceptions could be swallowed, and suggests a refactoring in the tests to reduce code duplication and improve maintainability.

Comment on lines 216 to 249
CompletableFuture.runAsync(() -> {
publisher.subscribe(new Flow.Subscriber<StreamingEventKind>() {
Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
onComplete();
}

@Override
public void onComplete() {
tube.complete();
}
@Override
public void onNext(StreamingEventKind item) {
tube.send(new SendStreamingMessageResponse(requestId, item));
subscription.request(1);
}

@Override
public void onError(Throwable throwable) {
if (throwable instanceof JSONRPCError jsonrpcError) {
tube.send(new SendStreamingMessageResponse(requestId, jsonrpcError));
} else {
tube.send(
new SendStreamingMessageResponse(
requestId, new
InternalError(throwable.getMessage())));
}
onComplete();
}

@Override
public void onComplete() {
tube.complete();
}
});
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

While using CompletableFuture.runAsync is a good way to avoid blocking the current thread, any RuntimeException thrown by publisher.subscribe() will be swallowed. This would cause the returned Publisher to never complete or emit any items, potentially causing the client to hang. It's safer to handle potential exceptions from the asynchronous operation and propagate them to the tube.

                CompletableFuture.runAsync(() -> {
                    publisher.subscribe(new Flow.Subscriber<StreamingEventKind>() {
                        Flow.Subscription subscription;
                        @Override
                        public void onSubscribe(Flow.Subscription subscription) {
                            this.subscription = subscription;
                            subscription.request(1);
                        }

                        @Override
                        public void onNext(StreamingEventKind item) {
                            tube.send(new SendStreamingMessageResponse(requestId, item));
                            subscription.request(1);
                        }

                        @Override
                        public void onError(Throwable throwable) {
                            if (throwable instanceof JSONRPCError jsonrpcError) {
                                tube.send(new SendStreamingMessageResponse(requestId, jsonrpcError));
                            } else {
                                tube.send(
                                        new SendStreamingMessageResponse(
                                                requestId, new
                                                InternalError(throwable.getMessage())));
                            }
                            onComplete();
                        }

                        @Override
                        public void onComplete() {
                            tube.complete();
                        }
                    });
                }).whenComplete((res, err) -> {
                    if (err != null) {
                        tube.fail(err);
                    }
                });

@ChenKangQiang ChenKangQiang changed the title fix issuse 224 fix: issuse 224 Aug 19, 2025
# Conflicts:
#	transport/jsonrpc/src/test/java/io/a2a/jsonrpc/handler/JSONRPCHandlerTest.java
@kabir
Copy link
Collaborator

kabir commented Aug 29, 2025

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request addresses issue #224 by wrapping a publisher subscription in CompletableFuture.runAsync, likely to resolve a blocking issue in the streaming logic. The changes also include substantial improvements to the test suite, where CountDownLatch and CompletableFuture are used to correctly handle asynchronous operations and prevent race conditions, making the tests more robust.

I've found one high-severity issue where an exception during the asynchronous subscription setup might not be propagated, potentially causing client streams to hang. I've provided a suggestion to handle this case.

On a minor note, there's a typo in the pull request title ("issuse" should be "issue"). Correcting it would improve clarity and adhere to Conventional Commits standards.

Comment on lines +246 to 279
CompletableFuture.runAsync(() -> {
publisher.subscribe(new Flow.Subscriber<StreamingEventKind>() {
Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}

@Override
public void onNext(StreamingEventKind item) {
tube.send(new SendStreamingMessageResponse(requestId, item));
subscription.request(1);
}
@Override
public void onNext(StreamingEventKind item) {
tube.send(new SendStreamingMessageResponse(requestId, item));
subscription.request(1);
}

@Override
public void onError(Throwable throwable) {
if (throwable instanceof JSONRPCError jsonrpcError) {
tube.send(new SendStreamingMessageResponse(requestId, jsonrpcError));
} else {
tube.send(
new SendStreamingMessageResponse(
requestId, new
InternalError(throwable.getMessage())));
@Override
public void onError(Throwable throwable) {
if (throwable instanceof JSONRPCError jsonrpcError) {
tube.send(new SendStreamingMessageResponse(requestId, jsonrpcError));
} else {
tube.send(
new SendStreamingMessageResponse(
requestId, new
InternalError(throwable.getMessage())));
}
onComplete();
}
onComplete();
}

@Override
public void onComplete() {
tube.complete();
}
@Override
public void onComplete() {
tube.complete();
}
});
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The CompletableFuture.runAsync call is 'fire-and-forget'. If an exception occurs during the execution of the lambda (e.g., if publisher.subscribe() throws an exception), the returned CompletableFuture will complete exceptionally, but this is not being handled. This could lead to the downstream subscriber hanging indefinitely as the tube would never be failed or completed.

It's important to handle potential exceptions from the asynchronous operation to ensure the stream is properly terminated. You can do this by chaining .exceptionally() to the CompletableFuture.

                CompletableFuture.runAsync(() -> {
                    publisher.subscribe(new Flow.Subscriber<StreamingEventKind>() {
                        Flow.Subscription subscription;
                        @Override
                        public void onSubscribe(Flow.Subscription subscription) {
                            this.subscription = subscription;
                            subscription.request(1);
                        }

                        @Override
                        public void onNext(StreamingEventKind item) {
                            tube.send(new SendStreamingMessageResponse(requestId, item));
                            subscription.request(1);
                        }

                        @Override
                        public void onError(Throwable throwable) {
                            if (throwable instanceof JSONRPCError jsonrpcError) {
                                tube.send(new SendStreamingMessageResponse(requestId, jsonrpcError));
                            } else {
                                tube.send(
                                        new SendStreamingMessageResponse(
                                                requestId, new
                                                InternalError(throwable.getMessage())));
                            }
                            onComplete();
                        }

                        @Override
                        public void onComplete() {
                            tube.complete();
                        }
                    });
                }).exceptionally(throwable -> {
                    tube.fail(throwable);
                    return null;
                });

Copy link
Collaborator

@kabir kabir left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @ChenKangQiang thanks for the contribution.

As mentioned in my comment on #224 I am not sure exactly what the issue is. Can you explain a bit better?

I've done a first pass at looking, and have made a few comments.

I think it could also be good to have a test, although looking at the original issue, I am not totally sure what that would look like :-)

Also, it needs a rebase, and Gemini seems to have found some things.

import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't use static imports

tube.send(new SendStreamingMessageResponse(requestId, item));
subscription.request(1);
}
@Override
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you using tabs here? Please replace with spaces if so

@kabir
Copy link
Collaborator

kabir commented Sep 1, 2025

@ChenKangQiang I am closing this one, and have opened #247 as a replacement. Thanks!

@kabir kabir closed this Sep 1, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants