Skip to content
Merged
Show file tree
Hide file tree
Changes from 67 commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
2fa0964
feat: Adding stand-alone support for RESTful API serving (#297)
pstephengoogle Jul 24, 2025
8bfa846
refactor: Refactor client code to support multi-transport interaction…
pstephengoogle Jul 24, 2025
f2c4085
Formatting/linting
holtskinner Jul 24, 2025
1fa0ec7
Update AgentCard path to `AGENT_CARD_WELL_KNOWN_PATH`
holtskinner Jul 24, 2025
98e9fe1
Lint errors
holtskinner Jul 24, 2025
d1d9e49
Lint - Construct AgentInterfaces for minimal_agent_card
holtskinner Jul 24, 2025
68ce59b
Update src/a2a/server/apps/rest/rest_app.py
pstephengoogle Jul 25, 2025
30bcaef
Merge branch 'main' into restful
holtskinner Jul 25, 2025
2a24b5f
Merge branch 'main' into restful
holtskinner Jul 25, 2025
33ba15e
Refactor to use `TransportProtocol` class from `types.py`
holtskinner Jul 25, 2025
11bb306
Fix lint errors in rest_handler
holtskinner Jul 25, 2025
ab7ab08
Fixed lint errors in rest_app
holtskinner Jul 25, 2025
78d03ef
Merge branch 'main' into restful
holtskinner Jul 25, 2025
c3f1bbb
chore: update lock file (#353)
kthota-g Jul 25, 2025
da44c94
chore: Update vscode PyTest launch.json to use workspace's venv inste…
aneeshgarg Jul 25, 2025
0c3d717
fix: Fix all the failing unit tests due to cyclic dependency, snake c…
aneeshgarg Jul 26, 2025
bcd7ee4
Merge branch 'main' into restful
aneeshgarg Jul 26, 2025
3e2c623
Merge branch 'main' into restful
holtskinner Jul 28, 2025
d46c44a
Fix lint error in rest/fastapi_app
holtskinner Jul 28, 2025
7009324
Fix lint errors in rest_client
holtskinner Jul 28, 2025
680f2d3
Fix lint errors in jsonrpc_client
holtskinner Jul 28, 2025
9a3da57
Fix lint errors in grpc_client
holtskinner Jul 28, 2025
9ee1d37
Fix lint errors in client.py
holtskinner Jul 28, 2025
8fbc1e1
Fix type warning for callable in client_factory
aneeshgarg Jul 28, 2025
addb1e2
Apply suggestions from code review
holtskinner Jul 28, 2025
434be28
fix gemini identified error in exception message
aneeshgarg Jul 28, 2025
39d8b13
Add protobuf libraries as default dependency since long term goal is …
aneeshgarg Jul 28, 2025
6bc2481
Rename test_client to test_jsonrpc_client since it tests jsonrpc cleint
aneeshgarg Jul 28, 2025
95dc15e
Fix some of the mypy errors
holtskinner Jul 28, 2025
d3e1027
Moves types around to avoid circular dependencies and make grpc modul…
aneeshgarg Jul 28, 2025
1a6bee9
Implement resubscribe across transport clients
mikeas1 Jul 28, 2025
22d5bbd
Fix some proto<->types conversion code
mikeas1 Jul 28, 2025
df3c0e7
fix broken test_proto_utils test
aneeshgarg Jul 29, 2025
f547260
Fix ruff linter errors
aneeshgarg Jul 29, 2025
4f2d121
Fix task_push_notification_config name generation when converting to …
aneeshgarg Jul 29, 2025
842e192
Create a wrapper exception for A2AError so that it can be raised in r…
aneeshgarg Jul 29, 2025
d2720a6
Fix snake case variables names and type error in test_gerpc_handler
aneeshgarg Jul 29, 2025
7d74660
Fix _generate_error_response argument type errors
aneeshgarg Jul 29, 2025
c48604d
Fix more mypy and gemini detected errors
aneeshgarg Jul 29, 2025
bbcfabf
AddJSONRpcError as A2AError since it is a superset of all JSONRpcErrors
aneeshgarg Jul 29, 2025
5c5b816
Revert "AddJSONRpcError as A2AError since it is a superset of all JSO…
aneeshgarg Jul 29, 2025
95de596
Spelling/formatting
holtskinner Jul 29, 2025
3cf61b4
Merge branch 'main' into restful
holtskinner Jul 29, 2025
0891716
Merge branch 'main' into restful
holtskinner Jul 29, 2025
39a430d
Merge branch 'main' into restful
mikeas1 Jul 30, 2025
6f592bb
Add unit tests for fastapi_app sendMessage api
aneeshgarg Jul 30, 2025
a063a8e
Centralize error handling and status code mapping for rest interface
aneeshgarg Jul 30, 2025
c2f4f2f
refactor: Refactor client into BaseClient + ClientTransport (#363)
mikeas1 Jul 30, 2025
210476b
Merge branch 'main' into restful
holtskinner Jul 30, 2025
a16e40f
Fix ruff lint error TC001
holtskinner Jul 30, 2025
447d1b0
Run formatting
holtskinner Jul 30, 2025
8adba29
Merge remote-tracking branch 'origin/main' into restful
aneeshgarg Jul 30, 2025
c73080d
Update test_jsonrpc_client.py
aneeshgarg Jul 30, 2025
bf21978
Fix set_task_push_notification_config integration test
aneeshgarg Jul 30, 2025
44ffd7f
Fix mypy errors from error_handlers.py
aneeshgarg Jul 30, 2025
7c2dfcf
Rename RESTApplication to RestAdapter since its purpose is to connect…
aneeshgarg Jul 30, 2025
087ddc1
Merge branch 'main' into restful
aneeshgarg Jul 30, 2025
3600f13
chore: Fix uv lock file
aneeshgarg Jul 30, 2025
8d252bb
Address mypy errors
mikeas1 Jul 30, 2025
8665809
Fix tests
mikeas1 Jul 30, 2025
2e0e526
Add type ignore to unrelated stuff to get a clean mypy
mikeas1 Jul 30, 2025
0e67148
Change pytest coverage to print to term
holtskinner Jul 30, 2025
068ee35
Formatting
holtskinner Jul 30, 2025
68e2df1
Refactor rest transport to reduce duplication, fix pyright errors
mikeas1 Jul 30, 2025
8a81b93
Add missing return type annotation
mikeas1 Jul 30, 2025
019ded4
Add tests for BaseClient and ClientTaskManager
mikeas1 Jul 30, 2025
1cfd28e
Formatting
mikeas1 Jul 30, 2025
b892a14
Increase visibility of handle_get_agent_card
mikeas1 Jul 30, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/actions/spelling/allow.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ lifecycles
linting
Llm
lstrips
mikeas
mockurl
notif
oauthoidc
Expand All @@ -67,6 +68,7 @@ pyi
pypistats
pyupgrade
pyversions
redef
respx
resub
RUF
Expand All @@ -76,5 +78,6 @@ sse
tagwords
taskupdate
testuuid
Tful
typeerror
vulnz
2 changes: 1 addition & 1 deletion .github/workflows/unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,6 @@ jobs:
- name: Install dependencies
run: uv sync --dev --extra sql --extra encryption --extra grpc --extra telemetry
- name: Run tests and check coverage
run: uv run pytest --cov=a2a --cov-report=xml --cov-fail-under=89
run: uv run pytest --cov=a2a --cov-report term --cov-fail-under=89
- name: Show coverage summary in log
run: uv run coverage report
5 changes: 3 additions & 2 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@
"-s"
],
"console": "integratedTerminal",
"justMyCode": true
"justMyCode": true,
"python": "${workspaceFolder}/.venv/bin/python",
}
]
}
}
27 changes: 27 additions & 0 deletions Gemini.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
**A2A specification:** https://a2a-protocol.org/latest/specification/

## Project frameworks
- uv as package manager

## How to run all tests
1. If dependencies are not installed install them using following command
```
uv sync --all-extras
```

2. Run tests
```
uv run pytest
```

## Other instructions
1. Whenever writing python code, write types as well.
2. After making the changes run ruff to check and fix the formatting issues
```
uv run ruff check --fix
```
3. Run mypy type checkers to check for type errors
```
uv run mypy
```
4. Run the unit tests to make sure that none of the unit tests are broken.
Empty file added error_handlers.py
Empty file.
9 changes: 6 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ authors = [{ name = "Google LLC", email = "[email protected]" }]
requires-python = ">=3.10"
keywords = ["A2A", "A2A SDK", "A2A Protocol", "Agent2Agent", "Agent 2 Agent"]
dependencies = [
"fastapi>=0.115.2",
"fastapi>=0.116.1",
"httpx>=0.28.1",
"httpx-sse>=0.4.0",
"pydantic>=2.11.3",
"sse-starlette",
"starlette"
"starlette",
"protobuf==5.29.5",
"google-api-core>=1.26.0",
]

classifiers = [
Expand All @@ -35,7 +37,7 @@ mysql = ["sqlalchemy[asyncio,aiomysql]>=2.0.0"]
sqlite = ["sqlalchemy[asyncio,aiosqlite]>=2.0.0"]
sql = ["sqlalchemy[asyncio,postgresql-asyncpg,aiomysql,aiosqlite]>=2.0.0"]
encryption = ["cryptography>=43.0.0"]
grpc = ["grpcio>=1.60", "grpcio-tools>=1.60", "grpcio_reflection>=1.7.0", "protobuf==5.29.5", "google-api-core>=1.26.0"]
grpc = ["grpcio>=1.60", "grpcio-tools>=1.60", "grpcio_reflection>=1.7.0"]
telemetry = ["opentelemetry-api>=1.33.0", "opentelemetry-sdk>=1.33.0"]

[project.urls]
Expand Down Expand Up @@ -90,6 +92,7 @@ dev = [
"pyupgrade",
"autoflake",
"no_implicit_optional",
"trio",
]

[[tool.uv.index]]
Expand Down
13 changes: 11 additions & 2 deletions src/a2a/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,24 @@
CredentialService,
InMemoryContextCredentialStore,
)
from a2a.client.client import A2ACardResolver, A2AClient
from a2a.client.card_resolver import A2ACardResolver
from a2a.client.client import Client, ClientConfig, ClientEvent, Consumer
from a2a.client.client_factory import ClientFactory, minimal_agent_card
from a2a.client.errors import (
A2AClientError,
A2AClientHTTPError,
A2AClientJSONError,
A2AClientTimeoutError,
)
from a2a.client.helpers import create_text_message_object
from a2a.client.legacy import A2AClient
from a2a.client.middleware import ClientCallContext, ClientCallInterceptor


logger = logging.getLogger(__name__)

try:
from a2a.client.grpc_client import A2AGrpcClient # type: ignore
from a2a.client.legacy_grpc import A2AGrpcClient # type: ignore
except ImportError as e:
_original_error = e
logger.debug(
Expand All @@ -48,9 +51,15 @@ def __init__(self, *args, **kwargs):
'A2AClientTimeoutError',
'A2AGrpcClient',
'AuthInterceptor',
'Client',
'ClientCallContext',
'ClientCallInterceptor',
'ClientConfig',
'ClientEvent',
'ClientFactory',
'Consumer',
'CredentialService',
'InMemoryContextCredentialStore',
'create_text_message_object',
'minimal_agent_card',
]
241 changes: 241 additions & 0 deletions src/a2a/client/base_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
from collections.abc import AsyncIterator

from a2a.client.client import (
Client,
ClientCallContext,
ClientConfig,
ClientEvent,
Consumer,
)
from a2a.client.client_task_manager import ClientTaskManager
from a2a.client.errors import A2AClientInvalidStateError
from a2a.client.middleware import ClientCallInterceptor
from a2a.client.transports.base import ClientTransport
from a2a.types import (
AgentCard,
GetTaskPushNotificationConfigParams,
Message,
MessageSendConfiguration,
MessageSendParams,
Task,
TaskArtifactUpdateEvent,
TaskIdParams,
TaskPushNotificationConfig,
TaskQueryParams,
TaskStatusUpdateEvent,
)


class BaseClient(Client):
"""Base implementation of the A2A client, containing transport-independent logic."""

def __init__(
self,
card: AgentCard,
config: ClientConfig,
transport: ClientTransport,
consumers: list[Consumer],
middleware: list[ClientCallInterceptor],
):
super().__init__(consumers, middleware)
self._card = card
self._config = config
self._transport = transport

async def send_message(
self,
request: Message,
*,
context: ClientCallContext | None = None,
) -> AsyncIterator[ClientEvent | Message]:
"""Sends a message to the agent.

This method handles both streaming and non-streaming (polling) interactions
based on the client configuration and agent capabilities. It will yield
events as they are received from the agent.

Args:
request: The message to send to the agent.
context: The client call context.

Yields:
An async iterator of `ClientEvent` or a final `Message` response.
"""
config = MessageSendConfiguration(
accepted_output_modes=self._config.accepted_output_modes,
blocking=not self._config.polling,
push_notification_config=(
self._config.push_notification_configs[0]
if self._config.push_notification_configs
else None
),
)
params = MessageSendParams(message=request, configuration=config)

if not self._config.streaming or not self._card.capabilities.streaming:
response = await self._transport.send_message(
params, context=context
)
result = (
(response, None) if isinstance(response, Task) else response
)
await self.consume(result, self._card)
yield result
return

tracker = ClientTaskManager()
stream = self._transport.send_message_streaming(params, context=context)

first_event = await anext(stream)
# The response from a server may be either exactly one Message or a
# series of Task updates. Separate out the first message for special
# case handling, which allows us to simplify further stream processing.
if isinstance(first_event, Message):
await self.consume(first_event, self._card)
yield first_event
return

yield await self._process_response(tracker, first_event)

async for event in stream:
yield await self._process_response(tracker, event)

async def _process_response(
self,
tracker: ClientTaskManager,
event: Task | Message | TaskStatusUpdateEvent | TaskArtifactUpdateEvent,
) -> ClientEvent:
if isinstance(event, Message):
raise A2AClientInvalidStateError(
'received a streamed Message from server after first response; this is not supported'
)
await tracker.process(event)
task = tracker.get_task_or_raise()
update = None if isinstance(event, Task) else event
client_event = (task, update)
await self.consume(client_event, self._card)
return client_event

async def get_task(
self,
request: TaskQueryParams,
*,
context: ClientCallContext | None = None,
) -> Task:
"""Retrieves the current state and history of a specific task.

Args:
request: The `TaskQueryParams` object specifying the task ID.
context: The client call context.

Returns:
A `Task` object representing the current state of the task.
"""
return await self._transport.get_task(request, context=context)

async def cancel_task(
self,
request: TaskIdParams,
*,
context: ClientCallContext | None = None,
) -> Task:
"""Requests the agent to cancel a specific task.

Args:
request: The `TaskIdParams` object specifying the task ID.
context: The client call context.

Returns:
A `Task` object containing the updated task status.
"""
return await self._transport.cancel_task(request, context=context)

async def set_task_callback(
self,
request: TaskPushNotificationConfig,
*,
context: ClientCallContext | None = None,
) -> TaskPushNotificationConfig:
"""Sets or updates the push notification configuration for a specific task.

Args:
request: The `TaskPushNotificationConfig` object with the new configuration.
context: The client call context.

Returns:
The created or updated `TaskPushNotificationConfig` object.
"""
return await self._transport.set_task_callback(request, context=context)

async def get_task_callback(
self,
request: GetTaskPushNotificationConfigParams,
*,
context: ClientCallContext | None = None,
) -> TaskPushNotificationConfig:
"""Retrieves the push notification configuration for a specific task.

Args:
request: The `GetTaskPushNotificationConfigParams` object specifying the task.
context: The client call context.

Returns:
A `TaskPushNotificationConfig` object containing the configuration.
"""
return await self._transport.get_task_callback(request, context=context)

async def resubscribe(
self,
request: TaskIdParams,
*,
context: ClientCallContext | None = None,
) -> AsyncIterator[ClientEvent]:
"""Resubscribes to a task's event stream.

This is only available if both the client and server support streaming.

Args:
request: Parameters to identify the task to resubscribe to.
context: The client call context.

Yields:
An async iterator of `ClientEvent` objects.

Raises:
NotImplementedError: If streaming is not supported by the client or server.
"""
if not self._config.streaming or not self._card.capabilities.streaming:
raise NotImplementedError(
'client and/or server do not support resubscription.'
)

tracker = ClientTaskManager()
# Note: resubscribe can only be called on an existing task. As such,
# we should never see Message updates, despite the typing of the service
# definition indicating it may be possible.
async for event in self._transport.resubscribe(
request, context=context
):
yield await self._process_response(tracker, event)

async def get_card(
self, *, context: ClientCallContext | None = None
) -> AgentCard:
"""Retrieves the agent's card.

This will fetch the authenticated card if necessary and update the
client's internal state with the new card.

Args:
context: The client call context.

Returns:
The `AgentCard` for the agent.
"""
card = await self._transport.get_card(context=context)
self._card = card
return card

async def close(self) -> None:
"""Closes the underlying transport."""
await self._transport.close()
Loading
Loading