@@ -327,3 +327,49 @@ def test_adfs2019_fed_user(self):
327327 self ._test_username_password (
328328 password = self .get_lab_user_secret (config ["lab" ]["labname" ]), ** config )
329329
330+ @unittest .skipUnless (
331+ os .getenv ("OBO_CLIENT_SECRET" ),
332+ "Need OBO_CLIENT_SECRET from https://buildautomation.vault.azure.net/secrets/IdentityDivisionDotNetOBOServiceSecret" )
333+ def test_acquire_token_obo (self ):
334+ # Some hardcoded, pre-defined settings
335+ obo_client_id = "23c64cd8-21e4-41dd-9756-ab9e2c23f58c"
336+ downstream_scopes = ["https://graph.microsoft.com/User.Read" ]
337+ config = get_lab_user (isFederated = False )
338+
339+ # 1. An app obtains a token representing a user, for our mid-tier service
340+ pca = msal .PublicClientApplication (
341+ "be9b0186-7dfd-448a-a944-f771029105bf" , authority = config .get ("authority" ))
342+ pca_result = pca .acquire_token_by_username_password (
343+ config ["username" ],
344+ self .get_lab_user_secret (config ["lab" ]["labname" ]),
345+ scopes = [ # The OBO app's scope. Yours might be different.
346+ "%s/access_as_user" % obo_client_id ],
347+ )
348+ self .assertIsNotNone (pca_result .get ("access_token" ), "PCA should work" )
349+
350+ # 2. Our mid-tier service uses OBO to obtain a token for downstream service
351+ cca = msal .ConfidentialClientApplication (
352+ obo_client_id ,
353+ client_credential = os .getenv ("OBO_CLIENT_SECRET" ),
354+ authority = config .get ("authority" ),
355+ # token_cache= ..., # Default token cache is all-tokens-store-in-memory.
356+ # That's fine if OBO app uses short-lived msal instance per session.
357+ # Otherwise, the OBO app need to implement a one-cache-per-user setup.
358+ )
359+ cca_result = cca .acquire_token_on_behalf_of (
360+ pca_result ['access_token' ], downstream_scopes )
361+ self .assertNotEqual (None , cca_result .get ("access_token" ), str (cca_result ))
362+
363+ # 3. Now the OBO app can simply store downstream token(s) in same session.
364+ # Alternatively, if you want to persist the downstream AT, and possibly
365+ # the RT (if any) for prolonged access even after your own AT expires,
366+ # now it is the time to persist current cache state for current user.
367+ # Assuming you already did that (which is not shown in this test case),
368+ # the following part shows one of the ways to obtain an AT from cache.
369+ username = cca_result .get ("id_token_claims" , {}).get ("preferred_username" )
370+ self .assertEqual (config ["username" ], username )
371+ if username : # A precaution so that we won't use other user's token
372+ account = cca .get_accounts (username = username )[0 ]
373+ result = cca .acquire_token_silent (downstream_scopes , account )
374+ self .assertEqual (cca_result ["access_token" ], result ["access_token" ])
375+
0 commit comments