Skip to content

Commit dbc4a12

Browse files
author
taralesc
committed
fix:“Queue is closed” on Python 3.13
1 parent 1dbe33d commit dbc4a12

File tree

3 files changed

+60
-2
lines changed

3 files changed

+60
-2
lines changed

src/a2a/server/events/event_consumer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ async def consume_all(self) -> AsyncGenerator[Event]:
135135
except asyncio.TimeoutError: # pyright: ignore [reportUnusedExcept]
136136
# This class was made an alias of build-in TimeoutError after 3.11
137137
continue
138-
except QueueClosed:
138+
except (QueueClosed, asyncio.QueueEmpty):
139139
# Confirm that the queue is closed, e.g. we aren't on
140140
# python 3.12 and get a queue empty error on an open queue
141141
if self.queue.is_closed():

src/a2a/server/events/event_queue.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,12 @@ async def dequeue_event(self, no_wait: bool = False) -> Event:
9090
asyncio.QueueShutDown: If the queue has been closed and is empty.
9191
"""
9292
async with self._lock:
93-
if self._is_closed and self.queue.empty():
93+
if (
94+
sys.version_info < (3, 13)
95+
and self._is_closed
96+
and self.queue.empty()
97+
):
98+
# On 3.13+, skip early raise; await self.queue.get() will raise QueueShutDown after shutdown()
9499
logger.warning('Queue is closed. Event will not be dequeued.')
95100
raise asyncio.QueueEmpty('Queue is closed.')
96101

tests/server/events/test_event_consumer.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,59 @@ async def test_consume_all_continues_on_queue_empty_if_not_really_closed(
324324
assert mock_event_queue.is_closed.call_count == 1
325325

326326

327+
@pytest.mark.asyncio
328+
async def test_consume_all_handles_queue_empty_when_closed_python_version_agnostic(
329+
event_consumer: EventConsumer, mock_event_queue: AsyncMock, monkeypatch
330+
):
331+
"""Ensure consume_all stops with no events when queue is closed and dequeue_event raises asyncio.QueueEmpty (Python version-agnostic)."""
332+
# Make QueueClosed a distinct exception (not QueueEmpty) to emulate py3.13 semantics
333+
from a2a.server.events import event_consumer as ec
334+
335+
class QueueShutDown(Exception):
336+
pass
337+
338+
monkeypatch.setattr(ec, 'QueueClosed', QueueShutDown, raising=True)
339+
340+
# Simulate queue reporting closed while dequeue raises QueueEmpty
341+
mock_event_queue.dequeue_event.side_effect = asyncio.QueueEmpty(
342+
'closed/empty'
343+
)
344+
mock_event_queue.is_closed.return_value = True
345+
346+
consumed_events = []
347+
async for event in event_consumer.consume_all():
348+
consumed_events.append(event)
349+
350+
assert consumed_events == []
351+
mock_event_queue.dequeue_event.assert_called_once()
352+
mock_event_queue.is_closed.assert_called_once()
353+
354+
355+
@pytest.mark.asyncio
356+
async def test_consume_all_continues_on_queue_empty_when_not_closed(
357+
event_consumer: EventConsumer, mock_event_queue: AsyncMock, monkeypatch
358+
):
359+
"""Ensure consume_all continues after asyncio.QueueEmpty when queue is open, yielding the next (final) event."""
360+
# First dequeue raises QueueEmpty (transient empty), then a final Message arrives
361+
final = Message(role='agent', parts=[{'text': 'done'}], message_id='final')
362+
mock_event_queue.dequeue_event.side_effect = [
363+
asyncio.QueueEmpty('temporarily empty'),
364+
final,
365+
]
366+
mock_event_queue.is_closed.return_value = False
367+
368+
# Make the polling responsive in tests
369+
event_consumer._timeout = 0.001
370+
371+
consumed = []
372+
async for ev in event_consumer.consume_all():
373+
consumed.append(ev)
374+
375+
assert consumed == [final]
376+
assert mock_event_queue.dequeue_event.call_count == 2
377+
mock_event_queue.is_closed.assert_called_once()
378+
379+
327380
def test_agent_task_callback_sets_exception(event_consumer: EventConsumer):
328381
"""Test that agent_task_callback sets _exception if the task had one."""
329382
mock_task = MagicMock(spec=asyncio.Task)

0 commit comments

Comments
 (0)