|
11 | 11 | ClientApplication, PublicClientApplication, ConfidentialClientApplication, |
12 | 12 | _str2bytes, _merge_claims_challenge_and_capabilities, |
13 | 13 | ) |
| 14 | +from msal.oauth2cli.oidc import decode_part |
14 | 15 | from tests import unittest |
15 | 16 | from tests.test_token_cache import build_id_token, build_response |
16 | 17 | from tests.http_client import MinimalHttpClient, MinimalResponse |
@@ -856,3 +857,85 @@ def test_app_did_not_register_redirect_uri_should_error_out(self): |
856 | 857 | ) |
857 | 858 | self.assertEqual(result.get("error"), "broker_error") |
858 | 859 |
|
| 860 | + |
| 861 | +class CdtTestCase(unittest.TestCase): |
| 862 | + |
| 863 | + def createConstraint(self, typ: str, action: str, targets: list[str]) -> dict: |
| 864 | + return {"ver": "1.0", "typ": typ, "a": action, "target": [ |
| 865 | + {"val": t} for t in targets |
| 866 | + ]} |
| 867 | + |
| 868 | + def test_constraint_format(self): |
| 869 | + self.assertEqual([ |
| 870 | + self.createConstraint("ns:usr", "create", ["guid1", "guid2"]), |
| 871 | + self.createConstraint("ns:app", "update", ["guid3", "guid4"]), |
| 872 | + self.createConstraint("ns:subscription", "read", ["guid5", "guid6"]), |
| 873 | + ], [ # Format defined in https://microsoft-my.sharepoint-df.com/:w:/p/rohitshende/EZgP9niwOvhKn-CUbj1NgG4BTZ6FSD9_16vXvsaXTiUzkg?e=j5DcQu&nav=eyJoIjoiODU5NDAyNjI4In0 |
| 874 | + {"ver": "1.0", "typ": "ns:usr", "a": "create", "target": [ |
| 875 | + {"val": "guid1"}, {"val": "guid2"}, |
| 876 | + ], |
| 877 | + }, |
| 878 | + {"ver": "1.0", "typ": "ns:app", "a": "update", "target": [ |
| 879 | + {"val": "guid3"}, {"val": "guid4"}, |
| 880 | + ], |
| 881 | + }, |
| 882 | + {"ver": "1.0", "typ": "ns:subscription", "a": "read", "target": [ |
| 883 | + {"val": "guid5"}, {"val": "guid6"}, |
| 884 | + ], |
| 885 | + }, |
| 886 | + ], "Constraint format is correct") # MSAL actually accepts arbitrary JSON blob |
| 887 | + |
| 888 | + def assertCdt(self, result: dict, constraints: list[dict]) -> None: |
| 889 | + self.assertIsNotNone( |
| 890 | + result.get("access_token"), "Encountered {}: {}".format( |
| 891 | + result.get("error"), result.get("error_description"))) |
| 892 | + _expectancy = "The return value should look like a Bearer response" |
| 893 | + self.assertEqual(result["token_type"], "Bearer", _expectancy) |
| 894 | + self.assertNotIn("xms_ds_nonce", result, _expectancy) |
| 895 | + headers = json.loads(decode_part(result["access_token"].split(".")[0])) |
| 896 | + self.assertEqual(headers.get("typ"), "cdt+jwt", "typ should be cdt+jwt") |
| 897 | + payload = json.loads(decode_part(result["access_token"].split(".")[1])) |
| 898 | + self.assertIsNotNone(payload.get("t") and payload.get("c")) |
| 899 | + cdt_envelope = json.loads(decode_part(payload["c"].split(".")[1])) |
| 900 | + self.assertIn("xms_ds_nonce", cdt_envelope) |
| 901 | + self.assertEqual(cdt_envelope["constraints"], constraints) |
| 902 | + |
| 903 | + def assertAppObtainsCdt(self, client_app, scopes) -> None: |
| 904 | + constraints1 = [self.createConstraint("ns:usr", "create", ["guid1"])] |
| 905 | + result = client_app.acquire_token_for_client( |
| 906 | + scopes, delegation_constraints=constraints1, |
| 907 | + ) |
| 908 | + self.assertCdt(result, constraints1) |
| 909 | + |
| 910 | + constraints2 = [self.createConstraint("ns:app", "update", ["guid2"])] |
| 911 | + result = client_app.acquire_token_for_client( |
| 912 | + scopes, delegation_constraints=constraints2, |
| 913 | + ) |
| 914 | + self.assertEqual(result["token_source"], "cache", "App token Should hit cache") |
| 915 | + self.assertCdt(result, constraints2) |
| 916 | + |
| 917 | + result = client_app.acquire_token_for_client( |
| 918 | + scopes, delegation_constraints=constraints2, |
| 919 | + delegation_confirmation_key=client_app._get_rsa_key("new"), |
| 920 | + ) |
| 921 | + self.assertEqual( |
| 922 | + result["token_source"], "identity_provider", |
| 923 | + "Different key should result in a new app token") |
| 924 | + self.assertCdt(result, constraints2) |
| 925 | + |
| 926 | + @patch("msal.authority.tenant_discovery", new=Mock(return_value={ |
| 927 | + "authorization_endpoint": "https://contoso.com/placeholder", |
| 928 | + "token_endpoint": "https://contoso.com/placeholder", |
| 929 | + })) |
| 930 | + def test_acquire_token_for_client_should_return_a_cdt(self): |
| 931 | + app = msal.ConfidentialClientApplication("id", client_credential="secret") |
| 932 | + with patch.object(app.http_client, "post", return_value=MinimalResponse( |
| 933 | + status_code=200, text=json.dumps({ |
| 934 | + "token_type": "Bearer", |
| 935 | + "access_token": "app token", |
| 936 | + "expires_in": 3600, |
| 937 | + "xms_ds_nonce": "nonce", |
| 938 | + }))) as mocked_post: |
| 939 | + self.assertAppObtainsCdt(app, ["scope1", "scope2"]) |
| 940 | + mocked_post.assert_called_once() |
| 941 | + |
0 commit comments