Skip to content

Commit 9b0f264

Browse files
Update the system to refactor for multi transport support
- Introduced some new abstraction layers to separate the logic - Simplified the AgentExecutor interface to allow for simpler support - Created some convenience methods for the agent builders to simplify the creation of the Task and Messages events. Change-Id: I6b0f2295fcf5045e12099030be80d92f287d62db
2 parents 58a3aa2 + 58a3aa2 commit 9b0f264

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+1934
-602
lines changed

.coveragerc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
[run]
2+
branch = True
3+
4+
[report]
5+
exclude_also =
6+
pass
7+
import
8+
@abstractmethod

.python-version

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
3.10
1+
3.13

README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# A2A SDK
22

3+
## SDK
4+
5+
This is the python SDK for A2A compatible agents.
6+
37
## Type generation from spec
48

59
<!-- TODO replace spec.json with the public url so we always get the latest version-->
@@ -12,4 +16,4 @@ uv run datamodel-codegen --input ./spec.json --input-file-type jsonschema --outp
1216

1317
```bash
1418
uv run pytest
15-
```
19+
```

examples/helloworld/__main__.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
from agent_executor import HelloWorldAgentExecutor
22

3-
from a2a.server import A2AServer
4-
from a2a.server.request_handlers import DefaultA2ARequestHandler
3+
from a2a.server.request_handlers import DefaultRequestHandler
4+
from a2a.server.tasks import InMemoryTaskStore
5+
from a2a.server.apps import A2AStarletteApplication
56
from a2a.types import (
67
AgentAuthentication,
78
AgentCapabilities,
@@ -31,9 +32,12 @@
3132
authentication=AgentAuthentication(schemes=['public']),
3233
)
3334

34-
request_handler = DefaultA2ARequestHandler(
35-
agent_executor=HelloWorldAgentExecutor()
35+
request_handler = DefaultRequestHandler(
36+
agent_executor=HelloWorldAgentExecutor(),
37+
task_store=InMemoryTaskStore(),
3638
)
3739

38-
server = A2AServer(agent_card=agent_card, request_handler=request_handler)
39-
server.start(host='0.0.0.0', port=9999)
40+
server = A2AStarletteApplication(agent_card=agent_card, http_handler=request_handler)
41+
import uvicorn
42+
43+
uvicorn.run(server.build(), host='0.0.0.0', port=9999)

examples/helloworld/agent_executor.py

Lines changed: 11 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@
55
from uuid import uuid4
66

77
from typing_extensions import override
8-
9-
from a2a.server.agent_execution import BaseAgentExecutor
10-
from a2a.server.events import EventQueue
8+
from a2a.server.agent_execution import AgentExecutor, RequestContext
9+
from a2a.server.events import Event, EventQueue
10+
from a2a.utils import new_agent_text_message
1111
from a2a.types import (
1212
Message,
1313
Part,
@@ -18,53 +18,28 @@
1818
TextPart,
1919
)
2020

21-
2221
class HelloWorldAgent:
2322
"""Hello World Agent."""
2423

25-
async def invoke(self):
26-
return 'Hello World'
24+
async def invoke(self) -> str:
25+
return 'Hello World'
2726

28-
async def stream(self) -> AsyncGenerator[dict[str, Any], None]:
29-
yield {'content': 'Hello ', 'done': False}
30-
await asyncio.sleep(2)
31-
yield {'content': 'World', 'done': True}
3227

33-
34-
class HelloWorldAgentExecutor(BaseAgentExecutor):
28+
class HelloWorldAgentExecutor(AgentExecutor):
3529
"""Test AgentProxy Implementation."""
3630

3731
def __init__(self):
3832
self.agent = HelloWorldAgent()
3933

4034
@override
41-
async def on_message_send(
35+
async def execute(
4236
self,
43-
request: SendMessageRequest,
37+
request: RequestContext,
4438
event_queue: EventQueue,
45-
task: Task | None,
4639
) -> None:
4740
result = await self.agent.invoke()
48-
49-
message: Message = Message(
50-
role=Role.agent,
51-
parts=[Part(root=TextPart(text=result))],
52-
messageId=str(uuid4()),
53-
)
54-
event_queue.enqueue_event(message)
41+
event_queue.enqueue_event(new_agent_text_message(result))
5542

5643
@override
57-
async def on_message_stream(
58-
self,
59-
request: SendStreamingMessageRequest,
60-
event_queue: EventQueue,
61-
task: Task | None,
62-
) -> None:
63-
async for chunk in self.agent.stream():
64-
message: Message = Message(
65-
role=Role.agent,
66-
parts=[Part(root=TextPart(text=chunk['content']))],
67-
messageId=str(uuid4()),
68-
final=chunk['done'],
69-
)
70-
event_queue.enqueue_event(message)
44+
async def cancel(self, request: RequestContext, event_queue: EventQueue) -> Task | None:
45+
raise Exception("cancel not supported")

examples/helloworld/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ name = "helloworld"
33
version = "0.1.0"
44
description = "HelloWorld agent example that only returns Messages"
55
readme = "README.md"
6-
requires-python = ">=3.10"
6+
requires-python = ">=3.13"
77
dependencies = [
88
"a2a",
99
"click>=8.1.8",

examples/langgraph/__main__.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@
88
from dotenv import load_dotenv
99

1010

11-
from a2a.server.request_handlers import DefaultA2ARequestHandler
12-
from a2a.server import A2AServer
11+
from a2a.server.request_handlers import DefaultRequestHandler
12+
from a2a.server.tasks import InMemoryTaskStore
13+
from a2a.server.apps import A2AStarletteApplication
1314
from a2a.types import (
1415
AgentAuthentication,
1516
AgentCapabilities,
@@ -29,14 +30,17 @@ def main(host: str, port: int):
2930
print('GOOGLE_API_KEY environment variable not set.')
3031
sys.exit(1)
3132

32-
request_handler = DefaultA2ARequestHandler(
33-
agent_executor=CurrencyAgentExecutor()
33+
request_handler = DefaultRequestHandler(
34+
agent_executor=CurrencyAgentExecutor(),
35+
task_store=InMemoryTaskStore(),
3436
)
3537

36-
server = A2AServer(
37-
agent_card=get_agent_card(host, port), request_handler=request_handler
38-
)
39-
server.start(host=host, port=port)
38+
server = A2AStarletteApplication(
39+
agent_card=get_agent_card(host, port),
40+
http_handler=request_handler)
41+
import uvicorn
42+
43+
uvicorn.run(server.build(), host=host, port=port)
4044

4145

4246
def get_agent_card(host: str, port: int):

examples/langgraph/agent.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,6 @@ def __init__(self):
9494
response_format=(self.RESPONSE_FORMAT_INSTRUCTION, ResponseFormat),
9595
)
9696

97-
def invoke(self, query: str, sessionId: str) -> dict[str, Any]:
98-
config: RunnableConfig = {'configurable': {'thread_id': sessionId}}
99-
self.graph.invoke({'messages': [('user', query)]}, config)
100-
return self.get_agent_response(config)
101-
10297
async def stream(
10398
self, query: str, sessionId: str
10499
) -> AsyncIterable[dict[str, Any]]:

examples/langgraph/agent_executor.py

Lines changed: 72 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,81 +1,97 @@
11
from typing import Any
22

33
from agent import CurrencyAgent
4-
from helpers import (
5-
process_streaming_agent_response,
6-
update_task_with_agent_response,
7-
)
84
from typing_extensions import override
95

10-
from a2a.server.agent_execution import BaseAgentExecutor
11-
from a2a.server.events.event_queue import EventQueue
6+
from a2a.server.agent_execution import AgentExecutor, RequestContext
7+
from a2a.server.events.event_queue import EventQueue, Event
8+
from a2a.utils import new_agent_text_message, new_text_artifact, new_task
129
from a2a.types import (
13-
MessageSendParams,
14-
SendMessageRequest,
15-
SendStreamingMessageRequest,
1610
Task,
1711
TextPart,
12+
TaskStatusUpdateEvent,
13+
TaskArtifactUpdateEvent,
14+
TaskStatus,
15+
TaskState,
1816
)
19-
from a2a.utils import create_task_obj
2017

2118

22-
class CurrencyAgentExecutor(BaseAgentExecutor):
19+
class CurrencyAgentExecutor(AgentExecutor):
2320
"""Currency AgentExecutor Example."""
2421

2522
def __init__(self):
2623
self.agent = CurrencyAgent()
2724

2825
@override
29-
async def on_message_send(
30-
self,
31-
request: SendMessageRequest,
32-
event_queue: EventQueue,
33-
task: Task | None,
34-
) -> None:
35-
"""Handler for 'message/send' requests."""
36-
params: MessageSendParams = request.params
37-
query = self._get_user_query(params)
38-
39-
if not task:
40-
task = create_task_obj(params)
41-
42-
# invoke the underlying agent
43-
agent_response: dict[str, Any] = self.agent.invoke(
44-
query, task.contextId
45-
)
46-
update_task_with_agent_response(task, agent_response)
47-
event_queue.enqueue_event(task)
48-
49-
@override
50-
async def on_message_stream(
26+
async def execute(
5127
self,
52-
request: SendStreamingMessageRequest,
28+
context: RequestContext,
5329
event_queue: EventQueue,
54-
task: Task | None,
5530
) -> None:
56-
"""Handler for 'message/stream' requests."""
57-
params: MessageSendParams = request.params
58-
query = self._get_user_query(params)
31+
query = context.get_user_input()
32+
task = context.current_task
5933

6034
if not task:
61-
task = create_task_obj(params)
62-
# emit the initial task so it is persisted to TaskStore
35+
task = new_task(context.message)
6336
event_queue.enqueue_event(task)
37+
# invoke the underlying agent, using streaming results
38+
async for event in self.agent.stream(query, task.contextId):
39+
if event['is_task_complete']:
40+
event_queue.enqueue_event(
41+
TaskArtifactUpdateEvent(
42+
append=False,
43+
contextId=task.contextId,
44+
taskId=task.id,
45+
lastChunk=True,
46+
artifact=new_text_artifact(
47+
name='current_result',
48+
description='Result of request to agent.',
49+
text=event['content'],
50+
),
51+
)
52+
)
53+
event_queue.enqueue_event(
54+
TaskStatusUpdateEvent(
55+
status=TaskStatus(state=TaskState.completed),
56+
final=True,
57+
contextId=task.contextId,
58+
taskId=task.id,
59+
)
60+
)
61+
elif event['require_user_input']:
62+
event_queue.enqueue_event(
63+
TaskStatusUpdateEvent(
64+
status=TaskStatus(
65+
state=TaskState.input_required,
66+
message=new_agent_text_message(
67+
event['content'],
68+
task.contextId,
69+
task.id,
70+
),
71+
),
72+
final=True,
73+
contextId=task.contextId,
74+
taskId=task.id,
75+
)
76+
)
77+
else:
78+
event_queue.enqueue_event(
79+
TaskStatusUpdateEvent(
80+
status=TaskStatus(
81+
state=TaskState.working,
82+
message=new_agent_text_message(
83+
event['content'],
84+
task.contextId,
85+
task.id,
86+
),
87+
),
88+
final=False,
89+
contextId=task.contextId,
90+
taskId=task.id,
91+
)
92+
)
6493

65-
# kickoff the streaming agent and process responses
66-
async for item in self.agent.stream(query, task.contextId):
67-
task_artifact_update_event, task_status_event = (
68-
process_streaming_agent_response(task, item)
69-
)
70-
71-
if task_artifact_update_event:
72-
event_queue.enqueue_event(task_artifact_update_event)
73-
74-
event_queue.enqueue_event(task_status_event)
94+
@override
95+
async def cancel(self, request: RequestContext, event_queue: EventQueue) -> Task | None:
96+
raise Exception("cancel not supported")
7597

76-
def _get_user_query(self, task_send_params: MessageSendParams) -> str:
77-
"""Helper to get user query from task send params."""
78-
part = task_send_params.message.parts[0].root
79-
if not isinstance(part, TextPart):
80-
raise ValueError('Only text parts are supported')
81-
return part.text

examples/langgraph/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ name = "langgraph-example"
33
version = "0.1.0"
44
description = "Currency conversion agent example"
55
readme = "README.md"
6-
requires-python = ">=3.10"
6+
requires-python = ">=3.13"
77
dependencies = [
88
"a2a",
99
"click>=8.1.8",

0 commit comments

Comments
 (0)