33import json
44import time
55import unittest
6+ import sys
67
78import requests
89
1112from msal .oauth2cli import AuthCodeReceiver
1213
1314logger = logging .getLogger (__name__ )
14- logging .basicConfig (level = logging .INFO )
15+ logging .basicConfig (level = logging .DEBUG if "-v" in sys . argv else logging . INFO )
1516
1617
1718def _get_app_and_auth_code (
@@ -49,7 +50,8 @@ def assertLoosely(self, response, assertion=None,
4950 error_description = response .get ("error_description" )))
5051 assertion ()
5152
52- def assertCacheWorksForUser (self , result_from_wire , scope , username = None ):
53+ def assertCacheWorksForUser (
54+ self , result_from_wire , scope , username = None , data = None ):
5355 # You can filter by predefined username, or let end user to choose one
5456 accounts = self .app .get_accounts (username = username )
5557 self .assertNotEqual (0 , len (accounts ))
@@ -59,7 +61,8 @@ def assertCacheWorksForUser(self, result_from_wire, scope, username=None):
5961 set (scope ) <= set (result_from_wire ["scope" ].split (" " ))
6062 ):
6163 # Going to test acquire_token_silent(...) to locate an AT from cache
62- result_from_cache = self .app .acquire_token_silent (scope , account = account )
64+ result_from_cache = self .app .acquire_token_silent (
65+ scope , account = account , data = data or {})
6366 self .assertIsNotNone (result_from_cache )
6467 self .assertIsNone (
6568 result_from_cache .get ("refresh_token" ), "A cache hit returns no RT" )
@@ -69,7 +72,8 @@ def assertCacheWorksForUser(self, result_from_wire, scope, username=None):
6972
7073 # Going to test acquire_token_silent(...) to obtain an AT by a RT from cache
7174 self .app .token_cache ._cache ["AccessToken" ] = {} # A hacky way to clear ATs
72- result_from_cache = self .app .acquire_token_silent (scope , account = account )
75+ result_from_cache = self .app .acquire_token_silent (
76+ scope , account = account , data = data or {})
7377 self .assertIsNotNone (result_from_cache ,
7478 "We should get a result from acquire_token_silent(...) call" )
7579 self .assertIsNotNone (
@@ -131,6 +135,84 @@ def _test_device_flow(
131135 logger .info (
132136 "%s obtained tokens: %s" , self .id (), json .dumps (result , indent = 4 ))
133137
138+ def _test_acquire_token_interactive (
139+ self , client_id = None , authority = None , scope = None , port = None ,
140+ username_uri = "" , # But you would want to provide one
141+ data = None , # Needed by ssh-cert feature
142+ ** ignored ):
143+ assert client_id and authority and scope
144+ self .app = msal .PublicClientApplication (
145+ client_id , authority = authority , http_client = MinimalHttpClient ())
146+ result = self .app .acquire_token_interactive (
147+ scope ,
148+ timeout = 120 ,
149+ port = port ,
150+ welcome_template = # This is an undocumented feature for testing
151+ """<html><body><h1>{id}</h1><ol>
152+ <li>Get a username from the upn shown at <a href="{username_uri}">here</a></li>
153+ <li>Get its password from https://aka.ms/GetLabUserSecret?Secret=msidlabXYZ
154+ (replace the lab name with the labName from the link above).</li>
155+ <li><a href="$auth_uri">Sign In</a> or <a href="$abort_uri">Abort</a></li>
156+ </ol></body></html>""" .format (id = self .id (), username_uri = username_uri ),
157+ data = data or {},
158+ )
159+ logger .debug (
160+ "%s: cache = %s, id_token_claims = %s" ,
161+ self .id (),
162+ json .dumps (self .app .token_cache ._cache , indent = 4 ),
163+ json .dumps (result .get ("id_token_claims" ), indent = 4 ),
164+ )
165+ self .assertIn (
166+ "access_token" , result ,
167+ "{error}: {error_description}" .format (
168+ # Note: No interpolation here, cause error won't always present
169+ error = result .get ("error" ),
170+ error_description = result .get ("error_description" )))
171+ self .assertCacheWorksForUser (result , scope , username = None , data = data or {})
172+ return result # For further testing
173+
174+
175+ class SshCertTestCase (E2eTestCase ):
176+ _JWK1 = """{"kty":"RSA", "n":"2tNr73xwcj6lH7bqRZrFzgSLj7OeLfbn8216uOMDHuaZ6TEUBDN8Uz0ve8jAlKsP9CQFCSVoSNovdE-fs7c15MxEGHjDcNKLWonznximj8pDGZQjVdfK-7mG6P6z-lgVcLuYu5JcWU_PeEqIKg5llOaz-qeQ4LEDS4T1D2qWRGpAra4rJX1-kmrWmX_XIamq30C9EIO0gGuT4rc2hJBWQ-4-FnE1NXmy125wfT3NdotAJGq5lMIfhjfglDbJCwhc8Oe17ORjO3FsB5CLuBRpYmP7Nzn66lRY3Fe11Xz8AEBl3anKFSJcTvlMnFtu3EpD-eiaHfTgRBU7CztGQqVbiQ", "e":"AQAB"}"""
177+ _JWK2 = """{"kty":"RSA", "n":"72u07mew8rw-ssw3tUs9clKstGO2lvD7ZNxJU7OPNKz5PGYx3gjkhUmtNah4I4FP0DuF1ogb_qSS5eD86w10Wb1ftjWcoY8zjNO9V3ph-Q2tMQWdDW5kLdeU3-EDzc0HQeou9E0udqmfQoPbuXFQcOkdcbh3eeYejs8sWn3TQprXRwGh_TRYi-CAurXXLxQ8rp-pltUVRIr1B63fXmXhMeCAGwCPEFX9FRRs-YHUszUJl9F9-E0nmdOitiAkKfCC9LhwB9_xKtjmHUM9VaEC9jWOcdvXZutwEoW2XPMOg0Ky-s197F9rfpgHle2gBrXsbvVMvS0D-wXg6vsq6BAHzQ", "e":"AQAB"}"""
178+ DATA1 = {"token_type" : "ssh-cert" , "key_id" : "key1" , "req_cnf" : _JWK1 }
179+ DATA2 = {"token_type" : "ssh-cert" , "key_id" : "key2" , "req_cnf" : _JWK2 }
180+ _SCOPE_USER = ["https://pas.windows.net/CheckMyAccess/Linux/user_impersonation" ]
181+ _SCOPE_SP = ["https://pas.windows.net/CheckMyAccess/Linux/.default" ]
182+ SCOPE = _SCOPE_SP # Historically there was a separation, at 2021 it is unified
183+
184+ def test_ssh_cert_for_service_principal (self ):
185+ # Any SP can obtain an ssh-cert. Here we use the lab app.
186+ result = get_lab_app ().acquire_token_for_client (self .SCOPE , data = self .DATA1 )
187+ self .assertIsNotNone (result .get ("access_token" ), "Encountered {}: {}" .format (
188+ result .get ("error" ), result .get ("error_description" )))
189+ self .assertEqual ("ssh-cert" , result ["token_type" ])
190+
191+ @unittest .skipIf (os .getenv ("TRAVIS" ), "Browser automation is not yet implemented" )
192+ def test_ssh_cert_for_user (self ):
193+ result = self ._test_acquire_token_interactive (
194+ client_id = "04b07795-8ddb-461a-bbee-02f9e1bf7b46" , # Azure CLI is one
195+ # of the only 2 clients that are PreAuthz to use ssh cert feature
196+ authority = "https://login.microsoftonline.com/common" ,
197+ scope = self .SCOPE ,
198+ data = self .DATA1 ,
199+ username_uri = "https://msidlab.com/api/user?usertype=cloud" ,
200+ ) # It already tests reading AT from cache, and using RT to refresh
201+ # acquire_token_silent() would work because we pass in the same key
202+ self .assertIsNotNone (result .get ("access_token" ), "Encountered {}: {}" .format (
203+ result .get ("error" ), result .get ("error_description" )))
204+ self .assertEqual ("ssh-cert" , result ["token_type" ])
205+ logger .debug ("%s.cache = %s" ,
206+ self .id (), json .dumps (self .app .token_cache ._cache , indent = 4 ))
207+
208+ # refresh_token grant can fetch an ssh-cert bound to a different key
209+ account = self .app .get_accounts ()[0 ]
210+ refreshed_ssh_cert = self .app .acquire_token_silent (
211+ self .SCOPE , account = account , data = self .DATA2 )
212+ self .assertIsNotNone (refreshed_ssh_cert )
213+ self .assertEqual (refreshed_ssh_cert ["token_type" ], "ssh-cert" )
214+ self .assertNotEqual (result ["access_token" ], refreshed_ssh_cert ['access_token' ])
215+
134216
135217THIS_FOLDER = os .path .dirname (__file__ )
136218CONFIG = os .path .join (THIS_FOLDER , "config.json" )
@@ -190,48 +272,6 @@ def test_auth_code_with_mismatching_nonce(self):
190272 self .app .acquire_token_by_authorization_code (
191273 ac , self .config ["scope" ], redirect_uri = redirect_uri , nonce = "bar" )
192274
193- def test_ssh_cert (self ):
194- self .skipUnlessWithConfig (["client_id" , "scope" ])
195-
196- JWK1 = """{"kty":"RSA", "n":"2tNr73xwcj6lH7bqRZrFzgSLj7OeLfbn8216uOMDHuaZ6TEUBDN8Uz0ve8jAlKsP9CQFCSVoSNovdE-fs7c15MxEGHjDcNKLWonznximj8pDGZQjVdfK-7mG6P6z-lgVcLuYu5JcWU_PeEqIKg5llOaz-qeQ4LEDS4T1D2qWRGpAra4rJX1-kmrWmX_XIamq30C9EIO0gGuT4rc2hJBWQ-4-FnE1NXmy125wfT3NdotAJGq5lMIfhjfglDbJCwhc8Oe17ORjO3FsB5CLuBRpYmP7Nzn66lRY3Fe11Xz8AEBl3anKFSJcTvlMnFtu3EpD-eiaHfTgRBU7CztGQqVbiQ", "e":"AQAB"}"""
197- JWK2 = """{"kty":"RSA", "n":"72u07mew8rw-ssw3tUs9clKstGO2lvD7ZNxJU7OPNKz5PGYx3gjkhUmtNah4I4FP0DuF1ogb_qSS5eD86w10Wb1ftjWcoY8zjNO9V3ph-Q2tMQWdDW5kLdeU3-EDzc0HQeou9E0udqmfQoPbuXFQcOkdcbh3eeYejs8sWn3TQprXRwGh_TRYi-CAurXXLxQ8rp-pltUVRIr1B63fXmXhMeCAGwCPEFX9FRRs-YHUszUJl9F9-E0nmdOitiAkKfCC9LhwB9_xKtjmHUM9VaEC9jWOcdvXZutwEoW2XPMOg0Ky-s197F9rfpgHle2gBrXsbvVMvS0D-wXg6vsq6BAHzQ", "e":"AQAB"}"""
198- data1 = {"token_type" : "ssh-cert" , "key_id" : "key1" , "req_cnf" : JWK1 }
199- ssh_test_slice = {
200- "dc" : "prod-wst-test1" ,
201- "slice" : "test" ,
202- "sshcrt" : "true" ,
203- }
204-
205- scopes = [ # Only this scope would result in an SSH-Cert
206- "https://pas.windows.net/CheckMyAccess/Linux/user_impersonation" ]
207- (self .app , ac , redirect_uri ) = self ._get_app_and_auth_code (scopes = scopes )
208-
209- result = self .app .acquire_token_by_authorization_code (
210- ac , scopes , redirect_uri = redirect_uri , data = data1 ,
211- params = ssh_test_slice )
212- self .assertIsNotNone (result .get ("access_token" ), "Encountered {}: {}" .format (
213- result .get ("error" ), result .get ("error_description" )))
214- self .assertEqual ("ssh-cert" , result ["token_type" ])
215- logger .debug ("%s.cache = %s" ,
216- self .id (), json .dumps (self .app .token_cache ._cache , indent = 4 ))
217-
218- # acquire_token_silent() needs to be passed the same key to work
219- account = self .app .get_accounts ()[0 ]
220- result_from_cache = self .app .acquire_token_silent (
221- scopes , account = account , data = data1 )
222- self .assertIsNotNone (result_from_cache )
223- self .assertEqual (
224- result ['access_token' ], result_from_cache ['access_token' ],
225- "We should get the cached SSH-cert" )
226-
227- # refresh_token grant can fetch an ssh-cert bound to a different key
228- refreshed_ssh_cert = self .app .acquire_token_silent (
229- scopes , account = account , params = ssh_test_slice ,
230- data = {"token_type" : "ssh-cert" , "key_id" : "key2" , "req_cnf" : JWK2 })
231- self .assertIsNotNone (refreshed_ssh_cert )
232- self .assertEqual (refreshed_ssh_cert ["token_type" ], "ssh-cert" )
233- self .assertNotEqual (result ["access_token" ], refreshed_ssh_cert ['access_token' ])
234-
235275 def test_client_secret (self ):
236276 self .skipUnlessWithConfig (["client_id" , "client_secret" ])
237277 self .app = msal .ConfidentialClientApplication (
@@ -445,39 +485,6 @@ def _test_acquire_token_by_auth_code_flow(
445485 error_description = result .get ("error_description" )))
446486 self .assertCacheWorksForUser (result , scope , username = None )
447487
448- def _test_acquire_token_interactive (
449- self , client_id = None , authority = None , scope = None , port = None ,
450- username_uri = "" , # But you would want to provide one
451- ** ignored ):
452- assert client_id and authority and scope
453- self .app = msal .PublicClientApplication (
454- client_id , authority = authority , http_client = MinimalHttpClient ())
455- result = self .app .acquire_token_interactive (
456- scope ,
457- timeout = 60 ,
458- port = port ,
459- welcome_template = # This is an undocumented feature for testing
460- """<html><body><h1>{id}</h1><ol>
461- <li>Get a username from the upn shown at <a href="{username_uri}">here</a></li>
462- <li>Get its password from https://aka.ms/GetLabUserSecret?Secret=msidlabXYZ
463- (replace the lab name with the labName from the link above).</li>
464- <li><a href="$auth_uri">Sign In</a> or <a href="$abort_uri">Abort</a></li>
465- </ol></body></html>""" .format (id = self .id (), username_uri = username_uri ),
466- )
467- logger .debug (
468- "%s: cache = %s, id_token_claims = %s" ,
469- self .id (),
470- json .dumps (self .app .token_cache ._cache , indent = 4 ),
471- json .dumps (result .get ("id_token_claims" ), indent = 4 ),
472- )
473- self .assertIn (
474- "access_token" , result ,
475- "{error}: {error_description}" .format (
476- # Note: No interpolation here, cause error won't always present
477- error = result .get ("error" ),
478- error_description = result .get ("error_description" )))
479- self .assertCacheWorksForUser (result , scope , username = None )
480-
481488 def _test_acquire_token_obo (self , config_pca , config_cca ):
482489 # 1. An app obtains a token representing a user, for our mid-tier service
483490 pca = msal .PublicClientApplication (
0 commit comments