Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/agents/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,17 @@ def last_agent(self) -> Agent[Any]:
"""
return self.current_agent

def cancel(self) -> None:
"""Cancels the streaming run, stopping all background tasks and marking the run as complete."""
self._cleanup_tasks() # Cancel all running tasks
self.is_complete = True # Mark the run as complete to stop event streaming

# Optionally, clear the event queue to prevent processing stale events
while not self._event_queue.empty():
self._event_queue.get_nowait()
while not self._input_guardrail_queue.empty():
self._input_guardrail_queue.get_nowait()

async def stream_events(self) -> AsyncIterator[StreamEvent]:
"""Stream deltas for new items as they are generated. We're using the types from the
OpenAI Responses API, so these are semantic events: each event has a `type` field that
Expand All @@ -174,6 +185,7 @@ async def stream_events(self) -> AsyncIterator[StreamEvent]:
try:
item = await self._event_queue.get()
except asyncio.CancelledError:
self.cancel() # Ensure tasks are cleaned up if the coroutine is cancelled
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this necessary? cleanup still occurs outside of the loop

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose not, though the AI thought it was 😅

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this

break

if isinstance(item, QueueCompleteSentinel):
Expand Down