Skip to content

Commit ee9e8e6

Browse files
authored
Merge branch 'main' into fix-bug#367-client_hangs
2 parents d3e95f2 + a371461 commit ee9e8e6

File tree

5 files changed

+81
-20
lines changed

5 files changed

+81
-20
lines changed

src/a2a/server/events/event_consumer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ async def consume_all(self) -> AsyncGenerator[Event]:
135135
except asyncio.TimeoutError: # pyright: ignore [reportUnusedExcept]
136136
# This class was made an alias of build-in TimeoutError after 3.11
137137
continue
138-
except QueueClosed:
138+
except (QueueClosed, asyncio.QueueEmpty):
139139
# Confirm that the queue is closed, e.g. we aren't on
140140
# python 3.12 and get a queue empty error on an open queue
141141
if self.queue.is_closed():

src/a2a/server/events/event_queue.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,12 @@ async def dequeue_event(self, no_wait: bool = False) -> Event:
9090
asyncio.QueueShutDown: If the queue has been closed and is empty.
9191
"""
9292
async with self._lock:
93-
if self._is_closed and self.queue.empty():
93+
if (
94+
sys.version_info < (3, 13)
95+
and self._is_closed
96+
and self.queue.empty()
97+
):
98+
# On 3.13+, skip early raise; await self.queue.get() will raise QueueShutDown after shutdown()
9499
logger.warning('Queue is closed. Event will not be dequeued.')
95100
raise asyncio.QueueEmpty('Queue is closed.')
96101

src/a2a/types.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ class DeleteTaskPushNotificationConfigParams(A2ABaseModel):
295295

296296
id: str
297297
"""
298-
The unique identifier of the task.
298+
The unique identifier (e.g. UUID) of the task.
299299
"""
300300
metadata: dict[str, Any] | None = None
301301
"""
@@ -432,7 +432,7 @@ class GetTaskPushNotificationConfigParams(A2ABaseModel):
432432

433433
id: str
434434
"""
435-
The unique identifier of the task.
435+
The unique identifier (e.g. UUID) of the task.
436436
"""
437437
metadata: dict[str, Any] | None = None
438438
"""
@@ -677,7 +677,7 @@ class ListTaskPushNotificationConfigParams(A2ABaseModel):
677677

678678
id: str
679679
"""
680-
The unique identifier of the task.
680+
The unique identifier (e.g. UUID) of the task.
681681
"""
682682
metadata: dict[str, Any] | None = None
683683
"""
@@ -830,7 +830,7 @@ class PushNotificationConfig(A2ABaseModel):
830830
"""
831831
id: str | None = None
832832
"""
833-
A unique ID for the push notification configuration, set by the client
833+
A unique identifier (e.g. UUID) for the push notification configuration, set by the client
834834
to support multiple notification callbacks.
835835
"""
836836
token: str | None = None
@@ -881,7 +881,7 @@ class TaskIdParams(A2ABaseModel):
881881

882882
id: str
883883
"""
884-
The unique identifier of the task.
884+
The unique identifier (e.g. UUID) of the task.
885885
"""
886886
metadata: dict[str, Any] | None = None
887887
"""
@@ -940,7 +940,7 @@ class TaskPushNotificationConfig(A2ABaseModel):
940940
"""
941941
task_id: str
942942
"""
943-
The ID of the task.
943+
The unique identifier (e.g. UUID) of the task.
944944
"""
945945

946946

@@ -955,7 +955,7 @@ class TaskQueryParams(A2ABaseModel):
955955
"""
956956
id: str
957957
"""
958-
The unique identifier of the task.
958+
The unique identifier (e.g. UUID) of the task.
959959
"""
960960
metadata: dict[str, Any] | None = None
961961
"""
@@ -1376,7 +1376,7 @@ class Artifact(A2ABaseModel):
13761376

13771377
artifact_id: str
13781378
"""
1379-
A unique identifier for the artifact within the scope of the task.
1379+
A unique identifier (e.g. UUID) for the artifact within the scope of the task.
13801380
"""
13811381
description: str | None = None
13821382
"""
@@ -1440,7 +1440,7 @@ class Message(A2ABaseModel):
14401440

14411441
context_id: str | None = None
14421442
"""
1443-
The context identifier for this message, used to group related interactions.
1443+
The context ID for this message, used to group related interactions.
14441444
"""
14451445
extensions: list[str] | None = None
14461446
"""
@@ -1473,7 +1473,7 @@ class Message(A2ABaseModel):
14731473
"""
14741474
task_id: str | None = None
14751475
"""
1476-
The identifier of the task this message is part of. Can be omitted for the first message of a new task.
1476+
The ID of the task this message is part of. Can be omitted for the first message of a new task.
14771477
"""
14781478

14791479

@@ -1863,15 +1863,15 @@ class Task(A2ABaseModel):
18631863
"""
18641864
context_id: str
18651865
"""
1866-
A server-generated identifier for maintaining context across multiple related tasks or interactions.
1866+
A server-generated unique identifier (e.g. UUID) for maintaining context across multiple related tasks or interactions.
18671867
"""
18681868
history: list[Message] | None = None
18691869
"""
18701870
An array of messages exchanged during the task, representing the conversation history.
18711871
"""
18721872
id: str
18731873
"""
1874-
A unique identifier for the task, generated by the server for a new task.
1874+
A unique identifier (e.g. UUID) for the task, generated by the server for a new task.
18751875
"""
18761876
kind: Literal['task'] = 'task'
18771877
"""

tests/server/events/test_event_consumer.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,59 @@ async def test_consume_all_continues_on_queue_empty_if_not_really_closed(
324324
assert mock_event_queue.is_closed.call_count == 1
325325

326326

327+
@pytest.mark.asyncio
328+
async def test_consume_all_handles_queue_empty_when_closed_python_version_agnostic(
329+
event_consumer: EventConsumer, mock_event_queue: AsyncMock, monkeypatch
330+
):
331+
"""Ensure consume_all stops with no events when queue is closed and dequeue_event raises asyncio.QueueEmpty (Python version-agnostic)."""
332+
# Make QueueClosed a distinct exception (not QueueEmpty) to emulate py3.13 semantics
333+
from a2a.server.events import event_consumer as ec
334+
335+
class QueueShutDown(Exception):
336+
pass
337+
338+
monkeypatch.setattr(ec, 'QueueClosed', QueueShutDown, raising=True)
339+
340+
# Simulate queue reporting closed while dequeue raises QueueEmpty
341+
mock_event_queue.dequeue_event.side_effect = asyncio.QueueEmpty(
342+
'closed/empty'
343+
)
344+
mock_event_queue.is_closed.return_value = True
345+
346+
consumed_events = []
347+
async for event in event_consumer.consume_all():
348+
consumed_events.append(event)
349+
350+
assert consumed_events == []
351+
mock_event_queue.dequeue_event.assert_called_once()
352+
mock_event_queue.is_closed.assert_called_once()
353+
354+
355+
@pytest.mark.asyncio
356+
async def test_consume_all_continues_on_queue_empty_when_not_closed(
357+
event_consumer: EventConsumer, mock_event_queue: AsyncMock, monkeypatch
358+
):
359+
"""Ensure consume_all continues after asyncio.QueueEmpty when queue is open, yielding the next (final) event."""
360+
# First dequeue raises QueueEmpty (transient empty), then a final Message arrives
361+
final = Message(role='agent', parts=[{'text': 'done'}], message_id='final')
362+
mock_event_queue.dequeue_event.side_effect = [
363+
asyncio.QueueEmpty('temporarily empty'),
364+
final,
365+
]
366+
mock_event_queue.is_closed.return_value = False
367+
368+
# Make the polling responsive in tests
369+
event_consumer._timeout = 0.001
370+
371+
consumed = []
372+
async for ev in event_consumer.consume_all():
373+
consumed.append(ev)
374+
375+
assert consumed == [final]
376+
assert mock_event_queue.dequeue_event.call_count == 2
377+
mock_event_queue.is_closed.assert_called_once()
378+
379+
327380
def test_agent_task_callback_sets_exception(event_consumer: EventConsumer):
328381
"""Test that agent_task_callback sets _exception if the task had one."""
329382
mock_task = MagicMock(spec=asyncio.Task)

tests/server/events/test_event_queue.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,9 @@ async def test_enqueue_event_propagates_to_children(
169169

170170

171171
@pytest.mark.asyncio
172-
async def test_enqueue_event_when_closed(event_queue: EventQueue) -> None:
172+
async def test_enqueue_event_when_closed(
173+
event_queue: EventQueue, expected_queue_closed_exception
174+
) -> None:
173175
"""Test that no event is enqueued if the parent queue is closed."""
174176
await event_queue.close() # Close the queue first
175177

@@ -178,7 +180,7 @@ async def test_enqueue_event_when_closed(event_queue: EventQueue) -> None:
178180
await event_queue.enqueue_event(event)
179181

180182
# Verify the queue is still empty
181-
with pytest.raises(asyncio.QueueEmpty):
183+
with pytest.raises(expected_queue_closed_exception):
182184
await event_queue.dequeue_event(no_wait=True)
183185

184186
# Also verify child queues are not affected directly by parent's enqueue attempt when closed
@@ -192,7 +194,7 @@ async def test_enqueue_event_when_closed(event_queue: EventQueue) -> None:
192194
await (
193195
child_queue.close()
194196
) # ensure child is also seen as closed for this test's purpose
195-
with pytest.raises(asyncio.QueueEmpty):
197+
with pytest.raises(expected_queue_closed_exception):
196198
await child_queue.dequeue_event(no_wait=True)
197199

198200

@@ -214,7 +216,7 @@ async def test_dequeue_event_closed_and_empty_no_wait(
214216
with pytest.raises(expected_queue_closed_exception):
215217
event_queue.queue.get_nowait()
216218

217-
with pytest.raises(asyncio.QueueEmpty, match='Queue is closed.'):
219+
with pytest.raises(expected_queue_closed_exception):
218220
await event_queue.dequeue_event(no_wait=True)
219221

220222

@@ -230,7 +232,8 @@ async def test_dequeue_event_closed_and_empty_waits_then_raises(
230232

231233
# This test is tricky because await event_queue.dequeue_event() would hang if not for the close check.
232234
# The current implementation's dequeue_event checks `is_closed` first.
233-
# If closed and empty, it raises QueueEmpty immediately.
235+
# If closed and empty, it raises QueueEmpty immediately (on Python <= 3.12).
236+
# On Python 3.13+, this check is skipped and asyncio.Queue.get() raises QueueShutDown instead.
234237
# The "waits_then_raises" scenario described in the subtask implies the `get()` might wait.
235238
# However, the current code:
236239
# async with self._lock:
@@ -240,7 +243,7 @@ async def test_dequeue_event_closed_and_empty_waits_then_raises(
240243
# event = await self.queue.get() -> this line is not reached if closed and empty.
241244

242245
# So, for the current implementation, it will raise QueueEmpty immediately.
243-
with pytest.raises(asyncio.QueueEmpty, match='Queue is closed.'):
246+
with pytest.raises(expected_queue_closed_exception):
244247
await event_queue.dequeue_event(no_wait=False)
245248

246249
# If the implementation were to change to allow `await self.queue.get()`

0 commit comments

Comments
 (0)