Skip to content
47 changes: 24 additions & 23 deletions src/a2a/server/request_handlers/default_request_handler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
import logging
import uuid

from collections.abc import AsyncGenerator
from typing import cast
Expand All @@ -21,7 +20,8 @@
)
from a2a.server.request_handlers.request_handler import RequestHandler
from a2a.server.tasks import (
PushNotifier,
PushNotificationConfigStore,
PushNotificationSender,
ResultAggregator,
TaskManager,
TaskStore,
Expand Down Expand Up @@ -58,12 +58,13 @@

_running_agents: dict[str, asyncio.Task]

def __init__(
def __init__( # noqa: PLR0913
self,
agent_executor: AgentExecutor,
task_store: TaskStore,
queue_manager: QueueManager | None = None,
push_notifier: PushNotifier | None = None,
push_config_store: PushNotificationConfigStore | None = None,
push_sender: PushNotificationSender | None = None,
request_context_builder: RequestContextBuilder | None = None,
) -> None:
"""Initializes the DefaultRequestHandler.
Expand All @@ -72,14 +73,16 @@
agent_executor: The `AgentExecutor` instance to run agent logic.
task_store: The `TaskStore` instance to manage task persistence.
queue_manager: The `QueueManager` instance to manage event queues. Defaults to `InMemoryQueueManager`.
push_notifier: The `PushNotifier` instance for sending push notifications. Defaults to None.
push_config_store: The `PushNotificationConfigStore` instance for managing push notification configurations. Defaults to None.
push_sender: The `PushNotificationSender` instance for sending push notifications. Defaults to None.
request_context_builder: The `RequestContextBuilder` instance used
to build request contexts. Defaults to `SimpleRequestContextBuilder`.
"""
self.agent_executor = agent_executor
self.task_store = task_store
self._queue_manager = queue_manager or InMemoryQueueManager()
self._push_notifier = push_notifier
self._push_config_store = push_config_store
self._push_sender = push_sender
self._request_context_builder = (
request_context_builder
or SimpleRequestContextBuilder(
Expand Down Expand Up @@ -170,37 +173,37 @@
Starts the agent execution for the message and waits for the final
result (Task or Message).
"""
task_manager = TaskManager(
task_id=params.message.taskId,
context_id=params.message.contextId,
task_store=self.task_store,
initial_message=params.message,
)
task: Task | None = await task_manager.get_task()
if task:
task = task_manager.update_with_message(params.message, task)
if self.should_add_push_info(params):
assert isinstance(self._push_notifier, PushNotifier)
assert self._push_config_store is not None
assert isinstance(
params.configuration, MessageSendConfiguration
)
assert isinstance(
params.configuration.pushNotificationConfig,
PushNotificationConfig,
)
await self._push_notifier.set_info(
await self._push_config_store.set_info(
task.id, params.configuration.pushNotificationConfig
)
request_context = await self._request_context_builder.build(

Check notice on line 197 in src/a2a/server/request_handlers/default_request_handler.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Copy/pasted code

see src/a2a/server/request_handlers/default_request_handler.py (261-284)
params=params,
task_id=task.id if task else None,
context_id=params.message.contextId,
task=task,
context=context,
)

task_id = cast('str', request_context.task_id)
# Always assign a task ID. We may not actually upgrade to a task, but

Check notice on line 206 in src/a2a/server/request_handlers/default_request_handler.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Copy/pasted code

see src/a2a/server/request_handlers/default_request_handler.py (286-296)
# dictating the task ID at this layer is useful for tracking running
# agents.
queue = await self._queue_manager.create_or_tap(task_id)
Expand Down Expand Up @@ -237,7 +240,7 @@
finally:
if interrupted:
# TODO: Track this disconnected cleanup task.
asyncio.create_task( # noqa: RUF006
asyncio.create_task( # noqa: RUF006
self._cleanup_producer(producer_task, task_id)
)
else:
Expand All @@ -255,30 +258,30 @@
Starts the agent execution and yields events as they are produced
by the agent.
"""
task_manager = TaskManager(
task_id=params.message.taskId,
context_id=params.message.contextId,
task_store=self.task_store,
initial_message=params.message,
)
task: Task | None = await task_manager.get_task()

if task:
task = task_manager.update_with_message(params.message, task)

if self.should_add_push_info(params):
assert isinstance(self._push_notifier, PushNotifier)
assert self._push_config_store is not None
assert isinstance(
params.configuration, MessageSendConfiguration
)
assert isinstance(
params.configuration.pushNotificationConfig,
PushNotificationConfig,
)
await self._push_notifier.set_info(
await self._push_config_store.set_info(
task.id, params.configuration.pushNotificationConfig
)
else:

Check notice on line 284 in src/a2a/server/request_handlers/default_request_handler.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Copy/pasted code

see src/a2a/server/request_handlers/default_request_handler.py (176-197)
queue = EventQueue()
result_aggregator = ResultAggregator(task_manager)
request_context = await self._request_context_builder.build(
Expand Down Expand Up @@ -315,19 +318,19 @@
)

if (
self._push_notifier
self._push_config_store # Check if store is available for config
and params.configuration
and params.configuration.pushNotificationConfig
):
await self._push_notifier.set_info(
await self._push_config_store.set_info(
task_id,
params.configuration.pushNotificationConfig,
)

if self._push_notifier and task_id:
if self._push_sender and task_id: # Check if sender is available
latest_task = await result_aggregator.current_result
if isinstance(latest_task, Task):
await self._push_notifier.send_notification(latest_task)
await self._push_sender.send_notification(latest_task)
yield event
finally:
await self._cleanup_producer(producer_task, task_id)
Expand Down Expand Up @@ -359,16 +362,14 @@

Requires a `PushNotifier` to be configured.
"""
if not self._push_notifier:
if not self._push_config_store:
raise ServerError(error=UnsupportedOperationError())

task: Task | None = await self.task_store.get(params.taskId)
if not task:
raise ServerError(error=TaskNotFoundError())

# Generate a unique id for the notification
params.pushNotificationConfig.id = str(uuid.uuid4())
await self._push_notifier.set_info(
await self._push_config_store.set_info(
params.taskId,
params.pushNotificationConfig,
)
Expand All @@ -382,16 +383,16 @@
) -> TaskPushNotificationConfig:
"""Default handler for 'tasks/pushNotificationConfig/get'.

Requires a `PushNotifier` to be configured.
Requires a `PushConfigStore` to be configured.
"""
if not self._push_notifier:
if not self._push_config_store:
raise ServerError(error=UnsupportedOperationError())

task: Task | None = await self.task_store.get(params.id)
if not task:
raise ServerError(error=TaskNotFoundError())

push_notification_config = await self._push_notifier.get_info(params.id)
push_notification_config = await self._push_config_store.get_info(params.id)
if not push_notification_config:
raise ServerError(error=InternalError())

Expand Down Expand Up @@ -433,7 +434,7 @@
def should_add_push_info(self, params: MessageSendParams) -> bool:
"""Determines if push notification info should be set for a task."""
return bool(
self._push_notifier
self._push_config_store
and params.configuration
and params.configuration.pushNotificationConfig
)
18 changes: 14 additions & 4 deletions src/a2a/server/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,28 @@
"""Components for managing tasks within the A2A server."""

from a2a.server.tasks.inmemory_push_notifier import InMemoryPushNotifier
from a2a.server.tasks.base_push_notification_sender import (
BasePushNotificationSender,
)
from a2a.server.tasks.inmemory_push_notification_config_store import (
InMemoryPushNotificationConfigStore,
)
from a2a.server.tasks.inmemory_task_store import InMemoryTaskStore
from a2a.server.tasks.push_notifier import PushNotifier
from a2a.server.tasks.push_notification_config_store import (
PushNotificationConfigStore,
)
from a2a.server.tasks.push_notification_sender import PushNotificationSender
from a2a.server.tasks.result_aggregator import ResultAggregator
from a2a.server.tasks.task_manager import TaskManager
from a2a.server.tasks.task_store import TaskStore
from a2a.server.tasks.task_updater import TaskUpdater


__all__ = [
'InMemoryPushNotifier',
'BasePushNotificationSender',
'InMemoryPushNotificationConfigStore',
'InMemoryTaskStore',
'PushNotifier',
'PushNotificationConfigStore',
'PushNotificationSender',
'ResultAggregator',
'TaskManager',
'TaskStore',
Expand Down
42 changes: 42 additions & 0 deletions src/a2a/server/tasks/base_push_notification_sender.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import logging

import httpx

from a2a.server.tasks.push_notification_config_store import (
PushNotificationConfigStore,
)
from a2a.server.tasks.push_notification_sender import PushNotificationSender
from a2a.types import Task


logger = logging.getLogger(__name__)


class BasePushNotificationSender(PushNotificationSender):
"""Base implementation of PushNotificationSender interface."""

def __init__(self, httpx_client: httpx.AsyncClient, config_store: PushNotificationConfigStore) -> None:
"""Initializes the BasePushNotificationSender.

Args:
httpx_client: An async HTTP client instance to send notifications.
config_store: A PushNotificationConfigStore instance to retrieve configurations.
"""
self._client = httpx_client
self._config_store = config_store

async def send_notification(self, task: Task) -> None:
"""Sends a push notification for a task if configuration exists."""
push_info = await self._config_store.get_info(task.id)
if not push_info:
return
url = push_info.url

try:
response = await self._client.post(
url, json=task.model_dump(mode='json', exclude_none=True)
)
response.raise_for_status()
logger.info(f'Push-notification sent for URL: {url}')
except Exception as e:
logger.error(f'Error sending push-notification: {e}')

Check notice on line 42 in src/a2a/server/tasks/base_push_notification_sender.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Copy/pasted code

see src/a2a/server/tasks/inmemory_push_notifier.py (50-62)
46 changes: 46 additions & 0 deletions src/a2a/server/tasks/inmemory_push_notification_config_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import asyncio
import logging

from a2a.server.tasks.push_notification_config_store import (
PushNotificationConfigStore,
)
from a2a.types import PushNotificationConfig


logger = logging.getLogger(__name__)


class InMemoryPushNotificationConfigStore(PushNotificationConfigStore):
"""In-memory implementation of PushNotificationConfigStore interface.

Stores push notification configurations in memory and uses an httpx client
to send notifications.
"""
def __init__(self) -> None:
"""Initializes the InMemoryPushNotifier.

Args:
httpx_client: An async HTTP client instance to send notifications.
"""
self.lock = asyncio.Lock()
self._push_notification_infos: dict[str, PushNotificationConfig] = {}

async def set_info(
self, task_id: str, notification_config: PushNotificationConfig
) -> None:
"""Sets or updates the push notification configuration for a task in memory."""
async with self.lock:
self._push_notification_infos[task_id] = notification_config

async def get_info(self, task_id: str) -> PushNotificationConfig | None:
"""Retrieves the push notification configuration for a task from memory."""
async with self.lock:
return self._push_notification_infos.get(task_id)



async def delete_info(self, task_id: str) -> None:
"""Deletes the push notification configuration for a task from memory."""
async with self.lock:
if task_id in self._push_notification_infos:
del self._push_notification_infos[task_id]

Check notice on line 46 in src/a2a/server/tasks/inmemory_push_notification_config_store.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Copy/pasted code

see src/a2a/server/tasks/inmemory_push_notifier.py (27-46)
19 changes: 19 additions & 0 deletions src/a2a/server/tasks/push_notification_config_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from abc import ABC, abstractmethod

from a2a.types import PushNotificationConfig


class PushNotificationConfigStore(ABC):
"""Interface for storing and retrieving push notification configurations for tasks."""

@abstractmethod
async def set_info(self, task_id: str, notification_config: PushNotificationConfig) -> None:
"""Sets or updates the push notification configuration for a task."""

@abstractmethod
async def get_info(self, task_id: str) -> PushNotificationConfig | None:
"""Retrieves the push notification configuration for a task."""

@abstractmethod
async def delete_info(self, task_id: str) -> None:
"""Deletes the push notification configuration for a task."""

Check notice on line 19 in src/a2a/server/tasks/push_notification_config_store.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Copy/pasted code

see src/a2a/server/tasks/push_notifier.py (9-21)
11 changes: 11 additions & 0 deletions src/a2a/server/tasks/push_notification_sender.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from abc import ABC, abstractmethod

from a2a.types import Task


class PushNotificationSender(ABC):
"""Interface for sending push notifications for tasks."""

@abstractmethod
async def send_notification(self, task: Task) -> None:
"""Sends a push notification containing the latest task state."""
Loading
Loading