Skip to content

Commit d83a7fc

Browse files
author
Lukasz Kawka
committed
feat: push notifications e2e tests
1 parent 8dbc78a commit d83a7fc

File tree

4 files changed

+458
-0
lines changed

4 files changed

+458
-0
lines changed
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
import httpx
2+
3+
from fastapi import FastAPI
4+
5+
from a2a.server.agent_execution import AgentExecutor, RequestContext
6+
from a2a.server.apps import A2ARESTFastAPIApplication
7+
from a2a.server.events import EventQueue
8+
from a2a.server.request_handlers import DefaultRequestHandler
9+
from a2a.server.tasks import (
10+
BasePushNotificationSender,
11+
InMemoryPushNotificationConfigStore,
12+
InMemoryTaskStore,
13+
TaskUpdater,
14+
)
15+
from a2a.types import (
16+
AgentCapabilities,
17+
AgentCard,
18+
AgentSkill,
19+
InvalidParamsError,
20+
Message,
21+
Task,
22+
)
23+
from a2a.utils import (
24+
new_agent_text_message,
25+
new_task,
26+
)
27+
from a2a.utils.errors import ServerError
28+
29+
30+
def test_agent_card(url: str) -> AgentCard:
31+
"""Returns an agent card for the test agent."""
32+
return AgentCard(
33+
name='Test Agent',
34+
description='Just a test agent',
35+
url=url,
36+
version='1.0.0',
37+
default_input_modes=['text'],
38+
default_output_modes=['text'],
39+
capabilities=AgentCapabilities(streaming=True, push_notifications=True),
40+
skills=[
41+
AgentSkill(
42+
id='greeting',
43+
name='Greeting Agent',
44+
description='just greats the user',
45+
tags=['greeting'],
46+
examples=['Hello Agent!', 'How are you?'],
47+
)
48+
],
49+
supports_authenticated_extended_card=True,
50+
)
51+
52+
53+
class TestAgent:
54+
"""Agent for push notification testing."""
55+
56+
async def invoke(
57+
self, updater: TaskUpdater, msg: Message, task: Task
58+
) -> None:
59+
# Fail for unsupported messages.
60+
if (
61+
not msg.parts
62+
or len(msg.parts) != 1
63+
or msg.parts[0].root.kind != 'text'
64+
):
65+
await updater.failed(
66+
new_agent_text_message(
67+
'Unsupported mesesage.', task.context_id, task.id
68+
)
69+
)
70+
return
71+
text_message = msg.parts[0].root.text
72+
73+
# Simple request-response flow.
74+
if text_message == 'Hello Agent!':
75+
await updater.complete(
76+
new_agent_text_message('Hello User!', task.context_id, task.id)
77+
)
78+
79+
# Flow with user input required: "How are you?" -> "Good! How are you?" -> "Good" -> "Amazing".
80+
elif text_message == 'How are you?':
81+
await updater.requires_input(
82+
new_agent_text_message(
83+
'Good! How are you?', task.context_id, task.id
84+
)
85+
)
86+
elif text_message == 'Good':
87+
await updater.complete(
88+
new_agent_text_message('Amazing', task.context_id, task.id)
89+
)
90+
91+
# Fail for unsupported messages.
92+
else:
93+
await updater.failed(
94+
new_agent_text_message(
95+
'Unsupported message.', task.context_id, task.id
96+
)
97+
)
98+
99+
100+
class TestAgentExecutor(AgentExecutor):
101+
"""Test AgentExecutor implementation."""
102+
103+
def __init__(self) -> None:
104+
self.agent = TestAgent()
105+
106+
async def execute(
107+
self,
108+
context: RequestContext,
109+
event_queue: EventQueue,
110+
) -> None:
111+
if not context.message:
112+
raise ServerError(error=InvalidParamsError(message='No message'))
113+
114+
task = context.current_task
115+
if not task:
116+
task = new_task(context.message)
117+
await event_queue.enqueue_event(task)
118+
updater = TaskUpdater(event_queue, task.id, task.context_id)
119+
120+
await self.agent.invoke(updater, context.message, task)
121+
122+
async def cancel(
123+
self, context: RequestContext, event_queue: EventQueue
124+
) -> None:
125+
raise Exception('cancel not supported')
126+
127+
128+
def create_agent_app(
129+
url: str, notification_client: httpx.AsyncClient
130+
) -> FastAPI:
131+
"""Creates a new HTTP+REST FastAPI application for the test agent."""
132+
push_config_store = InMemoryPushNotificationConfigStore()
133+
app = A2ARESTFastAPIApplication(
134+
agent_card=test_agent_card(url),
135+
http_handler=DefaultRequestHandler(
136+
agent_executor=TestAgentExecutor(),
137+
task_store=InMemoryTaskStore(),
138+
push_config_store=push_config_store,
139+
push_sender=BasePushNotificationSender(
140+
httpx_client=notification_client,
141+
config_store=push_config_store,
142+
),
143+
),
144+
)
145+
return app.build()
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import asyncio
2+
3+
from typing import Annotated
4+
5+
from fastapi import FastAPI, Path, Request
6+
7+
8+
def create_notifications_app() -> FastAPI:
9+
"""Creates a simple push notification injesting HTTP+REST application."""
10+
app = FastAPI()
11+
store_lock = asyncio.Lock()
12+
store = {}
13+
14+
@app.post('/notifications')
15+
async def add_notification(request: Request):
16+
"""Endpoint for injesting notifications from agents. It receives a JSON
17+
payload and stores it in-memory.
18+
"""
19+
if not request.headers.get('x-a2a-notification-token'):
20+
raise ValueError('Missing x-a2a-notification-token header.')
21+
payload = await request.json()
22+
task_id = payload['id']
23+
async with store_lock:
24+
if task_id not in store:
25+
store[task_id] = []
26+
store[task_id].append(payload)
27+
return {
28+
'status': 'received',
29+
}
30+
31+
@app.get('/tasks/{task_id}/notifications')
32+
async def list_notifications_by_task(
33+
task_id: Annotated[
34+
str, Path(title='The ID of the task to list the notifications for.')
35+
],
36+
):
37+
"""Helper endpoint for retrieving injested notifications for a given task."""
38+
async with store_lock:
39+
notifications = store.get(task_id, [])
40+
return {'notifications': notifications}
41+
42+
@app.get('/health')
43+
def health_check():
44+
"""Helper endpoint for checking if the server is up."""
45+
return {'status': 'ok'}
46+
47+
return app

0 commit comments

Comments
 (0)