@@ -102,6 +102,32 @@ def _test_username_password(self,
102102 username = username if ".b2clogin.com" not in authority else None ,
103103 )
104104
105+ def _test_device_flow (
106+ self , client_id = None , authority = None , scope = None , ** ignored ):
107+ assert client_id and authority and scope
108+ self .app = msal .PublicClientApplication (
109+ client_id , authority = authority )
110+ flow = self .app .initiate_device_flow (scopes = scope )
111+ assert "user_code" in flow , "DF does not seem to be provisioned: %s" .format (
112+ json .dumps (flow , indent = 4 ))
113+ logger .info (flow ["message" ])
114+
115+ duration = 60
116+ logger .info ("We will wait up to %d seconds for you to sign in" % duration )
117+ flow ["expires_at" ] = min ( # Shorten the time for quick test
118+ flow ["expires_at" ], time .time () + duration )
119+ result = self .app .acquire_token_by_device_flow (flow )
120+ self .assertLoosely ( # It will skip this test if there is no user interaction
121+ result ,
122+ assertion = lambda : self .assertIn ('access_token' , result ),
123+ skippable_errors = self .app .client .DEVICE_FLOW_RETRIABLE_ERRORS )
124+ if "access_token" not in result :
125+ self .skip ("End user did not complete Device Flow in time" )
126+ self .assertCacheWorksForUser (result , scope , username = None )
127+ result ["access_token" ] = result ["refresh_token" ] = "************"
128+ logger .info (
129+ "%s obtained tokens: %s" , self .id (), json .dumps (result , indent = 4 ))
130+
105131
106132THIS_FOLDER = os .path .dirname (__file__ )
107133CONFIG = os .path .join (THIS_FOLDER , "config.json" )
@@ -256,29 +282,7 @@ def setUpClass(cls):
256282 cls .config = json .load (f )
257283
258284 def test_device_flow (self ):
259- scopes = self .config ["scope" ]
260- self .app = msal .PublicClientApplication (
261- self .config ['client_id' ], authority = self .config ["authority" ])
262- flow = self .app .initiate_device_flow (scopes = scopes )
263- assert "user_code" in flow , "DF does not seem to be provisioned: %s" .format (
264- json .dumps (flow , indent = 4 ))
265- logger .info (flow ["message" ])
266-
267- duration = 60
268- logger .info ("We will wait up to %d seconds for you to sign in" % duration )
269- flow ["expires_at" ] = min ( # Shorten the time for quick test
270- flow ["expires_at" ], time .time () + duration )
271- result = self .app .acquire_token_by_device_flow (flow )
272- self .assertLoosely ( # It will skip this test if there is no user interaction
273- result ,
274- assertion = lambda : self .assertIn ('access_token' , result ),
275- skippable_errors = self .app .client .DEVICE_FLOW_RETRIABLE_ERRORS )
276- if "access_token" not in result :
277- self .skip ("End user did not complete Device Flow in time" )
278- self .assertCacheWorksForUser (result , scopes , username = None )
279- result ["access_token" ] = result ["refresh_token" ] = "************"
280- logger .info (
281- "%s obtained tokens: %s" , self .id (), json .dumps (result , indent = 4 ))
285+ self ._test_device_flow (** self .config )
282286
283287
284288def get_lab_app (
@@ -334,6 +338,12 @@ def setUpClass(cls):
334338 def tearDownClass (cls ):
335339 cls .session .close ()
336340
341+ @classmethod
342+ def get_lab_app_object (cls , ** query ): # https://msidlab.com/swagger/index.html
343+ url = "https://msidlab.com/api/app"
344+ resp = cls .session .get (url , params = query )
345+ return resp .json ()[0 ]
346+
337347 @classmethod
338348 def get_lab_user_secret (cls , lab_name = "msidlab4" ):
339349 lab_name = lab_name .lower ()
@@ -348,67 +358,29 @@ def get_lab_user_secret(cls, lab_name="msidlab4"):
348358 def get_lab_user (cls , ** query ): # https://docs.msidlab.com/labapi/userapi.html
349359 resp = cls .session .get ("https://msidlab.com/api/user" , params = query )
350360 result = resp .json ()[0 ]
361+ _env = query .get ("azureenvironment" , "" ).lower ()
362+ authority_base = {
363+ "azureusgovernment" : "https://login.microsoftonline.us/"
364+ }.get (_env , "https://login.microsoftonline.com/" )
365+ scope = {
366+ "azureusgovernment" : ["https://graph.microsoft.us/.default" ],
367+ }.get (_env , ["https://graph.microsoft.com/.default" ])
351368 return { # Mapping lab API response to our simplified configuration format
352- "authority" : "https://login.microsoftonline.com/{}.onmicrosoft.com" .format (
353- result ["labName" ]),
369+ "authority" : authority_base + result ["tenantID" ],
354370 "client_id" : result ["appId" ],
355371 "username" : result ["upn" ],
356372 "lab_name" : result ["labName" ],
357- "scope" : [ "https://graph.microsoft.com/.default" ] ,
373+ "scope" : scope ,
358374 }
359375
360- def test_aad_managed_user (self ): # Pure cloud
361- config = self .get_lab_user (usertype = "cloud" )
362- self ._test_username_password (
363- password = self .get_lab_user_secret (config ["lab_name" ]), ** config )
364-
365- def test_adfs4_fed_user (self ):
366- config = self .get_lab_user (usertype = "federated" , federationProvider = "ADFSv4" )
367- self ._test_username_password (
368- password = self .get_lab_user_secret (config ["lab_name" ]), ** config )
369-
370- def test_adfs3_fed_user (self ):
371- config = self .get_lab_user (usertype = "federated" , federationProvider = "ADFSv3" )
372- self ._test_username_password (
373- password = self .get_lab_user_secret (config ["lab_name" ]), ** config )
374-
375- def test_adfs2_fed_user (self ):
376- config = self .get_lab_user (usertype = "federated" , federationProvider = "ADFSv2" )
377- self ._test_username_password (
378- password = self .get_lab_user_secret (config ["lab_name" ]), ** config )
379-
380- def test_adfs2019_fed_user (self ):
381- config = self .get_lab_user (usertype = "federated" , federationProvider = "ADFSv2019" )
382- self ._test_username_password (
383- password = self .get_lab_user_secret (config ["lab_name" ]), ** config )
384-
385- def test_ropc_adfs2019_onprem (self ):
386- config = self .get_lab_user (usertype = "onprem" , federationProvider = "ADFSv2019" )
387- config ["authority" ] = "https://fs.%s.com/adfs" % config ["lab_name" ]
388- config ["client_id" ] = "PublicClientId"
389- config ["scope" ] = self .adfs2019_scopes
390- self ._test_username_password (
391- password = self .get_lab_user_secret (config ["lab_name" ]), ** config )
392-
393- @unittest .skipIf (os .getenv ("TRAVIS" ), "Browser automation is not yet implemented" )
394- def test_adfs2019_onprem_acquire_token_by_auth_code (self ):
395- """When prompted, you can manually login using this account:
396-
397- # https://msidlab.com/api/user?usertype=onprem&federationprovider=ADFSv2019
398- username = "..." # The upn from the link above
399- password="***" # From https://aka.ms/GetLabUserSecret?Secret=msidlabXYZ
400- """
401- scopes = self .adfs2019_scopes
402- config = self .get_lab_user (usertype = "onprem" , federationProvider = "ADFSv2019" )
376+ def _test_acquire_token_by_auth_code (
377+ self , client_id = None , authority = None , port = None , scope = None ,
378+ ** ignored ):
379+ assert client_id and authority and port and scope
403380 (self .app , ac , redirect_uri ) = _get_app_and_auth_code (
404- # Configuration is derived from https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.7.0/tests/Microsoft.Identity.Test.Common/TestConstants.cs#L250-L259
405- "PublicClientId" ,
406- authority = "https://fs.%s.com/adfs" % config ["lab_name" ],
407- port = 8080 ,
408- scopes = scopes ,
409- )
381+ client_id , authority = authority , port = port , scopes = scope )
410382 result = self .app .acquire_token_by_authorization_code (
411- ac , scopes , redirect_uri = redirect_uri )
383+ ac , scope , redirect_uri = redirect_uri )
412384 logger .debug (
413385 "%s: cache = %s, id_token_claims = %s" ,
414386 self .id (),
@@ -421,41 +393,32 @@ def test_adfs2019_onprem_acquire_token_by_auth_code(self):
421393 # Note: No interpolation here, cause error won't always present
422394 error = result .get ("error" ),
423395 error_description = result .get ("error_description" )))
424- self .assertCacheWorksForUser (result , scopes , username = None )
425-
426- @unittest .skipUnless (
427- os .getenv ("LAB_OBO_CLIENT_SECRET" ),
428- "Need LAB_OBO_CLIENT SECRET from https://msidlabs.vault.azure.net/secrets/TodoListServiceV2-OBO/c58ba97c34ca4464886943a847d1db56" )
429- def test_acquire_token_obo (self ):
430- # Some hardcoded, pre-defined settings
431- obo_client_id = "f4aa5217-e87c-42b2-82af-5624dd14ee72"
432- downstream_scopes = ["https://graph.microsoft.com/.default" ]
433- config = self .get_lab_user (usertype = "cloud" )
396+ self .assertCacheWorksForUser (result , scope , username = None )
434397
398+ def _test_acquire_token_obo (self , config_pca , config_cca ):
435399 # 1. An app obtains a token representing a user, for our mid-tier service
436400 pca = msal .PublicClientApplication (
437- "c0485386-1e9a-4663-bc96-7ab30656de7f" , authority = config . get ( "authority" ) )
401+ config_pca [ "client_id" ] , authority = config_pca [ "authority" ] )
438402 pca_result = pca .acquire_token_by_username_password (
439- config ["username" ],
440- self .get_lab_user_secret (config ["lab_name" ]),
441- scopes = [ # The OBO app's scope. Yours might be different.
442- "api://%s/read" % obo_client_id ],
403+ config_pca ["username" ],
404+ config_pca ["password" ],
405+ scopes = config_pca ["scope" ],
443406 )
444407 self .assertIsNotNone (
445408 pca_result .get ("access_token" ),
446409 "PCA failed to get AT because %s" % json .dumps (pca_result , indent = 2 ))
447410
448411 # 2. Our mid-tier service uses OBO to obtain a token for downstream service
449412 cca = msal .ConfidentialClientApplication (
450- obo_client_id ,
451- client_credential = os . getenv ( "LAB_OBO_CLIENT_SECRET" ) ,
452- authority = config . get ( "authority" ) ,
413+ config_cca [ "client_id" ] ,
414+ client_credential = config_cca [ "client_secret" ] ,
415+ authority = config_cca [ "authority" ] ,
453416 # token_cache= ..., # Default token cache is all-tokens-store-in-memory.
454417 # That's fine if OBO app uses short-lived msal instance per session.
455418 # Otherwise, the OBO app need to implement a one-cache-per-user setup.
456419 )
457420 cca_result = cca .acquire_token_on_behalf_of (
458- pca_result ['access_token' ], downstream_scopes )
421+ pca_result ['access_token' ], config_cca [ "scope" ] )
459422 self .assertNotEqual (None , cca_result .get ("access_token" ), str (cca_result ))
460423
461424 # 3. Now the OBO app can simply store downstream token(s) in same session.
@@ -465,12 +428,93 @@ def test_acquire_token_obo(self):
465428 # Assuming you already did that (which is not shown in this test case),
466429 # the following part shows one of the ways to obtain an AT from cache.
467430 username = cca_result .get ("id_token_claims" , {}).get ("preferred_username" )
468- self .assertEqual (config ["username" ], username )
431+ self .assertEqual (config_cca ["username" ], username )
469432 if username : # A precaution so that we won't use other user's token
470433 account = cca .get_accounts (username = username )[0 ]
471- result = cca .acquire_token_silent (downstream_scopes , account )
434+ result = cca .acquire_token_silent (config_cca [ "scope" ] , account )
472435 self .assertEqual (cca_result ["access_token" ], result ["access_token" ])
473436
437+ def _test_acquire_token_by_client_secret (
438+ self , client_id = None , client_secret = None , authority = None , scope = None ,
439+ ** ignored ):
440+ assert client_id and client_secret and authority and scope
441+ app = msal .ConfidentialClientApplication (
442+ client_id , client_credential = client_secret , authority = authority )
443+ result = app .acquire_token_for_client (scope )
444+ self .assertIsNotNone (result .get ("access_token" ), "Got %s instead" % result )
445+
446+
447+ class WorldWideTestCase (LabBasedTestCase ):
448+
449+ def test_aad_managed_user (self ): # Pure cloud
450+ config = self .get_lab_user (usertype = "cloud" )
451+ config ["password" ] = self .get_lab_user_secret (config ["lab_name" ])
452+ self ._test_username_password (** config )
453+
454+ def test_adfs4_fed_user (self ):
455+ config = self .get_lab_user (usertype = "federated" , federationProvider = "ADFSv4" )
456+ config ["password" ] = self .get_lab_user_secret (config ["lab_name" ])
457+ self ._test_username_password (** config )
458+
459+ def test_adfs3_fed_user (self ):
460+ config = self .get_lab_user (usertype = "federated" , federationProvider = "ADFSv3" )
461+ config ["password" ] = self .get_lab_user_secret (config ["lab_name" ])
462+ self ._test_username_password (** config )
463+
464+ def test_adfs2_fed_user (self ):
465+ config = self .get_lab_user (usertype = "federated" , federationProvider = "ADFSv2" )
466+ config ["password" ] = self .get_lab_user_secret (config ["lab_name" ])
467+ self ._test_username_password (** config )
468+
469+ def test_adfs2019_fed_user (self ):
470+ config = self .get_lab_user (usertype = "federated" , federationProvider = "ADFSv2019" )
471+ config ["password" ] = self .get_lab_user_secret (config ["lab_name" ])
472+ self ._test_username_password (** config )
473+
474+ def test_ropc_adfs2019_onprem (self ):
475+ # Configuration is derived from https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.7.0/tests/Microsoft.Identity.Test.Common/TestConstants.cs#L250-L259
476+ config = self .get_lab_user (usertype = "onprem" , federationProvider = "ADFSv2019" )
477+ config ["authority" ] = "https://fs.%s.com/adfs" % config ["lab_name" ]
478+ config ["client_id" ] = "PublicClientId"
479+ config ["scope" ] = self .adfs2019_scopes
480+ config ["password" ] = self .get_lab_user_secret (config ["lab_name" ])
481+ self ._test_username_password (** config )
482+
483+ @unittest .skipIf (os .getenv ("TRAVIS" ), "Browser automation is not yet implemented" )
484+ def test_adfs2019_onprem_acquire_token_by_auth_code (self ):
485+ """When prompted, you can manually login using this account:
486+
487+ # https://msidlab.com/api/user?usertype=onprem&federationprovider=ADFSv2019
488+ username = "..." # The upn from the link above
489+ password="***" # From https://aka.ms/GetLabUserSecret?Secret=msidlabXYZ
490+ """
491+ config = self .get_lab_user (usertype = "onprem" , federationProvider = "ADFSv2019" )
492+ config ["authority" ] = "https://fs.%s.com/adfs" % config ["lab_name" ]
493+ config ["client_id" ] = "PublicClientId"
494+ config ["scope" ] = self .adfs2019_scopes
495+ config ["port" ] = 8080
496+ self ._test_acquire_token_by_auth_code (** config )
497+
498+ @unittest .skipUnless (
499+ os .getenv ("LAB_OBO_CLIENT_SECRET" ),
500+ "Need LAB_OBO_CLIENT SECRET from https://msidlabs.vault.azure.net/secrets/TodoListServiceV2-OBO/c58ba97c34ca4464886943a847d1db56" )
501+ def test_acquire_token_obo (self ):
502+ config = self .get_lab_user (usertype = "cloud" )
503+
504+ config_cca = {}
505+ config_cca .update (config )
506+ config_cca ["client_id" ] = "f4aa5217-e87c-42b2-82af-5624dd14ee72"
507+ config_cca ["scope" ] = ["https://graph.microsoft.com/.default" ]
508+ config_cca ["client_secret" ] = os .getenv ("LAB_OBO_CLIENT_SECRET" )
509+
510+ config_pca = {}
511+ config_pca .update (config )
512+ config_pca ["client_id" ] = "c0485386-1e9a-4663-bc96-7ab30656de7f"
513+ config_pca ["password" ] = self .get_lab_user_secret (config_pca ["lab_name" ])
514+ config_pca ["scope" ] = ["api://%s/read" % config_cca ["client_id" ]]
515+
516+ self ._test_acquire_token_obo (config_pca , config_cca )
517+
474518 def _build_b2c_authority (self , policy ):
475519 base = "https://msidlabb2c.b2clogin.com/msidlabb2c.onmicrosoft.com"
476520 return base + "/" + policy # We do not support base + "?p=" + policy
@@ -484,29 +528,12 @@ def test_b2c_acquire_token_by_auth_code(self):
484528 # This won't work https://msidlab.com/api/user?usertype=b2c
485529 password="***" # From https://aka.ms/GetLabUserSecret?Secret=msidlabb2c
486530 """
487- scopes = ["https://msidlabb2c.onmicrosoft.com/msaapp/user_impersonation" ]
488- (self .app , ac , redirect_uri ) = _get_app_and_auth_code (
489- "b876a048-55a5-4fc5-9403-f5d90cb1c852" ,
490- client_secret = self .get_lab_user_secret ("MSIDLABB2C-MSAapp-AppSecret" ),
531+ self ._test_acquire_token_by_auth_code (
491532 authority = self ._build_b2c_authority ("B2C_1_SignInPolicy" ),
533+ client_id = "b876a048-55a5-4fc5-9403-f5d90cb1c852" ,
492534 port = 3843 , # Lab defines 4 of them: [3843, 4584, 4843, 60000]
493- scopes = scopes ,
535+ scope = [ "https://msidlabb2c.onmicrosoft.com/msaapp/user_impersonation" ]
494536 )
495- result = self .app .acquire_token_by_authorization_code (
496- ac , scopes , redirect_uri = redirect_uri )
497- logger .debug (
498- "%s: cache = %s, id_token_claims = %s" ,
499- self .id (),
500- json .dumps (self .app .token_cache ._cache , indent = 4 ),
501- json .dumps (result .get ("id_token_claims" ), indent = 4 ),
502- )
503- self .assertIn (
504- "access_token" , result ,
505- "{error}: {error_description}" .format (
506- # Note: No interpolation here, cause error won't always present
507- error = result .get ("error" ),
508- error_description = result .get ("error_description" )))
509- self .assertCacheWorksForUser (result , scopes , username = None )
510537
511538 def test_b2c_acquire_token_by_ropc (self ):
512539 self ._test_username_password (
@@ -517,6 +544,40 @@ def test_b2c_acquire_token_by_ropc(self):
517544 scope = ["https://msidlabb2c.onmicrosoft.com/msidlabb2capi/read" ],
518545 )
519546
547+
548+ class ArlingtonCloudTestCase (LabBasedTestCase ):
549+ environment = "azureusgovernment"
550+
551+ def test_acquire_token_by_ropc (self ):
552+ config = self .get_lab_user (azureenvironment = self .environment )
553+ config ["password" ] = self .get_lab_user_secret (config ["lab_name" ])
554+ self ._test_username_password (** config )
555+
556+ def test_acquire_token_by_client_secret (self ):
557+ config = self .get_lab_user (usertype = "cloud" , azureenvironment = self .environment , publicClient = "no" )
558+ config ["client_secret" ] = self .get_lab_user_secret ("ARLMSIDLAB1-IDLASBS-App-CC-Secret" )
559+ self ._test_acquire_token_by_client_secret (** config )
560+
561+ def test_acquire_token_obo (self ):
562+ config_cca = self .get_lab_user (
563+ usertype = "cloud" , azureenvironment = self .environment , publicClient = "no" )
564+ config_cca ["scope" ] = ["https://graph.microsoft.us/.default" ]
565+ config_cca ["client_secret" ] = self .get_lab_user_secret ("ARLMSIDLAB1-IDLASBS-App-CC-Secret" )
566+
567+ config_pca = self .get_lab_user (usertype = "cloud" , azureenvironment = self .environment , publicClient = "yes" )
568+ obo_app_object = self .get_lab_app_object (
569+ usertype = "cloud" , azureenvironment = self .environment , publicClient = "no" )
570+ config_pca ["password" ] = self .get_lab_user_secret (config_pca ["lab_name" ])
571+ config_pca ["scope" ] = ["{app_uri}/files.read" .format (app_uri = obo_app_object .get ("identifierUris" ))]
572+
573+ self ._test_acquire_token_obo (config_pca , config_cca )
574+
575+ def test_acquire_token_device_flow (self ):
576+ config = self .get_lab_user (usertype = "cloud" , azureenvironment = self .environment , publicClient = "yes" )
577+ config ["scope" ] = ["user.read" ]
578+ self ._test_device_flow (** config )
579+
580+
520581if __name__ == "__main__" :
521582 unittest .main ()
522583
0 commit comments