diff --git a/src/a2a/server/apps/jsonrpc/jsonrpc_app.py b/src/a2a/server/apps/jsonrpc/jsonrpc_app.py index 0b78cc1c..37d28d53 100644 --- a/src/a2a/server/apps/jsonrpc/jsonrpc_app.py +++ b/src/a2a/server/apps/jsonrpc/jsonrpc_app.py @@ -25,6 +25,7 @@ A2ARequest, AgentCard, CancelTaskRequest, + DeleteTaskPushNotificationConfigRequest, GetTaskPushNotificationConfigRequest, GetTaskRequest, InternalError, @@ -33,6 +34,7 @@ JSONRPCError, JSONRPCErrorResponse, JSONRPCResponse, + ListTaskPushNotificationConfigRequest, SendMessageRequest, SendStreamingMessageRequest, SendStreamingMessageResponse, @@ -297,12 +299,22 @@ async def _process_non_streaming_request( request_obj, context ) case SetTaskPushNotificationConfigRequest(): - handler_result = await self.handler.set_push_notification( + handler_result = await self.handler.set_push_notification_config( request_obj, context, ) case GetTaskPushNotificationConfigRequest(): - handler_result = await self.handler.get_push_notification( + handler_result = await self.handler.get_push_notification_config( + request_obj, + context, + ) + case ListTaskPushNotificationConfigRequest(): + handler_result = await self.handler.list_push_notification_config( + request_obj, + context, + ) + case DeleteTaskPushNotificationConfigRequest(): + handler_result = await self.handler.delete_push_notification_config( request_obj, context, ) diff --git a/src/a2a/server/request_handlers/default_request_handler.py b/src/a2a/server/request_handlers/default_request_handler.py index ff86a069..f3b584d4 100644 --- a/src/a2a/server/request_handlers/default_request_handler.py +++ b/src/a2a/server/request_handlers/default_request_handler.py @@ -1,6 +1,5 @@ import asyncio import logging -import uuid from collections.abc import AsyncGenerator from typing import cast @@ -21,15 +20,18 @@ ) from a2a.server.request_handlers.request_handler import RequestHandler from a2a.server.tasks import ( - PushNotifier, + PushNotificationConfigStore, + PushNotificationSender, ResultAggregator, TaskManager, TaskStore, ) from a2a.types import ( + DeleteTaskPushNotificationConfigParams, GetTaskPushNotificationConfigParams, InternalError, InvalidParamsError, + ListTaskPushNotificationConfigParams, Message, MessageSendConfiguration, MessageSendParams, @@ -67,12 +69,13 @@ class DefaultRequestHandler(RequestHandler): _running_agents: dict[str, asyncio.Task] - def __init__( + def __init__( # noqa: PLR0913 self, agent_executor: AgentExecutor, task_store: TaskStore, queue_manager: QueueManager | None = None, - push_notifier: PushNotifier | None = None, + push_config_store: PushNotificationConfigStore | None = None, + push_sender: PushNotificationSender | None = None, request_context_builder: RequestContextBuilder | None = None, ) -> None: """Initializes the DefaultRequestHandler. @@ -81,14 +84,16 @@ def __init__( agent_executor: The `AgentExecutor` instance to run agent logic. task_store: The `TaskStore` instance to manage task persistence. queue_manager: The `QueueManager` instance to manage event queues. Defaults to `InMemoryQueueManager`. - push_notifier: The `PushNotifier` instance for sending push notifications. Defaults to None. + push_config_store: The `PushNotificationConfigStore` instance for managing push notification configurations. Defaults to None. + push_sender: The `PushNotificationSender` instance for sending push notifications. Defaults to None. request_context_builder: The `RequestContextBuilder` instance used to build request contexts. Defaults to `SimpleRequestContextBuilder`. """ self.agent_executor = agent_executor self.task_store = task_store self._queue_manager = queue_manager or InMemoryQueueManager() - self._push_notifier = push_notifier + self._push_config_store = push_config_store + self._push_sender = push_sender self._request_context_builder = ( request_context_builder or SimpleRequestContextBuilder( @@ -198,7 +203,7 @@ async def _setup_message_execution( task = task_manager.update_with_message(params.message, task) if self.should_add_push_info(params): - assert isinstance(self._push_notifier, PushNotifier) + assert self._push_config_store is not None assert isinstance( params.configuration, MessageSendConfiguration ) @@ -206,7 +211,7 @@ async def _setup_message_execution( params.configuration.pushNotificationConfig, PushNotificationConfig, ) - await self._push_notifier.set_info( + await self._push_config_store.set_info( task.id, params.configuration.pushNotificationConfig ) @@ -247,10 +252,10 @@ async def _send_push_notification_if_needed( self, task_id: str, result_aggregator: ResultAggregator ) -> None: """Sends push notification if configured and task is available.""" - if self._push_notifier and task_id: + if self._push_sender and task_id: latest_task = await result_aggregator.current_result if isinstance(latest_task, Task): - await self._push_notifier.send_notification(latest_task) + await self._push_sender.send_notification(latest_task) async def on_message_send( self, @@ -329,11 +334,11 @@ async def on_message_send_stream( self._validate_task_id_match(task_id, event.id) if ( - self._push_notifier + self._push_config_store and params.configuration and params.configuration.pushNotificationConfig ): - await self._push_notifier.set_info( + await self._push_config_store.set_info( task_id, params.configuration.pushNotificationConfig, ) @@ -372,16 +377,14 @@ async def on_set_task_push_notification_config( Requires a `PushNotifier` to be configured. """ - if not self._push_notifier: + if not self._push_config_store: raise ServerError(error=UnsupportedOperationError()) task: Task | None = await self.task_store.get(params.taskId) if not task: raise ServerError(error=TaskNotFoundError()) - # Generate a unique id for the notification - params.pushNotificationConfig.id = str(uuid.uuid4()) - await self._push_notifier.set_info( + await self._push_config_store.set_info( params.taskId, params.pushNotificationConfig, ) @@ -395,21 +398,27 @@ async def on_get_task_push_notification_config( ) -> TaskPushNotificationConfig: """Default handler for 'tasks/pushNotificationConfig/get'. - Requires a `PushNotifier` to be configured. + Requires a `PushConfigStore` to be configured. """ - if not self._push_notifier: + if not self._push_config_store: raise ServerError(error=UnsupportedOperationError()) task: Task | None = await self.task_store.get(params.id) if not task: raise ServerError(error=TaskNotFoundError()) - push_notification_config = await self._push_notifier.get_info(params.id) - if not push_notification_config: - raise ServerError(error=InternalError()) + push_notification_config = await self._push_config_store.get_info( + params.id + ) + if not push_notification_config or not push_notification_config[0]: + raise ServerError( + error=InternalError( + message='Push notification config not found' + ) + ) return TaskPushNotificationConfig( - taskId=params.id, pushNotificationConfig=push_notification_config + taskId=params.id, pushNotificationConfig=push_notification_config[0] ) async def on_resubscribe_to_task( @@ -450,10 +459,61 @@ async def on_resubscribe_to_task( async for event in result_aggregator.consume_and_emit(consumer): yield event + async def on_list_task_push_notification_config( + self, + params: ListTaskPushNotificationConfigParams, + context: ServerCallContext | None = None, + ) -> list[TaskPushNotificationConfig]: + """Default handler for 'tasks/pushNotificationConfig/list'. + + Requires a `PushConfigStore` to be configured. + """ + if not self._push_config_store: + raise ServerError(error=UnsupportedOperationError()) + + task: Task | None = await self.task_store.get(params.id) + if not task: + raise ServerError(error=TaskNotFoundError()) + + push_notification_config_list = await self._push_config_store.get_info( + params.id + ) + + task_push_notification_config = [] + if push_notification_config_list: + for config in push_notification_config_list: + task_push_notification_config.append( + TaskPushNotificationConfig( + taskId=params.id, pushNotificationConfig=config + ) + ) + + return task_push_notification_config + + async def on_delete_task_push_notification_config( + self, + params: DeleteTaskPushNotificationConfigParams, + context: ServerCallContext | None = None, + ) -> None: + """Default handler for 'tasks/pushNotificationConfig/delete'. + + Requires a `PushConfigStore` to be configured. + """ + if not self._push_config_store: + raise ServerError(error=UnsupportedOperationError()) + + task: Task | None = await self.task_store.get(params.id) + if not task: + raise ServerError(error=TaskNotFoundError()) + + await self._push_config_store.delete_info( + params.id, params.pushNotificationConfigId + ) + def should_add_push_info(self, params: MessageSendParams) -> bool: """Determines if push notification info should be set for a task.""" return bool( - self._push_notifier + self._push_config_store and params.configuration and params.configuration.pushNotificationConfig ) diff --git a/src/a2a/server/request_handlers/jsonrpc_handler.py b/src/a2a/server/request_handlers/jsonrpc_handler.py index 13d2854b..3e79bc09 100644 --- a/src/a2a/server/request_handlers/jsonrpc_handler.py +++ b/src/a2a/server/request_handlers/jsonrpc_handler.py @@ -10,6 +10,9 @@ CancelTaskRequest, CancelTaskResponse, CancelTaskSuccessResponse, + DeleteTaskPushNotificationConfigRequest, + DeleteTaskPushNotificationConfigResponse, + DeleteTaskPushNotificationConfigSuccessResponse, GetTaskPushNotificationConfigRequest, GetTaskPushNotificationConfigResponse, GetTaskPushNotificationConfigSuccessResponse, @@ -18,6 +21,9 @@ GetTaskSuccessResponse, InternalError, JSONRPCErrorResponse, + ListTaskPushNotificationConfigRequest, + ListTaskPushNotificationConfigResponse, + ListTaskPushNotificationConfigSuccessResponse, Message, SendMessageRequest, SendMessageResponse, @@ -214,7 +220,7 @@ async def on_resubscribe_to_task( ) ) - async def get_push_notification( + async def get_push_notification_config( self, request: GetTaskPushNotificationConfigRequest, context: ServerCallContext | None = None, @@ -252,7 +258,7 @@ async def get_push_notification( lambda self: self.agent_card.capabilities.pushNotifications, 'Push notifications are not supported by the agent', ) - async def set_push_notification( + async def set_push_notification_config( self, request: SetTaskPushNotificationConfigRequest, context: ServerCallContext | None = None, @@ -325,3 +331,69 @@ async def on_get_task( id=request.id, error=e.error if e.error else InternalError() ) ) + + async def list_push_notification_config( + self, + request: ListTaskPushNotificationConfigRequest, + context: ServerCallContext | None = None, + ) -> ListTaskPushNotificationConfigResponse: + """Handles the 'tasks/pushNotificationConfig/list' JSON-RPC method. + + Args: + request: The incoming `ListTaskPushNotificationConfigRequest` object. + context: Context provided by the server. + + Returns: + A `ListTaskPushNotificationConfigResponse` object containing the config or a JSON-RPC error. + """ + try: + config = ( + await self.request_handler.on_list_task_push_notification_config( + request.params, context + ) + ) + return prepare_response_object( + request.id, + config, + (list,), + ListTaskPushNotificationConfigSuccessResponse, + ListTaskPushNotificationConfigResponse, + ) + except ServerError as e: + return ListTaskPushNotificationConfigResponse( + root=JSONRPCErrorResponse( + id=request.id, error=e.error if e.error else InternalError() + ) + ) + + async def delete_push_notification_config( + self, + request: DeleteTaskPushNotificationConfigRequest, + context: ServerCallContext | None = None, + ) -> DeleteTaskPushNotificationConfigResponse: + """Handles the 'tasks/pushNotificationConfig/list' JSON-RPC method. + + Args: + request: The incoming `DeleteTaskPushNotificationConfigRequest` object. + context: Context provided by the server. + + Returns: + A `DeleteTaskPushNotificationConfigResponse` object containing the config or a JSON-RPC error. + """ + try: + ( + await self.request_handler.on_delete_task_push_notification_config( + request.params, context + ) + ) + return DeleteTaskPushNotificationConfigResponse( + root=DeleteTaskPushNotificationConfigSuccessResponse( + id=request.id, result=None + ) + ) + except ServerError as e: + return DeleteTaskPushNotificationConfigResponse( + root=JSONRPCErrorResponse( + id=request.id, error=e.error if e.error else InternalError() + ) + ) diff --git a/src/a2a/server/request_handlers/request_handler.py b/src/a2a/server/request_handlers/request_handler.py index 3693d8b6..7ce76cc9 100644 --- a/src/a2a/server/request_handlers/request_handler.py +++ b/src/a2a/server/request_handlers/request_handler.py @@ -4,7 +4,9 @@ from a2a.server.context import ServerCallContext from a2a.server.events.event_queue import Event from a2a.types import ( + DeleteTaskPushNotificationConfigParams, GetTaskPushNotificationConfigParams, + ListTaskPushNotificationConfigParams, Message, MessageSendParams, Task, @@ -160,3 +162,39 @@ async def on_resubscribe_to_task( """ raise ServerError(error=UnsupportedOperationError()) yield + + @abstractmethod + async def on_list_task_push_notification_config( + self, + params: ListTaskPushNotificationConfigParams, + context: ServerCallContext | None = None, + ) -> list[TaskPushNotificationConfig]: + """Handles the 'tasks/pushNotificationConfig/list' method. + + Retrieves the current push notification configurations for a task. + + Args: + params: Parameters including the task ID. + context: Context provided by the server. + + Returns: + The `list[TaskPushNotificationConfig]` for the task. + """ + + @abstractmethod + async def on_delete_task_push_notification_config( + self, + params: DeleteTaskPushNotificationConfigParams, + context: ServerCallContext | None = None, + ) -> None: + """Handles the 'tasks/pushNotificationConfig/delete' method. + + Deletes a push notification configuration associated with a task. + + Args: + params: Parameters including the task ID. + context: Context provided by the server. + + Returns: + None + """ diff --git a/src/a2a/server/request_handlers/response_helpers.py b/src/a2a/server/request_handlers/response_helpers.py index b4e48ad9..7f005099 100644 --- a/src/a2a/server/request_handlers/response_helpers.py +++ b/src/a2a/server/request_handlers/response_helpers.py @@ -7,6 +7,8 @@ A2AError, CancelTaskResponse, CancelTaskSuccessResponse, + DeleteTaskPushNotificationConfigResponse, + DeleteTaskPushNotificationConfigSuccessResponse, GetTaskPushNotificationConfigResponse, GetTaskPushNotificationConfigSuccessResponse, GetTaskResponse, @@ -14,6 +16,8 @@ InvalidAgentResponseError, JSONRPCError, JSONRPCErrorResponse, + ListTaskPushNotificationConfigResponse, + ListTaskPushNotificationConfigSuccessResponse, Message, SendMessageResponse, SendMessageSuccessResponse, @@ -36,6 +40,8 @@ SetTaskPushNotificationConfigResponse, GetTaskPushNotificationConfigResponse, SendStreamingMessageResponse, + ListTaskPushNotificationConfigResponse, + DeleteTaskPushNotificationConfigResponse ) """Type variable for RootModel response types.""" @@ -48,6 +54,8 @@ SetTaskPushNotificationConfigSuccessResponse, GetTaskPushNotificationConfigSuccessResponse, SendStreamingMessageSuccessResponse, + ListTaskPushNotificationConfigSuccessResponse, + DeleteTaskPushNotificationConfigSuccessResponse ) """Type variable for SuccessResponse types.""" @@ -60,6 +68,7 @@ | TaskPushNotificationConfig | A2AError | JSONRPCError + | list[TaskPushNotificationConfig] ) """Type alias for possible event types produced by handlers.""" diff --git a/src/a2a/server/tasks/__init__.py b/src/a2a/server/tasks/__init__.py index ab8f52f0..6116bfce 100644 --- a/src/a2a/server/tasks/__init__.py +++ b/src/a2a/server/tasks/__init__.py @@ -1,8 +1,16 @@ """Components for managing tasks within the A2A server.""" -from a2a.server.tasks.inmemory_push_notifier import InMemoryPushNotifier +from a2a.server.tasks.base_push_notification_sender import ( + BasePushNotificationSender, +) +from a2a.server.tasks.inmemory_push_notification_config_store import ( + InMemoryPushNotificationConfigStore, +) from a2a.server.tasks.inmemory_task_store import InMemoryTaskStore -from a2a.server.tasks.push_notifier import PushNotifier +from a2a.server.tasks.push_notification_config_store import ( + PushNotificationConfigStore, +) +from a2a.server.tasks.push_notification_sender import PushNotificationSender from a2a.server.tasks.result_aggregator import ResultAggregator from a2a.server.tasks.task_manager import TaskManager from a2a.server.tasks.task_store import TaskStore @@ -10,9 +18,11 @@ __all__ = [ - 'InMemoryPushNotifier', + 'BasePushNotificationSender', + 'InMemoryPushNotificationConfigStore', 'InMemoryTaskStore', - 'PushNotifier', + 'PushNotificationConfigStore', + 'PushNotificationSender', 'ResultAggregator', 'TaskManager', 'TaskStore', diff --git a/src/a2a/server/tasks/base_push_notification_sender.py b/src/a2a/server/tasks/base_push_notification_sender.py new file mode 100644 index 00000000..308ed978 --- /dev/null +++ b/src/a2a/server/tasks/base_push_notification_sender.py @@ -0,0 +1,67 @@ +import asyncio +import logging + +import httpx + +from a2a.server.tasks.push_notification_config_store import ( + PushNotificationConfigStore, +) +from a2a.server.tasks.push_notification_sender import PushNotificationSender +from a2a.types import PushNotificationConfig, Task + + +logger = logging.getLogger(__name__) + + +class BasePushNotificationSender(PushNotificationSender): + """Base implementation of PushNotificationSender interface.""" + + def __init__( + self, + httpx_client: httpx.AsyncClient, + config_store: PushNotificationConfigStore, + ) -> None: + """Initializes the BasePushNotificationSender. + + Args: + httpx_client: An async HTTP client instance to send notifications. + config_store: A PushNotificationConfigStore instance to retrieve configurations. + """ + self._client = httpx_client + self._config_store = config_store + + async def send_notification(self, task: Task) -> None: + """Sends a push notification for a task if configuration exists.""" + push_configs = await self._config_store.get_info(task.id) + if not push_configs: + return + + awaitables = [ + self._dispatch_notification(task, push_info) + for push_info in push_configs + ] + results = await asyncio.gather(*awaitables) + + if not all(results): + logger.warning( + f'Some push notifications failed to send for task_id={task.id}' + ) + + async def _dispatch_notification( + self, task: Task, push_info: PushNotificationConfig + ) -> bool: + url = push_info.url + try: + response = await self._client.post( + url, json=task.model_dump(mode='json', exclude_none=True) + ) + response.raise_for_status() + logger.info( + f'Push-notification sent for task_id={task.id} to URL: {url}' + ) + return True + except Exception as e: + logger.error( + f'Error sending push-notification for task_id={task.id} to URL: {url}. Error: {e}' + ) + return False diff --git a/src/a2a/server/tasks/inmemory_push_notification_config_store.py b/src/a2a/server/tasks/inmemory_push_notification_config_store.py new file mode 100644 index 00000000..c5bc5dbe --- /dev/null +++ b/src/a2a/server/tasks/inmemory_push_notification_config_store.py @@ -0,0 +1,68 @@ +import asyncio +import logging + +from a2a.server.tasks.push_notification_config_store import ( + PushNotificationConfigStore, +) +from a2a.types import PushNotificationConfig + + +logger = logging.getLogger(__name__) + + +class InMemoryPushNotificationConfigStore(PushNotificationConfigStore): + """In-memory implementation of PushNotificationConfigStore interface. + + Stores push notification configurations in memory + """ + + def __init__(self) -> None: + """Initializes the InMemoryPushNotificationConfigStore.""" + self.lock = asyncio.Lock() + self._push_notification_infos: dict[ + str, list[PushNotificationConfig] + ] = {} + + async def set_info( + self, task_id: str, notification_config: PushNotificationConfig + ) -> None: + """Sets or updates the push notification configuration for a task in memory.""" + async with self.lock: + if task_id not in self._push_notification_infos: + self._push_notification_infos[task_id] = [] + + if notification_config.id is None: + notification_config.id = task_id + + for config in self._push_notification_infos[task_id]: + if config.id == notification_config.id: + self._push_notification_infos[task_id].remove(config) + break + + self._push_notification_infos[task_id].append(notification_config) + + async def get_info(self, task_id: str) -> list[PushNotificationConfig]: + """Retrieves the push notification configuration for a task from memory.""" + async with self.lock: + return self._push_notification_infos.get(task_id) or [] + + async def delete_info( + self, task_id: str, config_id: str | None = None + ) -> None: + """Deletes the push notification configuration for a task from memory.""" + async with self.lock: + if config_id is None: + config_id = task_id + + if task_id in self._push_notification_infos: + configurations = self._push_notification_infos[task_id] + if not configurations: + return + + for config in configurations: + if config.id == config_id: + configurations.remove(config) + break + + if len(configurations) == 0: + del self._push_notification_infos[task_id] diff --git a/src/a2a/server/tasks/inmemory_push_notifier.py b/src/a2a/server/tasks/inmemory_push_notifier.py deleted file mode 100644 index 058b18c0..00000000 --- a/src/a2a/server/tasks/inmemory_push_notifier.py +++ /dev/null @@ -1,62 +0,0 @@ -import asyncio -import logging - -import httpx - -from a2a.server.tasks.push_notifier import PushNotifier -from a2a.types import PushNotificationConfig, Task - - -logger = logging.getLogger(__name__) - - -class InMemoryPushNotifier(PushNotifier): - """In-memory implementation of PushNotifier interface. - - Stores push notification configurations in memory and uses an httpx client - to send notifications. - """ - - def __init__(self, httpx_client: httpx.AsyncClient) -> None: - """Initializes the InMemoryPushNotifier. - - Args: - httpx_client: An async HTTP client instance to send notifications. - """ - self._client = httpx_client - self.lock = asyncio.Lock() - self._push_notification_infos: dict[str, PushNotificationConfig] = {} - - async def set_info( - self, task_id: str, notification_config: PushNotificationConfig - ) -> None: - """Sets or updates the push notification configuration for a task in memory.""" - async with self.lock: - self._push_notification_infos[task_id] = notification_config - - async def get_info(self, task_id: str) -> PushNotificationConfig | None: - """Retrieves the push notification configuration for a task from memory.""" - async with self.lock: - return self._push_notification_infos.get(task_id) - - async def delete_info(self, task_id: str) -> None: - """Deletes the push notification configuration for a task from memory.""" - async with self.lock: - if task_id in self._push_notification_infos: - del self._push_notification_infos[task_id] - - async def send_notification(self, task: Task) -> None: - """Sends a push notification for a task if configuration exists.""" - push_info = await self.get_info(task.id) - if not push_info: - return - url = push_info.url - - try: - response = await self._client.post( - url, json=task.model_dump(mode='json', exclude_none=True) - ) - response.raise_for_status() - logger.info(f'Push-notification sent for URL: {url}') - except Exception as e: - logger.error(f'Error sending push-notification: {e}') diff --git a/src/a2a/server/tasks/push_notification_config_store.py b/src/a2a/server/tasks/push_notification_config_store.py new file mode 100644 index 00000000..dd93791f --- /dev/null +++ b/src/a2a/server/tasks/push_notification_config_store.py @@ -0,0 +1,19 @@ +from abc import ABC, abstractmethod + +from a2a.types import PushNotificationConfig + + +class PushNotificationConfigStore(ABC): + """Interface for storing and retrieving push notification configurations for tasks.""" + + @abstractmethod + async def set_info(self, task_id: str, notification_config: PushNotificationConfig) -> None: + """Sets or updates the push notification configuration for a task.""" + + @abstractmethod + async def get_info(self, task_id: str) -> list[PushNotificationConfig]: + """Retrieves the push notification configuration for a task.""" + + @abstractmethod + async def delete_info(self, task_id: str, config_id: str | None = None) -> None: + """Deletes the push notification configuration for a task.""" diff --git a/src/a2a/server/tasks/push_notification_sender.py b/src/a2a/server/tasks/push_notification_sender.py new file mode 100644 index 00000000..d9389d4a --- /dev/null +++ b/src/a2a/server/tasks/push_notification_sender.py @@ -0,0 +1,11 @@ +from abc import ABC, abstractmethod + +from a2a.types import Task + + +class PushNotificationSender(ABC): + """Interface for sending push notifications for tasks.""" + + @abstractmethod + async def send_notification(self, task: Task) -> None: + """Sends a push notification containing the latest task state.""" diff --git a/src/a2a/server/tasks/push_notifier.py b/src/a2a/server/tasks/push_notifier.py deleted file mode 100644 index 6a9040fd..00000000 --- a/src/a2a/server/tasks/push_notifier.py +++ /dev/null @@ -1,25 +0,0 @@ -from abc import ABC, abstractmethod - -from a2a.types import PushNotificationConfig, Task - - -class PushNotifier(ABC): - """PushNotifier interface to store, retrieve push notification for tasks and send push notifications.""" - - @abstractmethod - async def set_info( - self, task_id: str, notification_config: PushNotificationConfig - ) -> None: - """Sets or updates the push notification configuration for a task.""" - - @abstractmethod - async def get_info(self, task_id: str) -> PushNotificationConfig | None: - """Retrieves the push notification configuration for a task.""" - - @abstractmethod - async def delete_info(self, task_id: str) -> None: - """Deletes the push notification configuration for a task.""" - - @abstractmethod - async def send_notification(self, task: Task) -> None: - """Sends a push notification containing the latest task state.""" diff --git a/tests/server/request_handlers/test_default_request_handler.py b/tests/server/request_handlers/test_default_request_handler.py index dd713752..fdf100f7 100644 --- a/tests/server/request_handlers/test_default_request_handler.py +++ b/tests/server/request_handlers/test_default_request_handler.py @@ -21,10 +21,12 @@ from a2a.server.request_handlers import DefaultRequestHandler from a2a.server.tasks import ( InMemoryTaskStore, - PushNotifier, ResultAggregator, TaskStore, TaskUpdater, + PushNotificationConfigStore, + PushNotificationSender, + InMemoryPushNotificationConfigStore, ) from a2a.types import ( InternalError, @@ -44,6 +46,9 @@ TaskStatus, TextPart, UnsupportedOperationError, + GetTaskPushNotificationConfigParams, + ListTaskPushNotificationConfigParams, + DeleteTaskPushNotificationConfigParams, ) @@ -103,7 +108,8 @@ def test_init_default_dependencies(): assert isinstance( handler._request_context_builder, SimpleRequestContextBuilder ) - assert handler._push_notifier is None + assert handler._push_config_store is None + assert handler._push_sender is None assert ( handler._request_context_builder._should_populate_referred_tasks is False @@ -295,14 +301,14 @@ async def test_on_cancel_task_invalid_result_type(): assert ( 'Agent did not return valid response for cancel' in exc_info.value.error.message - ) + ) # type: ignore @pytest.mark.asyncio async def test_on_message_send_with_push_notification(): """Test on_message_send sets push notification info if provided.""" mock_task_store = AsyncMock(spec=TaskStore) - mock_push_notifier = AsyncMock(spec=PushNotifier) + mock_push_notification_store = AsyncMock(spec=PushNotificationConfigStore) mock_agent_executor = AsyncMock(spec=AgentExecutor) mock_request_context_builder = AsyncMock(spec=RequestContextBuilder) @@ -331,7 +337,7 @@ async def test_on_message_send_with_push_notification(): request_handler = DefaultRequestHandler( agent_executor=mock_agent_executor, task_store=mock_task_store, - push_notifier=mock_push_notifier, + push_config_store=mock_push_notification_store, request_context_builder=mock_request_context_builder, ) @@ -388,9 +394,8 @@ async def get_current_result(): params, create_server_call_context() ) - mock_push_notifier.set_info.assert_awaited_once_with(task_id, push_config) - mock_push_notifier.send_notification.assert_awaited_once_with( - final_task_result + mock_push_notification_store.set_info.assert_awaited_once_with( + task_id, push_config ) # Other assertions for full flow if needed (e.g., agent execution) mock_agent_executor.execute.assert_awaited_once() @@ -493,7 +498,7 @@ async def test_on_message_send_task_id_mismatch(): ) assert isinstance(exc_info.value.error, InternalError) - assert 'Task ID mismatch' in exc_info.value.error.message + assert 'Task ID mismatch' in exc_info.value.error.message # type: ignore @pytest.mark.asyncio @@ -567,7 +572,8 @@ async def test_on_message_send_interrupted_flow(): async def test_on_message_send_stream_with_push_notification(): """Test on_message_send_stream sets and uses push notification info.""" mock_task_store = AsyncMock(spec=TaskStore) - mock_push_notifier = AsyncMock(spec=PushNotifier) + mock_push_config_store = AsyncMock(spec=PushNotificationConfigStore) + mock_push_sender = AsyncMock(spec=PushNotificationSender) mock_agent_executor = AsyncMock(spec=AgentExecutor) mock_request_context_builder = AsyncMock(spec=RequestContextBuilder) @@ -594,7 +600,8 @@ async def test_on_message_send_stream_with_push_notification(): request_handler = DefaultRequestHandler( agent_executor=mock_agent_executor, task_store=mock_task_store, - push_notifier=mock_push_notifier, + push_config_store=mock_push_config_store, + push_sender=mock_push_sender, request_context_builder=mock_request_context_builder, ) @@ -829,12 +836,12 @@ def sync_get_event_stream_gen_for_prop_test(*args, **kwargs): # Assertions # 1. set_info called once at the beginning if task exists (or after task is created from message) - mock_push_notifier.set_info.assert_any_call(task_id, push_config) + mock_push_config_store.set_info.assert_any_call(task_id, push_config) # 2. send_notification called for each task event yielded by aggregator - assert mock_push_notifier.send_notification.await_count == 2 - mock_push_notifier.send_notification.assert_any_await(event1_task_update) - mock_push_notifier.send_notification.assert_any_await(event2_final_task) + assert mock_push_sender.send_notification.await_count == 2 + mock_push_sender.send_notification.assert_any_await(event1_task_update) + mock_push_sender.send_notification.assert_any_await(event2_final_task) mock_agent_executor.execute.assert_awaited_once() @@ -897,7 +904,7 @@ async def event_stream_gen_mismatch(): pass # Consume the stream to trigger the error assert isinstance(exc_info.value.error, InternalError) - assert 'Task ID mismatch' in exc_info.value.error.message + assert 'Task ID mismatch' in exc_info.value.error.message # type: ignore @pytest.mark.asyncio @@ -939,11 +946,11 @@ async def dummy_coro_for_task(): @pytest.mark.asyncio async def test_set_task_push_notification_config_no_notifier(): - """Test on_set_task_push_notification_config when _push_notifier is None.""" + """Test on_set_task_push_notification_config when _push_config_store is None.""" request_handler = DefaultRequestHandler( agent_executor=DummyAgentExecutor(), task_store=AsyncMock(spec=TaskStore), - push_notifier=None, # Explicitly None + push_config_store=None, # Explicitly None ) params = TaskPushNotificationConfig( taskId='task1', @@ -963,12 +970,14 @@ async def test_set_task_push_notification_config_task_not_found(): """Test on_set_task_push_notification_config when task is not found.""" mock_task_store = AsyncMock(spec=TaskStore) mock_task_store.get.return_value = None # Task not found - mock_push_notifier = AsyncMock(spec=PushNotifier) + mock_push_store = AsyncMock(spec=PushNotificationConfigStore) + mock_push_sender = AsyncMock(spec=PushNotificationSender) request_handler = DefaultRequestHandler( agent_executor=DummyAgentExecutor(), task_store=mock_task_store, - push_notifier=mock_push_notifier, + push_config_store=mock_push_store, + push_sender=mock_push_sender, ) params = TaskPushNotificationConfig( taskId='non_existent_task', @@ -983,18 +992,18 @@ async def test_set_task_push_notification_config_task_not_found(): assert isinstance(exc_info.value.error, TaskNotFoundError) mock_task_store.get.assert_awaited_once_with('non_existent_task') - mock_push_notifier.set_info.assert_not_awaited() + mock_push_store.set_info.assert_not_awaited() @pytest.mark.asyncio -async def test_get_task_push_notification_config_no_notifier(): - """Test on_get_task_push_notification_config when _push_notifier is None.""" +async def test_get_task_push_notification_config_no_store(): + """Test on_get_task_push_notification_config when _push_config_store is None.""" request_handler = DefaultRequestHandler( agent_executor=DummyAgentExecutor(), task_store=AsyncMock(spec=TaskStore), - push_notifier=None, # Explicitly None + push_config_store=None, # Explicitly None ) - params = TaskIdParams(id='task1') + params = GetTaskPushNotificationConfigParams(id='task1') from a2a.utils.errors import ServerError # Local import with pytest.raises(ServerError) as exc_info: @@ -1009,14 +1018,14 @@ async def test_get_task_push_notification_config_task_not_found(): """Test on_get_task_push_notification_config when task is not found.""" mock_task_store = AsyncMock(spec=TaskStore) mock_task_store.get.return_value = None # Task not found - mock_push_notifier = AsyncMock(spec=PushNotifier) + mock_push_store = AsyncMock(spec=PushNotificationConfigStore) request_handler = DefaultRequestHandler( agent_executor=DummyAgentExecutor(), task_store=mock_task_store, - push_notifier=mock_push_notifier, + push_config_store=mock_push_store, ) - params = TaskIdParams(id='non_existent_task') + params = GetTaskPushNotificationConfigParams(id='non_existent_task') from a2a.utils.errors import ServerError # Local import with pytest.raises(ServerError) as exc_info: @@ -1026,25 +1035,26 @@ async def test_get_task_push_notification_config_task_not_found(): assert isinstance(exc_info.value.error, TaskNotFoundError) mock_task_store.get.assert_awaited_once_with('non_existent_task') - mock_push_notifier.get_info.assert_not_awaited() + mock_push_store.get_info.assert_not_awaited() @pytest.mark.asyncio async def test_get_task_push_notification_config_info_not_found(): - """Test on_get_task_push_notification_config when push_notifier.get_info returns None.""" + """Test on_get_task_push_notification_config when push_config_store.get_info returns None.""" mock_task_store = AsyncMock(spec=TaskStore) - sample_task = create_sample_task(task_id='task_info_not_found') + + sample_task = create_sample_task(task_id='non_existent_task') mock_task_store.get.return_value = sample_task - mock_push_notifier = AsyncMock(spec=PushNotifier) - mock_push_notifier.get_info.return_value = None # Info not found + mock_push_store = AsyncMock(spec=PushNotificationConfigStore) + mock_push_store.get_info.return_value = None # Info not found request_handler = DefaultRequestHandler( agent_executor=DummyAgentExecutor(), task_store=mock_task_store, - push_notifier=mock_push_notifier, + push_config_store=mock_push_store, ) - params = TaskIdParams(id='task_info_not_found') + params = GetTaskPushNotificationConfigParams(id='non_existent_task') from a2a.utils.errors import ServerError # Local import with pytest.raises(ServerError) as exc_info: @@ -1055,8 +1065,90 @@ async def test_get_task_push_notification_config_info_not_found(): assert isinstance( exc_info.value.error, InternalError ) # Current code raises InternalError - mock_task_store.get.assert_awaited_once_with('task_info_not_found') - mock_push_notifier.get_info.assert_awaited_once_with('task_info_not_found') + mock_task_store.get.assert_awaited_once_with('non_existent_task') + mock_push_store.get_info.assert_awaited_once_with('non_existent_task') + + +@pytest.mark.asyncio +async def test_get_task_push_notification_config_info_with_config(): + """Test on_get_task_push_notification_config with valid push config id""" + mock_task_store = AsyncMock(spec=TaskStore) + + push_store = InMemoryPushNotificationConfigStore() + + request_handler = DefaultRequestHandler( + agent_executor=DummyAgentExecutor(), + task_store=mock_task_store, + push_config_store=push_store, + ) + + set_config_params = TaskPushNotificationConfig( + taskId='task_1', + pushNotificationConfig=PushNotificationConfig( + id='config_id', url='http://1.example.com' + ), + ) + await request_handler.on_set_task_push_notification_config( + set_config_params, create_server_call_context() + ) + + params = GetTaskPushNotificationConfigParams( + id='task_1', pushNotificationConfigId='config_id' + ) + + result: TaskPushNotificationConfig = ( + await request_handler.on_get_task_push_notification_config( + params, create_server_call_context() + ) + ) + + assert result is not None + assert result.taskId == 'task_1' + assert ( + result.pushNotificationConfig.url + == set_config_params.pushNotificationConfig.url + ) + assert result.pushNotificationConfig.id == 'config_id' + + +@pytest.mark.asyncio +async def test_get_task_push_notification_config_info_with_config_no_id(): + """Test on_get_task_push_notification_config with no push config id""" + mock_task_store = AsyncMock(spec=TaskStore) + + push_store = InMemoryPushNotificationConfigStore() + + request_handler = DefaultRequestHandler( + agent_executor=DummyAgentExecutor(), + task_store=mock_task_store, + push_config_store=push_store, + ) + + set_config_params = TaskPushNotificationConfig( + taskId='task_1', + pushNotificationConfig=PushNotificationConfig( + url='http://1.example.com' + ), + ) + await request_handler.on_set_task_push_notification_config( + set_config_params, create_server_call_context() + ) + + params = TaskIdParams(id='task_1') + + result: TaskPushNotificationConfig = ( + await request_handler.on_get_task_push_notification_config( + params, create_server_call_context() + ) + ) + + assert result is not None + assert result.taskId == 'task_1' + assert ( + result.pushNotificationConfig.url + == set_config_params.pushNotificationConfig.url + ) + assert result.pushNotificationConfig.id == 'task_1' @pytest.mark.asyncio @@ -1152,6 +1244,330 @@ async def consume_stream(): assert texts == ['Event 0', 'Event 1', 'Event 2'] +@pytest.mark.asyncio +async def test_list_task_push_notification_config_no_store(): + """Test on_list_task_push_notification_config when _push_config_store is None.""" + request_handler = DefaultRequestHandler( + agent_executor=DummyAgentExecutor(), + task_store=AsyncMock(spec=TaskStore), + push_config_store=None, # Explicitly None + ) + params = ListTaskPushNotificationConfigParams(id='task1') + from a2a.utils.errors import ServerError # Local import + + with pytest.raises(ServerError) as exc_info: + await request_handler.on_list_task_push_notification_config( + params, create_server_call_context() + ) + assert isinstance(exc_info.value.error, UnsupportedOperationError) + + +@pytest.mark.asyncio +async def test_list_task_push_notification_config_task_not_found(): + """Test on_list_task_push_notification_config when task is not found.""" + mock_task_store = AsyncMock(spec=TaskStore) + mock_task_store.get.return_value = None # Task not found + mock_push_store = AsyncMock(spec=PushNotificationConfigStore) + + request_handler = DefaultRequestHandler( + agent_executor=DummyAgentExecutor(), + task_store=mock_task_store, + push_config_store=mock_push_store, + ) + params = ListTaskPushNotificationConfigParams(id='non_existent_task') + from a2a.utils.errors import ServerError # Local import + + with pytest.raises(ServerError) as exc_info: + await request_handler.on_list_task_push_notification_config( + params, create_server_call_context() + ) + + assert isinstance(exc_info.value.error, TaskNotFoundError) + mock_task_store.get.assert_awaited_once_with('non_existent_task') + mock_push_store.get_info.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_list_no_task_push_notification_config_info(): + """Test on_get_task_push_notification_config when push_config_store.get_info returns []""" + mock_task_store = AsyncMock(spec=TaskStore) + + sample_task = create_sample_task(task_id='non_existent_task') + mock_task_store.get.return_value = sample_task + + push_store = InMemoryPushNotificationConfigStore() + + request_handler = DefaultRequestHandler( + agent_executor=DummyAgentExecutor(), + task_store=mock_task_store, + push_config_store=push_store, + ) + params = ListTaskPushNotificationConfigParams(id='non_existent_task') + + result = await request_handler.on_list_task_push_notification_config( + params, create_server_call_context() + ) + assert result == [] + + +@pytest.mark.asyncio +async def test_list_task_push_notification_config_info_with_config(): + """Test on_list_task_push_notification_config with push config+id""" + mock_task_store = AsyncMock(spec=TaskStore) + + sample_task = create_sample_task(task_id='non_existent_task') + mock_task_store.get.return_value = sample_task + + push_config1 = PushNotificationConfig( + id='config_1', url='http://example.com' + ) + push_config2 = PushNotificationConfig( + id='config_2', url='http://example.com' + ) + + push_store = InMemoryPushNotificationConfigStore() + await push_store.set_info('task_1', push_config1) + await push_store.set_info('task_1', push_config2) + + request_handler = DefaultRequestHandler( + agent_executor=DummyAgentExecutor(), + task_store=mock_task_store, + push_config_store=push_store, + ) + params = ListTaskPushNotificationConfigParams(id='task_1') + + result: list[ + TaskPushNotificationConfig + ] = await request_handler.on_list_task_push_notification_config( + params, create_server_call_context() + ) + + assert len(result) == 2 + assert result[0].taskId == 'task_1' + assert result[0].pushNotificationConfig == push_config1 + assert result[1].taskId == 'task_1' + assert result[1].pushNotificationConfig == push_config2 + + +@pytest.mark.asyncio +async def test_list_task_push_notification_config_info_with_config_and_no_id(): + """Test on_list_task_push_notification_config with no push config id""" + mock_task_store = AsyncMock(spec=TaskStore) + + push_store = InMemoryPushNotificationConfigStore() + + request_handler = DefaultRequestHandler( + agent_executor=DummyAgentExecutor(), + task_store=mock_task_store, + push_config_store=push_store, + ) + + # multiple calls without config id should replace the existing + set_config_params1 = TaskPushNotificationConfig( + taskId='task_1', + pushNotificationConfig=PushNotificationConfig( + url='http://1.example.com' + ), + ) + await request_handler.on_set_task_push_notification_config( + set_config_params1, create_server_call_context() + ) + + set_config_params2 = TaskPushNotificationConfig( + taskId='task_1', + pushNotificationConfig=PushNotificationConfig( + url='http://2.example.com' + ), + ) + await request_handler.on_set_task_push_notification_config( + set_config_params2, create_server_call_context() + ) + + params = ListTaskPushNotificationConfigParams(id='task_1') + + result: list[ + TaskPushNotificationConfig + ] = await request_handler.on_list_task_push_notification_config( + params, create_server_call_context() + ) + + assert len(result) == 1 + assert result[0].taskId == 'task_1' + assert ( + result[0].pushNotificationConfig.url + == set_config_params2.pushNotificationConfig.url + ) + assert result[0].pushNotificationConfig.id == 'task_1' + + +@pytest.mark.asyncio +async def test_delete_task_push_notification_config_no_store(): + """Test on_delete_task_push_notification_config when _push_config_store is None.""" + request_handler = DefaultRequestHandler( + agent_executor=DummyAgentExecutor(), + task_store=AsyncMock(spec=TaskStore), + push_config_store=None, # Explicitly None + ) + params = DeleteTaskPushNotificationConfigParams( + id='task1', pushNotificationConfigId='config1' + ) + from a2a.utils.errors import ServerError # Local import + + with pytest.raises(ServerError) as exc_info: + await request_handler.on_delete_task_push_notification_config( + params, create_server_call_context() + ) + assert isinstance(exc_info.value.error, UnsupportedOperationError) + + +@pytest.mark.asyncio +async def test_delete_task_push_notification_config_task_not_found(): + """Test on_delete_task_push_notification_config when task is not found.""" + mock_task_store = AsyncMock(spec=TaskStore) + mock_task_store.get.return_value = None # Task not found + mock_push_store = AsyncMock(spec=PushNotificationConfigStore) + + request_handler = DefaultRequestHandler( + agent_executor=DummyAgentExecutor(), + task_store=mock_task_store, + push_config_store=mock_push_store, + ) + params = DeleteTaskPushNotificationConfigParams( + id='non_existent_task', pushNotificationConfigId='config1' + ) + from a2a.utils.errors import ServerError # Local import + + with pytest.raises(ServerError) as exc_info: + await request_handler.on_delete_task_push_notification_config( + params, create_server_call_context() + ) + + assert isinstance(exc_info.value.error, TaskNotFoundError) + mock_task_store.get.assert_awaited_once_with('non_existent_task') + mock_push_store.get_info.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_delete_no_task_push_notification_config_info(): + """Test on_delete_task_push_notification_config without config info""" + mock_task_store = AsyncMock(spec=TaskStore) + + sample_task = create_sample_task(task_id='task_1') + mock_task_store.get.return_value = sample_task + + push_store = InMemoryPushNotificationConfigStore() + await push_store.set_info( + 'task_2', + PushNotificationConfig(id='config_1', url='http://example.com'), + ) + + request_handler = DefaultRequestHandler( + agent_executor=DummyAgentExecutor(), + task_store=mock_task_store, + push_config_store=push_store, + ) + params = DeleteTaskPushNotificationConfigParams( + id='task1', pushNotificationConfigId='config_non_existant' + ) + + result = await request_handler.on_delete_task_push_notification_config( + params, create_server_call_context() + ) + assert result == None + + params = DeleteTaskPushNotificationConfigParams( + id='task2', pushNotificationConfigId='config_non_existant' + ) + + result = await request_handler.on_delete_task_push_notification_config( + params, create_server_call_context() + ) + assert result == None + + +@pytest.mark.asyncio +async def test_delete_task_push_notification_config_info_with_config(): + """Test on_list_task_push_notification_config with push config+id""" + mock_task_store = AsyncMock(spec=TaskStore) + + sample_task = create_sample_task(task_id='non_existent_task') + mock_task_store.get.return_value = sample_task + + push_config1 = PushNotificationConfig( + id='config_1', url='http://example.com' + ) + push_config2 = PushNotificationConfig( + id='config_2', url='http://example.com' + ) + + push_store = InMemoryPushNotificationConfigStore() + await push_store.set_info('task_1', push_config1) + await push_store.set_info('task_1', push_config2) + await push_store.set_info('task_2', push_config1) + + request_handler = DefaultRequestHandler( + agent_executor=DummyAgentExecutor(), + task_store=mock_task_store, + push_config_store=push_store, + ) + params = DeleteTaskPushNotificationConfigParams( + id='task_1', pushNotificationConfigId='config_1' + ) + + result1 = await request_handler.on_delete_task_push_notification_config( + params, create_server_call_context() + ) + + assert result1 == None + + result2 = await request_handler.on_list_task_push_notification_config( + ListTaskPushNotificationConfigParams(id='task_1'), + create_server_call_context(), + ) + + assert len(result2) == 1 + assert result2[0].taskId == 'task_1' + assert result2[0].pushNotificationConfig == push_config2 + + +@pytest.mark.asyncio +async def test_delete_task_push_notification_config_info_with_config_and_no_id(): + """Test on_list_task_push_notification_config with no push config id""" + mock_task_store = AsyncMock(spec=TaskStore) + + sample_task = create_sample_task(task_id='non_existent_task') + mock_task_store.get.return_value = sample_task + + push_config = PushNotificationConfig(url='http://example.com') + + # insertion without id should replace the existing config + push_store = InMemoryPushNotificationConfigStore() + await push_store.set_info('task_1', push_config) + await push_store.set_info('task_1', push_config) + + request_handler = DefaultRequestHandler( + agent_executor=DummyAgentExecutor(), + task_store=mock_task_store, + push_config_store=push_store, + ) + params = DeleteTaskPushNotificationConfigParams( + id='task_1', pushNotificationConfigId='task_1' + ) + + result = await request_handler.on_delete_task_push_notification_config( + params, create_server_call_context() + ) + + assert result == None + + result2 = await request_handler.on_list_task_push_notification_config( + ListTaskPushNotificationConfigParams(id='task_1'), + create_server_call_context(), + ) + + assert len(result2) == 0 + + TERMINAL_TASK_STATES = { TaskState.completed, TaskState.canceled, diff --git a/tests/server/request_handlers/test_jsonrpc_handler.py b/tests/server/request_handlers/test_jsonrpc_handler.py index e3402648..de6c1453 100644 --- a/tests/server/request_handlers/test_jsonrpc_handler.py +++ b/tests/server/request_handlers/test_jsonrpc_handler.py @@ -16,7 +16,7 @@ from a2a.server.events import QueueManager from a2a.server.events.event_queue import EventQueue from a2a.server.request_handlers import DefaultRequestHandler, JSONRPCHandler -from a2a.server.tasks import InMemoryPushNotifier, PushNotifier, TaskStore +from a2a.server.tasks import TaskStore, InMemoryPushNotificationConfigStore, BasePushNotificationSender, PushNotificationConfigStore, PushNotificationSender from a2a.types import ( AgentCapabilities, AgentCard, @@ -55,6 +55,15 @@ TaskStatusUpdateEvent, TextPart, UnsupportedOperationError, + GetTaskPushNotificationConfigParams, + ListTaskPushNotificationConfigRequest, + ListTaskPushNotificationConfigResponse, + ListTaskPushNotificationConfigSuccessResponse, + ListTaskPushNotificationConfigParams, + DeleteTaskPushNotificationConfigParams, + DeleteTaskPushNotificationConfigRequest, + DeleteTaskPushNotificationConfigResponse, + DeleteTaskPushNotificationConfigSuccessResponse, ) from a2a.utils.errors import ServerError @@ -427,11 +436,12 @@ async def streaming_coro(): async def test_set_push_notification_success(self) -> None: mock_agent_executor = AsyncMock(spec=AgentExecutor) mock_task_store = AsyncMock(spec=TaskStore) - mock_push_notifier = AsyncMock(spec=PushNotifier) + mock_push_notification_store = AsyncMock(spec=PushNotificationConfigStore) + request_handler = DefaultRequestHandler( mock_agent_executor, mock_task_store, - push_notifier=mock_push_notifier, + push_config_store=mock_push_notification_store, ) self.mock_agent_card.capabilities = AgentCapabilities( streaming=True, pushNotifications=True @@ -449,23 +459,22 @@ async def test_set_push_notification_success(self) -> None: id='1', params=task_push_config ) response: SetTaskPushNotificationConfigResponse = ( - await handler.set_push_notification(request) + await handler.set_push_notification_config(request) ) self.assertIsInstance( response.root, SetTaskPushNotificationConfigSuccessResponse ) assert response.root.result == task_push_config # type: ignore - mock_push_notifier.set_info.assert_called_once_with( + mock_push_notification_store.set_info.assert_called_once_with( mock_task.id, task_push_config.pushNotificationConfig ) async def test_get_push_notification_success(self) -> None: mock_agent_executor = AsyncMock(spec=AgentExecutor) - mock_task_store = AsyncMock(spec=TaskStore) - mock_httpx_client = AsyncMock(spec=httpx.AsyncClient) - push_notifier = InMemoryPushNotifier(httpx_client=mock_httpx_client) + mock_task_store = AsyncMock(spec=TaskStore) + push_notification_store = InMemoryPushNotificationConfigStore() request_handler = DefaultRequestHandler( - mock_agent_executor, mock_task_store, push_notifier=push_notifier + mock_agent_executor, mock_task_store, push_config_store=push_notification_store ) self.mock_agent_card.capabilities = AgentCapabilities( streaming=True, pushNotifications=True @@ -482,7 +491,7 @@ async def test_get_push_notification_success(self) -> None: request = SetTaskPushNotificationConfigRequest( id='1', params=task_push_config ) - await handler.set_push_notification(request) + await handler.set_push_notification_config(request) get_request: GetTaskPushNotificationConfigRequest = ( GetTaskPushNotificationConfigRequest( @@ -490,7 +499,7 @@ async def test_get_push_notification_success(self) -> None: ) ) get_response: GetTaskPushNotificationConfigResponse = ( - await handler.get_push_notification(get_request) + await handler.get_push_notification_config(get_request) ) self.assertIsInstance( get_response.root, GetTaskPushNotificationConfigSuccessResponse @@ -506,9 +515,10 @@ async def test_on_message_stream_new_message_send_push_notification_success( mock_agent_executor = AsyncMock(spec=AgentExecutor) mock_task_store = AsyncMock(spec=TaskStore) mock_httpx_client = AsyncMock(spec=httpx.AsyncClient) - push_notifier = InMemoryPushNotifier(httpx_client=mock_httpx_client) + push_notification_store = InMemoryPushNotificationConfigStore() + push_notification_sender = BasePushNotificationSender(mock_httpx_client, push_notification_store) request_handler = DefaultRequestHandler( - mock_agent_executor, mock_task_store, push_notifier=push_notifier + mock_agent_executor, mock_task_store, push_config_store=push_notification_store, push_sender=push_notification_sender ) self.mock_agent_card.capabilities = AgentCapabilities( streaming=True, pushNotifications=True @@ -714,7 +724,7 @@ async def test_streaming_not_supported_error( pass self.assertEqual( - str(context.exception.error.message), + str(context.exception.error.message), # type: ignore 'Streaming is not supported by the agent', ) @@ -745,14 +755,14 @@ async def test_push_notifications_not_supported_error(self) -> None: # Should raise ServerError about push notifications not supported with self.assertRaises(ServerError) as context: - await handler.set_push_notification(request) + await handler.set_push_notification_config(request) self.assertEqual( - str(context.exception.error.message), + str(context.exception.error.message), # type: ignore 'Push notifications are not supported by the agent', ) - async def test_on_get_push_notification_no_push_notifier(self) -> None: + async def test_on_get_push_notification_no_push_config_store(self) -> None: """Test get_push_notification with no push notifier configured.""" # Arrange mock_agent_executor = AsyncMock(spec=AgentExecutor) @@ -773,13 +783,13 @@ async def test_on_get_push_notification_no_push_notifier(self) -> None: get_request = GetTaskPushNotificationConfigRequest( id='1', params=TaskIdParams(id=mock_task.id) ) - response = await handler.get_push_notification(get_request) + response = await handler.get_push_notification_config(get_request) # Assert self.assertIsInstance(response.root, JSONRPCErrorResponse) self.assertEqual(response.root.error, UnsupportedOperationError()) # type: ignore - async def test_on_set_push_notification_no_push_notifier(self) -> None: + async def test_on_set_push_notification_no_push_config_store(self) -> None: """Test set_push_notification with no push notifier configured.""" # Arrange mock_agent_executor = AsyncMock(spec=AgentExecutor) @@ -806,7 +816,7 @@ async def test_on_set_push_notification_no_push_notifier(self) -> None: request = SetTaskPushNotificationConfigRequest( id='1', params=task_push_config ) - response = await handler.set_push_notification(request) + response = await handler.set_push_notification_config(request) # Assert self.assertIsInstance(response.root, JSONRPCErrorResponse) @@ -885,7 +895,8 @@ async def test_default_request_handler_with_custom_components(self) -> None: mock_agent_executor = AsyncMock(spec=AgentExecutor) mock_task_store = AsyncMock(spec=TaskStore) mock_queue_manager = AsyncMock(spec=QueueManager) - mock_push_notifier = AsyncMock(spec=PushNotifier) + mock_push_config_store = AsyncMock(spec=PushNotificationConfigStore) + mock_push_sender = AsyncMock(spec=PushNotificationSender) mock_request_context_builder = AsyncMock(spec=RequestContextBuilder) # Act @@ -893,7 +904,8 @@ async def test_default_request_handler_with_custom_components(self) -> None: agent_executor=mock_agent_executor, task_store=mock_task_store, queue_manager=mock_queue_manager, - push_notifier=mock_push_notifier, + push_config_store=mock_push_config_store, + push_sender=mock_push_sender, request_context_builder=mock_request_context_builder, ) @@ -901,7 +913,8 @@ async def test_default_request_handler_with_custom_components(self) -> None: self.assertEqual(handler.agent_executor, mock_agent_executor) self.assertEqual(handler.task_store, mock_task_store) self.assertEqual(handler._queue_manager, mock_queue_manager) - self.assertEqual(handler._push_notifier, mock_push_notifier) + self.assertEqual(handler._push_config_store, mock_push_config_store) + self.assertEqual(handler._push_sender, mock_push_sender) self.assertEqual( handler._request_context_builder, mock_request_context_builder ) @@ -944,7 +957,7 @@ async def consume_raises_error(*args, **kwargs) -> NoReturn: # Assert self.assertIsInstance(response.root, JSONRPCErrorResponse) - self.assertEqual(response.root.error, UnsupportedOperationError()) + self.assertEqual(response.root.error, UnsupportedOperationError()) # type: ignore async def test_on_message_send_task_id_mismatch(self) -> None: mock_agent_executor = AsyncMock(spec=AgentExecutor) @@ -1008,3 +1021,121 @@ async def streaming_coro(): collected_events[0].root, JSONRPCErrorResponse ) self.assertIsInstance(collected_events[0].root.error, InternalError) + + async def test_on_get_push_notification(self) -> None: + """Test get_push_notification_config handling""" + mock_task_store = AsyncMock(spec=TaskStore) + + mock_task = Task(**MINIMAL_TASK) + mock_task_store.get.return_value = mock_task + + + # Create request handler without a push notifier + request_handler = AsyncMock(spec=DefaultRequestHandler) + task_push_config = TaskPushNotificationConfig(taskId=mock_task.id, pushNotificationConfig=PushNotificationConfig(id="config1", url='http://example.com')) + request_handler.on_get_task_push_notification_config.return_value = task_push_config + + self.mock_agent_card.capabilities = AgentCapabilities( + pushNotifications=True + ) + handler = JSONRPCHandler(self.mock_agent_card, request_handler) + list_request = GetTaskPushNotificationConfigRequest( + id='1', params=GetTaskPushNotificationConfigParams(id=mock_task.id, pushNotificationConfigId="config1") + ) + response = await handler.get_push_notification_config(list_request) + # Assert + self.assertIsInstance(response.root, GetTaskPushNotificationConfigSuccessResponse) + self.assertEqual(response.root.result, task_push_config) # type: ignore + + async def test_on_list_push_notification(self) -> None: + """Test list_push_notification_config handling""" + mock_task_store = AsyncMock(spec=TaskStore) + + mock_task = Task(**MINIMAL_TASK) + mock_task_store.get.return_value = mock_task + + + # Create request handler without a push notifier + request_handler = AsyncMock(spec=DefaultRequestHandler) + task_push_config = TaskPushNotificationConfig(taskId=mock_task.id, pushNotificationConfig=PushNotificationConfig(url='http://example.com')) + request_handler.on_list_task_push_notification_config.return_value = [task_push_config] + + self.mock_agent_card.capabilities = AgentCapabilities( + pushNotifications=True + ) + handler = JSONRPCHandler(self.mock_agent_card, request_handler) + list_request = ListTaskPushNotificationConfigRequest( + id='1', params=ListTaskPushNotificationConfigParams(id=mock_task.id) + ) + response = await handler.list_push_notification_config(list_request) + # Assert + self.assertIsInstance(response.root, ListTaskPushNotificationConfigSuccessResponse) + self.assertEqual(response.root.result, [task_push_config]) # type: ignore + + async def test_on_list_push_notification_error(self) -> None: + """Test list_push_notification_config handling""" + mock_task_store = AsyncMock(spec=TaskStore) + + mock_task = Task(**MINIMAL_TASK) + mock_task_store.get.return_value = mock_task + + + # Create request handler without a push notifier + request_handler = AsyncMock(spec=DefaultRequestHandler) + task_push_config = TaskPushNotificationConfig(taskId=mock_task.id, pushNotificationConfig=PushNotificationConfig(url='http://example.com')) + # throw server error + request_handler.on_list_task_push_notification_config.side_effect = ServerError(InternalError()) + + + self.mock_agent_card.capabilities = AgentCapabilities( + pushNotifications=True + ) + handler = JSONRPCHandler(self.mock_agent_card, request_handler) + list_request = ListTaskPushNotificationConfigRequest( + id='1', params=ListTaskPushNotificationConfigParams(id=mock_task.id) + ) + response = await handler.list_push_notification_config(list_request) + # Assert + self.assertIsInstance(response.root, JSONRPCErrorResponse) + self.assertEqual(response.root.error, InternalError()) # type: ignore + + async def test_on_delete_push_notification(self) -> None: + """Test delete_push_notification_config handling""" + + # Create request handler without a push notifier + request_handler = AsyncMock(spec=DefaultRequestHandler) + request_handler.on_delete_task_push_notification_config.return_value = None + + self.mock_agent_card.capabilities = AgentCapabilities( + pushNotifications=True + ) + handler = JSONRPCHandler(self.mock_agent_card, request_handler) + delete_request = DeleteTaskPushNotificationConfigRequest( + id='1', params=DeleteTaskPushNotificationConfigParams(id="task1", pushNotificationConfigId="config1") + ) + response = await handler.delete_push_notification_config(delete_request) + # Assert + self.assertIsInstance(response.root, DeleteTaskPushNotificationConfigSuccessResponse) + self.assertEqual(response.root.result, None) # type: ignore + + async def test_on_delete_push_notification_error(self) -> None: + """Test delete_push_notification_config error handling""" + + + # Create request handler without a push notifier + request_handler = AsyncMock(spec=DefaultRequestHandler) + # throw server error + request_handler.on_delete_task_push_notification_config.side_effect = ServerError(UnsupportedOperationError()) + + + self.mock_agent_card.capabilities = AgentCapabilities( + pushNotifications=True + ) + handler = JSONRPCHandler(self.mock_agent_card, request_handler) + delete_request = DeleteTaskPushNotificationConfigRequest( + id='1', params=DeleteTaskPushNotificationConfigParams(id="task1", pushNotificationConfigId="config1") + ) + response = await handler.delete_push_notification_config(delete_request) + # Assert + self.assertIsInstance(response.root, JSONRPCErrorResponse) + self.assertEqual(response.root.error, UnsupportedOperationError()) # type: ignore \ No newline at end of file diff --git a/tests/server/tasks/test_inmemory_push_notifier.py b/tests/server/tasks/test_inmemory_push_notifications.py similarity index 70% rename from tests/server/tasks/test_inmemory_push_notifier.py rename to tests/server/tasks/test_inmemory_push_notifications.py index 9c43df5f..586c7863 100644 --- a/tests/server/tasks/test_inmemory_push_notifier.py +++ b/tests/server/tasks/test_inmemory_push_notifications.py @@ -4,7 +4,12 @@ import httpx -from a2a.server.tasks.inmemory_push_notifier import InMemoryPushNotifier +from a2a.server.tasks.inmemory_push_notification_config_store import ( + InMemoryPushNotificationConfigStore, +) +from a2a.server.tasks.base_push_notification_sender import ( + BasePushNotificationSender, +) from a2a.types import PushNotificationConfig, Task, TaskState, TaskStatus @@ -29,8 +34,9 @@ def create_sample_push_config( class TestInMemoryPushNotifier(unittest.IsolatedAsyncioTestCase): def setUp(self): self.mock_httpx_client = AsyncMock(spec=httpx.AsyncClient) - self.notifier = InMemoryPushNotifier( - httpx_client=self.mock_httpx_client + self.config_store = InMemoryPushNotificationConfigStore() + self.notifier = BasePushNotificationSender( + httpx_client=self.mock_httpx_client, config_store=self.config_store ) # Corrected argument name def test_constructor_stores_client(self): @@ -40,73 +46,98 @@ async def test_set_info_adds_new_config(self): task_id = 'task_new' config = create_sample_push_config(url='http://new.url/callback') - await self.notifier.set_info(task_id, config) + await self.config_store.set_info(task_id, config) - self.assertIn(task_id, self.notifier._push_notification_infos) + self.assertIn(task_id, self.config_store._push_notification_infos) self.assertEqual( - self.notifier._push_notification_infos[task_id], config + self.config_store._push_notification_infos[task_id], [config] ) - async def test_set_info_updates_existing_config(self): + async def test_set_info_appends_to_existing_config(self): task_id = 'task_update' initial_config = create_sample_push_config( url='http://initial.url/callback', config_id='cfg_initial' ) - await self.notifier.set_info(task_id, initial_config) + await self.config_store.set_info(task_id, initial_config) updated_config = create_sample_push_config( url='http://updated.url/callback', config_id='cfg_updated' ) - await self.notifier.set_info(task_id, updated_config) + await self.config_store.set_info(task_id, updated_config) - self.assertIn(task_id, self.notifier._push_notification_infos) + self.assertIn(task_id, self.config_store._push_notification_infos) self.assertEqual( - self.notifier._push_notification_infos[task_id], updated_config + self.config_store._push_notification_infos[task_id][0], + initial_config, ) - self.assertNotEqual( - self.notifier._push_notification_infos[task_id], initial_config + self.assertEqual( + self.config_store._push_notification_infos[task_id][1], + updated_config, + ) + + async def test_set_info_without_config_id(self): + task_id = 'task1' + initial_config = PushNotificationConfig( + url='http://initial.url/callback' + ) + await self.config_store.set_info(task_id, initial_config) + + assert ( + self.config_store._push_notification_infos[task_id][0].id == task_id + ) + + updated_config = PushNotificationConfig( + url='http://initial.url/callback_new' + ) + await self.config_store.set_info(task_id, updated_config) + + self.assertIn(task_id, self.config_store._push_notification_infos) + assert len(self.config_store._push_notification_infos[task_id]) == 1 + self.assertEqual( + self.config_store._push_notification_infos[task_id][0].url, + updated_config.url, ) async def test_get_info_existing_config(self): task_id = 'task_get_exist' config = create_sample_push_config(url='http://get.this/callback') - await self.notifier.set_info(task_id, config) + await self.config_store.set_info(task_id, config) - retrieved_config = await self.notifier.get_info(task_id) - self.assertEqual(retrieved_config, config) + retrieved_config = await self.config_store.get_info(task_id) + self.assertEqual(retrieved_config, [config]) async def test_get_info_non_existent_config(self): task_id = 'task_get_non_exist' - retrieved_config = await self.notifier.get_info(task_id) - self.assertIsNone(retrieved_config) + retrieved_config = await self.config_store.get_info(task_id) + assert retrieved_config == [] async def test_delete_info_existing_config(self): task_id = 'task_delete_exist' config = create_sample_push_config(url='http://delete.this/callback') - await self.notifier.set_info(task_id, config) + await self.config_store.set_info(task_id, config) - self.assertIn(task_id, self.notifier._push_notification_infos) - await self.notifier.delete_info(task_id) - self.assertNotIn(task_id, self.notifier._push_notification_infos) + self.assertIn(task_id, self.config_store._push_notification_infos) + await self.config_store.delete_info(task_id, config_id=config.id) + self.assertNotIn(task_id, self.config_store._push_notification_infos) async def test_delete_info_non_existent_config(self): task_id = 'task_delete_non_exist' # Ensure it doesn't raise an error try: - await self.notifier.delete_info(task_id) + await self.config_store.delete_info(task_id) except Exception as e: self.fail( f'delete_info raised {e} unexpectedly for nonexistent task_id' ) self.assertNotIn( - task_id, self.notifier._push_notification_infos + task_id, self.config_store._push_notification_infos ) # Should still not be there async def test_send_notification_success(self): task_id = 'task_send_success' task_data = create_sample_task(task_id=task_id) config = create_sample_push_config(url='http://notify.me/here') - await self.notifier.set_info(task_id, config) + await self.config_store.set_info(task_id, config) # Mock the post call to simulate success mock_response = AsyncMock(spec=httpx.Response) @@ -135,14 +166,14 @@ async def test_send_notification_no_config(self): self.mock_httpx_client.post.assert_not_called() - @patch('a2a.server.tasks.inmemory_push_notifier.logger') + @patch('a2a.server.tasks.base_push_notification_sender.logger') async def test_send_notification_http_status_error( self, mock_logger: MagicMock ): task_id = 'task_send_http_err' task_data = create_sample_task(task_id=task_id) config = create_sample_push_config(url='http://notify.me/http_error') - await self.notifier.set_info(task_id, config) + await self.config_store.set_info(task_id, config) mock_response = MagicMock( spec=httpx.Response @@ -165,14 +196,14 @@ async def test_send_notification_http_status_error( ) self.assertIn(str(http_error), mock_logger.error.call_args[0][0]) - @patch('a2a.server.tasks.inmemory_push_notifier.logger') + @patch('a2a.server.tasks.base_push_notification_sender.logger') async def test_send_notification_request_error( self, mock_logger: MagicMock ): task_id = 'task_send_req_err' task_data = create_sample_task(task_id=task_id) config = create_sample_push_config(url='http://notify.me/req_error') - await self.notifier.set_info(task_id, config) + await self.config_store.set_info(task_id, config) request_error = httpx.RequestError('Network issue', request=MagicMock()) self.mock_httpx_client.post.side_effect = request_error @@ -186,7 +217,7 @@ async def test_send_notification_request_error( ) self.assertIn(str(request_error), mock_logger.error.call_args[0][0]) - @patch('a2a.server.tasks.inmemory_push_notifier.logger') + @patch('a2a.server.tasks.base_push_notification_sender.logger') async def test_send_notification_with_auth(self, mock_logger: MagicMock): task_id = 'task_send_auth' task_data = create_sample_task(task_id=task_id) @@ -205,7 +236,7 @@ async def test_send_notification_with_auth(self, mock_logger: MagicMock): # Given the current implementation of InMemoryPushNotifier, # it only supports basic auth via tuple. - await self.notifier.set_info(task_id, config) + await self.config_store.set_info(task_id, config) mock_response = AsyncMock(spec=httpx.Response) mock_response.status_code = 200 diff --git a/tests/server/tasks/test_push_notification_sender.py b/tests/server/tasks/test_push_notification_sender.py new file mode 100644 index 00000000..4a11680d --- /dev/null +++ b/tests/server/tasks/test_push_notification_sender.py @@ -0,0 +1,133 @@ +from a2a.server.tasks.base_push_notification_sender import ( + BasePushNotificationSender, +) + +import unittest + +from unittest.mock import AsyncMock, MagicMock, patch + +import httpx + +from a2a.types import ( + PushNotificationConfig, + Task, + TaskState, + TaskStatus, +) + + +def create_sample_task(task_id='task123', status_state=TaskState.completed): + return Task( + id=task_id, + contextId='ctx456', + status=TaskStatus(state=status_state), + ) + + +def create_sample_push_config( + url='http://example.com/callback', config_id='cfg1' +): + return PushNotificationConfig(id=config_id, url=url) + + +class TestBasePushNotificationSender(unittest.IsolatedAsyncioTestCase): + def setUp(self): + self.mock_httpx_client = AsyncMock(spec=httpx.AsyncClient) + self.mock_config_store = AsyncMock() + self.sender = BasePushNotificationSender( + httpx_client=self.mock_httpx_client, + config_store=self.mock_config_store, + ) + + def test_constructor_stores_client_and_config_store(self): + self.assertEqual(self.sender._client, self.mock_httpx_client) + self.assertEqual(self.sender._config_store, self.mock_config_store) + + async def test_send_notification_success(self): + task_id = 'task_send_success' + task_data = create_sample_task(task_id=task_id) + config = create_sample_push_config(url='http://notify.me/here') + self.mock_config_store.get_info.return_value = [config] + + mock_response = AsyncMock(spec=httpx.Response) + mock_response.status_code = 200 + self.mock_httpx_client.post.return_value = mock_response + + await self.sender.send_notification(task_data) + + self.mock_config_store.get_info.assert_awaited_once_with + + # assert httpx_client post method got invoked with right parameters + self.mock_httpx_client.post.assert_awaited_once_with( + config.url, + json=task_data.model_dump(mode='json', exclude_none=True), + ) + mock_response.raise_for_status.assert_called_once() + + async def test_send_notification_no_config(self): + task_id = 'task_send_no_config' + task_data = create_sample_task(task_id=task_id) + self.mock_config_store.get_info.return_value = [] + + await self.sender.send_notification(task_data) + + self.mock_config_store.get_info.assert_awaited_once_with(task_id) + self.mock_httpx_client.post.assert_not_called() + + @patch('a2a.server.tasks.base_push_notification_sender.logger') + async def test_send_notification_http_status_error( + self, mock_logger: MagicMock + ): + task_id = 'task_send_http_err' + task_data = create_sample_task(task_id=task_id) + config = create_sample_push_config(url='http://notify.me/http_error') + self.mock_config_store.get_info.return_value = [config] + + mock_response = MagicMock(spec=httpx.Response) + mock_response.status_code = 404 + mock_response.text = 'Not Found' + http_error = httpx.HTTPStatusError( + 'Not Found', request=MagicMock(), response=mock_response + ) + self.mock_httpx_client.post.side_effect = http_error + + await self.sender.send_notification(task_data) + + self.mock_config_store.get_info.assert_awaited_once_with(task_id) + self.mock_httpx_client.post.assert_awaited_once_with( + config.url, + json=task_data.model_dump(mode='json', exclude_none=True), + ) + mock_logger.error.assert_called_once() + + async def test_send_notification_multiple_configs(self): + task_id = 'task_multiple_configs' + task_data = create_sample_task(task_id=task_id) + config1 = create_sample_push_config( + url='http://notify.me/cfg1', config_id='cfg1' + ) + config2 = create_sample_push_config( + url='http://notify.me/cfg2', config_id='cfg2' + ) + self.mock_config_store.get_info.return_value = [config1, config2] + + mock_response = AsyncMock(spec=httpx.Response) + mock_response.status_code = 200 + self.mock_httpx_client.post.return_value = mock_response + + await self.sender.send_notification(task_data) + + self.mock_config_store.get_info.assert_awaited_once_with(task_id) + self.assertEqual(self.mock_httpx_client.post.call_count, 2) + + # Check calls for config1 + self.mock_httpx_client.post.assert_any_call( + config1.url, + json=task_data.model_dump(mode='json', exclude_none=True), + ) + # Check calls for config2 + self.mock_httpx_client.post.assert_any_call( + config2.url, + json=task_data.model_dump(mode='json', exclude_none=True), + ) + mock_response.raise_for_status.call_count = 2 diff --git a/tests/test_types.py b/tests/test_types.py index 2c0843e7..138bd927 100644 --- a/tests/test_types.py +++ b/tests/test_types.py @@ -71,6 +71,7 @@ TaskStatusUpdateEvent, TextPart, UnsupportedOperationError, + GetTaskPushNotificationConfigParams ) @@ -1496,3 +1497,33 @@ def test_subclass_enums() -> None: assert Role.user == 'user' assert TaskState.working == 'working' + + +def test_get_task_push_config_params() -> None: + """Tests successful validation of GetTaskPushNotificationConfigParams.""" + # Minimal valid data + params = { + "id":"task-1234" + } + TaskIdParams.model_validate(params) + GetTaskPushNotificationConfigParams.model_validate(params) + +def test_use_get_task_push_notification_params_for_request() -> None: + # GetTaskPushNotificationConfigRequest + get_push_notif_req_data: dict[str, Any] = { + 'id': 1, + 'jsonrpc': '2.0', + 'method': 'tasks/pushNotificationConfig/get', + 'params': { + "id":"task-1234", + "pushNotificationConfigId":"c1" + } + } + a2a_req_get_push_req = A2ARequest.model_validate(get_push_notif_req_data) + assert isinstance( + a2a_req_get_push_req.root, GetTaskPushNotificationConfigRequest + ) + assert isinstance(a2a_req_get_push_req.root.params, GetTaskPushNotificationConfigParams) + assert ( + a2a_req_get_push_req.root.method == 'tasks/pushNotificationConfig/get' + ) \ No newline at end of file