1+ # a2a/client/auth/interceptor.py
2+
3+ import logging
4+ from typing import Any
5+
6+ from a2a .client .auth .credentials import CredentialService
7+ from a2a .client .middleware import ClientCallContext , ClientCallInterceptor
8+ from a2a .types import AgentCard , APIKeySecurityScheme , HTTPAuthSecurityScheme
9+
10+ logger = logging .getLogger (__name__ )
11+
12+
13+ class AuthInterceptor (ClientCallInterceptor ):
14+ """
15+ An interceptor that automatically adds authentication details to requests
16+ based on the agent's security schemes.
17+ """
18+
19+ def __init__ (self , credential_service : CredentialService ):
20+ self ._credential_service = credential_service
21+
22+ async def intercept (
23+ self ,
24+ method_name : str ,
25+ request_payload : dict [str , Any ],
26+ http_kwargs : dict [str , Any ],
27+ agent_card : AgentCard | None ,
28+ context : ClientCallContext | None ,
29+ ) -> tuple [dict [str , Any ], dict [str , Any ]]:
30+ """
31+ Adds authentication headers to the request if credentials can be found.
32+ """
33+ if not agent_card or not agent_card .security or not agent_card .securitySchemes :
34+ return request_payload , http_kwargs
35+
36+ for requirement in agent_card .security :
37+ for scheme_name in requirement :
38+ credential = await self ._credential_service .get_credentials (
39+ scheme_name , context
40+ )
41+ if credential and scheme_name in agent_card .securitySchemes :
42+ scheme_def = agent_card .securitySchemes [scheme_name ].root
43+ headers = http_kwargs .get ('headers' , {})
44+
45+ if isinstance (scheme_def , HTTPAuthSecurityScheme ):
46+ headers ['Authorization' ] = f"{ scheme_def .scheme } { credential } "
47+ http_kwargs ['headers' ] = headers
48+ logger .debug (f"Added HTTP Auth for scheme '{ scheme_name } '." )
49+ return request_payload , http_kwargs
50+ elif isinstance (scheme_def , APIKeySecurityScheme ):
51+ if scheme_def .in_ == 'header' :
52+ headers [scheme_def .name ] = credential
53+ http_kwargs ['headers' ] = headers
54+ logger .debug (f"Added API Key Header for scheme '{ scheme_name } '." )
55+ return request_payload , http_kwargs
56+ else :
57+ logger .warning (
58+ f"API Key in '{ scheme_def .in_ } ' not supported by this interceptor."
59+ )
60+
61+ return request_payload , http_kwargs
0 commit comments