diff --git a/src/a2a/server/events/event_consumer.py b/src/a2a/server/events/event_consumer.py index 23ab9487..200e8019 100644 --- a/src/a2a/server/events/event_consumer.py +++ b/src/a2a/server/events/event_consumer.py @@ -135,7 +135,7 @@ async def consume_all(self) -> AsyncGenerator[Event]: except asyncio.TimeoutError: # pyright: ignore [reportUnusedExcept] # This class was made an alias of build-in TimeoutError after 3.11 continue - except QueueClosed: + except (QueueClosed, asyncio.QueueEmpty): # Confirm that the queue is closed, e.g. we aren't on # python 3.12 and get a queue empty error on an open queue if self.queue.is_closed(): diff --git a/src/a2a/server/events/event_queue.py b/src/a2a/server/events/event_queue.py index 6bf9650c..bcb02424 100644 --- a/src/a2a/server/events/event_queue.py +++ b/src/a2a/server/events/event_queue.py @@ -90,7 +90,12 @@ async def dequeue_event(self, no_wait: bool = False) -> Event: asyncio.QueueShutDown: If the queue has been closed and is empty. """ async with self._lock: - if self._is_closed and self.queue.empty(): + if ( + sys.version_info < (3, 13) + and self._is_closed + and self.queue.empty() + ): + # On 3.13+, skip early raise; await self.queue.get() will raise QueueShutDown after shutdown() logger.warning('Queue is closed. Event will not be dequeued.') raise asyncio.QueueEmpty('Queue is closed.') diff --git a/tests/server/events/test_event_consumer.py b/tests/server/events/test_event_consumer.py index 4116fabd..d306418e 100644 --- a/tests/server/events/test_event_consumer.py +++ b/tests/server/events/test_event_consumer.py @@ -324,6 +324,59 @@ async def test_consume_all_continues_on_queue_empty_if_not_really_closed( assert mock_event_queue.is_closed.call_count == 1 +@pytest.mark.asyncio +async def test_consume_all_handles_queue_empty_when_closed_python_version_agnostic( + event_consumer: EventConsumer, mock_event_queue: AsyncMock, monkeypatch +): + """Ensure consume_all stops with no events when queue is closed and dequeue_event raises asyncio.QueueEmpty (Python version-agnostic).""" + # Make QueueClosed a distinct exception (not QueueEmpty) to emulate py3.13 semantics + from a2a.server.events import event_consumer as ec + + class QueueShutDown(Exception): + pass + + monkeypatch.setattr(ec, 'QueueClosed', QueueShutDown, raising=True) + + # Simulate queue reporting closed while dequeue raises QueueEmpty + mock_event_queue.dequeue_event.side_effect = asyncio.QueueEmpty( + 'closed/empty' + ) + mock_event_queue.is_closed.return_value = True + + consumed_events = [] + async for event in event_consumer.consume_all(): + consumed_events.append(event) + + assert consumed_events == [] + mock_event_queue.dequeue_event.assert_called_once() + mock_event_queue.is_closed.assert_called_once() + + +@pytest.mark.asyncio +async def test_consume_all_continues_on_queue_empty_when_not_closed( + event_consumer: EventConsumer, mock_event_queue: AsyncMock, monkeypatch +): + """Ensure consume_all continues after asyncio.QueueEmpty when queue is open, yielding the next (final) event.""" + # First dequeue raises QueueEmpty (transient empty), then a final Message arrives + final = Message(role='agent', parts=[{'text': 'done'}], message_id='final') + mock_event_queue.dequeue_event.side_effect = [ + asyncio.QueueEmpty('temporarily empty'), + final, + ] + mock_event_queue.is_closed.return_value = False + + # Make the polling responsive in tests + event_consumer._timeout = 0.001 + + consumed = [] + async for ev in event_consumer.consume_all(): + consumed.append(ev) + + assert consumed == [final] + assert mock_event_queue.dequeue_event.call_count == 2 + mock_event_queue.is_closed.assert_called_once() + + def test_agent_task_callback_sets_exception(event_consumer: EventConsumer): """Test that agent_task_callback sets _exception if the task had one.""" mock_task = MagicMock(spec=asyncio.Task) diff --git a/tests/server/events/test_event_queue.py b/tests/server/events/test_event_queue.py index ecb7d814..7befcd39 100644 --- a/tests/server/events/test_event_queue.py +++ b/tests/server/events/test_event_queue.py @@ -169,7 +169,9 @@ async def test_enqueue_event_propagates_to_children( @pytest.mark.asyncio -async def test_enqueue_event_when_closed(event_queue: EventQueue) -> None: +async def test_enqueue_event_when_closed( + event_queue: EventQueue, expected_queue_closed_exception +) -> None: """Test that no event is enqueued if the parent queue is closed.""" await event_queue.close() # Close the queue first @@ -178,7 +180,7 @@ async def test_enqueue_event_when_closed(event_queue: EventQueue) -> None: await event_queue.enqueue_event(event) # Verify the queue is still empty - with pytest.raises(asyncio.QueueEmpty): + with pytest.raises(expected_queue_closed_exception): await event_queue.dequeue_event(no_wait=True) # Also verify child queues are not affected directly by parent's enqueue attempt when closed @@ -192,7 +194,7 @@ async def test_enqueue_event_when_closed(event_queue: EventQueue) -> None: await ( child_queue.close() ) # ensure child is also seen as closed for this test's purpose - with pytest.raises(asyncio.QueueEmpty): + with pytest.raises(expected_queue_closed_exception): await child_queue.dequeue_event(no_wait=True) @@ -214,7 +216,7 @@ async def test_dequeue_event_closed_and_empty_no_wait( with pytest.raises(expected_queue_closed_exception): event_queue.queue.get_nowait() - with pytest.raises(asyncio.QueueEmpty, match='Queue is closed.'): + with pytest.raises(expected_queue_closed_exception): await event_queue.dequeue_event(no_wait=True) @@ -230,7 +232,8 @@ async def test_dequeue_event_closed_and_empty_waits_then_raises( # This test is tricky because await event_queue.dequeue_event() would hang if not for the close check. # The current implementation's dequeue_event checks `is_closed` first. - # If closed and empty, it raises QueueEmpty immediately. + # If closed and empty, it raises QueueEmpty immediately (on Python <= 3.12). + # On Python 3.13+, this check is skipped and asyncio.Queue.get() raises QueueShutDown instead. # The "waits_then_raises" scenario described in the subtask implies the `get()` might wait. # However, the current code: # async with self._lock: @@ -240,7 +243,7 @@ async def test_dequeue_event_closed_and_empty_waits_then_raises( # event = await self.queue.get() -> this line is not reached if closed and empty. # So, for the current implementation, it will raise QueueEmpty immediately. - with pytest.raises(asyncio.QueueEmpty, match='Queue is closed.'): + with pytest.raises(expected_queue_closed_exception): await event_queue.dequeue_event(no_wait=False) # If the implementation were to change to allow `await self.queue.get()`