Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/linter.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ jobs:
run: uv sync --dev
- name: Run Ruff Linter
run: uv run ruff check .
- name: Run Ruff Format Check
run: uv run ruff format --check .
- name: Run MyPy Type Checker
run: uv run mypy src
- name: Run Pyright (Pylance equivalent)
Expand Down
6 changes: 2 additions & 4 deletions src/a2a/server/request_handlers/jsonrpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,10 +347,8 @@ async def list_push_notification_config(
A `ListTaskPushNotificationConfigResponse` object containing the config or a JSON-RPC error.
"""
try:
config = (
await self.request_handler.on_list_task_push_notification_config(
request.params, context
)
config = await self.request_handler.on_list_task_push_notification_config(
request.params, context
)
return prepare_response_object(
request.id,
Expand Down
4 changes: 2 additions & 2 deletions src/a2a/server/request_handlers/response_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
GetTaskPushNotificationConfigResponse,
SendStreamingMessageResponse,
ListTaskPushNotificationConfigResponse,
DeleteTaskPushNotificationConfigResponse
DeleteTaskPushNotificationConfigResponse,
)
"""Type variable for RootModel response types."""

Expand All @@ -55,7 +55,7 @@
GetTaskPushNotificationConfigSuccessResponse,
SendStreamingMessageSuccessResponse,
ListTaskPushNotificationConfigSuccessResponse,
DeleteTaskPushNotificationConfigSuccessResponse
DeleteTaskPushNotificationConfigSuccessResponse,
)
"""Type variable for SuccessResponse types."""

Expand Down
8 changes: 6 additions & 2 deletions src/a2a/server/tasks/push_notification_config_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@ class PushNotificationConfigStore(ABC):
"""Interface for storing and retrieving push notification configurations for tasks."""

@abstractmethod
async def set_info(self, task_id: str, notification_config: PushNotificationConfig) -> None:
async def set_info(
self, task_id: str, notification_config: PushNotificationConfig
) -> None:
"""Sets or updates the push notification configuration for a task."""

@abstractmethod
async def get_info(self, task_id: str) -> list[PushNotificationConfig]:
"""Retrieves the push notification configuration for a task."""

@abstractmethod
async def delete_info(self, task_id: str, config_id: str | None = None) -> None:
async def delete_info(
self, task_id: str, config_id: str | None = None
) -> None:
"""Deletes the push notification configuration for a task."""
12 changes: 9 additions & 3 deletions src/a2a/server/tasks/task_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,18 @@ async def update_status(
"""
async with self._lock:
if self._terminal_state_reached:
raise RuntimeError(f"Task {self.task_id} is already in a terminal state.")
raise RuntimeError(
f'Task {self.task_id} is already in a terminal state.'
)
if state in self._terminal_states:
self._terminal_state_reached = True
final = True

current_timestamp = timestamp if timestamp else datetime.now(timezone.utc).isoformat()
current_timestamp = (
timestamp
if timestamp
else datetime.now(timezone.utc).isoformat()
)
await self.event_queue.enqueue_event(
TaskStatusUpdateEvent(
taskId=self.task_id,
Expand Down Expand Up @@ -112,7 +118,7 @@ async def add_artifact( # noqa: PLR0913
metadata=metadata,
),
append=append,
lastChunk=last_chunk
lastChunk=last_chunk,
)
)

Expand Down
129 changes: 89 additions & 40 deletions tests/server/request_handlers/test_jsonrpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@
from a2a.server.events import QueueManager
from a2a.server.events.event_queue import EventQueue
from a2a.server.request_handlers import DefaultRequestHandler, JSONRPCHandler
from a2a.server.tasks import TaskStore, InMemoryPushNotificationConfigStore, BasePushNotificationSender, PushNotificationConfigStore, PushNotificationSender
from a2a.server.tasks import (
TaskStore,
InMemoryPushNotificationConfigStore,
BasePushNotificationSender,
PushNotificationConfigStore,
PushNotificationSender,
)
from a2a.types import (
AgentCapabilities,
AgentCard,
Expand Down Expand Up @@ -436,8 +442,10 @@ async def streaming_coro():
async def test_set_push_notification_success(self) -> None:
mock_agent_executor = AsyncMock(spec=AgentExecutor)
mock_task_store = AsyncMock(spec=TaskStore)
mock_push_notification_store = AsyncMock(spec=PushNotificationConfigStore)

mock_push_notification_store = AsyncMock(
spec=PushNotificationConfigStore
)

request_handler = DefaultRequestHandler(
mock_agent_executor,
mock_task_store,
Expand Down Expand Up @@ -471,10 +479,12 @@ async def test_set_push_notification_success(self) -> None:

async def test_get_push_notification_success(self) -> None:
mock_agent_executor = AsyncMock(spec=AgentExecutor)
mock_task_store = AsyncMock(spec=TaskStore)
mock_task_store = AsyncMock(spec=TaskStore)
push_notification_store = InMemoryPushNotificationConfigStore()
request_handler = DefaultRequestHandler(
mock_agent_executor, mock_task_store, push_config_store=push_notification_store
mock_agent_executor,
mock_task_store,
push_config_store=push_notification_store,
)
self.mock_agent_card.capabilities = AgentCapabilities(
streaming=True, pushNotifications=True
Expand Down Expand Up @@ -516,9 +526,14 @@ async def test_on_message_stream_new_message_send_push_notification_success(
mock_task_store = AsyncMock(spec=TaskStore)
mock_httpx_client = AsyncMock(spec=httpx.AsyncClient)
push_notification_store = InMemoryPushNotificationConfigStore()
push_notification_sender = BasePushNotificationSender(mock_httpx_client, push_notification_store)
push_notification_sender = BasePushNotificationSender(
mock_httpx_client, push_notification_store
)
request_handler = DefaultRequestHandler(
mock_agent_executor, mock_task_store, push_config_store=push_notification_store, push_sender=push_notification_sender
mock_agent_executor,
mock_task_store,
push_config_store=push_notification_store,
push_sender=push_notification_sender,
)
self.mock_agent_card.capabilities = AgentCapabilities(
streaming=True, pushNotifications=True
Expand Down Expand Up @@ -585,7 +600,7 @@ async def streaming_coro():
'kind': 'task',
'status': {'state': 'submitted'},
},
headers=None
headers=None,
),
call(
'http://example.com',
Expand All @@ -606,7 +621,7 @@ async def streaming_coro():
'kind': 'task',
'status': {'state': 'submitted'},
},
headers=None
headers=None,
),
call(
'http://example.com',
Expand All @@ -627,7 +642,7 @@ async def streaming_coro():
'kind': 'task',
'status': {'state': 'completed'},
},
headers=None
headers=None,
),
]
mock_httpx_client.post.assert_has_calls(calls)
Expand Down Expand Up @@ -727,7 +742,7 @@ async def test_streaming_not_supported_error(
pass

self.assertEqual(
str(context.exception.error.message), # type: ignore
str(context.exception.error.message), # type: ignore
'Streaming is not supported by the agent',
)

Expand Down Expand Up @@ -761,7 +776,7 @@ async def test_push_notifications_not_supported_error(self) -> None:
await handler.set_push_notification_config(request)

self.assertEqual(
str(context.exception.error.message), # type: ignore
str(context.exception.error.message), # type: ignore
'Push notifications are not supported by the agent',
)

Expand Down Expand Up @@ -960,7 +975,7 @@ async def consume_raises_error(*args, **kwargs) -> NoReturn:

# Assert
self.assertIsInstance(response.root, JSONRPCErrorResponse)
self.assertEqual(response.root.error, UnsupportedOperationError()) # type: ignore
self.assertEqual(response.root.error, UnsupportedOperationError()) # type: ignore

async def test_on_message_send_task_id_mismatch(self) -> None:
mock_agent_executor = AsyncMock(spec=AgentExecutor)
Expand Down Expand Up @@ -1031,37 +1046,54 @@ async def test_on_get_push_notification(self) -> None:

mock_task = Task(**MINIMAL_TASK)
mock_task_store.get.return_value = mock_task


# Create request handler without a push notifier
request_handler = AsyncMock(spec=DefaultRequestHandler)
task_push_config = TaskPushNotificationConfig(taskId=mock_task.id, pushNotificationConfig=PushNotificationConfig(id="config1", url='http://example.com'))
request_handler.on_get_task_push_notification_config.return_value = task_push_config
task_push_config = TaskPushNotificationConfig(
taskId=mock_task.id,
pushNotificationConfig=PushNotificationConfig(
id='config1', url='http://example.com'
),
)
request_handler.on_get_task_push_notification_config.return_value = (
task_push_config
)

self.mock_agent_card.capabilities = AgentCapabilities(
pushNotifications=True
)
handler = JSONRPCHandler(self.mock_agent_card, request_handler)
list_request = GetTaskPushNotificationConfigRequest(
id='1', params=GetTaskPushNotificationConfigParams(id=mock_task.id, pushNotificationConfigId="config1")
id='1',
params=GetTaskPushNotificationConfigParams(
id=mock_task.id, pushNotificationConfigId='config1'
),
)
response = await handler.get_push_notification_config(list_request)
# Assert
self.assertIsInstance(response.root, GetTaskPushNotificationConfigSuccessResponse)
self.assertEqual(response.root.result, task_push_config) # type: ignore
self.assertIsInstance(
response.root, GetTaskPushNotificationConfigSuccessResponse
)
self.assertEqual(response.root.result, task_push_config) # type: ignore

async def test_on_list_push_notification(self) -> None:
"""Test list_push_notification_config handling"""
mock_task_store = AsyncMock(spec=TaskStore)

mock_task = Task(**MINIMAL_TASK)
mock_task_store.get.return_value = mock_task


# Create request handler without a push notifier
request_handler = AsyncMock(spec=DefaultRequestHandler)
task_push_config = TaskPushNotificationConfig(taskId=mock_task.id, pushNotificationConfig=PushNotificationConfig(url='http://example.com'))
request_handler.on_list_task_push_notification_config.return_value = [task_push_config]
task_push_config = TaskPushNotificationConfig(
taskId=mock_task.id,
pushNotificationConfig=PushNotificationConfig(
url='http://example.com'
),
)
request_handler.on_list_task_push_notification_config.return_value = [
task_push_config
]

self.mock_agent_card.capabilities = AgentCapabilities(
pushNotifications=True
Expand All @@ -1072,23 +1104,30 @@ async def test_on_list_push_notification(self) -> None:
)
response = await handler.list_push_notification_config(list_request)
# Assert
self.assertIsInstance(response.root, ListTaskPushNotificationConfigSuccessResponse)
self.assertEqual(response.root.result, [task_push_config]) # type: ignore
self.assertIsInstance(
response.root, ListTaskPushNotificationConfigSuccessResponse
)
self.assertEqual(response.root.result, [task_push_config]) # type: ignore

async def test_on_list_push_notification_error(self) -> None:
"""Test list_push_notification_config handling"""
mock_task_store = AsyncMock(spec=TaskStore)

mock_task = Task(**MINIMAL_TASK)
mock_task_store.get.return_value = mock_task


# Create request handler without a push notifier
request_handler = AsyncMock(spec=DefaultRequestHandler)
task_push_config = TaskPushNotificationConfig(taskId=mock_task.id, pushNotificationConfig=PushNotificationConfig(url='http://example.com'))
task_push_config = TaskPushNotificationConfig(
taskId=mock_task.id,
pushNotificationConfig=PushNotificationConfig(
url='http://example.com'
),
)
# throw server error
request_handler.on_list_task_push_notification_config.side_effect = ServerError(InternalError())

request_handler.on_list_task_push_notification_config.side_effect = (
ServerError(InternalError())
)

self.mock_agent_card.capabilities = AgentCapabilities(
pushNotifications=True
Expand All @@ -1100,45 +1139,55 @@ async def test_on_list_push_notification_error(self) -> None:
response = await handler.list_push_notification_config(list_request)
# Assert
self.assertIsInstance(response.root, JSONRPCErrorResponse)
self.assertEqual(response.root.error, InternalError()) # type: ignore
self.assertEqual(response.root.error, InternalError()) # type: ignore

async def test_on_delete_push_notification(self) -> None:
"""Test delete_push_notification_config handling"""

# Create request handler without a push notifier
request_handler = AsyncMock(spec=DefaultRequestHandler)
request_handler.on_delete_task_push_notification_config.return_value = None
request_handler = AsyncMock(spec=DefaultRequestHandler)
request_handler.on_delete_task_push_notification_config.return_value = (
None
)

self.mock_agent_card.capabilities = AgentCapabilities(
pushNotifications=True
)
handler = JSONRPCHandler(self.mock_agent_card, request_handler)
delete_request = DeleteTaskPushNotificationConfigRequest(
id='1', params=DeleteTaskPushNotificationConfigParams(id="task1", pushNotificationConfigId="config1")
id='1',
params=DeleteTaskPushNotificationConfigParams(
id='task1', pushNotificationConfigId='config1'
),
)
response = await handler.delete_push_notification_config(delete_request)
# Assert
self.assertIsInstance(response.root, DeleteTaskPushNotificationConfigSuccessResponse)
self.assertEqual(response.root.result, None) # type: ignore
self.assertIsInstance(
response.root, DeleteTaskPushNotificationConfigSuccessResponse
)
self.assertEqual(response.root.result, None) # type: ignore

async def test_on_delete_push_notification_error(self) -> None:
"""Test delete_push_notification_config error handling"""


# Create request handler without a push notifier
request_handler = AsyncMock(spec=DefaultRequestHandler)
# throw server error
request_handler.on_delete_task_push_notification_config.side_effect = ServerError(UnsupportedOperationError())

request_handler.on_delete_task_push_notification_config.side_effect = (
ServerError(UnsupportedOperationError())
)

self.mock_agent_card.capabilities = AgentCapabilities(
pushNotifications=True
)
handler = JSONRPCHandler(self.mock_agent_card, request_handler)
delete_request = DeleteTaskPushNotificationConfigRequest(
id='1', params=DeleteTaskPushNotificationConfigParams(id="task1", pushNotificationConfigId="config1")
id='1',
params=DeleteTaskPushNotificationConfigParams(
id='task1', pushNotificationConfigId='config1'
),
)
response = await handler.delete_push_notification_config(delete_request)
# Assert
self.assertIsInstance(response.root, JSONRPCErrorResponse)
self.assertEqual(response.root.error, UnsupportedOperationError()) # type: ignore
self.assertEqual(response.root.error, UnsupportedOperationError()) # type: ignore
6 changes: 4 additions & 2 deletions tests/server/tasks/test_inmemory_push_notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,9 @@ async def test_send_notification_success(self):
async def test_send_notification_with_token_success(self):
task_id = 'task_send_success'
task_data = create_sample_task(task_id=task_id)
config = create_sample_push_config(url='http://notify.me/here', token='unique_token')
config = create_sample_push_config(
url='http://notify.me/here', token='unique_token'
)
await self.config_store.set_info(task_id, config)

# Mock the post call to simulate success
Expand All @@ -180,7 +182,7 @@ async def test_send_notification_with_token_success(self):
)
self.assertEqual(
called_kwargs['headers'],
{"X-A2A-Notification-Token": "unique_token"},
{'X-A2A-Notification-Token': 'unique_token'},
)
self.assertNotIn(
'auth', called_kwargs
Expand Down
Loading
Loading