88
99from oauth2cli import Client
1010from .authority import Authority
11- from .assertion import create_jwt_assertion
11+ from oauth2cli .assertion import JwtSigner
1212import mex
1313import wstrust_request
1414from .wstrust_response import SAML_TOKEN_TYPE_V1 , SAML_TOKEN_TYPE_V2
@@ -55,7 +55,7 @@ def __init__(
5555 or an X509 certificate container in this form:
5656
5757 {
58- "certificate ": "-----BEGIN PRIVATE KEY-----...",
58+ "private_key ": "... -----BEGIN PRIVATE KEY-----...",
5959 "thumbprint": "A1B2C3D4E5F6...",
6060 }
6161 """
@@ -66,36 +66,36 @@ def __init__(
6666 validate_authority )
6767 # Here the self.authority is not the same type as authority in input
6868 self .token_cache = token_cache or TokenCache ()
69- default_body = self ._build_auth_parameters (
70- self .client_credential ,
71- self .authority .token_endpoint , self .client_id )
72- default_body ["client_info" ] = 1
73- self .client = Client (
69+ self .client = self ._build_client (client_credential , self .authority )
70+
71+ def _build_client (self , client_credential , authority ):
72+ client_assertion = None
73+ default_body = {"client_info" : 1 }
74+ if isinstance (client_credential , dict ):
75+ assert ("private_key" in client_credential
76+ and "thumbprint" in client_credential )
77+ signer = JwtSigner (
78+ client_credential ["private_key" ], algorithm = "RS256" ,
79+ sha1_thumbprint = client_credential .get ("thumbprint" ))
80+ client_assertion = signer .sign_assertion (
81+ audience = authority .token_endpoint , issuer = self .client_id )
82+ else :
83+ default_body ['client_secret' ] = client_credential
84+ return Client (
7485 self .client_id ,
7586 configuration = {
76- "token_endpoint" : self .authority .token_endpoint ,
87+ "authorization_endpoint" : authority .authorization_endpoint ,
88+ "token_endpoint" : authority .token_endpoint ,
7789 "device_authorization_endpoint" : urljoin (
78- self . authority .token_endpoint , "devicecode" ),
90+ authority .token_endpoint , "devicecode" ),
7991 },
8092 default_body = default_body ,
93+ client_assertion = client_assertion ,
8194 on_obtaining_tokens = self .token_cache .add ,
8295 on_removing_rt = self .token_cache .remove_rt ,
8396 on_updating_rt = self .token_cache .update_rt ,
8497 )
8598
86- @staticmethod
87- def _build_auth_parameters (client_credential , token_endpoint , client_id ):
88- if isinstance (client_credential , dict ):
89- type_ = 'urn:ietf:params:oauth:client-assertion-type:jwt-bearer'
90- assertion = create_jwt_assertion (
91- client_credential ['certificate' ],
92- client_credential ['thumbprint' ],
93- audience = token_endpoint , issuer = client_id )
94- return {
95- 'client_assertion_type' : type_ , 'client_assertion' : assertion }
96- else :
97- return {'client_secret' : client_credential }
98-
9999 def get_authorization_request_url (
100100 self ,
101101 scope ,
@@ -218,16 +218,7 @@ def acquire_token_silent(
218218 "home_account_id" : (account or {}).get ("home_account_id" ),
219219 # "realm": the_authority.tenant, # AAD RTs are tenant-independent
220220 })
221- client = Client (
222- self .client_id ,
223- configuration = {"token_endpoint" : the_authority .token_endpoint },
224- default_body = self ._build_auth_parameters (
225- self .client_credential ,
226- the_authority .token_endpoint , self .client_id ),
227- on_obtaining_tokens = self .token_cache .add ,
228- on_removing_rt = self .token_cache .remove_rt ,
229- on_updating_rt = self .token_cache .update_rt ,
230- )
221+ client = self ._build_client (self .client_credential , the_authority )
231222 for entry in matches :
232223 response = client .obtain_token_with_refresh_token (
233224 entry , rt_getter = lambda token_item : token_item ["secret" ],
0 commit comments