Skip to content

Commit 74d31e3

Browse files
authored
feat: push notification e2e tests (#486)
# Description This PR adds two end-to-end tests for push notifications, designed to verify that the default push notification implementation functions as expected. These tests involve setting up two distinct servers: an agent server that hosts a basic test agent, and a notifications server responsible for receiving notifications from the agent server. Prerequisites: - [x] Follow the [`CONTRIBUTING` Guide](https://github.com/a2aproject/a2a-python/blob/main/CONTRIBUTING.md). - [x] Make your Pull Request title in the <https://www.conventionalcommits.org/> specification. - Important Prefixes for [release-please](https://github.com/googleapis/release-please): - `fix:` which represents bug fixes, and correlates to a [SemVer](https://semver.org/) patch. - `feat:` represents a new feature, and correlates to a SemVer minor. - `feat!:`, or `fix!:`, `refactor!:`, etc., which represent a breaking change (indicated by the `!`) and will result in a SemVer major. - [x] Ensure the tests and linter pass (Run `bash scripts/format.sh` from the repository root to format) - [x] Appropriate docs were updated (if necessary)
1 parent 2c152c0 commit 74d31e3

File tree

5 files changed

+504
-0
lines changed

5 files changed

+504
-0
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ dev = [
103103
"autoflake",
104104
"no_implicit_optional",
105105
"trio",
106+
"uvicorn>=0.35.0",
106107
]
107108

108109
[[tool.uv.index]]
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
import httpx
2+
3+
from fastapi import FastAPI
4+
5+
from a2a.server.agent_execution import AgentExecutor, RequestContext
6+
from a2a.server.apps import A2ARESTFastAPIApplication
7+
from a2a.server.events import EventQueue
8+
from a2a.server.request_handlers import DefaultRequestHandler
9+
from a2a.server.tasks import (
10+
BasePushNotificationSender,
11+
InMemoryPushNotificationConfigStore,
12+
InMemoryTaskStore,
13+
TaskUpdater,
14+
)
15+
from a2a.types import (
16+
AgentCapabilities,
17+
AgentCard,
18+
AgentSkill,
19+
InvalidParamsError,
20+
Message,
21+
Task,
22+
)
23+
from a2a.utils import (
24+
new_agent_text_message,
25+
new_task,
26+
)
27+
from a2a.utils.errors import ServerError
28+
29+
30+
def test_agent_card(url: str) -> AgentCard:
31+
"""Returns an agent card for the test agent."""
32+
return AgentCard(
33+
name='Test Agent',
34+
description='Just a test agent',
35+
url=url,
36+
version='1.0.0',
37+
default_input_modes=['text'],
38+
default_output_modes=['text'],
39+
capabilities=AgentCapabilities(streaming=True, push_notifications=True),
40+
skills=[
41+
AgentSkill(
42+
id='greeting',
43+
name='Greeting Agent',
44+
description='just greets the user',
45+
tags=['greeting'],
46+
examples=['Hello Agent!', 'How are you?'],
47+
)
48+
],
49+
supports_authenticated_extended_card=True,
50+
)
51+
52+
53+
class TestAgent:
54+
"""Agent for push notification testing."""
55+
56+
async def invoke(
57+
self, updater: TaskUpdater, msg: Message, task: Task
58+
) -> None:
59+
# Fail for unsupported messages.
60+
if (
61+
not msg.parts
62+
or len(msg.parts) != 1
63+
or msg.parts[0].root.kind != 'text'
64+
):
65+
await updater.failed(
66+
new_agent_text_message(
67+
'Unsupported message.', task.context_id, task.id
68+
)
69+
)
70+
return
71+
text_message = msg.parts[0].root.text
72+
73+
# Simple request-response flow.
74+
if text_message == 'Hello Agent!':
75+
await updater.complete(
76+
new_agent_text_message('Hello User!', task.context_id, task.id)
77+
)
78+
79+
# Flow with user input required: "How are you?" -> "Good! How are you?" -> "Good" -> "Amazing".
80+
elif text_message == 'How are you?':
81+
await updater.requires_input(
82+
new_agent_text_message(
83+
'Good! How are you?', task.context_id, task.id
84+
)
85+
)
86+
elif text_message == 'Good':
87+
await updater.complete(
88+
new_agent_text_message('Amazing', task.context_id, task.id)
89+
)
90+
91+
# Fail for unsupported messages.
92+
else:
93+
await updater.failed(
94+
new_agent_text_message(
95+
'Unsupported message.', task.context_id, task.id
96+
)
97+
)
98+
99+
100+
class TestAgentExecutor(AgentExecutor):
101+
"""Test AgentExecutor implementation."""
102+
103+
def __init__(self) -> None:
104+
self.agent = TestAgent()
105+
106+
async def execute(
107+
self,
108+
context: RequestContext,
109+
event_queue: EventQueue,
110+
) -> None:
111+
if not context.message:
112+
raise ServerError(error=InvalidParamsError(message='No message'))
113+
114+
task = context.current_task
115+
if not task:
116+
task = new_task(context.message)
117+
await event_queue.enqueue_event(task)
118+
updater = TaskUpdater(event_queue, task.id, task.context_id)
119+
120+
await self.agent.invoke(updater, context.message, task)
121+
122+
async def cancel(
123+
self, context: RequestContext, event_queue: EventQueue
124+
) -> None:
125+
raise NotImplementedError('cancel not supported')
126+
127+
128+
def create_agent_app(
129+
url: str, notification_client: httpx.AsyncClient
130+
) -> FastAPI:
131+
"""Creates a new HTTP+REST FastAPI application for the test agent."""
132+
push_config_store = InMemoryPushNotificationConfigStore()
133+
app = A2ARESTFastAPIApplication(
134+
agent_card=test_agent_card(url),
135+
http_handler=DefaultRequestHandler(
136+
agent_executor=TestAgentExecutor(),
137+
task_store=InMemoryTaskStore(),
138+
push_config_store=push_config_store,
139+
push_sender=BasePushNotificationSender(
140+
httpx_client=notification_client,
141+
config_store=push_config_store,
142+
),
143+
),
144+
)
145+
return app.build()
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import asyncio
2+
3+
from typing import Annotated
4+
5+
from fastapi import FastAPI, HTTPException, Path, Request
6+
from pydantic import BaseModel, ValidationError
7+
8+
from a2a.types import Task
9+
10+
11+
class Notification(BaseModel):
12+
"""Encapsulates default push notification data."""
13+
14+
task: Task
15+
token: str
16+
17+
18+
def create_notifications_app() -> FastAPI:
19+
"""Creates a simple push notification ingesting HTTP+REST application."""
20+
app = FastAPI()
21+
store_lock = asyncio.Lock()
22+
store: dict[str, list[Notification]] = {}
23+
24+
@app.post('/notifications')
25+
async def add_notification(request: Request):
26+
"""Endpoint for injesting notifications from agents. It receives a JSON
27+
payload and stores it in-memory.
28+
"""
29+
token = request.headers.get('x-a2a-notification-token')
30+
if not token:
31+
raise HTTPException(
32+
status_code=400,
33+
detail='Missing "x-a2a-notification-token" header.',
34+
)
35+
try:
36+
task = Task.model_validate(await request.json())
37+
except ValidationError as e:
38+
raise HTTPException(status_code=400, detail=str(e))
39+
40+
async with store_lock:
41+
if task.id not in store:
42+
store[task.id] = []
43+
store[task.id].append(
44+
Notification(
45+
task=task,
46+
token=token,
47+
)
48+
)
49+
return {
50+
'status': 'received',
51+
}
52+
53+
@app.get('/tasks/{task_id}/notifications')
54+
async def list_notifications_by_task(
55+
task_id: Annotated[
56+
str, Path(title='The ID of the task to list the notifications for.')
57+
],
58+
):
59+
"""Helper endpoint for retrieving injested notifications for a given task."""
60+
async with store_lock:
61+
notifications = store.get(task_id, [])
62+
return {'notifications': notifications}
63+
64+
@app.get('/health')
65+
def health_check():
66+
"""Helper endpoint for checking if the server is up."""
67+
return {'status': 'ok'}
68+
69+
return app

0 commit comments

Comments
 (0)