diff --git a/environment.yml b/environment.yml index 09ec1def63..0292fad67f 100644 --- a/environment.yml +++ b/environment.yml @@ -18,6 +18,8 @@ dependencies: - fire - geopy - humanfriendly + - intake >=2.0.0 + - intake-esm >=2025.2.3 - iris >=3.11 # 3.11 first to support Numpy 2 and Python 3.13 - iris-esmf-regrid >=0.11.0 - iris-grib >=0.20.0 # github.com/ESMValGroup/ESMValCore/issues/2535 diff --git a/esmvalcore/config/_intake.py b/esmvalcore/config/_intake.py new file mode 100644 index 0000000000..e2305afd68 --- /dev/null +++ b/esmvalcore/config/_intake.py @@ -0,0 +1,171 @@ +"""esgf-pyclient configuration. + +The configuration is read from the file ~/.esmvaltool/esgf-pyclient.yml. +""" + +import logging +import os +import stat +from functools import lru_cache +from pathlib import Path +from typing import Any + +import yaml + +logger = logging.getLogger(__name__) + +CONFIG_FILE = Path.home() / ".esmvaltool" / "data-intake.yml" + + +def read_config_file() -> dict: + """Read the configuration from file.""" + if CONFIG_FILE.exists(): + logger.info("Loading Intake-ESM configuration from %s", CONFIG_FILE) + mode = os.stat(CONFIG_FILE).st_mode + if mode & stat.S_IRWXG or mode & stat.S_IRWXO: + logger.warning("Correcting unsafe permissions on %s", CONFIG_FILE) + os.chmod(CONFIG_FILE, stat.S_IRUSR | stat.S_IWUSR) + with CONFIG_FILE.open(encoding="utf-8") as file: + cfg = yaml.safe_load(file) + else: + logger.info( + "Using default Intake-ESM configuration, configuration " + "file %s not present.", + CONFIG_FILE, + ) + cfg = {} + + return cfg + + +def load_intake_config(): + """Load the intake-esm configuration.""" + cfg = { + "CMIP6": { + "catalogs": { + "NCI": [ + { + "file": "/g/data/fs38/catalog/v2/esm/catalog.json", + "facets": { + "activity": "activity_id", + "dataset": "source_id", + "ensemble": "member_id", + "exp": "experiment_id", + "grid": "grid_label", + "institute": "institution_id", + "mip": "table_id", + "short_name": "variable_id", + "version": "version", + "frequency": "frequency", + }, + }, + { + "file": "/g/data/oi10/catalog/v2/esm/catalog.json", + "facets": { + "activity": "activity_id", + "dataset": "source_id", + "ensemble": "member_id", + "exp": "experiment_id", + "grid": "grid_label", + "institute": "institution_id", + "mip": "table_id", + "short_name": "variable_id", + "version": "version", + "frequency": "frequency", + }, + }, + ] + } + }, + "CMIP5": { + "catalogs": { + "NCI": [ + { + "file": "/g/data/rr3/catalog/v2/esm/catalog.json", + "facets": { + "activity": "activity_id", + "dataset": "source_id", + "ensemble": "ensemble", + "exp": "experiment", + "grid": "grid_label", + "institute": "institution_id", + "mip": "table_id", + "short_name": "variable", + "version": "version", + }, + }, + { + "file": "/g/data/al33/catalog/v2/esm/catalog.json", + "facets": { + "activity": "activity_id", + "dataset": "source_id", + "ensemble": "ensemble", + "exp": "experiment", + "institute": "institute", + "mip": "table", + "short_name": "variable", + "version": "version", + "timerange": "time_range", + }, + }, + ] + } + }, + } + + file_cfg = read_config_file() + cfg.update(file_cfg) + + return cfg + + +@lru_cache() +def get_intake_config(): + """Get the esgf-pyclient configuration.""" + return load_intake_config() + + +def _read_facets( + cfg: dict, + fhandle: str | None, + project: str | None = None, +) -> tuple[dict[str, Any], str]: + """ + Extract facet mapping from ESMValCore configuration for a given catalog file handle. + + Recursively traverses the ESMValCore configuration structure to find the + facet mapping that corresponds to the specified file handle. + + Parameters + ---------- + cfg : dict + The ESMValCore intake configuration dictionary. + fhandle : str + The file handle/path of the intake-esm catalog to match. + project : str, optional + The current project name in the configuration hierarchy. + + Returns + ------- + tuple + A tuple containing: + - dict: Facet mapping between ESMValCore facets and catalog columns + - str: The project name associated with the catalog file + """ + if fhandle is None: + raise ValueError( + "Unable to ascertain facets without valid file handle." + ) + + for _project, val in cfg.items(): + if not (isinstance(val, list)): + return _read_facets(val, fhandle, project or _project) + for facet_info in val: + file, facets = facet_info.get("file"), facet_info.get("facets") + if file == fhandle: + return facets, project # type: ignore[return-value] + else: + raise ValueError( + f"No facets found for {fhandle} in the config file. " + "Please check the config file and ensure it is valid." + ) diff --git a/esmvalcore/data/__init__.py b/esmvalcore/data/__init__.py new file mode 100644 index 0000000000..0001840cab --- /dev/null +++ b/esmvalcore/data/__init__.py @@ -0,0 +1,10 @@ +"""Find files using an intake-esm catalog and load them.""" + +from .intake._intake_dataset import clear_catalog_cache, load_catalogs +from .intake._interface import merge_intake_search_history + +__all__ = [ + "load_catalogs", + "clear_catalog_cache", + "merge_intake_search_history", +] diff --git a/esmvalcore/data/intake/_intake_dataset.py b/esmvalcore/data/intake/_intake_dataset.py new file mode 100644 index 0000000000..d046ed3bc0 --- /dev/null +++ b/esmvalcore/data/intake/_intake_dataset.py @@ -0,0 +1,132 @@ +"""Import datasets using Intake-ESM.""" + +import logging +from pathlib import Path +from typing import Any, Sequence + +import intake +import intake_esm + +from esmvalcore.config._intake import get_intake_config +from esmvalcore.local import LocalFile + +__all__ = ["load_catalogs", "clear_catalog_cache"] + +logger = logging.getLogger(__name__) + +_CACHE: dict[Path, intake_esm.core.esm_datastore] = {} + + +def clear_catalog_cache() -> None: + """Clear the catalog cache.""" + _CACHE.clear() + + +def load_catalogs( + project: str, drs: dict[str, Any] +) -> tuple[list[intake_esm.core.esm_datastore], list[dict[str, str]]]: + """Load all intake-esm catalogs for a project and their associated facet mappings. + + Parameters + ---------- + project : str + The project name, eg. 'CMIP6'. + drs : dict + The DRS configuration. Can be obtained from the global configuration drs + field, eg. CFG['drs']. + + Returns + ------- + intake_esm.core.esm_datastore + The catalog. + dict + The facet mapping - a dictionary mapping ESMVlCore dataset facet names + to the fields in the intake-esm datastore. + """ + catalog_info: dict[str, Any] = ( + get_intake_config().get(project, {}).get("catalogs", {}) + ) + + site = drs.get(project, "default") + if site not in catalog_info: + return [None], [{}] + + catalog_urls = [ + Path(catalog.get("file")).expanduser() + for catalog in catalog_info[site] + ] + facet_list = [catalog.get("facets") for catalog in catalog_info[site]] + + for catalog_url in catalog_urls: + if catalog_url not in _CACHE: + logger.info( + "Loading intake-esm catalog (this may take some time): %s", + catalog_url, + ) + _CACHE[catalog_url] = intake.open_esm_datastore(catalog_url) + logger.info("Successfully loaded catalog %s", catalog_url) + + return ([_CACHE[cat_url] for cat_url in catalog_urls], facet_list) + + +def find_files( + *, project: str, drs: dict, facets: dict +) -> Sequence[LocalFile]: + """Find files for variable in all intake-esm catalogs associated with a project. + + Parameters + ---------- + facet_map : dict + A dict mapping the variable names used to initialise the IntakeDataset + object to their ESMValCore facet names. For example, + ``` + ACCESS_ESM1_5 = IntakeDataset( + short_name='tos', + project='CMIP6', + ) + ``` + would result in a variable dict of {'short_name': 'tos', 'project': 'CMIP6'}. + drs : dict + The DRS configuration. Can be obtained from the global configuration drs + field, eg. CFG['drs']. + """ + catalogs, facet_maps = load_catalogs(project, drs) + + if not catalogs: + return [] + + files = [] + + for catalog, facet_map in zip(catalogs, facet_maps, strict=False): + query = {facet_map.get(key): val for key, val in facets.items()} + query.pop(None, None) + + _unused_facets = { + key: val for key, val in facets.items() if key not in facet_map + } + + logger.info( + "Unable to refine datastore search on catalog %s with the following facets %s", + catalog.esmcat.catalog_file, + _unused_facets, + ) + + selection = catalog.search(**query) + + if not selection: + continue + + # Select latest version + if "version" in facet_map and "version" not in facets: + latest_version = max( + selection.unique().version + ) # These are strings - need to double check the sorting here. + query = { + **query, + facet_map["version"]: latest_version, + } + selection = selection.search(**query) + + files += [LocalFile(f) for f in selection.unique().path] + + return files diff --git a/esmvalcore/data/intake/_interface.py b/esmvalcore/data/intake/_interface.py new file mode 100644 index 0000000000..0b257d4773 --- /dev/null +++ b/esmvalcore/data/intake/_interface.py @@ -0,0 +1,54 @@ +import typing + + +def merge_intake_search_history( + search_history: list[dict[str, list[typing.Any]]], +) -> dict[str, typing.Any]: + """Create a facet mapping from an intake-esm search history. + + This takes an intake-esm search history, which typically looks something like + ```python + [ + {'variable_id': ['tos']}, + {'table_id': ['Omon']}, + {'experiment_id': ['historical']}, + {'member_id': ['r1i1p1f1']}, + {'source_id': ['ACCESS-ESM1-5']}, + {'grid_label': ['gn']}, + {'version': ['v.*']}, + ] + ``` + and turns it into something like + ```python + { + 'variable_id': 'tos', + 'table_id': 'Omon', + 'experiment_id': 'historical', + 'member_id': 'r1i1p1f1', + 'source_id': 'ACCESS-ESM1-5', + 'grid_label': 'gn', + 'version': 'v.*', + } + ``` + + Notes + ----- + This function is really quite ugly & could probably be improved. + """ + merged: dict[str, typing.Any] = {} + + for entry in search_history: + for key, value in entry.items(): + if key in merged: + if isinstance(merged[key], list): + merged[key].extend(value) + else: + merged[key] = [merged[key]] + value + else: + merged[key] = value + + for key, val in merged.items(): + if isinstance(val, list) and len(val) == 1: + merged[key] = val[0] + + return merged diff --git a/pyproject.toml b/pyproject.toml index fab2e3be2c..3a995affd4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,6 +45,8 @@ dependencies = [ "fire", "geopy", "humanfriendly", + "intake>=2.0.0", + "intake-esm>=2025.2.3", "iris-grib>=0.20.0", # github.com/ESMValGroup/ESMValCore/issues/2535 "isodate>=0.7.0", "jinja2",