Skip to content

Commit d6484b6

Browse files
feat: Support for list and delete push notification config (#222)
Support for list and delete push notification config. Removes the push_notifier from the SDK. This will be merged into push_notif_refactoring branch (#212) and then merged into main along with refactoring changes. --------- Co-authored-by: Holt Skinner <[email protected]>
1 parent 1ad6703 commit d6484b6

13 files changed

+648
-359
lines changed

src/a2a/server/apps/jsonrpc/jsonrpc_app.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
SetTaskPushNotificationConfigRequest,
4040
TaskResubscriptionRequest,
4141
UnsupportedOperationError,
42+
ListTaskPushNotificationConfigRequest,
43+
DeleteTaskPushNotificationConfigRequest
4244
)
4345
from a2a.utils.errors import MethodNotImplementedError
4446

@@ -297,12 +299,22 @@ async def _process_non_streaming_request(
297299
request_obj, context
298300
)
299301
case SetTaskPushNotificationConfigRequest():
300-
handler_result = await self.handler.set_push_notification(
302+
handler_result = await self.handler.set_push_notification_config(
301303
request_obj,
302304
context,
303305
)
304306
case GetTaskPushNotificationConfigRequest():
305-
handler_result = await self.handler.get_push_notification(
307+
handler_result = await self.handler.get_push_notification_config(
308+
request_obj,
309+
context,
310+
)
311+
case ListTaskPushNotificationConfigRequest():
312+
handler_result = await self.handler.list_push_notification_config(
313+
request_obj,
314+
context,
315+
)
316+
case DeleteTaskPushNotificationConfigRequest():
317+
handler_result = await self.handler.delete_push_notification_config(
306318
request_obj,
307319
context,
308320
)

src/a2a/server/request_handlers/default_request_handler.py

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@
3939
TaskPushNotificationConfig,
4040
TaskQueryParams,
4141
UnsupportedOperationError,
42+
ListTaskPushNotificationConfigParams,
43+
DeleteTaskPushNotificationConfigParams,
44+
DeleteTaskPushNotificationConfigResponse,
45+
DeleteTaskPushNotificationConfigSuccessResponse
4246
)
4347
from a2a.utils.errors import ServerError
4448
from a2a.utils.telemetry import SpanKind, trace_class
@@ -393,11 +397,11 @@ async def on_get_task_push_notification_config(
393397
raise ServerError(error=TaskNotFoundError())
394398

395399
push_notification_config = await self._push_config_store.get_info(params.id)
396-
if not push_notification_config:
400+
if not push_notification_config or not push_notification_config[0]:
397401
raise ServerError(error=InternalError())
398402

399403
return TaskPushNotificationConfig(
400-
taskId=params.id, pushNotificationConfig=push_notification_config
404+
taskId=params.id, pushNotificationConfig=push_notification_config[0]
401405
)
402406

403407
async def on_resubscribe_to_task(
@@ -431,6 +435,51 @@ async def on_resubscribe_to_task(
431435
async for event in result_aggregator.consume_and_emit(consumer):
432436
yield event
433437

438+
async def on_list_task_push_notification_config(
439+
self,
440+
params: ListTaskPushNotificationConfigParams,
441+
context: ServerCallContext | None = None,
442+
) -> list[TaskPushNotificationConfig]:
443+
"""Default handler for 'tasks/pushNotificationConfig/list'.
444+
445+
Requires a `PushConfigStore` to be configured.
446+
"""
447+
if not self._push_config_store:
448+
raise ServerError(error=UnsupportedOperationError())
449+
450+
task: Task | None = await self.task_store.get(params.id)
451+
if not task:
452+
raise ServerError(error=TaskNotFoundError())
453+
454+
push_notification_config_list = await self._push_config_store.get_info(params.id)
455+
456+
task_push_notification_config = []
457+
if push_notification_config_list:
458+
for config in push_notification_config_list:
459+
task_push_notification_config.append(TaskPushNotificationConfig(
460+
taskId=params.id, pushNotificationConfig=config
461+
))
462+
463+
return task_push_notification_config
464+
465+
async def on_delete_task_push_notification_config(
466+
self,
467+
params: DeleteTaskPushNotificationConfigParams,
468+
context: ServerCallContext | None = None,
469+
) -> None:
470+
"""Default handler for 'tasks/pushNotificationConfig/delete'.
471+
472+
Requires a `PushConfigStore` to be configured.
473+
"""
474+
if not self._push_config_store:
475+
raise ServerError(error=UnsupportedOperationError())
476+
477+
task: Task | None = await self.task_store.get(params.id)
478+
if not task:
479+
raise ServerError(error=TaskNotFoundError())
480+
481+
await self._push_config_store.delete_info(params.id, params.pushNotificationConfigId)
482+
434483
def should_add_push_info(self, params: MessageSendParams) -> bool:
435484
"""Determines if push notification info should be set for a task."""
436485
return bool(

src/a2a/server/request_handlers/jsonrpc_handler.py

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@
3434
TaskPushNotificationConfig,
3535
TaskResubscriptionRequest,
3636
TaskStatusUpdateEvent,
37+
ListTaskPushNotificationConfigRequest,
38+
ListTaskPushNotificationConfigResponse,
39+
ListTaskPushNotificationConfigSuccessResponse,
40+
DeleteTaskPushNotificationConfigRequest,
41+
DeleteTaskPushNotificationConfigResponse,
42+
DeleteTaskPushNotificationConfigSuccessResponse
3743
)
3844
from a2a.utils.errors import ServerError
3945
from a2a.utils.helpers import validate
@@ -214,7 +220,7 @@ async def on_resubscribe_to_task(
214220
)
215221
)
216222

217-
async def get_push_notification(
223+
async def get_push_notification_config(
218224
self,
219225
request: GetTaskPushNotificationConfigRequest,
220226
context: ServerCallContext | None = None,
@@ -252,7 +258,7 @@ async def get_push_notification(
252258
lambda self: self.agent_card.capabilities.pushNotifications,
253259
'Push notifications are not supported by the agent',
254260
)
255-
async def set_push_notification(
261+
async def set_push_notification_config(
256262
self,
257263
request: SetTaskPushNotificationConfigRequest,
258264
context: ServerCallContext | None = None,
@@ -325,3 +331,69 @@ async def on_get_task(
325331
id=request.id, error=e.error if e.error else InternalError()
326332
)
327333
)
334+
335+
async def list_push_notification_config(
336+
self,
337+
request: ListTaskPushNotificationConfigRequest,
338+
context: ServerCallContext | None = None,
339+
) -> ListTaskPushNotificationConfigResponse:
340+
"""Handles the 'tasks/pushNotificationConfig/list' JSON-RPC method.
341+
342+
Args:
343+
request: The incoming `ListTaskPushNotificationConfigRequest` object.
344+
context: Context provided by the server.
345+
346+
Returns:
347+
A `ListTaskPushNotificationConfigResponse` object containing the config or a JSON-RPC error.
348+
"""
349+
try:
350+
config = (
351+
await self.request_handler.on_list_task_push_notification_config(
352+
request.params, context
353+
)
354+
)
355+
return prepare_response_object(
356+
request.id,
357+
config,
358+
(list,),
359+
ListTaskPushNotificationConfigSuccessResponse,
360+
ListTaskPushNotificationConfigResponse,
361+
)
362+
except ServerError as e:
363+
return ListTaskPushNotificationConfigResponse(
364+
root=JSONRPCErrorResponse(
365+
id=request.id, error=e.error if e.error else InternalError()
366+
)
367+
)
368+
369+
async def delete_push_notification_config(
370+
self,
371+
request: DeleteTaskPushNotificationConfigRequest,
372+
context: ServerCallContext | None = None,
373+
) -> DeleteTaskPushNotificationConfigResponse:
374+
"""Handles the 'tasks/pushNotificationConfig/list' JSON-RPC method.
375+
376+
Args:
377+
request: The incoming `DeleteTaskPushNotificationConfigRequest` object.
378+
context: Context provided by the server.
379+
380+
Returns:
381+
A `DeleteTaskPushNotificationConfigResponse` object containing the config or a JSON-RPC error.
382+
"""
383+
try:
384+
(
385+
await self.request_handler.on_delete_task_push_notification_config(
386+
request.params, context
387+
)
388+
)
389+
return DeleteTaskPushNotificationConfigResponse(
390+
root=DeleteTaskPushNotificationConfigSuccessResponse(
391+
id=request.id, result=None
392+
)
393+
)
394+
except ServerError as e:
395+
return DeleteTaskPushNotificationConfigResponse(
396+
root=JSONRPCErrorResponse(
397+
id=request.id, error=e.error if e.error else InternalError()
398+
)
399+
)

src/a2a/server/request_handlers/request_handler.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
TaskPushNotificationConfig,
1313
TaskQueryParams,
1414
UnsupportedOperationError,
15+
ListTaskPushNotificationConfigParams,
16+
DeleteTaskPushNotificationConfigParams
1517
)
1618
from a2a.utils.errors import ServerError
1719

@@ -160,3 +162,39 @@ async def on_resubscribe_to_task(
160162
"""
161163
raise ServerError(error=UnsupportedOperationError())
162164
yield
165+
166+
@abstractmethod
167+
async def on_list_task_push_notification_config(
168+
self,
169+
params: ListTaskPushNotificationConfigParams,
170+
context: ServerCallContext | None = None,
171+
) -> list[TaskPushNotificationConfig]:
172+
"""Handles the 'tasks/pushNotificationConfig/list' method.
173+
174+
Retrieves the current push notification configurations for a task.
175+
176+
Args:
177+
params: Parameters including the task ID.
178+
context: Context provided by the server.
179+
180+
Returns:
181+
The `list[TaskPushNotificationConfig]` for the task.
182+
"""
183+
184+
@abstractmethod
185+
async def on_delete_task_push_notification_config(
186+
self,
187+
params: DeleteTaskPushNotificationConfigParams,
188+
context: ServerCallContext | None = None,
189+
) -> None:
190+
"""Handles the 'tasks/pushNotificationConfig/delete' method.
191+
192+
Deletes a push notification configuration associated with a task.
193+
194+
Args:
195+
params: Parameters including the task ID.
196+
context: Context provided by the server.
197+
198+
Returns:
199+
None
200+
"""

src/a2a/server/request_handlers/response_helpers.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@
2525
TaskArtifactUpdateEvent,
2626
TaskPushNotificationConfig,
2727
TaskStatusUpdateEvent,
28+
ListTaskPushNotificationConfigResponse,
29+
DeleteTaskPushNotificationConfigResponse,
30+
ListTaskPushNotificationConfigSuccessResponse,
31+
DeleteTaskPushNotificationConfigSuccessResponse
2832
)
2933

3034

@@ -36,6 +40,8 @@
3640
SetTaskPushNotificationConfigResponse,
3741
GetTaskPushNotificationConfigResponse,
3842
SendStreamingMessageResponse,
43+
ListTaskPushNotificationConfigResponse,
44+
DeleteTaskPushNotificationConfigResponse
3945
)
4046
"""Type variable for RootModel response types."""
4147

@@ -48,6 +54,8 @@
4854
SetTaskPushNotificationConfigSuccessResponse,
4955
GetTaskPushNotificationConfigSuccessResponse,
5056
SendStreamingMessageSuccessResponse,
57+
ListTaskPushNotificationConfigSuccessResponse,
58+
DeleteTaskPushNotificationConfigSuccessResponse
5159
)
5260
"""Type variable for SuccessResponse types."""
5361

@@ -60,6 +68,7 @@
6068
| TaskPushNotificationConfig
6169
| A2AError
6270
| JSONRPCError
71+
| list[TaskPushNotificationConfig]
6372
)
6473
"""Type alias for possible event types produced by handlers."""
6574

src/a2a/server/tasks/base_push_notification_sender.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
PushNotificationConfigStore,
77
)
88
from a2a.server.tasks.push_notification_sender import PushNotificationSender
9-
from a2a.types import Task
9+
from a2a.types import Task, PushNotificationConfig
1010

1111

1212
logger = logging.getLogger(__name__)
@@ -27,11 +27,15 @@ def __init__(self, httpx_client: httpx.AsyncClient, config_store: PushNotificati
2727

2828
async def send_notification(self, task: Task) -> None:
2929
"""Sends a push notification for a task if configuration exists."""
30-
push_info = await self._config_store.get_info(task.id)
31-
if not push_info:
30+
push_configs = await self._config_store.get_info(task.id)
31+
if not push_configs:
3232
return
33-
url = push_info.url
33+
34+
for push_info in push_configs:
35+
await self._dispatch_notification(task, push_info)
3436

37+
async def _dispatch_notification(self, task: Task, push_info: PushNotificationConfig) -> None:
38+
url = push_info.url
3539
try:
3640
response = await self._client.post(
3741
url, json=task.model_dump(mode='json', exclude_none=True)

src/a2a/server/tasks/inmemory_push_notification_config_store.py

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,34 +13,49 @@
1313
class InMemoryPushNotificationConfigStore(PushNotificationConfigStore):
1414
"""In-memory implementation of PushNotificationConfigStore interface.
1515
16-
Stores push notification configurations in memory and uses an httpx client
17-
to send notifications.
16+
Stores push notification configurations in memory
1817
"""
1918
def __init__(self) -> None:
20-
"""Initializes the InMemoryPushNotifier.
21-
22-
Args:
23-
httpx_client: An async HTTP client instance to send notifications.
24-
"""
19+
"""Initializes the InMemoryPushNotificationConfigStore."""
2520
self.lock = asyncio.Lock()
26-
self._push_notification_infos: dict[str, PushNotificationConfig] = {}
21+
self._push_notification_infos: dict[str, list[PushNotificationConfig]] = {}
2722

2823
async def set_info(
2924
self, task_id: str, notification_config: PushNotificationConfig
3025
) -> None:
3126
"""Sets or updates the push notification configuration for a task in memory."""
3227
async with self.lock:
33-
self._push_notification_infos[task_id] = notification_config
28+
if task_id not in self._push_notification_infos:
29+
self._push_notification_infos[task_id] = []
30+
31+
if notification_config.id is None:
32+
notification_config.id = task_id
33+
34+
for config in self._push_notification_infos[task_id]:
35+
if config.id == notification_config.id:
36+
self._push_notification_infos[task_id].remove(config)
37+
break
3438

35-
async def get_info(self, task_id: str) -> PushNotificationConfig | None:
36-
"""Retrieves the push notification configuration for a task from memory."""
37-
async with self.lock:
38-
return self._push_notification_infos.get(task_id)
39+
self._push_notification_infos[task_id].append(notification_config)
3940

4041

42+
async def get_info(self, task_id: str) -> list[PushNotificationConfig]:
43+
"""Retrieves the push notification configuration for a task from memory."""
44+
async with self.lock:
45+
return self._push_notification_infos.get(task_id) or []
4146

42-
async def delete_info(self, task_id: str) -> None:
47+
async def delete_info(self, task_id: str, config_id: str | None = None) -> None:
4348
"""Deletes the push notification configuration for a task from memory."""
4449
async with self.lock:
50+
if config_id is None:
51+
config_id = task_id
52+
4553
if task_id in self._push_notification_infos:
46-
del self._push_notification_infos[task_id]
54+
configurations = self._push_notification_infos[task_id]
55+
if not configurations:
56+
return
57+
58+
for config in configurations:
59+
if config.id == config_id:
60+
configurations.remove(config)
61+
break

0 commit comments

Comments
 (0)