Skip to content

Commit d7e3bce

Browse files
Add gRPC based support in the SDK.
- Introduces an A2AGrpcClient to talk to server over grpc - Introduces GrpcHandler to tranlate the gRPC transport to the internal python data model and back. - A set of transform operations in proto_utils.py to handle the transform This is a starting point and can be iterated and optimized as we move forward, especially trying to automate the transform code so it stays in sync.
1 parent 3526a2a commit d7e3bce

File tree

12 files changed

+2552
-2
lines changed

12 files changed

+2552
-2
lines changed

buf.gen.yaml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,12 @@ managed:
1919
plugins:
2020
# Generate python protobuf related code
2121
# Generates *_pb2.py files, one for each .proto
22-
- remote: buf.build/protocolbuffers/python
22+
- remote: buf.build/protocolbuffers/python:v29.3
2323
out: src/a2a/grpc
2424
# Generate python service code.
2525
# Generates *_pb2_grpc.py
2626
- remote: buf.build/grpc/python
2727
out: src/a2a/grpc
28+
# Generates *_pb2.pyi files.
29+
- remote: buf.build/protocolbuffers/pyi:v29.3
30+
out: src/a2a/grpc

src/a2a/client/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""Client-side components for interacting with an A2A agent."""
22

33
from a2a.client.client import A2ACardResolver, A2AClient
4+
from a2a.client.grpc_client import A2AGrpcClient
45
from a2a.client.errors import (
56
A2AClientError,
67
A2AClientHTTPError,
@@ -15,5 +16,6 @@
1516
'A2AClientError',
1617
'A2AClientHTTPError',
1718
'A2AClientJSONError',
19+
'A2AGrpcClient',
1820
'create_text_message_object',
1921
]

src/a2a/client/grpc_client.py

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
import json
2+
import logging
3+
from collections.abc import AsyncGenerator
4+
from typing import Any
5+
from uuid import uuid4
6+
import grpc
7+
8+
from a2a.client.errors import A2AClientHTTPError, A2AClientJSONError
9+
from a2a.types import (
10+
AgentCard,
11+
MessageSendParams,
12+
Task,
13+
TaskStatusUpdateEvent,
14+
TaskArtifactUpdateEvent,
15+
TaskPushNotificationConfig,
16+
TaskIdParams,
17+
TaskQueryParams,
18+
Message,
19+
)
20+
from a2a.utils.telemetry import SpanKind, trace_class
21+
from a2a.utils import proto_utils
22+
from a2a.grpc import a2a_pb2_grpc
23+
from a2a.grpc import a2a_pb2
24+
25+
logger = logging.getLogger(__name__)
26+
27+
28+
@trace_class(kind=SpanKind.CLIENT)
29+
class A2AGrpcClient:
30+
"""A2A Client for interacting with an A2A agent via gRPC."""
31+
32+
def __init__(
33+
self,
34+
grpc_stub: a2a_pb2_grpc.A2AServiceStub,
35+
agent_card: AgentCard,
36+
):
37+
"""Initializes the A2AGrpcClient.
38+
39+
Requires an `AgentCard`
40+
41+
Args:
42+
grpc_stub: A grpc client stub.
43+
agent_card: The agent card object.
44+
"""
45+
self.agent_card = agent_card
46+
self.stub = grpc_stub
47+
48+
async def send_message(
49+
self,
50+
request: MessageSendParams,
51+
) -> Task | Message :
52+
"""Sends a non-streaming message request to the agent.
53+
54+
Args:
55+
request: The `MessageSendParams` object containing the message and configuration.
56+
57+
Returns:
58+
A `Task` or `Message` object containing the agent's response.
59+
"""
60+
response = await self.stub.SendMessage(
61+
a2a_pb2.SendMessageRequest(
62+
request=proto_utils.ToProto.message(request.message),
63+
configuration=proto_utils.ToProto.message_send_configuration(
64+
request.configuration
65+
),
66+
metadata=proto_utils.ToProto.metadata(request.metadata),
67+
)
68+
)
69+
if response.task:
70+
return proto_utils.FromProto.task(response.task)
71+
return proto_utils.FromProto.message(response.msg)
72+
73+
async def send_message_streaming(
74+
self,
75+
request: MessageSendParams,
76+
) -> AsyncGenerator[
77+
Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent
78+
]:
79+
"""Sends a streaming message request to the agent and yields responses as they arrive.
80+
81+
This method uses gRPC streams to receive a stream of updates from the
82+
agent.
83+
84+
Args:
85+
request: The `MessageSendParams` object containing the message and configuration.
86+
87+
Yields:
88+
`Message` or `Task` or `TaskStatusUpdateEvent` or
89+
`TaskArtifactUpdateEvent` objects as they are received in the
90+
stream.
91+
"""
92+
stream = self.stub.SendStreamingMessage(
93+
a2a_pb2.SendMessageRequest(
94+
request=proto_utils.ToProto.message(request.message),
95+
configuration=proto_utils.ToProto.message_send_configuration(
96+
request.configuration
97+
),
98+
metadata=proto_utils.ToProto.metadata(request.metadata),
99+
)
100+
)
101+
while True:
102+
response = await stream.read()
103+
if response == grpc.aio.EOF:
104+
break
105+
if response.HasField('msg'):
106+
yield proto_utils.FromProto.message(response.msg)
107+
elif response.HasField('task'):
108+
yield proto_utils.FromProto.task(response.task)
109+
elif response.HasField('status_update'):
110+
yield proto_utils.FromProto.task_status_update_event(
111+
response.status_update
112+
)
113+
elif response.HasField('artifact_update'):
114+
yield proto_utils.FromProto.task_artifact_update_event(
115+
response.artifact_update
116+
)
117+
118+
async def get_task(
119+
self,
120+
request: TaskQueryParams,
121+
) -> Task:
122+
"""Retrieves the current state and history of a specific task.
123+
124+
Args:
125+
request: The `TaskQueryParams` object specifying the task ID
126+
127+
Returns:
128+
A `Task` object containing the Task or None.
129+
"""
130+
task = await self.stub.GetTask(
131+
a2a_pb2.GetTaskRequest(name=f'tasks/{request.id}')
132+
)
133+
return proto_utils.FromProto.task(task)
134+
135+
async def cancel_task(
136+
self,
137+
request: TaskIdParams,
138+
) -> Task:
139+
"""Requests the agent to cancel a specific task.
140+
141+
Args:
142+
request: The `TaskIdParams` object specifying the task ID.
143+
144+
Returns:
145+
A `Task` object containing the updated Task
146+
"""
147+
task = await self.stub.CancelTask(
148+
a2a_pb2.CancelTaskRequest(name=f'tasks/{request.id}')
149+
)
150+
return proto_utils.FromProto.task(task)
151+
152+
async def set_task_callback(
153+
self,
154+
request: TaskPushNotificationConfig,
155+
) -> TaskPushNotificationConfig:
156+
"""Sets or updates the push notification configuration for a specific task.
157+
158+
Args:
159+
request: The `TaskPushNotificationConfig` object specifying the task ID and configuration.
160+
161+
Returns:
162+
A `TaskPushNotificationConfig` object containing the config.
163+
"""
164+
config = await self.stub.CreateTaskPushNotification(
165+
a2a_pb2.CreateTaskPushNotificationRequest(
166+
parent='',
167+
config_id='',
168+
config=proto_utils.ToProto.task_push_notification_config(
169+
request
170+
),
171+
)
172+
)
173+
return proto_utils.FromProto.task_push_notification_config(config)
174+
175+
async def get_task_callback(
176+
self,
177+
request: TaskIdParams, # TODO: Update to a push id params
178+
) -> TaskPushNotificationConfig:
179+
"""Retrieves the push notification configuration for a specific task.
180+
181+
Args:
182+
request: The `TaskIdParams` object specifying the task ID.
183+
184+
Returns:
185+
A `TaskPushNotificationConfig` object containing the configuration.
186+
"""
187+
config = await self.stub.GetTaskPushNotification(
188+
a2a_pb2.GetTaskPushNotificationRequest(
189+
name=f'tasks/{request.id}/pushNotification/undefined',
190+
)
191+
)
192+
return proto_utils.FromProto.task_push_notification_config(config)

src/a2a/grpc/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)