|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +import dataclasses |
| 4 | +import importlib |
| 5 | +import inspect |
| 6 | +import sys |
| 7 | +from collections.abc import Iterator, Mapping |
| 8 | +from pathlib import Path |
| 9 | +from typing import TYPE_CHECKING, Any, Protocol, Union, runtime_checkable |
| 10 | + |
| 11 | +if TYPE_CHECKING: |
| 12 | + from collections.abc import Generator, Iterable |
| 13 | + |
| 14 | + StrMapping = Mapping[str, Any] |
| 15 | +else: |
| 16 | + StrMapping = Mapping |
| 17 | + |
| 18 | +from ..metadata import _ALL_FIELDS |
| 19 | + |
| 20 | +__all__ = ["load_provider", "process_dynamic_metadata"] |
| 21 | + |
| 22 | + |
| 23 | +def __dir__() -> list[str]: |
| 24 | + return __all__ |
| 25 | + |
| 26 | + |
| 27 | +@runtime_checkable |
| 28 | +class DynamicMetadataProtocol(Protocol): |
| 29 | + def dynamic_metadata( |
| 30 | + self, |
| 31 | + fields: Iterable[str], |
| 32 | + settings: dict[str, Any], |
| 33 | + project: Mapping[str, Any], |
| 34 | + ) -> dict[str, Any]: ... |
| 35 | + |
| 36 | + |
| 37 | +@runtime_checkable |
| 38 | +class DynamicMetadataRequirementsProtocol(DynamicMetadataProtocol, Protocol): |
| 39 | + def get_requires_for_dynamic_metadata( |
| 40 | + self, settings: dict[str, Any] |
| 41 | + ) -> list[str]: ... |
| 42 | + |
| 43 | + |
| 44 | +@runtime_checkable |
| 45 | +class DynamicMetadataWheelProtocol(DynamicMetadataProtocol, Protocol): |
| 46 | + def dynamic_wheel(self, field: str, settings: Mapping[str, Any]) -> bool: ... |
| 47 | + |
| 48 | + |
| 49 | +DMProtocols = Union[ |
| 50 | + DynamicMetadataProtocol, |
| 51 | + DynamicMetadataRequirementsProtocol, |
| 52 | + DynamicMetadataWheelProtocol, |
| 53 | +] |
| 54 | + |
| 55 | + |
| 56 | +def load_provider( |
| 57 | + provider: str, |
| 58 | + provider_path: str | None = None, |
| 59 | +) -> DMProtocols: |
| 60 | + if provider_path is None: |
| 61 | + return importlib.import_module(provider) |
| 62 | + |
| 63 | + if not Path(provider_path).is_dir(): |
| 64 | + msg = "provider-path must be an existing directory" |
| 65 | + raise AssertionError(msg) |
| 66 | + |
| 67 | + try: |
| 68 | + sys.path.insert(0, provider_path) |
| 69 | + return importlib.import_module(provider) |
| 70 | + finally: |
| 71 | + sys.path.pop(0) |
| 72 | + |
| 73 | + |
| 74 | +def _load_dynamic_metadata( |
| 75 | + metadata: Mapping[str, Mapping[str, str]], |
| 76 | +) -> Generator[tuple[str, DMProtocols, dict[str, Any]], None, None]: |
| 77 | + for field, orig_config in metadata.items(): |
| 78 | + if "provider" not in orig_config: |
| 79 | + msg = "Missing provider in dynamic metadata" |
| 80 | + raise KeyError(msg) |
| 81 | + |
| 82 | + if field not in _ALL_FIELDS: |
| 83 | + msg = f"{field} is not a valid field" |
| 84 | + raise KeyError(msg) |
| 85 | + config = dict(orig_config) |
| 86 | + provider = config.pop("provider") |
| 87 | + provider_path = config.pop("provider-path", None) |
| 88 | + loaded_provider = load_provider(provider, provider_path) |
| 89 | + yield field, loaded_provider, config |
| 90 | + |
| 91 | + |
| 92 | +@dataclasses.dataclass |
| 93 | +class DynamicPyProject(StrMapping): |
| 94 | + settings: dict[str, dict[str, Any]] |
| 95 | + project: dict[str, Any] |
| 96 | + providers: dict[str, DMProtocols] |
| 97 | + |
| 98 | + def __getitem__(self, key: str) -> Any: |
| 99 | + # Try to get the settings from either the static file or dynamic metadata provider |
| 100 | + if key in self.project: |
| 101 | + return self.project[key] |
| 102 | + |
| 103 | + # Check if we are in a loop, i.e. something else is already requesting |
| 104 | + # this key while trying to get another key |
| 105 | + if key not in self.providers: |
| 106 | + dep_type = "missing" if key in self.settings else "circular" |
| 107 | + msg = f"Encountered a {dep_type} dependency at {key}" |
| 108 | + raise ValueError(msg) |
| 109 | + |
| 110 | + provider = self.providers.pop(key) |
| 111 | + sig = inspect.signature(provider.dynamic_metadata) |
| 112 | + if len(sig.parameters) < 3: |
| 113 | + # Backcompat for dynamic_metadata without metadata dict |
| 114 | + self.project[key] = provider.dynamic_metadata( # type: ignore[call-arg] |
| 115 | + key, self.settings[key] |
| 116 | + ) |
| 117 | + else: |
| 118 | + self.project[key] = provider.dynamic_metadata(key, self.settings[key], self) |
| 119 | + self.project["dynamic"].remove(key) |
| 120 | + |
| 121 | + return self.project[key] |
| 122 | + |
| 123 | + def __iter__(self) -> Iterator[str]: |
| 124 | + # Iterate over the keys of the static settings |
| 125 | + yield from self.project |
| 126 | + |
| 127 | + # Iterate over the keys of the dynamic metadata providers |
| 128 | + yield from self.providers |
| 129 | + |
| 130 | + def __len__(self) -> int: |
| 131 | + return len(self.project) + len(self.providers) |
| 132 | + |
| 133 | + def __contains__(self, key: object) -> bool: |
| 134 | + return key in self.project or key in self.providers |
| 135 | + |
| 136 | + |
| 137 | +def process_dynamic_metadata( |
| 138 | + project: Mapping[str, Any], |
| 139 | + metadata: Mapping[str, Mapping[str, Any]], |
| 140 | +) -> dict[str, Any]: |
| 141 | + initial = {f: (p, c) for (f, p, c) in _load_dynamic_metadata(metadata)} |
| 142 | + |
| 143 | + settings = DynamicPyProject( |
| 144 | + settings={f: c for f, (_, c) in initial.items()}, |
| 145 | + project=dict(project), |
| 146 | + providers={k: v for k, (v, _) in initial.items()}, |
| 147 | + ) |
| 148 | + |
| 149 | + return dict(settings) |
0 commit comments