|
1 | 1 | # Note: Since Aug 2019 we move all e2e tests into test_e2e.py, |
2 | 2 | # so this test_application file contains only unit tests without dependency. |
| 3 | +import hashlib |
3 | 4 | import json |
4 | 5 | import logging |
5 | 6 | import sys |
@@ -62,6 +63,35 @@ def test_bytes_to_bytes(self): |
62 | 63 | self.assertEqual(type(_str2bytes(b"some bytes")), type(b"bytes")) |
63 | 64 |
|
64 | 65 |
|
| 66 | +def fake_token_getter( |
| 67 | + *, |
| 68 | + access_token: str = "an access token", |
| 69 | + status_code: int = 200, |
| 70 | + expires_in: int = 3600, |
| 71 | + token_type: str = "Bearer", |
| 72 | + payload: dict = None, |
| 73 | + headers: dict = None, |
| 74 | +): |
| 75 | + """A helper to create a fake token getter, |
| 76 | + which will be consumed by ClientApplication's acquire methods' post parameter. |
| 77 | +
|
| 78 | + Generic mock.patch() is inconvenient because: |
| 79 | + 1. If you patch it at or above oauth2.py _obtain_token(), token cache is not populated. |
| 80 | + 2. If you patch it at request.post(), your test cases become fragile because |
| 81 | + more http round-trips may be added for future flows, |
| 82 | + then your existing test case would break until you mock new round-trips. |
| 83 | + """ |
| 84 | + return lambda url, *args, **kwargs: MinimalResponse( |
| 85 | + status_code=status_code, |
| 86 | + text=json.dumps(payload or { |
| 87 | + "access_token": access_token, |
| 88 | + "expires_in": expires_in, |
| 89 | + "token_type": token_type, |
| 90 | + }), |
| 91 | + headers=headers, |
| 92 | + ) |
| 93 | + |
| 94 | + |
65 | 95 | class TestClientApplicationAcquireTokenSilentErrorBehaviors(unittest.TestCase): |
66 | 96 |
|
67 | 97 | @patch(_OIDC_DISCOVERY, new=_OIDC_DISCOVERY_MOCK) |
@@ -874,3 +904,61 @@ def test_app_did_not_register_redirect_uri_should_error_out(self): |
874 | 904 | parent_window_handle=app.CONSOLE_WINDOW_HANDLE, |
875 | 905 | ) |
876 | 906 | self.assertEqual(result.get("error"), "broker_error") |
| 907 | + |
| 908 | + |
| 909 | +@patch("msal.authority.tenant_discovery", new=Mock(return_value={ |
| 910 | + "authorization_endpoint": "https://contoso.com/placeholder", |
| 911 | + "token_endpoint": "https://contoso.com/placeholder", |
| 912 | + })) |
| 913 | +class AccessTokenToRefreshTestCase(unittest.TestCase): |
| 914 | + scopes = ["scope"] |
| 915 | + token1 = "AT one" |
| 916 | + token1_hash = hashlib.sha256(token1.encode()).hexdigest() |
| 917 | + token2 = "AT two" |
| 918 | + |
| 919 | + def setUp(self): |
| 920 | + self.app = msal.ConfidentialClientApplication("id", client_credential="*") |
| 921 | + # Prepopulate cache |
| 922 | + self.app.acquire_token_for_client( |
| 923 | + self.scopes, post=fake_token_getter(access_token=self.token1)) |
| 924 | + self.assertNotEqual( |
| 925 | + self.app.token_cache._cache, {}, "Cache should have been populated") |
| 926 | + |
| 927 | + def test_mismatching_hash_should_not_trigger_refresh(self): |
| 928 | + result = self.app.acquire_token_for_client( |
| 929 | + self.scopes, |
| 930 | + access_token_sha256_to_refresh="mismatching hash", |
| 931 | + post=fake_token_getter(access_token=self.token2)) |
| 932 | + self.assertEqual(result.get("access_token"), self.token1, "Should hit old token") |
| 933 | + self.assertEqual(result.get("token_source"), self.app._TOKEN_SOURCE_CACHE) |
| 934 | + |
| 935 | + def test_matching_hash_should_trigger_refresh(self): |
| 936 | + result = self.app.acquire_token_for_client( |
| 937 | + self.scopes, |
| 938 | + access_token_sha256_to_refresh=self.token1_hash, |
| 939 | + post=fake_token_getter(access_token=self.token2)) |
| 940 | + self.assertEqual(result.get("access_token"), self.token2, "Should obtain new token") |
| 941 | + self.assertEqual(result.get("token_source"), self.app._TOKEN_SOURCE_IDP) |
| 942 | + |
| 943 | + # A client using old token1 and valid old hash, even with claims challenge, |
| 944 | + # should not trigger refresh, because we can serve it with token2 in cache. |
| 945 | + result = self.app.acquire_token_for_client( |
| 946 | + self.scopes, |
| 947 | + access_token_sha256_to_refresh=self.token1_hash, |
| 948 | + claims_challenge='''{"access_token": { |
| 949 | + "access_token": {"nbf": {"essential": true, "value": "1563308371"} |
| 950 | + }}''', |
| 951 | + post=fake_token_getter(access_token="AT three")) |
| 952 | + self.assertEqual(result.get("access_token"), self.token2, "Token 2 should be returned") |
| 953 | + self.assertEqual(result.get("token_source"), self.app._TOKEN_SOURCE_CACHE) |
| 954 | + |
| 955 | + def test_force_refresh_alone_should_trigger_refresh(self): |
| 956 | + # Note: MSAL Python's acquire_token_for_client() never support force_refresh, |
| 957 | + # but let's ensure _acquire_token_silent_with_error(..., force_refresh=True) |
| 958 | + # bypasses cache, so that other account-based flows can still use it. |
| 959 | + result = self.app._acquire_token_silent_with_error( |
| 960 | + self.scopes, account=None, force_refresh=True, |
| 961 | + post=fake_token_getter(access_token=self.token2)) |
| 962 | + self.assertEqual(result.get("access_token"), self.token2, "Should hit new token") |
| 963 | + self.assertEqual(result.get("token_source"), self.app._TOKEN_SOURCE_IDP) |
| 964 | + |
0 commit comments