11import json
22
33from collections .abc import Callable
4- from typing import Any
4+ from typing import Any , TypedDict
55
66
77try :
8- from jose import jws
9- from jose .backends .base import Key
10- from jose .exceptions import JOSEError
11- from jose .utils import base64url_decode , base64url_encode
8+ import jwt
9+
10+ from jwt .api_jwk import PyJWK
11+ from jwt .exceptions import PyJWTError
12+ from jwt .utils import base64url_decode , base64url_encode
1213except ImportError as e :
1314 raise ImportError (
14- 'A2A Signing requires python-jose to be installed. '
15+ 'A2A Signing requires PyJWT to be installed. '
1516 'Install with: '
1617 "'pip install a2a-sdk[signing]'"
1718 ) from e
1819
1920from a2a .types import AgentCard , AgentCardSignature
2021
2122
23+ class SignatureVerificationError (Exception ):
24+ """Base exception for signature verification errors."""
25+
26+
27+ class NoSignatureError (SignatureVerificationError ):
28+ """Exception raised when no signature is found on an AgentCard."""
29+
30+
31+ class InvalidSignaturesError (SignatureVerificationError ):
32+ """Exception raised when all signatures are invalid."""
33+
34+
35+ class ProtectedHeader (TypedDict ):
36+ """Protected header parameters for JWS (JSON Web Signature)."""
37+
38+ kid : str
39+ """ Key identifier. """
40+ alg : str | None
41+ """ Algorithm used for signing. """
42+ jku : str | None
43+ """ JSON Web Key Set URL. """
44+ typ : str | None
45+ """ Token type.
46+
47+ Best practice: SHOULD be "JOSE" for JWS tokens.
48+ """
49+
50+
2251def clean_empty (d : Any ) -> Any :
23- """Recursively remove empty strings, lists, dicts, and None values from a dictionary."""
52+ """Recursively remove empty strings, lists and dicts from a dictionary."""
2453 if isinstance (d , dict ):
2554 cleaned_dict : dict [Any , Any ] = {k : clean_empty (v ) for k , v in d .items ()}
26- return {
27- k : v
28- for k , v in cleaned_dict .items ()
29- if v is not None and (isinstance (v , (bool , int , float )) or v )
30- }
55+ return {k : v for k , v in cleaned_dict .items () if v }
3156 if isinstance (d , list ):
3257 cleaned_list : list [Any ] = [clean_empty (v ) for v in d ]
33- return [
34- v
35- for v in cleaned_list
36- if v is not None and (isinstance (v , (bool , int , float )) or v )
37- ]
38- return d if d not in ['' , [], {}, None ] else None
58+ return [v for v in cleaned_list if v ]
59+ return d if d not in ['' , [], {}] else None
3960
4061
4162def canonicalize_agent_card (agent_card : AgentCard ) -> str :
4263 """Canonicalizes the Agent Card JSON according to RFC 8785 (JCS)."""
4364 card_dict = agent_card .model_dump (
4465 exclude = {'signatures' },
4566 exclude_defaults = True ,
67+ exclude_none = True ,
4668 by_alias = True ,
4769 )
48- # Ensure 'protocol_version' is always included
49- protocol_version_alias = (
50- AgentCard .model_fields ['protocol_version' ].alias or 'protocol_version'
51- )
52- if protocol_version_alias not in card_dict :
53- card_dict [protocol_version_alias ] = agent_card .protocol_version
54-
55- # Recursively remove empty/None values
70+ # Recursively remove empty values
5671 cleaned_dict = clean_empty (card_dict )
57-
5872 return json .dumps (cleaned_dict , separators = (',' , ':' ), sort_keys = True )
5973
6074
6175def create_agent_card_signer (
62- signing_key : str | bytes | dict [str , Any ] | Key ,
63- kid : str ,
64- alg : str = 'HS256' ,
65- jku : str | None = None ,
76+ signing_key : PyJWK | str | bytes ,
77+ protected_header : ProtectedHeader ,
78+ header : dict [str , Any ] | None = None ,
6679) -> Callable [[AgentCard ], AgentCard ]:
6780 """Creates a function that signs an AgentCard and adds the signature.
6881
6982 Args:
7083 signing_key: The private key for signing.
71- kid: Key ID for the signing key.
72- alg: The algorithm to use (e.g., "ES256", "RS256").
73- jku: Optional URL to the JSON Web Keys.
84+ protected_header: The protected header parameters.
85+ header: Unprotected header parameters.
7486
7587 Returns:
7688 A callable that takes an AgentCard and returns the modified AgentCard with a signature.
7789 """
7890
7991 def agent_card_signer (agent_card : AgentCard ) -> AgentCard :
80- """The actual card_modifier function ."""
92+ """Signs agent card ."""
8193 canonical_payload = canonicalize_agent_card (agent_card )
94+ payload_dict = json .loads (canonical_payload )
8295
83- headers = {'kid' : kid , 'typ' : 'JOSE' }
84- if jku :
85- headers ['jku' ] = jku
86-
87- jws_string = jws .sign (
88- payload = canonical_payload .encode ('utf-8' ),
96+ jws_string = jwt .encode (
97+ payload = payload_dict ,
8998 key = signing_key ,
90- headers = headers ,
91- algorithm = alg ,
99+ algorithm = protected_header . get ( 'alg' , 'HS256' ) ,
100+ headers = protected_header ,
92101 )
93102
94- # The result of jws.sign is a compact serialization: HEADER.PAYLOAD.SIGNATURE
95- protected_header , _ , signature = jws_string .split ('.' )
103+ # The result of jwt.encode is a compact serialization: HEADER.PAYLOAD.SIGNATURE
104+ protected , _ , signature = jws_string .split ('.' )
96105
97106 agent_card_signature = AgentCardSignature (
98- protected = protected_header ,
107+ header = header ,
108+ protected = protected ,
99109 signature = signature ,
100110 )
101111
@@ -108,9 +118,7 @@ def agent_card_signer(agent_card: AgentCard) -> AgentCard:
108118
109119
110120def create_signature_verifier (
111- key_provider : Callable [
112- [str | None , str | None ], str | bytes | dict [str , Any ] | Key
113- ],
121+ key_provider : Callable [[str | None , str | None ], PyJWK | str | bytes ],
114122 algorithms : list [str ],
115123) -> Callable [[AgentCard ], None ]:
116124 """Creates a function that verifies AgentCard signatures.
@@ -126,14 +134,17 @@ def create_signature_verifier(
126134 def signature_verifier (
127135 agent_card : AgentCard ,
128136 ) -> None :
129- """The actual signature_verifier function."""
137+ """Verifies agent card signatures.
138+
139+ Checks if at least one signature matches the key, otherwise raises an error.
140+ """
130141 if not agent_card .signatures :
131- raise JOSEError ( 'No signatures found on AgentCard ' )
142+ raise NoSignatureError ( 'AgentCard has no signatures to verify. ' )
132143
133144 last_error = None
134145 for agent_card_signature in agent_card .signatures :
135146 try :
136- # fetch kid and jku from protected header
147+ # get verification key
137148 protected_header_json = base64url_decode (
138149 agent_card_signature .protected .encode ('utf-8' )
139150 ).decode ('utf-8' )
@@ -146,20 +157,22 @@ def signature_verifier(
146157 encoded_payload = base64url_encode (
147158 canonical_payload .encode ('utf-8' )
148159 ).decode ('utf-8' )
149- token = f'{ agent_card_signature .protected } .{ encoded_payload } .{ agent_card_signature .signature } '
150160
151- jws .verify (
152- token = token ,
161+ token = f'{ agent_card_signature .protected } .{ encoded_payload } .{ agent_card_signature .signature } '
162+ jwt .decode (
163+ jwt = token ,
153164 key = verification_key ,
154165 algorithms = algorithms ,
155166 )
156167 # Found a valid signature, exit the loop and function
157168 break
158- except JOSEError as e :
169+ except PyJWTError as e :
159170 last_error = e
160171 continue
161172 else :
162173 # This block runs only if the loop completes without a break
163- raise JOSEError ('No valid signature found' ) from last_error
174+ raise InvalidSignaturesError (
175+ 'No valid signature found'
176+ ) from last_error
164177
165178 return signature_verifier
0 commit comments