@@ -185,35 +185,42 @@ async def stream_events(self) -> AsyncIterator[StreamEvent]:
185185 - A MaxTurnsExceeded exception if the agent exceeds the max_turns limit.
186186 - A GuardrailTripwireTriggered exception if a guardrail is tripped.
187187 """
188- while True :
189- self ._check_errors ()
190- if self ._stored_exception :
191- logger .debug ("Breaking due to stored exception" )
192- self .is_complete = True
193- break
188+ try :
189+ while True :
190+ self ._check_errors ()
191+ if self ._stored_exception :
192+ logger .debug ("Breaking due to stored exception" )
193+ self .is_complete = True
194+ break
194195
195- if self .is_complete and self ._event_queue .empty ():
196- break
196+ if self .is_complete and self ._event_queue .empty ():
197+ break
197198
198- try :
199- item = await self ._event_queue .get ()
200- except asyncio .CancelledError :
201- break
199+ try :
200+ item = await self ._event_queue .get ()
201+ except asyncio .CancelledError :
202+ break
202203
203- if isinstance (item , QueueCompleteSentinel ):
204- # Await input guardrails if they are still running, so late exceptions are captured.
205- await self ._await_task_safely (self ._input_guardrails_task )
204+ if isinstance (item , QueueCompleteSentinel ):
205+ # Await input guardrails if they are still running, so late
206+ # exceptions are captured.
207+ await self ._await_task_safely (self ._input_guardrails_task )
206208
207- self ._event_queue .task_done ()
209+ self ._event_queue .task_done ()
208210
209- # Check for errors, in case the queue was completed due to an exception
210- self ._check_errors ()
211- break
212-
213- yield item
214- self ._event_queue .task_done ()
211+ # Check for errors, in case the queue was completed
212+ # due to an exception
213+ self ._check_errors ()
214+ break
215215
216- self ._cleanup_tasks ()
216+ yield item
217+ self ._event_queue .task_done ()
218+ finally :
219+ # Ensure main execution completes before cleanup to avoid race conditions
220+ # with session operations
221+ await self ._await_task_safely (self ._run_impl_task )
222+ # Safely terminate all background tasks after main execution has finished
223+ self ._cleanup_tasks ()
217224
218225 if self ._stored_exception :
219226 raise self ._stored_exception
0 commit comments