1313logging .basicConfig (level = logging .INFO )
1414
1515
16+ def _get_app_and_auth_code (
17+ client_id ,
18+ client_secret = None ,
19+ authority = "https://login.microsoftonline.com/common" ,
20+ port = 44331 ,
21+ scopes = ["https://graph.windows.net/.default" ],
22+ ):
23+ from msal .oauth2cli .authcode import obtain_auth_code
24+ app = msal .ClientApplication (client_id , client_secret , authority = authority )
25+ redirect_uri = "http://localhost:%d" % port
26+ ac = obtain_auth_code (port , auth_uri = app .get_authorization_request_url (
27+ scopes , redirect_uri = redirect_uri ))
28+ assert ac is not None
29+ return (app , ac , redirect_uri )
30+
1631@unittest .skipIf (os .getenv ("TRAVIS_TAG" ), "Skip e2e tests during tagged release" )
1732class E2eTestCase (unittest .TestCase ):
1833
@@ -49,9 +64,15 @@ def assertCacheWorksForUser(self, result_from_wire, scope, username=None):
4964 result_from_cache = self .app .acquire_token_silent (scope , account = account )
5065 self .assertIsNotNone (result_from_cache ,
5166 "We should get a result from acquire_token_silent(...) call" )
52- self .assertNotEqual (
53- result_from_wire ['access_token' ], result_from_cache ['access_token' ],
54- "We should get a fresh AT (via RT)" )
67+ self .assertIsNotNone (
68+ # We used to assert it this way:
69+ # result_from_wire['access_token'] != result_from_cache['access_token']
70+ # but ROPC in B2C tends to return the same AT we obtained seconds ago.
71+ # Now looking back, "refresh_token grant would return a brand new AT"
72+ # was just an empirical observation but never a committment in specs,
73+ # so we adjust our way to assert here.
74+ (result_from_cache or {}).get ("access_token" ),
75+ "We should get an AT from acquire_token_silent(...) call" )
5576
5677 def assertCacheWorksForApp (self , result_from_wire , scope ):
5778 # Going to test acquire_token_silent(...) to locate an AT from cache
@@ -70,7 +91,10 @@ def _test_username_password(self,
7091 username , password , scopes = scope )
7192 self .assertLoosely (result )
7293 # self.assertEqual(None, result.get("error"), str(result))
73- self .assertCacheWorksForUser (result , scope , username = username )
94+ self .assertCacheWorksForUser (
95+ result , scope ,
96+ username = username if ".b2clogin.com" not in authority else None ,
97+ )
7498
7599
76100THIS_FOLDER = os .path .dirname (__file__ )
@@ -95,23 +119,17 @@ def test_username_password(self):
95119 self ._test_username_password (** self .config )
96120
97121 def _get_app_and_auth_code (self ):
98- from msal .oauth2cli .authcode import obtain_auth_code
99- app = msal .ClientApplication (
122+ return _get_app_and_auth_code (
100123 self .config ["client_id" ],
101- client_credential = self .config .get ("client_secret" ),
102- authority = self .config .get ("authority" ))
103- port = self .config .get ("listen_port" , 44331 )
104- redirect_uri = "http://localhost:%s" % port
105- auth_request_uri = app .get_authorization_request_url (
106- self .config ["scope" ], redirect_uri = redirect_uri )
107- ac = obtain_auth_code (port , auth_uri = auth_request_uri )
108- self .assertNotEqual (ac , None )
109- return (app , ac , redirect_uri )
124+ client_secret = self .config .get ("client_secret" ),
125+ authority = self .config .get ("authority" ),
126+ port = self .config .get ("listen_port" , 44331 ),
127+ scopes = self .config ["scope" ],
128+ )
110129
111130 def test_auth_code (self ):
112131 self .skipUnlessWithConfig (["client_id" , "scope" ])
113132 (self .app , ac , redirect_uri ) = self ._get_app_and_auth_code ()
114-
115133 result = self .app .acquire_token_by_authorization_code (
116134 ac , self .config ["scope" ], redirect_uri = redirect_uri )
117135 logger .debug ("%s.cache = %s" ,
@@ -314,7 +332,7 @@ def get_lab_user_secret(cls, lab_name="msidlab4"):
314332 lab_name = lab_name .lower ()
315333 if lab_name not in cls ._secrets :
316334 logger .info ("Querying lab user password for %s" , lab_name )
317- # Note: Short link won't work "https://aka.ms/GetLabUserSecret?Secret=%s"
335+ # Short link only works in browser "https://aka.ms/GetLabUserSecret?Secret=%s"
318336 # So we use the official link written in here
319337 # https://microsoft.sharepoint-df.com/teams/MSIDLABSExtended/SitePages/Programmatically-accessing-LAB-API%27s.aspx
320338 url = ("https://request.msidlab.com/api/GetLabUserSecret?code=KpY5uCcoKo0aW8VOL/CUO3wnu9UF2XbSnLFGk56BDnmQiwD80MQ7HA==&Secret=%s"
@@ -417,3 +435,49 @@ def test_acquire_token_obo(self):
417435 result = cca .acquire_token_silent (downstream_scopes , account )
418436 self .assertEqual (cca_result ["access_token" ], result ["access_token" ])
419437
438+ def _build_b2c_authority (self , policy ):
439+ base = "https://msidlabb2c.b2clogin.com/msidlabb2c.onmicrosoft.com"
440+ return base + "/" + policy # We do not support base + "?p=" + policy
441+
442+ @unittest .skipIf (os .getenv ("TRAVIS" ), "Browser automation is not yet implemented" )
443+ def test_b2c_acquire_token_by_auth_code (self ):
444+ """
445+ When prompted, you can manually login using this account:
446+
447+ 448+ # This won't work https://msidlab.com/api/user?usertype=b2c
449+ password="***" # From https://aka.ms/GetLabUserSecret?Secret=msidlabb2c
450+ """
451+ scopes = ["https://msidlabb2c.onmicrosoft.com/msaapp/user_impersonation" ]
452+ (self .app , ac , redirect_uri ) = _get_app_and_auth_code (
453+ "b876a048-55a5-4fc5-9403-f5d90cb1c852" ,
454+ client_secret = self .get_lab_user_secret ("MSIDLABB2C-MSAapp-AppSecret" ),
455+ authority = self ._build_b2c_authority ("B2C_1_SignInPolicy" ),
456+ port = 3843 , # Lab defines 4 of them: [3843, 4584, 4843, 60000]
457+ scopes = scopes ,
458+ )
459+ result = self .app .acquire_token_by_authorization_code (
460+ ac , scopes , redirect_uri = redirect_uri )
461+ logger .debug (
462+ "%s: cache = %s, id_token_claims = %s" ,
463+ self .id (),
464+ json .dumps (self .app .token_cache ._cache , indent = 4 ),
465+ json .dumps (result .get ("id_token_claims" ), indent = 4 ),
466+ )
467+ self .assertIn (
468+ "access_token" , result ,
469+ "{error}: {error_description}" .format (
470+ # Note: No interpolation here, cause error won't always present
471+ error = result .get ("error" ),
472+ error_description = result .get ("error_description" )))
473+ self .assertCacheWorksForUser (result , scopes , username = None )
474+
475+ def test_b2c_acquire_token_by_ropc (self ):
476+ self ._test_username_password (
477+ authority = self ._build_b2c_authority ("B2C_1_ROPC_Auth" ),
478+ client_id = "e3b9ad76-9763-4827-b088-80c7a7888f79" ,
479+ 480+ password = self .get_lab_user_secret ("msidlabb2c" ),
481+ scope = ["https://msidlabb2c.onmicrosoft.com/msidlabb2capi/read" ],
482+ )
483+
0 commit comments