diff --git a/pydantic_settings/sources/providers/gcp.py b/pydantic_settings/sources/providers/gcp.py index ba202222..62f356a7 100644 --- a/pydantic_settings/sources/providers/gcp.py +++ b/pydantic_settings/sources/providers/gcp.py @@ -37,10 +37,11 @@ class GoogleSecretManagerMapping(Mapping[str, Optional[str]]): _loaded_secrets: dict[str, str | None] _secret_client: SecretManagerServiceClient - def __init__(self, secret_client: SecretManagerServiceClient, project_id: str) -> None: + def __init__(self, secret_client: SecretManagerServiceClient, project_id: str, case_sensitive: bool) -> None: self._loaded_secrets = {} self._secret_client = secret_client self._project_id = project_id + self._case_sensitive = case_sensitive @property def _gcp_project_path(self) -> str: @@ -48,15 +49,22 @@ def _gcp_project_path(self) -> str: @cached_property def _secret_names(self) -> list[str]: - return [ - self._secret_client.parse_secret_path(secret.name).get('secret', '') - for secret in self._secret_client.list_secrets(parent=self._gcp_project_path) - ] + rv: list[str] = [] + + secrets = self._secret_client.list_secrets(parent=self._gcp_project_path) + for secret in secrets: + name = self._secret_client.parse_secret_path(secret.name).get('secret', '') + if not self._case_sensitive: + name = name.lower() + rv.append(name) + return rv def _secret_version_path(self, key: str, version: str = 'latest') -> str: return self._secret_client.secret_version_path(self._project_id, key, version) def __getitem__(self, key: str) -> str | None: + if not self._case_sensitive: + key = key.lower() if key not in self._loaded_secrets: # If we know the key isn't available in secret manager, raise a key error if key not in self._secret_names: @@ -92,6 +100,7 @@ def __init__( env_parse_none_str: str | None = None, env_parse_enums: bool | None = None, secret_client: SecretManagerServiceClient | None = None, + case_sensitive: bool | None = True, ) -> None: # Import Google Packages if they haven't already been imported if SecretManagerServiceClient is None or Credentials is None or google_auth_default is None: @@ -124,7 +133,7 @@ def __init__( super().__init__( settings_cls, - case_sensitive=True, + case_sensitive=case_sensitive, env_prefix=env_prefix, env_ignore_empty=False, env_parse_none_str=env_parse_none_str, @@ -132,7 +141,9 @@ def __init__( ) def _load_env_vars(self) -> Mapping[str, Optional[str]]: - return GoogleSecretManagerMapping(self._secret_client, project_id=self._project_id) + return GoogleSecretManagerMapping( + self._secret_client, project_id=self._project_id, case_sensitive=self.case_sensitive + ) def __repr__(self) -> str: return f'{self.__class__.__name__}(project_id={self._project_id!r}, env_nested_delimiter={self.env_nested_delimiter!r})' diff --git a/tests/test_source_gcp_secret_manager.py b/tests/test_source_gcp_secret_manager.py index d5b13b1f..e43c45ad 100644 --- a/tests/test_source_gcp_secret_manager.py +++ b/tests/test_source_gcp_secret_manager.py @@ -57,7 +57,7 @@ def mock_access_secret_version(name: str): @pytest.fixture def secret_manager_mapping(mock_secret_client): - return GoogleSecretManagerMapping(mock_secret_client, 'test-project') + return GoogleSecretManagerMapping(mock_secret_client, project_id='test-project', case_sensitive=True) @pytest.fixture @@ -96,6 +96,13 @@ def test_secret_manager_mapping_getitem_success(self, secret_manager_mapping): value = secret_manager_mapping['test-secret'] assert value == 'test-value' + def test_secret_manager_mapping_getitem_case_insensitive_success(self, mock_secret_client): + case_insensitive_mapping = GoogleSecretManagerMapping( + mock_secret_client, project_id='test-project', case_sensitive=False + ) + value = case_insensitive_mapping['TEST-SECRET'] + assert value == 'test-value' + def test_secret_manager_mapping_getitem_nonexistent_key(self, secret_manager_mapping): with pytest.raises(KeyError): _ = secret_manager_mapping['nonexistent-secret']