Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions src/a2a/server/apps/jsonrpc/jsonrpc_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
SetTaskPushNotificationConfigRequest,
TaskResubscriptionRequest,
UnsupportedOperationError,
ListTaskPushNotificationConfigRequest,
DeleteTaskPushNotificationConfigRequest
)
from a2a.utils.errors import MethodNotImplementedError

Expand Down Expand Up @@ -297,12 +299,22 @@ async def _process_non_streaming_request(
request_obj, context
)
case SetTaskPushNotificationConfigRequest():
handler_result = await self.handler.set_push_notification(
handler_result = await self.handler.set_push_notification_config(
request_obj,
context,
)
case GetTaskPushNotificationConfigRequest():
handler_result = await self.handler.get_push_notification(
handler_result = await self.handler.get_push_notification_config(
request_obj,
context,
)
case ListTaskPushNotificationConfigRequest():
handler_result = await self.handler.list_push_notification_config(
request_obj,
context,
)
case DeleteTaskPushNotificationConfigRequest():
handler_result = await self.handler.delete_push_notification_config(
request_obj,
context,
)
Expand Down
53 changes: 51 additions & 2 deletions src/a2a/server/request_handlers/default_request_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@
TaskPushNotificationConfig,
TaskQueryParams,
UnsupportedOperationError,
ListTaskPushNotificationConfigParams,
DeleteTaskPushNotificationConfigParams,
DeleteTaskPushNotificationConfigResponse,
DeleteTaskPushNotificationConfigSuccessResponse
)
from a2a.utils.errors import ServerError
from a2a.utils.telemetry import SpanKind, trace_class
Expand Down Expand Up @@ -393,11 +397,11 @@ async def on_get_task_push_notification_config(
raise ServerError(error=TaskNotFoundError())

push_notification_config = await self._push_config_store.get_info(params.id)
if not push_notification_config:
if not push_notification_config or not push_notification_config[0]:
raise ServerError(error=InternalError())

return TaskPushNotificationConfig(
taskId=params.id, pushNotificationConfig=push_notification_config
taskId=params.id, pushNotificationConfig=push_notification_config[0]
)

async def on_resubscribe_to_task(
Expand Down Expand Up @@ -431,6 +435,51 @@ async def on_resubscribe_to_task(
async for event in result_aggregator.consume_and_emit(consumer):
yield event

async def on_list_task_push_notification_config(
self,
params: ListTaskPushNotificationConfigParams,
context: ServerCallContext | None = None,
) -> list[TaskPushNotificationConfig]:
"""Default handler for 'tasks/pushNotificationConfig/list'.

Requires a `PushConfigStore` to be configured.
"""
if not self._push_config_store:
raise ServerError(error=UnsupportedOperationError())

task: Task | None = await self.task_store.get(params.id)
if not task:
raise ServerError(error=TaskNotFoundError())

push_notification_config_list = await self._push_config_store.get_info(params.id)

task_push_notification_config = []
if push_notification_config_list:
for config in push_notification_config_list:
task_push_notification_config.append(TaskPushNotificationConfig(
taskId=params.id, pushNotificationConfig=config
))

return task_push_notification_config

async def on_delete_task_push_notification_config(
self,
params: DeleteTaskPushNotificationConfigParams,
context: ServerCallContext | None = None,
) -> None:
"""Default handler for 'tasks/pushNotificationConfig/delete'.

Requires a `PushConfigStore` to be configured.
"""
if not self._push_config_store:
raise ServerError(error=UnsupportedOperationError())

task: Task | None = await self.task_store.get(params.id)
if not task:
raise ServerError(error=TaskNotFoundError())

await self._push_config_store.delete_info(params.id, params.pushNotificationConfigId)

def should_add_push_info(self, params: MessageSendParams) -> bool:
"""Determines if push notification info should be set for a task."""
return bool(
Expand Down
76 changes: 74 additions & 2 deletions src/a2a/server/request_handlers/jsonrpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@
TaskPushNotificationConfig,
TaskResubscriptionRequest,
TaskStatusUpdateEvent,
ListTaskPushNotificationConfigRequest,
ListTaskPushNotificationConfigResponse,
ListTaskPushNotificationConfigSuccessResponse,
DeleteTaskPushNotificationConfigRequest,
DeleteTaskPushNotificationConfigResponse,
DeleteTaskPushNotificationConfigSuccessResponse
)
from a2a.utils.errors import ServerError
from a2a.utils.helpers import validate
Expand Down Expand Up @@ -214,7 +220,7 @@ async def on_resubscribe_to_task(
)
)

async def get_push_notification(
async def get_push_notification_config(
self,
request: GetTaskPushNotificationConfigRequest,
context: ServerCallContext | None = None,
Expand Down Expand Up @@ -252,7 +258,7 @@ async def get_push_notification(
lambda self: self.agent_card.capabilities.pushNotifications,
'Push notifications are not supported by the agent',
)
async def set_push_notification(
async def set_push_notification_config(
self,
request: SetTaskPushNotificationConfigRequest,
context: ServerCallContext | None = None,
Expand Down Expand Up @@ -325,3 +331,69 @@ async def on_get_task(
id=request.id, error=e.error if e.error else InternalError()
)
)

async def list_push_notification_config(
self,
request: ListTaskPushNotificationConfigRequest,
context: ServerCallContext | None = None,
) -> ListTaskPushNotificationConfigResponse:
"""Handles the 'tasks/pushNotificationConfig/list' JSON-RPC method.

Args:
request: The incoming `ListTaskPushNotificationConfigRequest` object.
context: Context provided by the server.

Returns:
A `ListTaskPushNotificationConfigResponse` object containing the config or a JSON-RPC error.
"""
try:
config = (
await self.request_handler.on_list_task_push_notification_config(
request.params, context
)
)
return prepare_response_object(
request.id,
config,
(list,),
ListTaskPushNotificationConfigSuccessResponse,
ListTaskPushNotificationConfigResponse,
)
except ServerError as e:
return ListTaskPushNotificationConfigResponse(
root=JSONRPCErrorResponse(
id=request.id, error=e.error if e.error else InternalError()
)
)

async def delete_push_notification_config(
self,
request: DeleteTaskPushNotificationConfigRequest,
context: ServerCallContext | None = None,
) -> DeleteTaskPushNotificationConfigResponse:
"""Handles the 'tasks/pushNotificationConfig/list' JSON-RPC method.

Args:
request: The incoming `DeleteTaskPushNotificationConfigRequest` object.
context: Context provided by the server.

Returns:
A `DeleteTaskPushNotificationConfigResponse` object containing the config or a JSON-RPC error.
"""
try:
(
await self.request_handler.on_delete_task_push_notification_config(
request.params, context
)
)
return DeleteTaskPushNotificationConfigResponse(
root=DeleteTaskPushNotificationConfigSuccessResponse(
id=request.id, result=None
)
)
except ServerError as e:
return DeleteTaskPushNotificationConfigResponse(
root=JSONRPCErrorResponse(
id=request.id, error=e.error if e.error else InternalError()
)
)
38 changes: 38 additions & 0 deletions src/a2a/server/request_handlers/request_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
TaskPushNotificationConfig,
TaskQueryParams,
UnsupportedOperationError,
ListTaskPushNotificationConfigParams,
DeleteTaskPushNotificationConfigParams
)
from a2a.utils.errors import ServerError

Expand Down Expand Up @@ -160,3 +162,39 @@ async def on_resubscribe_to_task(
"""
raise ServerError(error=UnsupportedOperationError())
yield

@abstractmethod
async def on_list_task_push_notification_config(
self,
params: ListTaskPushNotificationConfigParams,
context: ServerCallContext | None = None,
) -> list[TaskPushNotificationConfig]:
"""Handles the 'tasks/pushNotificationConfig/list' method.

Retrieves the current push notification configurations for a task.

Args:
params: Parameters including the task ID.
context: Context provided by the server.

Returns:
The `list[TaskPushNotificationConfig]` for the task.
"""

@abstractmethod
async def on_delete_task_push_notification_config(
self,
params: DeleteTaskPushNotificationConfigParams,
context: ServerCallContext | None = None,
) -> None:
"""Handles the 'tasks/pushNotificationConfig/delete' method.

Deletes a push notification configuration associated with a task.

Args:
params: Parameters including the task ID.
context: Context provided by the server.

Returns:
None
"""
9 changes: 9 additions & 0 deletions src/a2a/server/request_handlers/response_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
TaskArtifactUpdateEvent,
TaskPushNotificationConfig,
TaskStatusUpdateEvent,
ListTaskPushNotificationConfigResponse,
DeleteTaskPushNotificationConfigResponse,
ListTaskPushNotificationConfigSuccessResponse,
DeleteTaskPushNotificationConfigSuccessResponse
)


Expand All @@ -36,6 +40,8 @@
SetTaskPushNotificationConfigResponse,
GetTaskPushNotificationConfigResponse,
SendStreamingMessageResponse,
ListTaskPushNotificationConfigResponse,
DeleteTaskPushNotificationConfigResponse
)
"""Type variable for RootModel response types."""

Expand All @@ -48,6 +54,8 @@
SetTaskPushNotificationConfigSuccessResponse,
GetTaskPushNotificationConfigSuccessResponse,
SendStreamingMessageSuccessResponse,
ListTaskPushNotificationConfigSuccessResponse,
DeleteTaskPushNotificationConfigSuccessResponse
)
"""Type variable for SuccessResponse types."""

Expand All @@ -60,6 +68,7 @@
| TaskPushNotificationConfig
| A2AError
| JSONRPCError
| list[TaskPushNotificationConfig]
)
"""Type alias for possible event types produced by handlers."""

Expand Down
12 changes: 8 additions & 4 deletions src/a2a/server/tasks/base_push_notification_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
PushNotificationConfigStore,
)
from a2a.server.tasks.push_notification_sender import PushNotificationSender
from a2a.types import Task
from a2a.types import Task, PushNotificationConfig


logger = logging.getLogger(__name__)
Expand All @@ -27,11 +27,15 @@ def __init__(self, httpx_client: httpx.AsyncClient, config_store: PushNotificati

async def send_notification(self, task: Task) -> None:
"""Sends a push notification for a task if configuration exists."""
push_info = await self._config_store.get_info(task.id)
if not push_info:
push_configs = await self._config_store.get_info(task.id)
if not push_configs:
return
url = push_info.url

for push_info in push_configs:
await self._dispatch_notification(task, push_info)

async def _dispatch_notification(self, task: Task, push_info: PushNotificationConfig) -> None:
url = push_info.url
try:
response = await self._client.post(
url, json=task.model_dump(mode='json', exclude_none=True)
Expand Down
45 changes: 30 additions & 15 deletions src/a2a/server/tasks/inmemory_push_notification_config_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,34 +13,49 @@
class InMemoryPushNotificationConfigStore(PushNotificationConfigStore):
"""In-memory implementation of PushNotificationConfigStore interface.

Stores push notification configurations in memory and uses an httpx client
to send notifications.
Stores push notification configurations in memory
"""
def __init__(self) -> None:
"""Initializes the InMemoryPushNotifier.

Args:
httpx_client: An async HTTP client instance to send notifications.
"""
"""Initializes the InMemoryPushNotificationConfigStore."""
self.lock = asyncio.Lock()
self._push_notification_infos: dict[str, PushNotificationConfig] = {}
self._push_notification_infos: dict[str, list[PushNotificationConfig]] = {}

async def set_info(
self, task_id: str, notification_config: PushNotificationConfig
) -> None:
"""Sets or updates the push notification configuration for a task in memory."""
async with self.lock:
self._push_notification_infos[task_id] = notification_config
if task_id not in self._push_notification_infos:
self._push_notification_infos[task_id] = []

if notification_config.id is None:
notification_config.id = task_id

for config in self._push_notification_infos[task_id]:
if config.id == notification_config.id:
self._push_notification_infos[task_id].remove(config)
break

async def get_info(self, task_id: str) -> PushNotificationConfig | None:
"""Retrieves the push notification configuration for a task from memory."""
async with self.lock:
return self._push_notification_infos.get(task_id)
self._push_notification_infos[task_id].append(notification_config)


async def get_info(self, task_id: str) -> list[PushNotificationConfig]:
"""Retrieves the push notification configuration for a task from memory."""
async with self.lock:
return self._push_notification_infos.get(task_id) or []

async def delete_info(self, task_id: str) -> None:
async def delete_info(self, task_id: str, config_id: str | None = None) -> None:
"""Deletes the push notification configuration for a task from memory."""
async with self.lock:
if config_id is None:
config_id = task_id

if task_id in self._push_notification_infos:
del self._push_notification_infos[task_id]
configurations = self._push_notification_infos[task_id]
if not configurations:
return

for config in configurations:
if config.id == config_id:
configurations.remove(config)
break
Loading