Skip to content

Commit fbe67be

Browse files
committed
reduce lock contention in EventQueue.clear_events
1 parent fbdc76f commit fbe67be

File tree

1 file changed

+12
-14
lines changed

1 file changed

+12
-14
lines changed

src/a2a/server/events/event_queue.py

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -177,9 +177,10 @@ async def clear_events(self, clear_child_queues: bool = True) -> None:
177177
If False, only clear the current queue, leaving child queues untouched.
178178
"""
179179
logger.debug('Clearing all events from EventQueue and child queues.')
180+
181+
# Clear all events from the queue, even if closed
182+
cleared_count = 0
180183
async with self._lock:
181-
# Clear all events from the queue, even if closed
182-
cleared_count = 0
183184
while not self.queue.empty():
184185
try:
185186
event = self.queue.get_nowait()
@@ -196,15 +197,12 @@ async def clear_events(self, clear_child_queues: bool = True) -> None:
196197
f'Cleared {cleared_count} unprocessed events from EventQueue.'
197198
)
198199

199-
# Clear all child queues
200-
if clear_child_queues:
201-
child_tasks = []
202-
for child in self._children:
203-
child_tasks.append(
204-
asyncio.create_task(child.clear_events())
205-
)
206-
207-
if child_tasks:
208-
await asyncio.wait(
209-
child_tasks, return_when=asyncio.ALL_COMPLETED
210-
)
200+
# Clear all child queues (lock released before awaiting child tasks)
201+
if clear_child_queues and self._children:
202+
child_tasks = [
203+
asyncio.create_task(child.clear_events())
204+
for child in self._children
205+
]
206+
207+
if child_tasks:
208+
await asyncio.gather(*child_tasks, return_exceptions=True)

0 commit comments

Comments
 (0)