Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/a2a/server/events/event_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ async def consume_all(self) -> AsyncGenerator[Event]:
except asyncio.TimeoutError: # pyright: ignore [reportUnusedExcept]
# This class was made an alias of build-in TimeoutError after 3.11
continue
except QueueClosed:
except (QueueClosed, asyncio.QueueEmpty):
# Confirm that the queue is closed, e.g. we aren't on
# python 3.12 and get a queue empty error on an open queue
if self.queue.is_closed():
Expand Down
7 changes: 6 additions & 1 deletion src/a2a/server/events/event_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,12 @@ async def dequeue_event(self, no_wait: bool = False) -> Event:
asyncio.QueueShutDown: If the queue has been closed and is empty.
"""
async with self._lock:
if self._is_closed and self.queue.empty():
if (
sys.version_info < (3, 13)
and self._is_closed
and self.queue.empty()
):
# On 3.13+, skip early raise; await self.queue.get() will raise QueueShutDown after shutdown()
logger.warning('Queue is closed. Event will not be dequeued.')
raise asyncio.QueueEmpty('Queue is closed.')

Expand Down
53 changes: 53 additions & 0 deletions tests/server/events/test_event_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,59 @@ async def test_consume_all_continues_on_queue_empty_if_not_really_closed(
assert mock_event_queue.is_closed.call_count == 1


@pytest.mark.asyncio
async def test_consume_all_handles_queue_empty_when_closed_python_version_agnostic(
event_consumer: EventConsumer, mock_event_queue: AsyncMock, monkeypatch
):
"""Ensure consume_all stops with no events when queue is closed and dequeue_event raises asyncio.QueueEmpty (Python version-agnostic)."""
# Make QueueClosed a distinct exception (not QueueEmpty) to emulate py3.13 semantics
from a2a.server.events import event_consumer as ec

class QueueShutDown(Exception):
pass

monkeypatch.setattr(ec, 'QueueClosed', QueueShutDown, raising=True)

# Simulate queue reporting closed while dequeue raises QueueEmpty
mock_event_queue.dequeue_event.side_effect = asyncio.QueueEmpty(
'closed/empty'
)
mock_event_queue.is_closed.return_value = True

consumed_events = []
async for event in event_consumer.consume_all():
consumed_events.append(event)

assert consumed_events == []
mock_event_queue.dequeue_event.assert_called_once()
mock_event_queue.is_closed.assert_called_once()


@pytest.mark.asyncio
async def test_consume_all_continues_on_queue_empty_when_not_closed(
event_consumer: EventConsumer, mock_event_queue: AsyncMock, monkeypatch
):
"""Ensure consume_all continues after asyncio.QueueEmpty when queue is open, yielding the next (final) event."""
# First dequeue raises QueueEmpty (transient empty), then a final Message arrives
final = Message(role='agent', parts=[{'text': 'done'}], message_id='final')
mock_event_queue.dequeue_event.side_effect = [
asyncio.QueueEmpty('temporarily empty'),
final,
]
mock_event_queue.is_closed.return_value = False

# Make the polling responsive in tests
event_consumer._timeout = 0.001

consumed = []
async for ev in event_consumer.consume_all():
consumed.append(ev)

assert consumed == [final]
assert mock_event_queue.dequeue_event.call_count == 2
mock_event_queue.is_closed.assert_called_once()


def test_agent_task_callback_sets_exception(event_consumer: EventConsumer):
"""Test that agent_task_callback sets _exception if the task had one."""
mock_task = MagicMock(spec=asyncio.Task)
Expand Down
15 changes: 9 additions & 6 deletions tests/server/events/test_event_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,9 @@ async def test_enqueue_event_propagates_to_children(


@pytest.mark.asyncio
async def test_enqueue_event_when_closed(event_queue: EventQueue) -> None:
async def test_enqueue_event_when_closed(
event_queue: EventQueue, expected_queue_closed_exception
) -> None:
"""Test that no event is enqueued if the parent queue is closed."""
await event_queue.close() # Close the queue first

Expand All @@ -178,7 +180,7 @@ async def test_enqueue_event_when_closed(event_queue: EventQueue) -> None:
await event_queue.enqueue_event(event)

# Verify the queue is still empty
with pytest.raises(asyncio.QueueEmpty):
with pytest.raises(expected_queue_closed_exception):
await event_queue.dequeue_event(no_wait=True)

# Also verify child queues are not affected directly by parent's enqueue attempt when closed
Expand All @@ -192,7 +194,7 @@ async def test_enqueue_event_when_closed(event_queue: EventQueue) -> None:
await (
child_queue.close()
) # ensure child is also seen as closed for this test's purpose
with pytest.raises(asyncio.QueueEmpty):
with pytest.raises(expected_queue_closed_exception):
await child_queue.dequeue_event(no_wait=True)


Expand All @@ -214,7 +216,7 @@ async def test_dequeue_event_closed_and_empty_no_wait(
with pytest.raises(expected_queue_closed_exception):
event_queue.queue.get_nowait()

with pytest.raises(asyncio.QueueEmpty, match='Queue is closed.'):
with pytest.raises(expected_queue_closed_exception):
await event_queue.dequeue_event(no_wait=True)


Expand All @@ -230,7 +232,8 @@ async def test_dequeue_event_closed_and_empty_waits_then_raises(

# This test is tricky because await event_queue.dequeue_event() would hang if not for the close check.
# The current implementation's dequeue_event checks `is_closed` first.
# If closed and empty, it raises QueueEmpty immediately.
# If closed and empty, it raises QueueEmpty immediately (on Python <= 3.12).
# On Python 3.13+, this check is skipped and asyncio.Queue.get() raises QueueShutDown instead.
# The "waits_then_raises" scenario described in the subtask implies the `get()` might wait.
# However, the current code:
# async with self._lock:
Expand All @@ -240,7 +243,7 @@ async def test_dequeue_event_closed_and_empty_waits_then_raises(
# event = await self.queue.get() -> this line is not reached if closed and empty.

# So, for the current implementation, it will raise QueueEmpty immediately.
with pytest.raises(asyncio.QueueEmpty, match='Queue is closed.'):
with pytest.raises(expected_queue_closed_exception):
await event_queue.dequeue_event(no_wait=False)

# If the implementation were to change to allow `await self.queue.get()`
Expand Down
Loading