-
Notifications
You must be signed in to change notification settings - Fork 319
feat: Implement Agent Card Signing and Verification per Spec #581
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
1a973f0
81e354e
a55e4f7
2621476
f915ff2
634710e
c5f6971
57e4a68
701eb39
257e4dd
900f0ee
b680757
054d187
f1c9dab
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,161 @@ | ||
| import json | ||
|
|
||
| from collections.abc import Callable | ||
| from typing import Any | ||
|
|
||
|
|
||
| try: | ||
| from jose import jws | ||
sokoliva marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| from jose.backends.base import Key | ||
| from jose.exceptions import JOSEError | ||
| from jose.utils import base64url_decode, base64url_encode | ||
| except ImportError as e: | ||
| raise ImportError( | ||
| 'A2AUtilsSigning requires python-jose to be installed. ' | ||
| 'Install with: ' | ||
| "'pip install a2a-sdk[signing]'" | ||
| ) from e | ||
|
|
||
| from a2a.types import AgentCard, AgentCardSignature | ||
|
|
||
|
|
||
| def clean_empty(d: Any) -> Any: | ||
sokoliva marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| """Recursively remove empty lists, dicts, strings, and None values from a dictionary.""" | ||
| if isinstance(d, dict): | ||
| cleaned = {k: clean_empty(v) for k, v in d.items()} | ||
| return { | ||
| k: v | ||
| for k, v in cleaned.items() | ||
| if v is not None and (isinstance(v, (bool, int, float)) or v) | ||
| } | ||
| if isinstance(d, list): | ||
| cleaned = [clean_empty(v) for v in d] | ||
| return [ | ||
| v | ||
| for v in cleaned | ||
| if v is not None and (isinstance(v, (bool, int, float)) or v) | ||
| ] | ||
| return d if d not in [None, '', [], {}] else None | ||
|
|
||
|
|
||
| def canonicalize_agent_card(agent_card: AgentCard) -> str: | ||
sokoliva marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| """Canonicalizes the Agent Card JSON according to RFC 8785 (JCS).""" | ||
| card_dict = agent_card.model_dump( | ||
| exclude={'signatures'}, | ||
| exclude_defaults=True, | ||
| by_alias=True, | ||
| ) | ||
| # Ensure 'protocol_version' is always included | ||
| protocol_version_alias = ( | ||
| AgentCard.model_fields['protocol_version'].alias or 'protocol_version' | ||
| ) | ||
| if protocol_version_alias not in card_dict: | ||
| card_dict[protocol_version_alias] = agent_card.protocol_version | ||
sokoliva marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| # Recursively remove empty/None values | ||
| cleaned_dict = clean_empty(card_dict) | ||
|
|
||
| return json.dumps(cleaned_dict, separators=(',', ':'), sort_keys=True) | ||
|
|
||
|
|
||
| def create_agent_card_signer( | ||
| signing_key: str | bytes | dict[str, Any] | Key, | ||
| kid: str, | ||
| alg: str = 'HS256', | ||
| jku: str | None = None, | ||
|
Check failure on line 65 in src/a2a/utils/signing.py
|
||
| ) -> Callable[[AgentCard], AgentCard]: | ||
| """Creates a function that signs an AgentCard and adds the signature. | ||
| Args: | ||
| signing_key: The private key for signing. | ||
| kid: Key ID for the signing key. | ||
| alg: The algorithm to use (e.g., "ES256", "RS256"). | ||
| jku: Optional URL to the JWKS. | ||
|
Check failure on line 73 in src/a2a/utils/signing.py
|
||
| Returns: | ||
| A callable that takes an AgentCard and returns the modified AgentCard with a signature. | ||
| """ | ||
|
|
||
| def agent_card_signer(agent_card: AgentCard) -> AgentCard: | ||
| """The actual card_modifier function.""" | ||
| canonical_payload = canonicalize_agent_card(agent_card) | ||
|
|
||
| headers = {'kid': kid, 'typ': 'JOSE'} | ||
sokoliva marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if jku: | ||
|
Check failure on line 84 in src/a2a/utils/signing.py
|
||
| headers['jku'] = jku | ||
|
Check failure on line 85 in src/a2a/utils/signing.py
|
||
|
|
||
| jws_string = jws.sign( | ||
|
Check failure on line 87 in src/a2a/utils/signing.py
|
||
| payload=canonical_payload.encode('utf-8'), | ||
| key=signing_key, | ||
| headers=headers, | ||
| algorithm=alg, | ||
| ) | ||
|
|
||
| # The result of jws.sign is a compact serialization: HEADER.PAYLOAD.SIGNATURE | ||
| protected_header, _, signature = jws_string.split('.') | ||
|
|
||
| agent_card_signature = AgentCardSignature( | ||
lkawka marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| protected=protected_header, | ||
| signature=signature, | ||
| ) | ||
|
|
||
| agent_card.signatures = (agent_card.signatures or []) + [ | ||
| agent_card_signature | ||
| ] | ||
| return agent_card | ||
|
|
||
| return agent_card_signer | ||
|
|
||
|
|
||
| def create_signature_verifier( | ||
| key_provider: Callable[ | ||
| [str | None, str | None], str | bytes | dict[str, Any] | Key | ||
| ], | ||
| ) -> Callable[[AgentCard], None]: | ||
sokoliva marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| """Creates a function that verifies AgentCard signatures. | ||
sokoliva marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| Args: | ||
| key_provider: A callable that takes key-id (kid) and JSON web key url (jku) and returns the verification key. | ||
sokoliva marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| Returns: | ||
| A callable that takes an AgentCard, and raises an error if none of the signatures are valid. | ||
| """ | ||
|
|
||
| def signature_verifier( | ||
| agent_card: AgentCard, | ||
| ) -> None: | ||
| """The actual signature_verifier function.""" | ||
sokoliva marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if not agent_card.signatures: | ||
| raise JOSEError('No signatures found on AgentCard') | ||
sokoliva marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| last_error = None | ||
| for agent_card_signature in agent_card.signatures: | ||
| try: | ||
| # fetch kid and jku from protected header | ||
| protected_header_json = base64url_decode( | ||
| agent_card_signature.protected.encode('utf-8') | ||
| ).decode('utf-8') | ||
| protected_header = json.loads(protected_header_json) | ||
| kid = protected_header.get('kid') | ||
| jku = protected_header.get('jku') | ||
| verification_key = key_provider(kid, jku) | ||
|
|
||
| canonical_payload = canonicalize_agent_card(agent_card) | ||
| encoded_payload = base64url_encode( | ||
| canonical_payload.encode('utf-8') | ||
| ).decode('utf-8') | ||
| token = f'{agent_card_signature.protected}.{encoded_payload}.{agent_card_signature.signature}' | ||
|
|
||
| jws.verify( | ||
| token=token, | ||
| key=verification_key, | ||
| algorithms=None, | ||
sokoliva marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| ) | ||
| return # Found a valid signature | ||
|
|
||
| except JOSEError as e: | ||
| last_error = e | ||
lkawka marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| continue | ||
| raise JOSEError('No valid signature found') from last_error | ||
|
|
||
| return signature_verifier | ||
Uh oh!
There was an error while loading. Please reload this page.