Skip to content
Closed
37 changes: 27 additions & 10 deletions pydantic_settings/sources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,27 +258,44 @@ def __init__(
settings_cls: type[BaseSettings],
init_kwargs: dict[str, Any],
nested_model_default_partial_update: bool | None = None,
case_sensitive: bool | None = None,
):
super().__init__(settings_cls)

self.case_sensitive = case_sensitive if case_sensitive is not None else self.config.get('case_sensitive', False)

self.init_kwargs = {}
init_kwarg_names = set(init_kwargs.keys())
init_kwargs_processed = set()

init_kwargs_lookup: dict[str, str]
if not self.case_sensitive:
init_kwargs_lookup = {k.lower(): k for k in init_kwargs}
else:
init_kwargs_lookup = {k: k for k in init_kwargs}

for field_name, field_info in settings_cls.model_fields.items():
alias_names, *_ = _get_alias_names(field_name, field_info)
init_kwarg_name = init_kwarg_names & set(alias_names)
if init_kwarg_name:
preferred_alias = alias_names[0]
init_kwarg_names -= init_kwarg_name
self.init_kwargs[preferred_alias] = init_kwargs[init_kwarg_name.pop()]
self.init_kwargs.update({key: val for key, val in init_kwargs.items() if key in init_kwarg_names})
canonical_aliases, _ = _get_alias_names(field_name, field_info, case_sensitive=True)
preferred_alias = canonical_aliases[0]

match_aliases, _ = _get_alias_names(field_name, field_info, case_sensitive=self.case_sensitive)
for alias in match_aliases:
original_kwarg_key = init_kwargs_lookup.get(alias)
if original_kwarg_key is not None and original_kwarg_key not in init_kwargs_processed:
self.init_kwargs[preferred_alias] = init_kwargs[original_kwarg_key]
init_kwargs_processed.add(original_kwarg_key)
break

for original_key, value in init_kwargs.items():
if original_key not in init_kwargs_processed:
self.init_kwargs[original_key] = value

super().__init__(settings_cls)
self.nested_model_default_partial_update = (
nested_model_default_partial_update
if nested_model_default_partial_update is not None
else self.config.get('nested_model_default_partial_update', False)
)

def get_field_value(self, field: FieldInfo, field_name: str) -> tuple[Any, str, bool]:
# Nothing to do here. Only implement the return statement to make mypy happy
return None, '', False

def __call__(self) -> dict[str, Any]:
Expand Down
38 changes: 38 additions & 0 deletions tests/test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,44 @@ class Settings(BaseSettings):
]


def test_init_settings_source_extra_fields_case_sensitive(monkeypatch):
class CaseSensitiveSettings(BaseSettings):
foo: str = Field(..., alias='FOO')
model_config = SettingsConfigDict(case_sensitive=True, extra='allow')

# Test case-sensitive with missing alias
monkeypatch.setattr(os, 'environ', value={})
init_kwargs = {'Foo': 'wrong_value', 'extra_field': 'extra_value'}
with pytest.raises(ValidationError) as exc_info:
CaseSensitiveSettings(**init_kwargs)
assert exc_info.value.errors(include_url=False) == [
{
'type': 'missing',
'loc': ('FOO',),
'msg': 'Field required',
'input': init_kwargs,
}
]

# Test case-sensitive with correct alias and extra field
monkeypatch.setattr(os, 'environ', value={})
init_kwargs = {'FOO': 'foo_value', 'extra_field': 'extra_value'}
settings = CaseSensitiveSettings(**init_kwargs)
assert settings.foo == 'foo_value'
assert settings.__pydantic_extra__ == {'extra_field': 'extra_value'}

class CaseInsensitiveSettings(BaseSettings):
foo: str = Field(..., alias='FOO')
model_config = SettingsConfigDict(case_sensitive=False, extra='allow')

# Test case-insensitive with extra field
monkeypatch.setattr(os, 'environ', value={})
init_kwargs = {'Foo': 'foo_value', 'extra_field': 'extra_value'}
settings = CaseInsensitiveSettings(**init_kwargs)
assert settings.foo == 'foo_value'
assert settings.__pydantic_extra__ == {'extra_field': 'extra_value'}


@pytest.mark.parametrize('env_nested_delimiter', [None, ''])
def test_case_sensitive_no_nested_delimiter(monkeypatch, env_nested_delimiter):
class Subsettings(BaseSettings):
Expand Down