Skip to content
16 changes: 14 additions & 2 deletions src/a2a/server/apps/jsonrpc/jsonrpc_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
A2ARequest,
AgentCard,
CancelTaskRequest,
DeleteTaskPushNotificationConfigRequest,
GetTaskPushNotificationConfigRequest,
GetTaskRequest,
InternalError,
Expand All @@ -33,6 +34,7 @@
JSONRPCError,
JSONRPCErrorResponse,
JSONRPCResponse,
ListTaskPushNotificationConfigRequest,
SendMessageRequest,
SendStreamingMessageRequest,
SendStreamingMessageResponse,
Expand Down Expand Up @@ -297,12 +299,22 @@ async def _process_non_streaming_request(
request_obj, context
)
case SetTaskPushNotificationConfigRequest():
handler_result = await self.handler.set_push_notification(
handler_result = await self.handler.set_push_notification_config(
request_obj,
context,
)
case GetTaskPushNotificationConfigRequest():
handler_result = await self.handler.get_push_notification(
handler_result = await self.handler.get_push_notification_config(
request_obj,
context,
)
case ListTaskPushNotificationConfigRequest():
handler_result = await self.handler.list_push_notification_config(
request_obj,
context,
)
case DeleteTaskPushNotificationConfigRequest():
handler_result = await self.handler.delete_push_notification_config(
request_obj,
context,
)
Expand Down
100 changes: 78 additions & 22 deletions src/a2a/server/request_handlers/default_request_handler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
import logging
import uuid

from collections.abc import AsyncGenerator
from typing import cast
Expand All @@ -21,15 +20,18 @@
)
from a2a.server.request_handlers.request_handler import RequestHandler
from a2a.server.tasks import (
PushNotifier,
PushNotificationConfigStore,
PushNotificationSender,
ResultAggregator,
TaskManager,
TaskStore,
)
from a2a.types import (
DeleteTaskPushNotificationConfigParams,
GetTaskPushNotificationConfigParams,
InternalError,
InvalidParamsError,
ListTaskPushNotificationConfigParams,
Message,
MessageSendConfiguration,
MessageSendParams,
Expand Down Expand Up @@ -67,12 +69,13 @@ class DefaultRequestHandler(RequestHandler):

_running_agents: dict[str, asyncio.Task]

def __init__(
def __init__( # noqa: PLR0913
self,
agent_executor: AgentExecutor,
task_store: TaskStore,
queue_manager: QueueManager | None = None,
push_notifier: PushNotifier | None = None,
push_config_store: PushNotificationConfigStore | None = None,
push_sender: PushNotificationSender | None = None,
request_context_builder: RequestContextBuilder | None = None,
) -> None:
"""Initializes the DefaultRequestHandler.
Expand All @@ -81,14 +84,16 @@ def __init__(
agent_executor: The `AgentExecutor` instance to run agent logic.
task_store: The `TaskStore` instance to manage task persistence.
queue_manager: The `QueueManager` instance to manage event queues. Defaults to `InMemoryQueueManager`.
push_notifier: The `PushNotifier` instance for sending push notifications. Defaults to None.
push_config_store: The `PushNotificationConfigStore` instance for managing push notification configurations. Defaults to None.
push_sender: The `PushNotificationSender` instance for sending push notifications. Defaults to None.
request_context_builder: The `RequestContextBuilder` instance used
to build request contexts. Defaults to `SimpleRequestContextBuilder`.
"""
self.agent_executor = agent_executor
self.task_store = task_store
self._queue_manager = queue_manager or InMemoryQueueManager()
self._push_notifier = push_notifier
self._push_config_store = push_config_store
self._push_sender = push_sender
self._request_context_builder = (
request_context_builder
or SimpleRequestContextBuilder(
Expand Down Expand Up @@ -198,15 +203,15 @@ async def _setup_message_execution(

task = task_manager.update_with_message(params.message, task)
if self.should_add_push_info(params):
assert isinstance(self._push_notifier, PushNotifier)
assert self._push_config_store is not None
assert isinstance(
params.configuration, MessageSendConfiguration
)
assert isinstance(
params.configuration.pushNotificationConfig,
PushNotificationConfig,
)
await self._push_notifier.set_info(
await self._push_config_store.set_info(
task.id, params.configuration.pushNotificationConfig
)

Expand Down Expand Up @@ -247,10 +252,10 @@ async def _send_push_notification_if_needed(
self, task_id: str, result_aggregator: ResultAggregator
) -> None:
"""Sends push notification if configured and task is available."""
if self._push_notifier and task_id:
if self._push_sender and task_id:
latest_task = await result_aggregator.current_result
if isinstance(latest_task, Task):
await self._push_notifier.send_notification(latest_task)
await self._push_sender.send_notification(latest_task)

async def on_message_send(
self,
Expand Down Expand Up @@ -329,11 +334,11 @@ async def on_message_send_stream(
self._validate_task_id_match(task_id, event.id)

if (
self._push_notifier
self._push_config_store
and params.configuration
and params.configuration.pushNotificationConfig
):
await self._push_notifier.set_info(
await self._push_config_store.set_info(
task_id,
params.configuration.pushNotificationConfig,
)
Expand Down Expand Up @@ -372,16 +377,14 @@ async def on_set_task_push_notification_config(

Requires a `PushNotifier` to be configured.
"""
if not self._push_notifier:
if not self._push_config_store:
raise ServerError(error=UnsupportedOperationError())

task: Task | None = await self.task_store.get(params.taskId)
if not task:
raise ServerError(error=TaskNotFoundError())

# Generate a unique id for the notification
params.pushNotificationConfig.id = str(uuid.uuid4())
await self._push_notifier.set_info(
await self._push_config_store.set_info(
params.taskId,
params.pushNotificationConfig,
)
Expand All @@ -395,21 +398,23 @@ async def on_get_task_push_notification_config(
) -> TaskPushNotificationConfig:
"""Default handler for 'tasks/pushNotificationConfig/get'.

Requires a `PushNotifier` to be configured.
Requires a `PushConfigStore` to be configured.
"""
if not self._push_notifier:
if not self._push_config_store:
raise ServerError(error=UnsupportedOperationError())

task: Task | None = await self.task_store.get(params.id)
if not task:
raise ServerError(error=TaskNotFoundError())

push_notification_config = await self._push_notifier.get_info(params.id)
if not push_notification_config:
push_notification_config = await self._push_config_store.get_info(
params.id
)
if not push_notification_config or not push_notification_config[0]:
raise ServerError(error=InternalError())

return TaskPushNotificationConfig(
taskId=params.id, pushNotificationConfig=push_notification_config
taskId=params.id, pushNotificationConfig=push_notification_config[0]
)

async def on_resubscribe_to_task(
Expand Down Expand Up @@ -450,10 +455,61 @@ async def on_resubscribe_to_task(
async for event in result_aggregator.consume_and_emit(consumer):
yield event

async def on_list_task_push_notification_config(
self,
params: ListTaskPushNotificationConfigParams,
context: ServerCallContext | None = None,
) -> list[TaskPushNotificationConfig]:
"""Default handler for 'tasks/pushNotificationConfig/list'.

Requires a `PushConfigStore` to be configured.
"""
if not self._push_config_store:
raise ServerError(error=UnsupportedOperationError())

task: Task | None = await self.task_store.get(params.id)
if not task:
raise ServerError(error=TaskNotFoundError())

push_notification_config_list = await self._push_config_store.get_info(
params.id
)

task_push_notification_config = []
if push_notification_config_list:
for config in push_notification_config_list:
task_push_notification_config.append(
TaskPushNotificationConfig(
taskId=params.id, pushNotificationConfig=config
)
)

return task_push_notification_config

async def on_delete_task_push_notification_config(
self,
params: DeleteTaskPushNotificationConfigParams,
context: ServerCallContext | None = None,
) -> None:
"""Default handler for 'tasks/pushNotificationConfig/delete'.

Requires a `PushConfigStore` to be configured.
"""
if not self._push_config_store:
raise ServerError(error=UnsupportedOperationError())

task: Task | None = await self.task_store.get(params.id)
if not task:
raise ServerError(error=TaskNotFoundError())

await self._push_config_store.delete_info(
params.id, params.pushNotificationConfigId
)

def should_add_push_info(self, params: MessageSendParams) -> bool:
"""Determines if push notification info should be set for a task."""
return bool(
self._push_notifier
self._push_config_store
and params.configuration
and params.configuration.pushNotificationConfig
)
76 changes: 74 additions & 2 deletions src/a2a/server/request_handlers/jsonrpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
CancelTaskRequest,
CancelTaskResponse,
CancelTaskSuccessResponse,
DeleteTaskPushNotificationConfigRequest,
DeleteTaskPushNotificationConfigResponse,
DeleteTaskPushNotificationConfigSuccessResponse,
GetTaskPushNotificationConfigRequest,
GetTaskPushNotificationConfigResponse,
GetTaskPushNotificationConfigSuccessResponse,
Expand All @@ -18,6 +21,9 @@
GetTaskSuccessResponse,
InternalError,
JSONRPCErrorResponse,
ListTaskPushNotificationConfigRequest,
ListTaskPushNotificationConfigResponse,
ListTaskPushNotificationConfigSuccessResponse,
Message,
SendMessageRequest,
SendMessageResponse,
Expand Down Expand Up @@ -192,29 +198,29 @@
or JSON-RPC error responses if a `ServerError` is raised.
"""
try:
async for event in self.request_handler.on_resubscribe_to_task(
request.params, context
):
yield prepare_response_object(
request.id,
event,
(
Task,
Message,
TaskArtifactUpdateEvent,
TaskStatusUpdateEvent,
),
SendStreamingMessageSuccessResponse,
SendStreamingMessageResponse,
)
except ServerError as e:
yield SendStreamingMessageResponse(
root=JSONRPCErrorResponse(
id=request.id, error=e.error if e.error else InternalError()
)
)

async def get_push_notification(
async def get_push_notification_config(

Check notice on line 223 in src/a2a/server/request_handlers/jsonrpc_handler.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Copy/pasted code

see src/a2a/server/request_handlers/jsonrpc_handler.py (127-149)
self,
request: GetTaskPushNotificationConfigRequest,
context: ServerCallContext | None = None,
Expand Down Expand Up @@ -252,7 +258,7 @@
lambda self: self.agent_card.capabilities.pushNotifications,
'Push notifications are not supported by the agent',
)
async def set_push_notification(
async def set_push_notification_config(
self,
request: SetTaskPushNotificationConfigRequest,
context: ServerCallContext | None = None,
Expand Down Expand Up @@ -325,3 +331,69 @@
id=request.id, error=e.error if e.error else InternalError()
)
)

async def list_push_notification_config(
self,
request: ListTaskPushNotificationConfigRequest,
context: ServerCallContext | None = None,
) -> ListTaskPushNotificationConfigResponse:
"""Handles the 'tasks/pushNotificationConfig/list' JSON-RPC method.

Args:
request: The incoming `ListTaskPushNotificationConfigRequest` object.
context: Context provided by the server.

Returns:
A `ListTaskPushNotificationConfigResponse` object containing the config or a JSON-RPC error.
"""
try:
config = (
await self.request_handler.on_list_task_push_notification_config(
request.params, context
)
)
return prepare_response_object(
request.id,
config,
(list,),
ListTaskPushNotificationConfigSuccessResponse,
ListTaskPushNotificationConfigResponse,
)
except ServerError as e:
return ListTaskPushNotificationConfigResponse(
root=JSONRPCErrorResponse(
id=request.id, error=e.error if e.error else InternalError()
)
)

async def delete_push_notification_config(
self,
request: DeleteTaskPushNotificationConfigRequest,
context: ServerCallContext | None = None,
) -> DeleteTaskPushNotificationConfigResponse:
"""Handles the 'tasks/pushNotificationConfig/list' JSON-RPC method.

Args:
request: The incoming `DeleteTaskPushNotificationConfigRequest` object.
context: Context provided by the server.

Returns:
A `DeleteTaskPushNotificationConfigResponse` object containing the config or a JSON-RPC error.
"""
try:
(
await self.request_handler.on_delete_task_push_notification_config(
request.params, context
)
)
return DeleteTaskPushNotificationConfigResponse(
root=DeleteTaskPushNotificationConfigSuccessResponse(
id=request.id, result=None
)
)
except ServerError as e:
return DeleteTaskPushNotificationConfigResponse(
root=JSONRPCErrorResponse(
id=request.id, error=e.error if e.error else InternalError()
)
)
Loading
Loading