Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/a2a/server/events/event_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ async def consume_all(self) -> AsyncGenerator[Event]:
except asyncio.TimeoutError: # pyright: ignore [reportUnusedExcept]
# This class was made an alias of build-in TimeoutError after 3.11
continue
except QueueClosed:
except (QueueClosed, asyncio.QueueEmpty):
# Confirm that the queue is closed, e.g. we aren't on
# python 3.12 and get a queue empty error on an open queue
if self.queue.is_closed():
Expand Down
7 changes: 6 additions & 1 deletion src/a2a/server/events/event_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,12 @@ async def dequeue_event(self, no_wait: bool = False) -> Event:
asyncio.QueueShutDown: If the queue has been closed and is empty.
"""
async with self._lock:
if self._is_closed and self.queue.empty():
if (
sys.version_info < (3, 13)
and self._is_closed
and self.queue.empty()
):
# On 3.13+, skip early raise; await self.queue.get() will raise QueueShutDown after shutdown()
logger.warning('Queue is closed. Event will not be dequeued.')
raise asyncio.QueueEmpty('Queue is closed.')

Expand Down
53 changes: 53 additions & 0 deletions tests/server/events/test_event_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,59 @@ async def test_consume_all_continues_on_queue_empty_if_not_really_closed(
assert mock_event_queue.is_closed.call_count == 1


@pytest.mark.asyncio
async def test_consume_all_handles_queue_empty_when_closed_python_version_agnostic(
event_consumer: EventConsumer, mock_event_queue: AsyncMock, monkeypatch
):
"""Ensure consume_all stops with no events when queue is closed and dequeue_event raises asyncio.QueueEmpty (Python version-agnostic)."""
# Make QueueClosed a distinct exception (not QueueEmpty) to emulate py3.13 semantics
from a2a.server.events import event_consumer as ec

class QueueShutDown(Exception):
pass

monkeypatch.setattr(ec, 'QueueClosed', QueueShutDown, raising=True)

# Simulate queue reporting closed while dequeue raises QueueEmpty
mock_event_queue.dequeue_event.side_effect = asyncio.QueueEmpty(
'closed/empty'
)
mock_event_queue.is_closed.return_value = True

consumed_events = []
async for event in event_consumer.consume_all():
consumed_events.append(event)

assert consumed_events == []
mock_event_queue.dequeue_event.assert_called_once()
mock_event_queue.is_closed.assert_called_once()


@pytest.mark.asyncio
async def test_consume_all_continues_on_queue_empty_when_not_closed(
event_consumer: EventConsumer, mock_event_queue: AsyncMock, monkeypatch
):
"""Ensure consume_all continues after asyncio.QueueEmpty when queue is open, yielding the next (final) event."""
# First dequeue raises QueueEmpty (transient empty), then a final Message arrives
final = Message(role='agent', parts=[{'text': 'done'}], message_id='final')
mock_event_queue.dequeue_event.side_effect = [
asyncio.QueueEmpty('temporarily empty'),
final,
]
mock_event_queue.is_closed.return_value = False

# Make the polling responsive in tests
event_consumer._timeout = 0.001

consumed = []
async for ev in event_consumer.consume_all():
consumed.append(ev)

assert consumed == [final]
assert mock_event_queue.dequeue_event.call_count == 2
mock_event_queue.is_closed.assert_called_once()


def test_agent_task_callback_sets_exception(event_consumer: EventConsumer):
"""Test that agent_task_callback sets _exception if the task had one."""
mock_task = MagicMock(spec=asyncio.Task)
Expand Down
Loading