Skip to content

Commit 42e83cb

Browse files
author
严骏驰
committed
The main updates in this pull request are as follows:
Updated and improved code comments/docstrings. Updated the format
1 parent a5446e1 commit 42e83cb

File tree

1 file changed

+23
-8
lines changed

1 file changed

+23
-8
lines changed

src/a2a/server/events/event_queue.py

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -128,10 +128,18 @@ def tap(self) -> 'EventQueue':
128128
return queue
129129

130130
async def close(self, immediate: bool = False) -> None:
131-
"""Closes the queue for future push events.
131+
"""
132+
Closes the queue for future push events and also closes all child queues.
133+
134+
Once closed, no new events can be enqueued. For Python 3.13+, this will trigger
135+
`asyncio.QueueShutDown` when the queue is empty and a consumer tries to dequeue.
136+
For lower versions, the queue will be marked as closed and optionally cleared.
137+
138+
Args:
139+
immediate (bool):
140+
- True: Immediately closes the queue and clears all unprocessed events without waiting for them to be consumed. This is suitable for scenarios where you need to forcefully interrupt and quickly release resources.
141+
- False (default): Gracefully closes the queue, waiting for all queued events to be processed (i.e., the queue is drained) before closing. This is suitable when you want to ensure all events are handled.
132142
133-
Once closed, `dequeue_event` will eventually raise `asyncio.QueueShutDown`
134-
when the queue is empty. Also closes all child queues.
135143
"""
136144
logger.debug('Closing EventQueue.')
137145
async with self._lock:
@@ -176,21 +184,28 @@ async def clear_events(self, clear_child_queues: bool = True) -> None:
176184
while not self.queue.empty():
177185
try:
178186
event = self.queue.get_nowait()
179-
logger.debug(f'Discarding unprocessed event of type: {type(event)}, content: {event}')
187+
logger.debug(
188+
f'Discarding unprocessed event of type: {type(event)}, content: {event}'
189+
)
180190
self.queue.task_done()
181191
cleared_count += 1
182192
except asyncio.QueueEmpty:
183193
break
184194

185195
if cleared_count > 0:
186-
logger.debug(f'Cleared {cleared_count} unprocessed events from EventQueue.')
196+
logger.debug(
197+
f'Cleared {cleared_count} unprocessed events from EventQueue.'
198+
)
187199

188200
# Clear all child queues
189201
if clear_child_queues:
190202
child_tasks = []
191203
for child in self._children:
192-
child_tasks.append(asyncio.create_task(child.clear_events()))
204+
child_tasks.append(
205+
asyncio.create_task(child.clear_events())
206+
)
193207

194208
if child_tasks:
195-
await asyncio.wait(child_tasks, return_when=asyncio.ALL_COMPLETED)
196-
209+
await asyncio.wait(
210+
child_tasks, return_when=asyncio.ALL_COMPLETED
211+
)

0 commit comments

Comments
 (0)