2929from tests .http_client import MinimalHttpClient , MinimalResponse
3030from msal .oauth2cli import AuthCodeReceiver
3131from msal .oauth2cli .oidc import decode_part
32+ from msal .application import _build_req_cnf
3233
3334try :
3435 import pymsalruntime
@@ -791,12 +792,12 @@ def test_user_account(self):
791792 self ._test_user_account ()
792793
793794
794- def _data_for_pop (key ):
795- raw_req_cnf = json .dumps ({"kid" : key , "xms_ksl" : "sw" })
795+ def _data_for_pop (key_id ):
796796 return { # Sampled from Azure CLI's plugin connectedk8s
797797 'token_type' : 'pop' ,
798- 'key_id' : key ,
799- "req_cnf" : base64 .urlsafe_b64encode (raw_req_cnf .encode ('utf-8' )).decode ('utf-8' ).rstrip ('=' ),
798+ 'key_id' : key_id ,
799+ "req_cnf" : _build_req_cnf (
800+ {"kid" : key_id , "xms_ksl" : "sw" }, remove_padding = True ),
800801 # Note: Sending raw_req_cnf without base64 encoding would result in an http 500 error
801802 } # See also https://github.com/Azure/azure-cli-extensions/blob/main/src/connectedk8s/azext_connectedk8s/_clientproxyutils.py#L86-L92
802803
@@ -817,6 +818,38 @@ def test_user_account(self):
817818 self ._test_user_account ()
818819
819820
821+ class CdtTestCase (LabBasedTestCase ):
822+ _JWK1 = {"kty" :"RSA" , "n" :"2tNr73xwcj6lH7bqRZrFzgSLj7OeLfbn8216uOMDHuaZ6TEUBDN8Uz0ve8jAlKsP9CQFCSVoSNovdE-fs7c15MxEGHjDcNKLWonznximj8pDGZQjVdfK-7mG6P6z-lgVcLuYu5JcWU_PeEqIKg5llOaz-qeQ4LEDS4T1D2qWRGpAra4rJX1-kmrWmX_XIamq30C9EIO0gGuT4rc2hJBWQ-4-FnE1NXmy125wfT3NdotAJGq5lMIfhjfglDbJCwhc8Oe17ORjO3FsB5CLuBRpYmP7Nzn66lRY3Fe11Xz8AEBl3anKFSJcTvlMnFtu3EpD-eiaHfTgRBU7CztGQqVbiQ" , "e" :"AQAB" }
823+ def test_service_principal (self ):
824+ """
825+ app = get_lab_app(
826+ authority="https://login.microsoftonline.com/microsoft.onmicrosoft.com"
827+ "?dc=ESTS-PUB-JPELR1-AZ1-FD000-TEST1",
828+ )
829+ """
830+ app = msal .ConfidentialClientApplication (
831+ os .getenv ("RAY_APP_CLIENT_ID" ),
832+ client_credential = os .getenv ("RAY_APP_CLIENT_SECRET" ),
833+ authority = "https://login.microsoftonline.com/msidlab4.onmicrosoft.com"
834+ "?dc=ESTS-PUB-JPELR1-AZ1-FD000-TEST1" , # Accessible within AzVPN
835+ )
836+ from http .client import HTTPConnection
837+ HTTPConnection .debuglevel = 1
838+ result = app .acquire_token_for_client (
839+ [f"{ app .client_id } /.default" ],
840+ delegation_constraints = [
841+ {"typ" : "usr" , "a" : "C" , "target" : ["constraint1" , "constraint4" ]},
842+ {"typ" : "app" , "a" : "R" , "target" : ["constraint2" , "constraint5" ]},
843+ {"typ" : "subscription" , "a" : "U" , "target" : ["constraint3" ]},
844+ ],
845+ req_ds_cnf = self ._JWK1 ,
846+ )
847+ self .assertIsNotNone (result .get ("access_token" ), "Encountered {}: {}" .format (
848+ result .get ("error" ), result .get ("error_description" )))
849+ print ("Test case result:" , result )
850+ self .assertIsNotNone (result .get ("xms_ds_nonce" ))
851+
852+
820853class WorldWideTestCase (LabBasedTestCase ):
821854
822855 def test_aad_managed_user (self ): # Pure cloud
0 commit comments