Skip to content

Commit 8ec734c

Browse files
feat: Update to support python 3.12
- add case statement to use the asyncio.Queue.shutdown method for 3.13+ - add special handling to allow for similar semantics as asyncio.Queue.shutdown for 3.12 Tested on multiple samples in the a2a repo and some examples in this repo
1 parent 5b03148 commit 8ec734c

File tree

5 files changed

+49
-11
lines changed

5 files changed

+49
-11
lines changed

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ description = "A2A Python SDK"
55
readme = "README.md"
66
license = { file = "LICENSE" }
77
authors = [{ name = "Google LLC", email = "[email protected]" }]
8-
requires-python = ">=3.13"
8+
requires-python = ">=3.12"
99
keywords = ["A2A", "A2A SDK", "A2A Protocol", "Agent2Agent"]
1010
dependencies = [
1111
"httpx>=0.28.1",
@@ -22,7 +22,7 @@ classifiers = [
2222
"Intended Audience :: Developers",
2323
"Programming Language :: Python",
2424
"Programming Language :: Python :: 3",
25-
"Programming Language :: Python :: 3.13",
25+
"Programming Language :: Python :: 3.12",
2626
"Operating System :: OS Independent",
2727
"Topic :: Software Development :: Libraries :: Python Modules",
2828
"License :: OSI Approved :: Apache Software License",

src/a2a/server/events/event_consumer.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
import logging
3+
import sys
34

45
from collections.abc import AsyncGenerator
56

@@ -14,6 +15,12 @@
1415
from a2a.utils.errors import ServerError
1516
from a2a.utils.telemetry import SpanKind, trace_class
1617

18+
# This is an alias to the execption for closed queue
19+
QueueClosed = asyncio.QueueEmpty
20+
21+
# When using python 3.13 or higher, the closed queue signal is QueueShutdown
22+
if sys.version_info >= (3, 13):
23+
QueueClosed = asyncio.QueueShutDown
1724

1825
logger = logging.getLogger(__name__)
1926

@@ -111,13 +118,16 @@ async def consume_all(self) -> AsyncGenerator[Event]:
111118

112119
if is_final_event:
113120
logger.debug('Stopping event consumption in consume_all.')
114-
self.queue.close()
121+
await self.queue.close()
115122
break
116123
except TimeoutError:
117124
# continue polling until there is a final event
118125
continue
119-
except asyncio.QueueShutDown:
120-
break
126+
except QueueClosed:
127+
# Confirm that the queue is closed, e.g. we aren't on
128+
# python 3.12 and get a queue empty error on an open queue
129+
if self.queue.is_closed():
130+
break
121131

122132
def agent_task_callback(self, agent_task: asyncio.Task[None]):
123133
"""Callback to handle exceptions from the agent's execution task.

src/a2a/server/events/event_queue.py

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
import logging
3+
import sys
34

45
from a2a.types import (
56
A2AError,
@@ -39,6 +40,8 @@ def __init__(self) -> None:
3940
"""Initializes the EventQueue."""
4041
self.queue: asyncio.Queue[Event] = asyncio.Queue()
4142
self._children: list[EventQueue] = []
43+
self._is_closed = False
44+
self._lock = asyncio.Lock()
4245
logger.debug('EventQueue initialized.')
4346

4447
def enqueue_event(self, event: Event):
@@ -47,6 +50,9 @@ def enqueue_event(self, event: Event):
4750
Args:
4851
event: The event object to enqueue.
4952
"""
53+
if self._is_closed:
54+
logger.warning('Queue is closed. Event will not be enqueued.')
55+
return
5056
logger.debug(f'Enqueuing event of type: {type(event)}')
5157
self.queue.put_nowait(event)
5258
for child in self._children:
@@ -66,6 +72,11 @@ async def dequeue_event(self, no_wait: bool = False) -> Event:
6672
asyncio.QueueEmpty: If `no_wait` is True and the queue is empty.
6773
asyncio.QueueShutDown: If the queue has been closed and is empty.
6874
"""
75+
async with self._lock:
76+
if self._is_closed and self.queue.empty():
77+
logger.warning('Queue is closed. Event will not be dequeued.')
78+
raise asyncio.QueueEmpty('Queue is closed.')
79+
6980
if no_wait:
7081
logger.debug('Attempting to dequeue event (no_wait=True).')
7182
event = self.queue.get_nowait()
@@ -99,13 +110,30 @@ def tap(self) -> 'EventQueue':
99110
self._children.append(queue)
100111
return queue
101112

102-
def close(self):
113+
async def close(self):
103114
"""Closes the queue for future push events.
104115
105116
Once closed, `dequeue_event` will eventually raise `asyncio.QueueShutDown`
106117
when the queue is empty. Also closes all child queues.
107118
"""
108119
logger.debug('Closing EventQueue.')
109-
self.queue.shutdown()
110-
for child in self._children:
111-
child.close()
120+
async with self._lock:
121+
# If already closed, just return.
122+
if self._is_closed:
123+
return
124+
self._is_closed = True
125+
# If using python 3.13 or higher, use the shutdown method
126+
if sys.version_info >= (3, 13):
127+
self.queue.shutdown()
128+
for child in self._children:
129+
child.close()
130+
# Otherwise, join the queue
131+
else:
132+
tasks = [asyncio.create_task(self.queue.join())]
133+
for child in self._children:
134+
tasks.append(asyncio.create_task(child.close()))
135+
await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
136+
137+
def is_closed(self) -> bool:
138+
"""Checks if the queue is closed."""
139+
return self._is_closed

src/a2a/server/events/in_memory_queue_manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ async def close(self, task_id: str):
6969
if task_id not in self._task_queue:
7070
raise NoTaskQueue()
7171
queue = self._task_queue.pop(task_id)
72-
queue.close()
72+
await queue.close()
7373

7474
async def create_or_tap(self, task_id: str) -> EventQueue:
7575
"""Creates a new event queue for a task ID if one doesn't exist, otherwise taps the existing one.

src/a2a/server/request_handlers/default_request_handler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ async def _run_event_stream(
147147
queue: The event queue for the agent to publish to.
148148
"""
149149
await self.agent_executor.execute(request, queue)
150-
queue.close()
150+
await queue.close()
151151

152152
async def on_message_send(
153153
self, params: MessageSendParams

0 commit comments

Comments
 (0)