Skip to content

Commit 2488249

Browse files
committed
Refactor SshCert e2e test to use lab user
1 parent 4dd6ce4 commit 2488249

File tree

1 file changed

+53
-42
lines changed

1 file changed

+53
-42
lines changed

tests/test_e2e.py

Lines changed: 53 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -277,48 +277,6 @@ def _test_acquire_token_interactive(
277277
return result # For further testing
278278

279279

280-
class SshCertTestCase(E2eTestCase):
281-
_JWK1 = """{"kty":"RSA", "n":"2tNr73xwcj6lH7bqRZrFzgSLj7OeLfbn8216uOMDHuaZ6TEUBDN8Uz0ve8jAlKsP9CQFCSVoSNovdE-fs7c15MxEGHjDcNKLWonznximj8pDGZQjVdfK-7mG6P6z-lgVcLuYu5JcWU_PeEqIKg5llOaz-qeQ4LEDS4T1D2qWRGpAra4rJX1-kmrWmX_XIamq30C9EIO0gGuT4rc2hJBWQ-4-FnE1NXmy125wfT3NdotAJGq5lMIfhjfglDbJCwhc8Oe17ORjO3FsB5CLuBRpYmP7Nzn66lRY3Fe11Xz8AEBl3anKFSJcTvlMnFtu3EpD-eiaHfTgRBU7CztGQqVbiQ", "e":"AQAB"}"""
282-
_JWK2 = """{"kty":"RSA", "n":"72u07mew8rw-ssw3tUs9clKstGO2lvD7ZNxJU7OPNKz5PGYx3gjkhUmtNah4I4FP0DuF1ogb_qSS5eD86w10Wb1ftjWcoY8zjNO9V3ph-Q2tMQWdDW5kLdeU3-EDzc0HQeou9E0udqmfQoPbuXFQcOkdcbh3eeYejs8sWn3TQprXRwGh_TRYi-CAurXXLxQ8rp-pltUVRIr1B63fXmXhMeCAGwCPEFX9FRRs-YHUszUJl9F9-E0nmdOitiAkKfCC9LhwB9_xKtjmHUM9VaEC9jWOcdvXZutwEoW2XPMOg0Ky-s197F9rfpgHle2gBrXsbvVMvS0D-wXg6vsq6BAHzQ", "e":"AQAB"}"""
283-
DATA1 = {"token_type": "ssh-cert", "key_id": "key1", "req_cnf": _JWK1}
284-
DATA2 = {"token_type": "ssh-cert", "key_id": "key2", "req_cnf": _JWK2}
285-
_SCOPE_USER = ["https://pas.windows.net/CheckMyAccess/Linux/user_impersonation"]
286-
_SCOPE_SP = ["https://pas.windows.net/CheckMyAccess/Linux/.default"]
287-
SCOPE = _SCOPE_SP # Historically there was a separation, at 2021 it is unified
288-
289-
def test_ssh_cert_for_service_principal(self):
290-
# Any SP can obtain an ssh-cert. Here we use the lab app.
291-
result = get_lab_app().acquire_token_for_client(self.SCOPE, data=self.DATA1)
292-
self.assertIsNotNone(result.get("access_token"), "Encountered {}: {}".format(
293-
result.get("error"), result.get("error_description")))
294-
self.assertEqual("ssh-cert", result["token_type"])
295-
296-
def test_ssh_cert_for_user_should_work_with_any_account(self):
297-
result = self._test_acquire_token_interactive(
298-
client_id="04b07795-8ddb-461a-bbee-02f9e1bf7b46", # Azure CLI is one
299-
# of the only 2 clients that are PreAuthz to use ssh cert feature
300-
authority="https://login.microsoftonline.com/common",
301-
scope=self.SCOPE,
302-
data=self.DATA1,
303-
username_uri="https://msidlab.com/api/user?usertype=cloud",
304-
prompt="none" if msal.application._is_running_in_cloud_shell() else None,
305-
) # It already tests reading AT from cache, and using RT to refresh
306-
# acquire_token_silent() would work because we pass in the same key
307-
self.assertIsNotNone(result.get("access_token"), "Encountered {}: {}".format(
308-
result.get("error"), result.get("error_description")))
309-
self.assertEqual("ssh-cert", result["token_type"])
310-
logger.debug("%s.cache = %s",
311-
self.id(), json.dumps(self.app.token_cache._cache, indent=4))
312-
313-
# refresh_token grant can fetch an ssh-cert bound to a different key
314-
account = self.app.get_accounts()[0]
315-
refreshed_ssh_cert = self.app.acquire_token_silent(
316-
self.SCOPE, account=account, data=self.DATA2)
317-
self.assertIsNotNone(refreshed_ssh_cert)
318-
self.assertEqual(refreshed_ssh_cert["token_type"], "ssh-cert")
319-
self.assertNotEqual(result["access_token"], refreshed_ssh_cert['access_token'])
320-
321-
322280
@unittest.skipUnless(
323281
msal.application._is_running_in_cloud_shell(),
324282
"Manually run this test case from inside Cloud Shell")
@@ -697,6 +655,59 @@ def _test_acquire_token_by_client_secret(
697655
self.assertCacheWorksForApp(result, scope)
698656

699657

658+
class PopWithExternalKeyTestCase(LabBasedTestCase):
659+
def _test_service_principal(self):
660+
# Any SP can obtain an ssh-cert. Here we use the lab app.
661+
result = get_lab_app().acquire_token_for_client(self.SCOPE, data=self.DATA1)
662+
self.assertIsNotNone(result.get("access_token"), "Encountered {}: {}".format(
663+
result.get("error"), result.get("error_description")))
664+
self.assertEqual(self.EXPECTED_TOKEN_TYPE, result["token_type"])
665+
666+
def _test_user_account(self):
667+
lab_user = self.get_lab_user(usertype="cloud")
668+
result = self._test_acquire_token_interactive(
669+
client_id="04b07795-8ddb-461a-bbee-02f9e1bf7b46", # Azure CLI is one
670+
# of the only 2 clients that are PreAuthz to use ssh cert feature
671+
authority="https://login.microsoftonline.com/common",
672+
scope=self.SCOPE,
673+
data=self.DATA1,
674+
username=lab_user["username"],
675+
lab_name=lab_user["lab_name"],
676+
prompt="none" if msal.application._is_running_in_cloud_shell() else None,
677+
) # It already tests reading AT from cache, and using RT to refresh
678+
# acquire_token_silent() would work because we pass in the same key
679+
self.assertIsNotNone(result.get("access_token"), "Encountered {}: {}".format(
680+
result.get("error"), result.get("error_description")))
681+
self.assertEqual(self.EXPECTED_TOKEN_TYPE, result["token_type"])
682+
logger.debug("%s.cache = %s",
683+
self.id(), json.dumps(self.app.token_cache._cache, indent=4))
684+
685+
# refresh_token grant can fetch an ssh-cert bound to a different key
686+
account = self.app.get_accounts()[0]
687+
refreshed_ssh_cert = self.app.acquire_token_silent(
688+
self.SCOPE, account=account, data=self.DATA2)
689+
self.assertIsNotNone(refreshed_ssh_cert)
690+
self.assertEqual(self.EXPECTED_TOKEN_TYPE, refreshed_ssh_cert["token_type"])
691+
self.assertNotEqual(result["access_token"], refreshed_ssh_cert['access_token'])
692+
693+
694+
class SshCertTestCase(PopWithExternalKeyTestCase):
695+
EXPECTED_TOKEN_TYPE = "ssh-cert"
696+
_JWK1 = """{"kty":"RSA", "n":"2tNr73xwcj6lH7bqRZrFzgSLj7OeLfbn8216uOMDHuaZ6TEUBDN8Uz0ve8jAlKsP9CQFCSVoSNovdE-fs7c15MxEGHjDcNKLWonznximj8pDGZQjVdfK-7mG6P6z-lgVcLuYu5JcWU_PeEqIKg5llOaz-qeQ4LEDS4T1D2qWRGpAra4rJX1-kmrWmX_XIamq30C9EIO0gGuT4rc2hJBWQ-4-FnE1NXmy125wfT3NdotAJGq5lMIfhjfglDbJCwhc8Oe17ORjO3FsB5CLuBRpYmP7Nzn66lRY3Fe11Xz8AEBl3anKFSJcTvlMnFtu3EpD-eiaHfTgRBU7CztGQqVbiQ", "e":"AQAB"}"""
697+
_JWK2 = """{"kty":"RSA", "n":"72u07mew8rw-ssw3tUs9clKstGO2lvD7ZNxJU7OPNKz5PGYx3gjkhUmtNah4I4FP0DuF1ogb_qSS5eD86w10Wb1ftjWcoY8zjNO9V3ph-Q2tMQWdDW5kLdeU3-EDzc0HQeou9E0udqmfQoPbuXFQcOkdcbh3eeYejs8sWn3TQprXRwGh_TRYi-CAurXXLxQ8rp-pltUVRIr1B63fXmXhMeCAGwCPEFX9FRRs-YHUszUJl9F9-E0nmdOitiAkKfCC9LhwB9_xKtjmHUM9VaEC9jWOcdvXZutwEoW2XPMOg0Ky-s197F9rfpgHle2gBrXsbvVMvS0D-wXg6vsq6BAHzQ", "e":"AQAB"}"""
698+
DATA1 = {"token_type": "ssh-cert", "key_id": "key1", "req_cnf": _JWK1}
699+
DATA2 = {"token_type": "ssh-cert", "key_id": "key2", "req_cnf": _JWK2}
700+
_SCOPE_USER = ["https://pas.windows.net/CheckMyAccess/Linux/user_impersonation"]
701+
_SCOPE_SP = ["https://pas.windows.net/CheckMyAccess/Linux/.default"]
702+
SCOPE = _SCOPE_SP # Historically there was a separation, at 2021 it is unified
703+
704+
def test_service_principal(self):
705+
self._test_service_principal()
706+
707+
def test_user_account(self):
708+
self._test_user_account()
709+
710+
700711
class WorldWideTestCase(LabBasedTestCase):
701712

702713
def test_aad_managed_user(self): # Pure cloud

0 commit comments

Comments
 (0)