From 9aeb460dac42a66e3eab88f39029553fbb83a127 Mon Sep 17 00:00:00 2001 From: Ankan Misra Date: Mon, 15 Sep 2025 17:24:21 +0530 Subject: [PATCH 1/3] fix: resolve race condition in RunResultStreaming.stream_events() causing premature session cleanup --- src/agents/result.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/agents/result.py b/src/agents/result.py index 9d57da13d..6a6c12f01 100644 --- a/src/agents/result.py +++ b/src/agents/result.py @@ -213,6 +213,9 @@ async def stream_events(self) -> AsyncIterator[StreamEvent]: yield item self._event_queue.task_done() + # Ensure main execution completes before cleanup to avoid race conditions with session operations + await self._await_task_safely(self._run_impl_task) + # Safely terminate all background tasks after main execution has finished self._cleanup_tasks() if self._stored_exception: From c95cbd7ff378ff409a06acfb5c982b8cacc85bab Mon Sep 17 00:00:00 2001 From: Ankan Misra Date: Mon, 15 Sep 2025 17:39:20 +0530 Subject: [PATCH 2/3] feat:fixed the lint error --- src/agents/result.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/agents/result.py b/src/agents/result.py index 6a6c12f01..b63fae24b 100644 --- a/src/agents/result.py +++ b/src/agents/result.py @@ -213,7 +213,8 @@ async def stream_events(self) -> AsyncIterator[StreamEvent]: yield item self._event_queue.task_done() - # Ensure main execution completes before cleanup to avoid race conditions with session operations + # Ensure main execution completes before cleanup to avoid race conditions + # with session operations await self._await_task_safely(self._run_impl_task) # Safely terminate all background tasks after main execution has finished self._cleanup_tasks() From b261abbc129b701f86a141d62f5240a7d8eb22f4 Mon Sep 17 00:00:00 2001 From: Ankan Misra Date: Mon, 15 Sep 2025 18:09:41 +0530 Subject: [PATCH 3/3] fix: add cleanup guarantee to stream_events --- src/agents/result.py | 57 +++++++++++++++++++++++--------------------- 1 file changed, 30 insertions(+), 27 deletions(-) diff --git a/src/agents/result.py b/src/agents/result.py index b63fae24b..26609da28 100644 --- a/src/agents/result.py +++ b/src/agents/result.py @@ -185,39 +185,42 @@ async def stream_events(self) -> AsyncIterator[StreamEvent]: - A MaxTurnsExceeded exception if the agent exceeds the max_turns limit. - A GuardrailTripwireTriggered exception if a guardrail is tripped. """ - while True: - self._check_errors() - if self._stored_exception: - logger.debug("Breaking due to stored exception") - self.is_complete = True - break + try: + while True: + self._check_errors() + if self._stored_exception: + logger.debug("Breaking due to stored exception") + self.is_complete = True + break - if self.is_complete and self._event_queue.empty(): - break + if self.is_complete and self._event_queue.empty(): + break - try: - item = await self._event_queue.get() - except asyncio.CancelledError: - break + try: + item = await self._event_queue.get() + except asyncio.CancelledError: + break - if isinstance(item, QueueCompleteSentinel): - # Await input guardrails if they are still running, so late exceptions are captured. - await self._await_task_safely(self._input_guardrails_task) + if isinstance(item, QueueCompleteSentinel): + # Await input guardrails if they are still running, so late + # exceptions are captured. + await self._await_task_safely(self._input_guardrails_task) - self._event_queue.task_done() + self._event_queue.task_done() - # Check for errors, in case the queue was completed due to an exception - self._check_errors() - break - - yield item - self._event_queue.task_done() + # Check for errors, in case the queue was completed + # due to an exception + self._check_errors() + break - # Ensure main execution completes before cleanup to avoid race conditions - # with session operations - await self._await_task_safely(self._run_impl_task) - # Safely terminate all background tasks after main execution has finished - self._cleanup_tasks() + yield item + self._event_queue.task_done() + finally: + # Ensure main execution completes before cleanup to avoid race conditions + # with session operations + await self._await_task_safely(self._run_impl_task) + # Safely terminate all background tasks after main execution has finished + self._cleanup_tasks() if self._stored_exception: raise self._stored_exception