Skip to content

Commit 5eb03ff

Browse files
author
taralesc
committed
fix:“Queue is closed” on Python 3.13
1 parent 1dbe33d commit 5eb03ff

File tree

3 files changed

+54
-2
lines changed

3 files changed

+54
-2
lines changed

src/a2a/server/events/event_consumer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ async def consume_all(self) -> AsyncGenerator[Event]:
135135
except asyncio.TimeoutError: # pyright: ignore [reportUnusedExcept]
136136
# This class was made an alias of build-in TimeoutError after 3.11
137137
continue
138-
except QueueClosed:
138+
except (QueueClosed, asyncio.QueueEmpty):
139139
# Confirm that the queue is closed, e.g. we aren't on
140140
# python 3.12 and get a queue empty error on an open queue
141141
if self.queue.is_closed():

src/a2a/server/events/event_queue.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,8 @@ async def dequeue_event(self, no_wait: bool = False) -> Event:
9090
asyncio.QueueShutDown: If the queue has been closed and is empty.
9191
"""
9292
async with self._lock:
93-
if self._is_closed and self.queue.empty():
93+
if sys.version_info < (3, 13) and self._is_closed and self.queue.empty():
94+
# On 3.13+, skip early raise; await self.queue.get() will raise QueueShutDown after shutdown()
9495
logger.warning('Queue is closed. Event will not be dequeued.')
9596
raise asyncio.QueueEmpty('Queue is closed.')
9697

tests/server/events/test_event_consumer.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,57 @@ async def test_consume_all_continues_on_queue_empty_if_not_really_closed(
323323
# The second QueueClosed is not reached because Message breaks the loop.
324324
assert mock_event_queue.is_closed.call_count == 1
325325

326+
@pytest.mark.asyncio
327+
async def test_consume_all_handles_queue_empty_when_closed_python_version_agnostic(
328+
event_consumer: EventConsumer, mock_event_queue: AsyncMock, monkeypatch
329+
):
330+
"""Ensure consume_all stops with no events when queue is closed and dequeue_event raises asyncio.QueueEmpty (Python version-agnostic)."""
331+
# Make QueueClosed a distinct exception (not QueueEmpty) to emulate py3.13 semantics
332+
from a2a.server.events import event_consumer as ec
333+
334+
class QueueShutDown(Exception):
335+
pass
336+
337+
monkeypatch.setattr(ec, "QueueClosed", QueueShutDown, raising=True)
338+
339+
# Simulate queue reporting closed while dequeue raises QueueEmpty
340+
mock_event_queue.dequeue_event.side_effect = asyncio.QueueEmpty("closed/empty")
341+
mock_event_queue.is_closed.return_value = True
342+
343+
344+
consumed_events = []
345+
async for event in event_consumer.consume_all():
346+
consumed_events.append(event)
347+
348+
assert consumed_events == []
349+
mock_event_queue.dequeue_event.assert_called_once()
350+
mock_event_queue.is_closed.assert_called_once()
351+
352+
353+
@pytest.mark.asyncio
354+
async def test_consume_all_continues_on_queue_empty_when_not_closed(
355+
event_consumer: EventConsumer, mock_event_queue: AsyncMock, monkeypatch
356+
):
357+
"""Ensure consume_all continues after asyncio.QueueEmpty when queue is open, yielding the next (final) event."""
358+
# First dequeue raises QueueEmpty (transient empty), then a final Message arrives
359+
final = Message(role='agent', parts=[{'text': 'done'}], message_id='final')
360+
mock_event_queue.dequeue_event.side_effect = [
361+
asyncio.QueueEmpty('temporarily empty'),
362+
final,
363+
]
364+
mock_event_queue.is_closed.return_value = False
365+
366+
# Make the polling responsive in tests
367+
event_consumer._timeout = 0.001
368+
369+
consumed = []
370+
async for ev in event_consumer.consume_all():
371+
consumed.append(ev)
372+
373+
assert consumed == [final]
374+
assert mock_event_queue.dequeue_event.call_count == 2
375+
mock_event_queue.is_closed.assert_called_once()
376+
326377

327378
def test_agent_task_callback_sets_exception(event_consumer: EventConsumer):
328379
"""Test that agent_task_callback sets _exception if the task had one."""

0 commit comments

Comments
 (0)