File tree Expand file tree Collapse file tree 2 files changed +37
-0
lines changed
Expand file tree Collapse file tree 2 files changed +37
-0
lines changed Original file line number Diff line number Diff line change @@ -125,6 +125,7 @@ async def consume_all(self) -> AsyncGenerator[Event]:
125125 # other part is waiting for an event or a closed queue.
126126 if is_final_event :
127127 logger .debug ('Stopping event consumption in consume_all.' )
128+ await self .queue .clear_events ()
128129 await self .queue .close ()
129130 yield event
130131 break
Original file line number Diff line number Diff line change @@ -154,3 +154,39 @@ async def close(self) -> None:
154154 def is_closed (self ) -> bool :
155155 """Checks if the queue is closed."""
156156 return self ._is_closed
157+
158+ async def clear_events (self , clear_child_queues : bool = True ) -> None :
159+ """Clears all events from the current queue and optionally all child queues.
160+
161+ This method removes all pending events from the queue without processing them.
162+ Child queues can be optionally cleared based on the clear_child_queues parameter.
163+
164+ Args:
165+ clear_child_queues: If True (default), clear all child queues as well.
166+ If False, only clear the current queue, leaving child queues untouched.
167+ """
168+ logger .debug ('Clearing all events from EventQueue and child queues.' )
169+ async with self ._lock :
170+ # Clear all events from the queue, even if closed
171+ cleared_count = 0
172+ while not self .queue .empty ():
173+ try :
174+ event = self .queue .get_nowait ()
175+ logger .debug (f'Discarding unprocessed event of type: { type (event )} , content: { event } ' )
176+ self .queue .task_done ()
177+ cleared_count += 1
178+ except asyncio .QueueEmpty :
179+ break
180+
181+ if cleared_count > 0 :
182+ logger .debug (f'Cleared { cleared_count } unprocessed events from EventQueue.' )
183+
184+ # Clear all child queues
185+ if clear_child_queues :
186+ child_tasks = []
187+ for child in self ._children :
188+ child_tasks .append (asyncio .create_task (child .clear_events ()))
189+
190+ if child_tasks :
191+ await asyncio .wait (child_tasks , return_when = asyncio .ALL_COMPLETED )
192+
You can’t perform that action at this time.
0 commit comments