@@ -931,3 +931,62 @@ def test_acquire_token_for_client_should_return_a_cdt(self):
931931 self .assertAppObtainsCdt (app , ["scope1" , "scope2" ])
932932 self .assertEqual (mocked_post .call_count , 2 )
933933
934+
935+ @patch ("msal.authority.tenant_discovery" , new = Mock (return_value = {
936+ "authorization_endpoint" : "https://contoso.com/placeholder" ,
937+ "token_endpoint" : "https://contoso.com/placeholder" ,
938+ }))
939+ class FmiTestCase (unittest .TestCase ):
940+
941+ def assertFmi (self , result : dict ) -> None :
942+ self .assertIsNotNone (
943+ result .get ("access_token" ), "Encountered {}: {}" .format (
944+ result .get ("error" ), result .get ("error_description" )))
945+ self .assertIn (result ["token_type" ], ("fmi_cred" , "fmi_token" ))
946+
947+ def assertAppObtainsAndCaches (self , app , scopes , fmi_path ) -> dict :
948+ result = app .acquire_token_for_client (scopes , fmi_path = fmi_path )
949+ self .assertFmi (result )
950+ self .assertEqual (result ["token_source" ], "identity_provider" )
951+
952+ result = app .acquire_token_for_client (scopes , fmi_path = fmi_path )
953+ self .assertFmi (result )
954+ self .assertEqual (result ["token_source" ], "cache" )
955+
956+ return result
957+
958+ def assertAppCachesByFmipath (self , app , scopes , fmi_path ) -> dict :
959+ with patch .object (app .http_client , "post" , return_value = MinimalResponse (
960+ status_code = 200 , text = json .dumps ({
961+ "token_type" : "fmi_cred"
962+ if scopes == ["api://AzureFMITokenExchange" ] else "fmi_token" ,
963+ "access_token" : "payload" ,
964+ "expires_in" : 3600 ,
965+ }))) as mocked_post :
966+ result = self .assertAppObtainsAndCaches (app , scopes , fmi_path )
967+ self .assertAppObtainsAndCaches (app , scopes , fmi_path + "/subpath" )
968+ self .assertEqual (mocked_post .call_count , 2 )
969+ self .assertEqual (
970+ 2 ,
971+ len (list (app .token_cache .search (
972+ msal .TokenCache .CredentialType .ACCESS_TOKEN ))),
973+ "Should cache both tokens" )
974+ logger .debug ("cache=%s" , json .dumps (app .token_cache ._cache , indent = 2 ))
975+ return result
976+
977+ def test_fmi_with_rma_and_sub_rma (self ):
978+ fmi_cred = self .assertAppCachesByFmipath (
979+ msal .ConfidentialClientApplication (
980+ "rma_client_id" ,
981+ client_credential = {"client_assertion" : "some assertion" },
982+ ),
983+ ["api://AzureFMITokenExchange" ],
984+ "/eid1/c/<cloud>/t/<tenantid>/a/<rma>/<fmi_path>" )
985+ self .assertAppCachesByFmipath (
986+ msal .ConfidentialClientApplication (
987+ "urn:microsoft:identity:fmi" ,
988+ client_credential = {"client_assertion" : fmi_cred ["access_token" ]},
989+ ),
990+ ["https://graph.microsoft.com/.default" ],
991+ "/eid1/c/<cloud>/t/<tenantid>/a/<rma>/<fmi_path>/foo" )
992+
0 commit comments