Skip to content

Commit 971f110

Browse files
committed
Add test case to show that OBO supports SP
1 parent 6c84adf commit 971f110

File tree

1 file changed

+68
-21
lines changed

1 file changed

+68
-21
lines changed

tests/test_e2e.py

Lines changed: 68 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -649,19 +649,27 @@ def _test_acquire_token_obo(self, config_pca, config_cca,
649649
# Here we just test regional apps won't adversely break OBO
650650
http_client=None,
651651
):
652-
# 1. An app obtains a token representing a user, for our mid-tier service
653-
pca = msal.PublicClientApplication(
654-
config_pca["client_id"], authority=config_pca["authority"],
655-
azure_region=azure_region,
656-
http_client=http_client or MinimalHttpClient())
657-
pca_result = pca.acquire_token_by_username_password(
658-
config_pca["username"],
659-
config_pca["password"],
660-
scopes=config_pca["scope"],
661-
)
662-
self.assertIsNotNone(
663-
pca_result.get("access_token"),
664-
"PCA failed to get AT because %s" % json.dumps(pca_result, indent=2))
652+
if "client_secret" not in config_pca:
653+
# 1.a An app obtains a token representing a user, for our mid-tier service
654+
result = msal.PublicClientApplication(
655+
config_pca["client_id"], authority=config_pca["authority"],
656+
azure_region=azure_region,
657+
http_client=http_client or MinimalHttpClient(),
658+
).acquire_token_by_username_password(
659+
config_pca["username"], config_pca["password"],
660+
scopes=config_pca["scope"],
661+
)
662+
else: # We repurpose the config_pca to contain client_secret for cca app 1
663+
# 1.b An app obtains a token representing itself, for our mid-tier service
664+
result = msal.ConfidentialClientApplication(
665+
config_pca["client_id"], authority=config_pca["authority"],
666+
client_credential=config_pca["client_secret"],
667+
azure_region=azure_region,
668+
http_client=http_client or MinimalHttpClient(),
669+
).acquire_token_for_client(scopes=config_pca["scope"])
670+
assertion = result.get("access_token")
671+
self.assertIsNotNone(assertion, "First app failed to get AT. {}".format(
672+
json.dumps(result, indent=2)))
665673

666674
# 2. Our mid-tier service uses OBO to obtain a token for downstream service
667675
cca = msal.ConfidentialClientApplication(
@@ -674,9 +682,9 @@ def _test_acquire_token_obo(self, config_pca, config_cca,
674682
# That's fine if OBO app uses short-lived msal instance per session.
675683
# Otherwise, the OBO app need to implement a one-cache-per-user setup.
676684
)
677-
cca_result = cca.acquire_token_on_behalf_of(
678-
pca_result['access_token'], config_cca["scope"])
679-
self.assertNotEqual(None, cca_result.get("access_token"), str(cca_result))
685+
cca_result = cca.acquire_token_on_behalf_of(assertion, config_cca["scope"])
686+
self.assertIsNotNone(cca_result.get("access_token"), "OBO call failed: {}".format(
687+
json.dumps(cca_result, indent=2)))
680688

681689
# 3. Now the OBO app can simply store downstream token(s) in same session.
682690
# Alternatively, if you want to persist the downstream AT, and possibly
@@ -685,13 +693,27 @@ def _test_acquire_token_obo(self, config_pca, config_cca,
685693
# Assuming you already did that (which is not shown in this test case),
686694
# the following part shows one of the ways to obtain an AT from cache.
687695
username = cca_result.get("id_token_claims", {}).get("preferred_username")
688-
if username: # It means CCA have requested an IDT w/ "profile" scope
689-
self.assertEqual(config_cca["username"], username)
690696
accounts = cca.get_accounts(username=username)
691-
assert len(accounts) == 1, "App is expected to partition token cache per user"
692-
account = accounts[0]
697+
if username is not None: # It means CCA have requested an IDT w/ "profile" scope
698+
assert config_cca["username"] == username, "Incorrect test case configuration"
699+
self.assertEqual(1, len(accounts), "App is supposed to partition token cache per user")
700+
account = accounts[0] # Alternatively, cca app could just loop through each account
693701
result = cca.acquire_token_silent(config_cca["scope"], account)
694-
self.assertEqual(cca_result["access_token"], result["access_token"])
702+
self.assertTrue(
703+
result and result.get("access_token") == cca_result["access_token"],
704+
"CCA should hit an access token from cache: {}".format(
705+
json.dumps(cca.token_cache._cache, indent=2)))
706+
if "refresh_token" in cca_result:
707+
result = cca.acquire_token_silent(
708+
config_cca["scope"], account=account, force_refresh=True)
709+
self.assertTrue(
710+
result and "access_token" in result,
711+
"CCA should get an AT silently, but we got this instead: {}".format(result))
712+
self.assertNotEqual(
713+
result["access_token"], cca_result["access_token"],
714+
"CCA should get a new AT")
715+
else:
716+
logger.info("AAD did not issue a RT for OBO flow")
695717

696718
def _test_acquire_token_by_client_secret(
697719
self, client_id=None, client_secret=None, authority=None, scope=None,
@@ -933,6 +955,31 @@ def test_acquire_token_obo(self):
933955

934956
self._test_acquire_token_obo(config_pca, config_cca)
935957

958+
@unittest.skipUnless(
959+
os.path.exists("tests/sp_obo.pem"),
960+
"Need a 'tests/sp_obo.pem' private to run OBO for SP test")
961+
def test_acquire_token_obo_for_sp(self):
962+
authority = "https://login.windows-ppe.net/f686d426-8d16-42db-81b7-ab578e110ccd"
963+
with open("tests/sp_obo.pem") as pem:
964+
client_secret = {
965+
"private_key": pem.read(),
966+
"thumbprint": "378938210C976692D7F523B8C4FFBB645D17CE92",
967+
}
968+
midtier_app = {
969+
"authority": authority,
970+
"client_id": "c84e9c32-0bc9-4a73-af05-9efe9982a322",
971+
"client_secret": client_secret,
972+
"scope": ["23d08a1e-1249-4f7c-b5a5-cb11f29b6923/.default"],
973+
#"username": "OBO-Client-PPE", # We do NOT attempt locating initial_app by name
974+
}
975+
initial_app = {
976+
"authority": authority,
977+
"client_id": "9793041b-9078-4942-b1d2-babdc472cc0c",
978+
"client_secret": client_secret,
979+
"scope": [midtier_app["client_id"] + "/.default"],
980+
}
981+
self._test_acquire_token_obo(initial_app, midtier_app)
982+
936983
def test_acquire_token_by_client_secret(self):
937984
# Vastly different than ArlingtonCloudTestCase.test_acquire_token_by_client_secret()
938985
_app = self.get_lab_app_object(

0 commit comments

Comments
 (0)