|
1 | 1 | import base64 |
2 | 2 | import json |
3 | 3 | import os |
| 4 | +import re |
4 | 5 |
|
5 | 6 | import jwt |
6 | 7 | import pytest |
7 | 8 | from momento.auth.credential_provider import CredentialProvider |
8 | 9 | from momento.auth.momento_endpoint_resolver import _Base64DecodedV1Token |
| 10 | +from momento.errors.exceptions import InvalidArgumentException |
9 | 11 |
|
10 | 12 | from tests.utils import uuid_str |
11 | 13 |
|
|
23 | 25 | os.environ[test_env_var_name] = test_token |
24 | 26 | os.environ[test_v1_env_var_name] = test_encoded_v1_token.decode("utf-8") |
25 | 27 |
|
| 28 | +# For v2 API key tests |
| 29 | +test_v2_key_message = {"t": "g", "jti": "some-id"} |
| 30 | +test_v2_api_key = jwt.encode(test_v2_key_message, "secret", algorithm="HS512") |
| 31 | +test_v2_key_env_var_name = "MOMENTO_API_KEY" |
| 32 | +test_v2_endpoint = "testEndpoint" |
| 33 | +test_v2_endpoint_env_var_name = "MOMENTO_ENDPOINT" |
| 34 | +os.environ[test_v2_key_env_var_name] = test_v2_api_key |
| 35 | +os.environ[test_v2_endpoint_env_var_name] = test_v2_endpoint |
| 36 | + |
26 | 37 |
|
27 | 38 | @pytest.mark.parametrize( |
28 | 39 | "provider, auth_token, control_endpoint, cache_endpoint", |
@@ -97,3 +108,173 @@ def test_endpoints(provider: CredentialProvider, auth_token: str, control_endpoi |
97 | 108 | def test_env_token_raises_if_not_exists() -> None: |
98 | 109 | with pytest.raises(RuntimeError, match=r"Missing required environment variable"): |
99 | 110 | CredentialProvider.from_environment_variable(env_var_name=uuid_str()) |
| 111 | + |
| 112 | + |
| 113 | +@pytest.mark.parametrize( |
| 114 | + "provider, expected_api_key, expected_control_endpoint, expected_cache_endpoint, expected_token_endpoint", |
| 115 | + [ |
| 116 | + ( |
| 117 | + CredentialProvider.from_api_key_v2( |
| 118 | + api_key=test_v2_api_key, |
| 119 | + endpoint=test_v2_endpoint, |
| 120 | + ), |
| 121 | + test_v2_api_key, |
| 122 | + f"control.{test_v2_endpoint}", |
| 123 | + f"cache.{test_v2_endpoint}", |
| 124 | + f"token.{test_v2_endpoint}", |
| 125 | + ), |
| 126 | + ( |
| 127 | + CredentialProvider.from_environment_variables_v2( |
| 128 | + api_key_env_var=test_v2_key_env_var_name, |
| 129 | + endpoint_env_var=test_v2_endpoint_env_var_name, |
| 130 | + ), |
| 131 | + test_v2_api_key, |
| 132 | + f"control.{test_v2_endpoint}", |
| 133 | + f"cache.{test_v2_endpoint}", |
| 134 | + f"token.{test_v2_endpoint}", |
| 135 | + ), |
| 136 | + ( |
| 137 | + CredentialProvider.from_environment_variables_v2(), |
| 138 | + test_v2_api_key, |
| 139 | + f"control.{test_v2_endpoint}", |
| 140 | + f"cache.{test_v2_endpoint}", |
| 141 | + f"token.{test_v2_endpoint}", |
| 142 | + ), |
| 143 | + ], |
| 144 | +) |
| 145 | +def test_v2_api_key_endpoints( |
| 146 | + provider: CredentialProvider, |
| 147 | + expected_api_key: str, |
| 148 | + expected_control_endpoint: str, |
| 149 | + expected_cache_endpoint: str, |
| 150 | + expected_token_endpoint: str, |
| 151 | +) -> None: |
| 152 | + assert provider.auth_token == expected_api_key |
| 153 | + assert provider.control_endpoint == expected_control_endpoint |
| 154 | + assert provider.cache_endpoint == expected_cache_endpoint |
| 155 | + assert provider.token_endpoint == expected_token_endpoint |
| 156 | + |
| 157 | + |
| 158 | +def test_v2_key_from_string_raises_if_api_key_empty() -> None: |
| 159 | + with pytest.raises(InvalidArgumentException, match="API key cannot be empty"): |
| 160 | + CredentialProvider.from_api_key_v2(api_key="", endpoint=test_v2_endpoint) |
| 161 | + |
| 162 | + |
| 163 | +def test_v2_key_from_string_raises_if_endpoint_empty() -> None: |
| 164 | + with pytest.raises(InvalidArgumentException, match="Endpoint cannot be empty"): |
| 165 | + CredentialProvider.from_api_key_v2(api_key=test_v2_api_key, endpoint="") |
| 166 | + |
| 167 | + |
| 168 | +def test_v2_key_from_env_raises_if_env_var_name_empty() -> None: |
| 169 | + with pytest.raises(InvalidArgumentException, match="API key environment variable name cannot be empty"): |
| 170 | + CredentialProvider.from_environment_variables_v2( |
| 171 | + api_key_env_var="", endpoint_env_var=test_v2_endpoint_env_var_name |
| 172 | + ) |
| 173 | + |
| 174 | + |
| 175 | +def test_v2_key_from_env_raises_if_env_var_missing() -> None: |
| 176 | + with pytest.raises(RuntimeError, match="Missing required environment variable"): |
| 177 | + CredentialProvider.from_environment_variables_v2( |
| 178 | + api_key_env_var=uuid_str(), endpoint_env_var=test_v2_endpoint_env_var_name |
| 179 | + ) |
| 180 | + |
| 181 | + |
| 182 | +def test_v2_key_from_env_raises_if_endpoint_empty() -> None: |
| 183 | + with pytest.raises(InvalidArgumentException, match="Endpoint environment variable name cannot be empty"): |
| 184 | + CredentialProvider.from_environment_variables_v2(api_key_env_var=test_v2_key_env_var_name, endpoint_env_var="") |
| 185 | + |
| 186 | + |
| 187 | +def test_v2_key_from_env_raises_if_api_key_empty_string() -> None: |
| 188 | + empty_api_key_env_var = uuid_str() |
| 189 | + os.environ[empty_api_key_env_var] = "" |
| 190 | + with pytest.raises(RuntimeError, match="Missing required environment variable"): |
| 191 | + CredentialProvider.from_environment_variables_v2( |
| 192 | + api_key_env_var=empty_api_key_env_var, endpoint_env_var=test_v2_endpoint_env_var_name |
| 193 | + ) |
| 194 | + |
| 195 | + |
| 196 | +def test_v2_key_from_string_raises_if_base64_api_key() -> None: |
| 197 | + with pytest.raises( |
| 198 | + InvalidArgumentException, |
| 199 | + match=re.escape( |
| 200 | + "Received an invalid v2 API key. Are you using the correct key and the correct CredentialProvider method?" |
| 201 | + ), |
| 202 | + ): |
| 203 | + CredentialProvider.from_api_key_v2( |
| 204 | + api_key=test_encoded_v1_token.decode("utf-8"), endpoint=test_v2_endpoint_env_var_name |
| 205 | + ) |
| 206 | + |
| 207 | + |
| 208 | +def test_v2_key_from_env_raises_if_base64_api_key() -> None: |
| 209 | + with pytest.raises( |
| 210 | + InvalidArgumentException, |
| 211 | + match=re.escape( |
| 212 | + "Received an invalid v2 API key. Are you using the correct key? Or did you mean to use `from_environment_variable()` with a legacy key instead?" |
| 213 | + ), |
| 214 | + ): |
| 215 | + CredentialProvider.from_environment_variables_v2( |
| 216 | + api_key_env_var=test_v1_env_var_name, endpoint_env_var=test_v2_endpoint_env_var_name |
| 217 | + ) |
| 218 | + |
| 219 | + |
| 220 | +def test_v2_key_from_string_raises_if_pre_v1_token() -> None: |
| 221 | + with pytest.raises( |
| 222 | + InvalidArgumentException, |
| 223 | + match=re.escape( |
| 224 | + "Received an invalid v2 API key. Are you using the correct key and the correct CredentialProvider method?" |
| 225 | + ), |
| 226 | + ): |
| 227 | + CredentialProvider.from_api_key_v2(api_key=test_token, endpoint=test_v2_endpoint_env_var_name) |
| 228 | + |
| 229 | + |
| 230 | +def test_v2_key_from_env_raises_if_pre_v1_token() -> None: |
| 231 | + with pytest.raises( |
| 232 | + InvalidArgumentException, |
| 233 | + match=re.escape( |
| 234 | + "Received an invalid v2 API key. Are you using the correct key? Or did you mean to use `from_environment_variable()` with a legacy key instead?" |
| 235 | + ), |
| 236 | + ): |
| 237 | + CredentialProvider.from_environment_variables_v2( |
| 238 | + api_key_env_var=test_env_var_name, endpoint_env_var=test_v2_endpoint_env_var_name |
| 239 | + ) |
| 240 | + |
| 241 | + |
| 242 | +def test_v2_key_provided_to_from_string() -> None: |
| 243 | + with pytest.raises( |
| 244 | + InvalidArgumentException, |
| 245 | + match=re.escape( |
| 246 | + "Unexpectedly received a v2 API key. Are you using the correct key and the correct CredentialProvider method?" |
| 247 | + ), |
| 248 | + ): |
| 249 | + CredentialProvider.from_string(auth_token=test_v2_api_key) |
| 250 | + |
| 251 | + |
| 252 | +def test_v2_key_provided_to_from_disposable_token() -> None: |
| 253 | + with pytest.raises( |
| 254 | + InvalidArgumentException, |
| 255 | + match=re.escape( |
| 256 | + "Unexpectedly received a v2 API key. Are you using the correct key and the correct CredentialProvider method?" |
| 257 | + ), |
| 258 | + ): |
| 259 | + CredentialProvider.from_disposable_token(auth_token=test_v2_api_key) |
| 260 | + |
| 261 | + |
| 262 | +def test_from_disposable_token_raises_if_token_empty() -> None: |
| 263 | + with pytest.raises(InvalidArgumentException, match="Disposable token cannot be empty."): |
| 264 | + CredentialProvider.from_disposable_token(auth_token="") |
| 265 | + |
| 266 | + |
| 267 | +def test_from_disposable_token_accepts_v1_api_key() -> None: |
| 268 | + provider = CredentialProvider.from_disposable_token(auth_token=test_encoded_v1_token.decode("utf-8")) |
| 269 | + assert provider.auth_token == test_v1_api_key |
| 270 | + assert provider.control_endpoint == "control.test.momentohq.com" |
| 271 | + assert provider.cache_endpoint == "cache.test.momentohq.com" |
| 272 | + assert provider.token_endpoint == "token.test.momentohq.com" |
| 273 | + |
| 274 | + |
| 275 | +def test_from_disposable_token_accepts_pre_v1_token() -> None: |
| 276 | + provider = CredentialProvider.from_disposable_token(auth_token=test_token) |
| 277 | + assert provider.auth_token == test_token |
| 278 | + assert provider.control_endpoint == test_control_endpoint |
| 279 | + assert provider.cache_endpoint == test_cache_endpoint |
| 280 | + assert provider.token_endpoint == f"token.{test_cache_endpoint}" |
0 commit comments