Skip to content

Commit a5446e1

Browse files
author
严骏驰
committed
fix: #367 add immediate parameter to EventQueue.close() for forced queue shutdown
- Add immediate parameter to EventQueue.close() method to allow immediate queue closure by discarding all pending events - Update EventConsumer.consume_all() to use close(immediate=True) when encountering the first final event to prevent blocking on subsequent final events - This resolves the issue where multiple final events in queue would cause the consumer to block after processing the first final event The immediate parameter provides a way to force close the queue without waiting for all events to be processed, which is useful in scenarios where the consumer needs to exit immediately upon encountering a final event, regardless of remaining events in the queue.
1 parent 66526b9 commit a5446e1

File tree

2 files changed

+10
-7
lines changed

2 files changed

+10
-7
lines changed

src/a2a/server/events/event_consumer.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,7 @@ async def consume_all(self) -> AsyncGenerator[Event]:
125125
# other part is waiting for an event or a closed queue.
126126
if is_final_event:
127127
logger.debug('Stopping event consumption in consume_all.')
128-
await self.queue.clear_events()
129-
await self.queue.close()
128+
await self.queue.close(True)
130129
yield event
131130
break
132131
yield event

src/a2a/server/events/event_queue.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ def tap(self) -> 'EventQueue':
127127
self._children.append(queue)
128128
return queue
129129

130-
async def close(self) -> None:
130+
async def close(self, immediate: bool = False) -> None:
131131
"""Closes the queue for future push events.
132132
133133
Once closed, `dequeue_event` will eventually raise `asyncio.QueueShutDown`
@@ -136,16 +136,20 @@ async def close(self) -> None:
136136
logger.debug('Closing EventQueue.')
137137
async with self._lock:
138138
# If already closed, just return.
139-
if self._is_closed:
139+
if self._is_closed and not immediate:
140140
return
141-
self._is_closed = True
141+
if not self._is_closed:
142+
self._is_closed = True
142143
# If using python 3.13 or higher, use the shutdown method
143144
if sys.version_info >= (3, 13):
144-
self.queue.shutdown()
145+
self.queue.shutdown(immediate)
145146
for child in self._children:
146-
await child.close()
147+
await child.close(immediate)
147148
# Otherwise, join the queue
148149
else:
150+
if immediate:
151+
await self.clear_events(True)
152+
return
149153
tasks = [asyncio.create_task(self.queue.join())]
150154
for child in self._children:
151155
tasks.append(asyncio.create_task(child.close()))

0 commit comments

Comments
 (0)