Skip to content

Conversation

@ovidiutaralesca
Copy link
Contributor

Issue

It's been described well in #464 :

When the client-side connection is terminated, the EventConsumer stops processing. As a result, any changes to the task state after the disconnection are not persisted to the TaskStore. The task itself continues running in the background, but its updated state is no longer reflected in the TaskStore.

This has been addressed in this PR by simply adding a catch for (asyncio.CancelledError, GeneratorExit) in the on_message_send_stream method.

However, adding that revealed a difference in semantics between Python 3.13+ and <3.13 for EventQueue.close(). I have also addressed that.

How it's reproduced

@azyobuzin provided a detailed guide on this in #464 . My only addition would be to add loggers for a2a.server.events.event_queue and a2a.server.events.event_consumer to get a better understanding of what's happening under the hood.

Fix

Code

  • Ensure streaming continues persisting events after client disconnect via background consumption by adding a catch for (asyncio.CancelledError, GeneratorExit) in the on_message_send_stream method.
  • Align EventQueue.close() behavior on Python ≥3.13 and ≤3.12 (graceful vs. immediate).

Tests

Event queue tests (tests/server/events/test_event_queue.py)

Added/updated tests to verify:

  • Graceful close on ≥3.13 waits for drain and children.
  • Immediate close clears queues and propagates.
  • To support Python 3.10, when simulating ≥3.13 using sys.version_info, inject a dummy queue.shutdown on asyncio.Queue so tests don’t fail on runtimes without it. I've seen this pattern used in existing tests too.

Request handler tests (tests/server/request_handlers/test_default_request_handler.py)

Added test_disconnect_persists_final_task_to_store which tests the flow described in issue #464 :

  • Starts streaming, yields first event, then simulates client disconnect.
  • Background consumer persists the final Task to InMemoryTaskStore.
  • Uses wait_until to await disappearance of the specific background_consume:{task_id} task, then asserts TaskState.completed.

General

Added a cleanup for lingering background tasks, I think it's an improvement for my earlier PR where I've tracked background tasks.

Misc

Ruff 0.13.0 now fails the check for unused variables, made some minimal changes as suggested by the linter, irrelevant to the issue: underscores for _payload and _task_manager.

Fixes #464

@ovidiutaralesca ovidiutaralesca requested a review from a team as a code owner September 13, 2025 08:41
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @ovidiutaralesca, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request resolves a critical issue where task state updates were not reliably persisted to the TaskStore if a client disconnected during an ongoing streaming operation. The changes ensure that tasks continue to be processed and their final states are saved, preventing data loss. Additionally, it standardizes the behavior of the EventQueue.close() method across different Python versions, providing predictable queue shutdown semantics, and includes comprehensive test coverage for these fixes.

Highlights

  • Client Disconnect Task Persistence: Implemented a fix to ensure that task state is properly persisted to the TaskStore even if a client disconnects prematurely during a streaming operation. This is achieved by catching asyncio.CancelledError and GeneratorExit in on_message_send_stream and initiating a background task to consume and persist remaining events.
  • Consistent EventQueue.close() Behavior: Addressed differences in EventQueue.close() semantics between Python 3.13+ and earlier versions. The close method now provides consistent graceful (wait for drain) and immediate (clear pending events) shutdown behavior across all supported Python versions.
  • New Test Coverage: Introduced new tests for EventQueue to verify the consistent close() behavior on Python 3.13+ and added a dedicated test (test_disconnect_persists_final_task_to_store) to confirm that task states are correctly persisted after a client disconnect.
  • Background Task Cleanup: Improved test stability and resource management by adding cleanup logic for lingering background tasks in relevant test cases.
  • Linter-Suggested Changes: Applied minor code changes, primarily renaming unused variables (e.g., payload to _payload, task_manager to _task_manager), as suggested by Ruff linter version 0.13.0.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request addresses a critical issue where task state was not persisted after a client disconnect. The fix in on_message_send_stream to catch disconnection errors and spawn a background task for continued event consumption is well-implemented and robust. The associated refactoring of EventQueue.close() to align behavior across Python versions is a great improvement for consistency. The new tests, particularly test_disconnect_persists_final_task_to_store, are thorough and effectively validate the fix.

I have a couple of suggestions in src/a2a/server/events/event_queue.py to use asyncio.gather instead of asyncio.wait for better exception propagation and code clarity. Overall, this is an excellent contribution that significantly improves the reliability of the server.

@ovidiutaralesca ovidiutaralesca force-pushed the fix_task_state_not_persisted branch from a9b3010 to 1da4886 Compare September 13, 2025 08:52
@ovidiutaralesca
Copy link
Contributor Author

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request effectively resolves a critical issue where task state was not persisted if a client disconnected during a streaming response. The approach of catching the disconnection errors and spawning a background task to continue event processing is robust and well-implemented. The accompanying refactoring to standardize EventQueue.close() behavior across Python versions is a valuable improvement for code maintainability. The new tests are comprehensive and provide strong confidence in the fix. I have a few minor suggestions to strengthen some test assertions and to simplify a piece of logic in EventQueue.

Copy link
Contributor

@mikeas1 mikeas1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great, thanks for the contribution. Continuing processing upon client disconnect is definitely the behavior we want, so ensuring this works in all case makes sense. Tests look great.

@mikeas1 mikeas1 merged commit 5342ca4 into a2aproject:main Sep 17, 2025
6 checks passed
holtskinner pushed a commit that referenced this pull request Sep 23, 2025
🤖 I have created a release *beep* *boop*
---


##
[0.3.7](v0.3.6...v0.3.7)
(2025-09-22)


### Bug Fixes

* jsonrpc client send streaming request header and timeout field
([#475](#475))
([675354a](675354a))
* Task state is not persisted to task store after client disconnect
([#472](#472))
([5342ca4](5342ca4)),
closes [#464](#464)

---
This PR was generated with [Release
Please](https://github.com/googleapis/release-please). See
[documentation](https://github.com/googleapis/release-please#release-please).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug]: Task state is not persisted to TaskStore after client disconnect

4 participants