Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions src/a2a/server/tasks/base_push_notification_sender.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
import asyncio
import logging

Expand Down Expand Up @@ -52,13 +52,21 @@
) -> bool:
url = push_info.url
try:
headers = None
headers = {}
if push_info.token:
headers = {'X-A2A-Notification-Token': push_info.token}
headers['X-A2A-Notification-Token'] = push_info.token

Check failure on line 58 in src/a2a/server/tasks/base_push_notification_sender.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Ruff (W293)

src/a2a/server/tasks/base_push_notification_sender.py:58:1: W293 Blank line contains whitespace
# Add authentication header if configured
if push_info.authentication and push_info.authentication.schemes:
for scheme in push_info.authentication.schemes:
if scheme.lower() == 'bearer' and push_info.authentication.credentials:
headers['Authorization'] = f'Bearer {push_info.authentication.credentials}'
break
Comment on lines 60 to 69
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

For better readability and conciseness, you can use any() with a generator expression to check for the 'bearer' scheme. This achieves the same result as the for loop with a break but in a more idiomatic and efficient Python style, as it also avoids iterating if credentials are not present.

            if push_info.authentication and push_info.authentication.credentials:
                if any(scheme.lower() == 'bearer' for scheme in push_info.authentication.schemes):
                    headers['Authorization'] = f'Bearer {push_info.authentication.credentials}'

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


Check failure on line 65 in src/a2a/server/tasks/base_push_notification_sender.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Ruff (W293)

src/a2a/server/tasks/base_push_notification_sender.py:65:1: W293 Blank line contains whitespace
response = await self._client.post(
url,
json=task.model_dump(mode='json', exclude_none=True),
headers=headers,
headers=headers if headers else None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

This conditional assignment is not necessary. The httpx client accepts an empty dictionary for headers, which is what headers would be if no token or authentication is provided. You can simplify this by just passing headers directly.

                headers=headers,

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

)
response.raise_for_status()
logger.info(
Expand Down
90 changes: 89 additions & 1 deletion tests/server/tasks/test_push_notification_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
BasePushNotificationSender,
)
from a2a.types import (
PushNotificationAuthenticationInfo,
PushNotificationConfig,
Task,
TaskState,
Expand All @@ -29,8 +30,11 @@ def create_sample_push_config(
url: str = 'http://example.com/callback',
config_id: str = 'cfg1',
token: str | None = None,
authentication: PushNotificationAuthenticationInfo | None = None,
) -> PushNotificationConfig:
return PushNotificationConfig(id=config_id, url=url, token=token)
return PushNotificationConfig(
id=config_id, url=url, token=token, authentication=authentication
)


class TestBasePushNotificationSender(unittest.IsolatedAsyncioTestCase):
Expand Down Expand Up @@ -92,6 +96,90 @@ async def test_send_notification_with_token_success(self) -> None:
)
mock_response.raise_for_status.assert_called_once()

async def test_send_notification_with_bearer_authentication(self) -> None:
task_id = 'task_send_bearer_auth'
task_data = create_sample_task(task_id=task_id)
auth_info = PushNotificationAuthenticationInfo(
schemes=['Bearer'], credentials='test-jwt-token'
)
config = create_sample_push_config(
url='http://notify.me/here',
token='unique_token',
authentication=auth_info,
)
self.mock_config_store.get_info.return_value = [config]

mock_response = AsyncMock(spec=httpx.Response)
mock_response.status_code = 200
self.mock_httpx_client.post.return_value = mock_response

await self.sender.send_notification(task_data)

self.mock_config_store.get_info.assert_awaited_once_with(task_id)

# assert httpx_client post method got invoked with right parameters
self.mock_httpx_client.post.assert_awaited_once_with(
config.url,
json=task_data.model_dump(mode='json', exclude_none=True),
headers={
'X-A2A-Notification-Token': 'unique_token',
'Authorization': 'Bearer test-jwt-token',
},
)
mock_response.raise_for_status.assert_called_once()

async def test_send_notification_with_bearer_authentication_no_credentials(
self,
) -> None:
task_id = 'task_send_bearer_no_creds'
task_data = create_sample_task(task_id=task_id)
auth_info = PushNotificationAuthenticationInfo(
schemes=['Bearer'], credentials=None
)
config = create_sample_push_config(
url='http://notify.me/here', authentication=auth_info
)
self.mock_config_store.get_info.return_value = [config]

mock_response = AsyncMock(spec=httpx.Response)
mock_response.status_code = 200
self.mock_httpx_client.post.return_value = mock_response

await self.sender.send_notification(task_data)

# Should not add Authorization header when credentials are missing
self.mock_httpx_client.post.assert_awaited_once_with(
config.url,
json=task_data.model_dump(mode='json', exclude_none=True),
headers=None,
)

async def test_send_notification_with_non_bearer_authentication(
self,
) -> None:
task_id = 'task_send_non_bearer'
task_data = create_sample_task(task_id=task_id)
auth_info = PushNotificationAuthenticationInfo(
schemes=['Basic'], credentials='user:pass'
)
config = create_sample_push_config(
url='http://notify.me/here', authentication=auth_info
)
self.mock_config_store.get_info.return_value = [config]

mock_response = AsyncMock(spec=httpx.Response)
mock_response.status_code = 200
self.mock_httpx_client.post.return_value = mock_response

await self.sender.send_notification(task_data)

# Should not add Authorization header for non-Bearer schemes
self.mock_httpx_client.post.assert_awaited_once_with(
config.url,
json=task_data.model_dump(mode='json', exclude_none=True),
headers=None,
)

async def test_send_notification_no_config(self) -> None:
task_id = 'task_send_no_config'
task_data = create_sample_task(task_id=task_id)
Expand Down
Loading