-
Notifications
You must be signed in to change notification settings - Fork 2.5k
fix: _handle_async_stream_response() in OpenAIChatGenerator handles asyncio.CancelledError closing the response stream
#10175
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub. 1 Skipped Deployment
|
Amnah199
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LG to me! But I would wait for @mpangrazzi 's review
mpangrazzi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me!
| --- | ||
| enhancements: | ||
| - | | ||
| The ``_handle_async_stream_response()`` method in ``OpenAIChatGenerator`` now handles ``asyncio.CancelledError`` exceptions. When a streaming task is cancelled mid-stream, the async for loop gracefully closes the stream using ``asyncio.shield()``to ensure the cleanup operation completes even during cancellation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| The ``_handle_async_stream_response()`` method in ``OpenAIChatGenerator`` now handles ``asyncio.CancelledError`` exceptions. When a streaming task is cancelled mid-stream, the async for loop gracefully closes the stream using ``asyncio.shield()``to ensure the cleanup operation completes even during cancellation. | |
| The ``_handle_async_stream_response()`` method in ``OpenAIChatGenerator`` now handles ``asyncio.CancelledError`` exceptions. When a streaming task is cancelled mid-stream, the async for loop gracefully closes the stream using ``asyncio.shield()`` to ensure the cleanup operation completes even during cancellation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added a missing space
Related Issues
Proposed Changes:
The
_handle_async_stream_response()method inOpenAIChatGeneratordid not handleasyncio.CancelledErrorexceptions. When a streaming task was cancelled mid-stream, the async for loop would exit without properly closing the OpenAI stream object - leading to tokens still being sent and charged + potential resource leaks, open connections.Added exception handling in
_handle_async_stream_response()to catchasyncio.CancelledErrorand gracefully close the stream withasyncio.shield()to ensure the cleanup operation completes even during cancellation.How did you test it?
test_async_stream_closes_on_cancellation(): mocks an async stream, starts a streaming task, cancels it mid-stream, and verifies that stream.close() is called exactly once.test_run_async_cancellation_integration(): Tests against the real OpenAI API by starting a long-running streaming task (essay generation), cancelling it after 2 seconds, and verifying graceful cancellation with partial chunks received.Minor fix in
test_openai_responses.py: added missing OPENAI_API_KEY environment variable setup via monkeypatch.Notes for the reviewer
This PR also adds the same functionality for the following ChatGenerators (most are part of
haystack-core-integrationsonly the first is not):AzureOpenAIChatGeneratorNvidiaChatGeneratorAIMLAPIChatGeneratorMistralChatGeneratorSTACKITChatGeneratorOpenRouterChatGeneratorTogetherAIChatGeneratorsince they all inherit from the
OpenAIChatGenerator.Checklist
fix:,feat:,build:,chore:,ci:,docs:,style:,refactor:,perf:,test:and added!in case the PR includes breaking changes.