@@ -477,9 +477,10 @@ def get_lab_app(
477477 if os .getenv (env_client_id ) and os .getenv (env_client_cert_path ):
478478 # id came from https://docs.msidlab.com/accounts/confidentialclient.html
479479 client_id = os .getenv (env_client_id )
480- # Cert came from https://ms.portal.azure.com/#@microsoft.onmicrosoft.com/asset/Microsoft_Azure_KeyVault/Certificate/https://msidlabs.vault.azure.net/certificates/LabVaultAccessCert
481480 client_credential = {
482- "private_key_pfx_path" : os .getenv (env_client_cert_path ),
481+ "private_key_pfx_path" :
482+ # Cert came from https://ms.portal.azure.com/#@microsoft.onmicrosoft.com/asset/Microsoft_Azure_KeyVault/Certificate/https://msidlabs.vault.azure.net/certificates/LabAuth
483+ os .getenv (env_client_cert_path ),
483484 "public_certificate" : True , # Opt in for SNI
484485 }
485486 elif os .getenv (env_client_id ) and os .getenv (env_name2 ):
@@ -649,19 +650,27 @@ def _test_acquire_token_obo(self, config_pca, config_cca,
649650 # Here we just test regional apps won't adversely break OBO
650651 http_client = None ,
651652 ):
652- # 1. An app obtains a token representing a user, for our mid-tier service
653- pca = msal .PublicClientApplication (
654- config_pca ["client_id" ], authority = config_pca ["authority" ],
655- azure_region = azure_region ,
656- http_client = http_client or MinimalHttpClient ())
657- pca_result = pca .acquire_token_by_username_password (
658- config_pca ["username" ],
659- config_pca ["password" ],
660- scopes = config_pca ["scope" ],
661- )
662- self .assertIsNotNone (
663- pca_result .get ("access_token" ),
664- "PCA failed to get AT because %s" % json .dumps (pca_result , indent = 2 ))
653+ if "client_secret" not in config_pca :
654+ # 1.a An app obtains a token representing a user, for our mid-tier service
655+ result = msal .PublicClientApplication (
656+ config_pca ["client_id" ], authority = config_pca ["authority" ],
657+ azure_region = azure_region ,
658+ http_client = http_client or MinimalHttpClient (),
659+ ).acquire_token_by_username_password (
660+ config_pca ["username" ], config_pca ["password" ],
661+ scopes = config_pca ["scope" ],
662+ )
663+ else : # We repurpose the config_pca to contain client_secret for cca app 1
664+ # 1.b An app obtains a token representing itself, for our mid-tier service
665+ result = msal .ConfidentialClientApplication (
666+ config_pca ["client_id" ], authority = config_pca ["authority" ],
667+ client_credential = config_pca ["client_secret" ],
668+ azure_region = azure_region ,
669+ http_client = http_client or MinimalHttpClient (),
670+ ).acquire_token_for_client (scopes = config_pca ["scope" ])
671+ assertion = result .get ("access_token" )
672+ self .assertIsNotNone (assertion , "First app failed to get AT. {}" .format (
673+ json .dumps (result , indent = 2 )))
665674
666675 # 2. Our mid-tier service uses OBO to obtain a token for downstream service
667676 cca = msal .ConfidentialClientApplication (
@@ -674,9 +683,9 @@ def _test_acquire_token_obo(self, config_pca, config_cca,
674683 # That's fine if OBO app uses short-lived msal instance per session.
675684 # Otherwise, the OBO app need to implement a one-cache-per-user setup.
676685 )
677- cca_result = cca .acquire_token_on_behalf_of (
678- pca_result [ 'access_token' ], config_cca [ "scope" ])
679- self . assertNotEqual ( None , cca_result . get ( "access_token" ), str ( cca_result ))
686+ cca_result = cca .acquire_token_on_behalf_of (assertion , config_cca [ "scope" ])
687+ self . assertIsNotNone ( cca_result . get ( "access_token" ), "OBO call failed: {}" . format (
688+ json . dumps ( cca_result , indent = 2 ) ))
680689
681690 # 3. Now the OBO app can simply store downstream token(s) in same session.
682691 # Alternatively, if you want to persist the downstream AT, and possibly
@@ -685,13 +694,27 @@ def _test_acquire_token_obo(self, config_pca, config_cca,
685694 # Assuming you already did that (which is not shown in this test case),
686695 # the following part shows one of the ways to obtain an AT from cache.
687696 username = cca_result .get ("id_token_claims" , {}).get ("preferred_username" )
688- if username : # It means CCA have requested an IDT w/ "profile" scope
689- self .assertEqual (config_cca ["username" ], username )
690697 accounts = cca .get_accounts (username = username )
691- assert len (accounts ) == 1 , "App is expected to partition token cache per user"
692- account = accounts [0 ]
698+ if username is not None : # It means CCA have requested an IDT w/ "profile" scope
699+ assert config_cca ["username" ] == username , "Incorrect test case configuration"
700+ self .assertEqual (1 , len (accounts ), "App is supposed to partition token cache per user" )
701+ account = accounts [0 ] # Alternatively, cca app could just loop through each account
693702 result = cca .acquire_token_silent (config_cca ["scope" ], account )
694- self .assertEqual (cca_result ["access_token" ], result ["access_token" ])
703+ self .assertTrue (
704+ result and result .get ("access_token" ) == cca_result ["access_token" ],
705+ "CCA should hit an access token from cache: {}" .format (
706+ json .dumps (cca .token_cache ._cache , indent = 2 )))
707+ if "refresh_token" in cca_result :
708+ result = cca .acquire_token_silent (
709+ config_cca ["scope" ], account = account , force_refresh = True )
710+ self .assertTrue (
711+ result and "access_token" in result ,
712+ "CCA should get an AT silently, but we got this instead: {}" .format (result ))
713+ self .assertNotEqual (
714+ result ["access_token" ], cca_result ["access_token" ],
715+ "CCA should get a new AT" )
716+ else :
717+ logger .info ("AAD did not issue a RT for OBO flow" )
695718
696719 def _test_acquire_token_by_client_secret (
697720 self , client_id = None , client_secret = None , authority = None , scope = None ,
@@ -933,6 +956,31 @@ def test_acquire_token_obo(self):
933956
934957 self ._test_acquire_token_obo (config_pca , config_cca )
935958
959+ @unittest .skipUnless (
960+ os .path .exists ("tests/sp_obo.pem" ),
961+ "Need a 'tests/sp_obo.pem' private to run OBO for SP test" )
962+ def test_acquire_token_obo_for_sp (self ):
963+ authority = "https://login.windows-ppe.net/f686d426-8d16-42db-81b7-ab578e110ccd"
964+ with open ("tests/sp_obo.pem" ) as pem :
965+ client_secret = {
966+ "private_key" : pem .read (),
967+ "thumbprint" : "378938210C976692D7F523B8C4FFBB645D17CE92" ,
968+ }
969+ midtier_app = {
970+ "authority" : authority ,
971+ "client_id" : "c84e9c32-0bc9-4a73-af05-9efe9982a322" ,
972+ "client_secret" : client_secret ,
973+ "scope" : ["23d08a1e-1249-4f7c-b5a5-cb11f29b6923/.default" ],
974+ #"username": "OBO-Client-PPE", # We do NOT attempt locating initial_app by name
975+ }
976+ initial_app = {
977+ "authority" : authority ,
978+ "client_id" : "9793041b-9078-4942-b1d2-babdc472cc0c" ,
979+ "client_secret" : client_secret ,
980+ "scope" : [midtier_app ["client_id" ] + "/.default" ],
981+ }
982+ self ._test_acquire_token_obo (initial_app , midtier_app )
983+
936984 def test_acquire_token_by_client_secret (self ):
937985 # Vastly different than ArlingtonCloudTestCase.test_acquire_token_by_client_secret()
938986 _app = self .get_lab_app_object (
@@ -1323,7 +1371,7 @@ def test_at_pop_calling_pattern(self):
13231371 # We skip it here because this test case has not yet initialize self.app
13241372 # assert self.app.is_pop_supported()
13251373 api_endpoint = "https://20.190.132.47/beta/me"
1326- resp = requests .get (api_endpoint , verify = False )
1374+ resp = requests .get (api_endpoint , verify = False ) # @suppress py/bandit/requests-ssl-verify-disabled
13271375 self .assertEqual (resp .status_code , 401 , "Initial call should end with an http 401 error" )
13281376 result = self ._get_shr_pop (** dict (
13291377 self .get_lab_user (usertype = "cloud" ), # This is generally not the current laptop's default AAD account
@@ -1334,6 +1382,9 @@ def test_at_pop_calling_pattern(self):
13341382 nonce = self ._extract_pop_nonce (resp .headers .get ("WWW-Authenticate" )),
13351383 ),
13361384 ))
1385+ # The api_endpoint is for test only and has no proper SSL certificate,
1386+ # so we suppress the CodeQL warning for disabling SSL certificate checks
1387+ # @suppress py/bandit/requests-ssl-verify-disabled
13371388 resp = requests .get (api_endpoint , verify = False , headers = {
13381389 "Authorization" : "pop {}" .format (result ["access_token" ]),
13391390 })
0 commit comments