From c1299660a7867db097edf4fb088cd4a9af406cf2 Mon Sep 17 00:00:00 2001 From: Charles Turner Date: Tue, 4 Feb 2025 10:02:48 +0800 Subject: [PATCH 01/17] Add recognised intake-esm datastores on NCI systems to config_developer.yml, skeleton of intake-esm inclusiion following #1218 --- esmvalcore/config-developer.yml | 50 ++++++++++++++++++ esmvalcore/intake/__init__.py | 5 ++ esmvalcore/intake/_data_source.py | 84 +++++++++++++++++++++++++++++++ pyproject.toml | 2 + 4 files changed, 141 insertions(+) create mode 100644 esmvalcore/intake/__init__.py create mode 100644 esmvalcore/intake/_data_source.py diff --git a/esmvalcore/config-developer.yml b/esmvalcore/config-developer.yml index faa009ec8f..40ef86c800 100644 --- a/esmvalcore/config-developer.yml +++ b/esmvalcore/config-developer.yml @@ -38,6 +38,23 @@ CMIP6: SYNDA: '{activity}/{institute}/{dataset}/{exp}/{ensemble}/{mip}/{short_name}/{grid}/{version}' NCI: '{activity}/{institute}/{dataset}/{exp}/{ensemble}/{mip}/{short_name}/{grid}/{version}' input_file: '{short_name}_{mip}_{dataset}_{exp}_{ensemble}_{grid}*.nc' + catalogs: +  NCI: + files: + - /g/data/fs38/catalog/v2/esm/catalog.json + - /g/data/oi10/catalog/v2/esm/catalog.json + facets: + # mapping from recipe facets to intake-esm catalog facets + # TODO: Fix these when Gadi is back up + activity: activity_id + dataset: source_id + ensemble: member_id + exp: experiment_id + grid: grid_label + institute: institution_id + mip: table_id + short_name: variable_id + version: version output_file: '{project}_{dataset}_{mip}_{exp}_{ensemble}_{short_name}_{grid}' cmor_type: 'CMIP6' @@ -56,6 +73,23 @@ CMIP5: SMHI: '{dataset}/{ensemble}/{exp}/{frequency}' SYNDA: '{institute}/{dataset}/{exp}/{frequency}/{modeling_realm}/{mip}/{ensemble}/{version}' input_file: '{short_name}_{mip}_{dataset}_{exp}_{ensemble}*.nc' + catalogs: +  NCI: + files: + - /g/data/al33/catalog/v2/esm/catalog.json + - /g/data/rr3/catalog/v2/esm/catalog.json + facets: + # mapping from recipe facets to intake-esm catalog facets + # TODO: Fix these when Gadi is back up + activity: activity_id + dataset: source_id + ensemble: member_id + exp: experiment_id + grid: grid_label + institute: institution_id + mip: table_id + short_name: variable_id + version: version output_file: '{project}_{dataset}_{mip}_{exp}_{ensemble}_{short_name}' CMIP3: @@ -156,6 +190,22 @@ CORDEX: ESGF: '{project.lower}/output/{domain}/{institute}/{driver}/{exp}/{ensemble}/{dataset}/{rcm_version}/{frequency}/{short_name}/{version}' SYNDA: '{domain}/{institute}/{driver}/{exp}/{ensemble}/{dataset}/{rcm_version}/{frequency}/{short_name}/{version}' input_file: '{short_name}_{domain}_{driver}_{exp}_{ensemble}_{institute}-{dataset}_{rcm_version}_{mip}*.nc' + catalogs: +  NCI: + files: + - /g/data/oi10/catalog/v2/esm/catalog.json + facets: + # mapping from recipe facets to intake-esm catalog facets + # TODO: Fix these when Gadi is back up + activity: activity_id + dataset: source_id + ensemble: member_id + exp: experiment_id + grid: grid_label + institute: institution_id + mip: table_id + short_name: variable_id + version: version output_file: '{project}_{institute}_{dataset}_{rcm_version}_{driver}_{domain}_{mip}_{exp}_{ensemble}_{short_name}' cmor_type: 'CMIP5' cmor_path: 'cordex' diff --git a/esmvalcore/intake/__init__.py b/esmvalcore/intake/__init__.py new file mode 100644 index 0000000000..6c18ace9b6 --- /dev/null +++ b/esmvalcore/intake/__init__.py @@ -0,0 +1,5 @@ +"""Find files using an intake-esm catalog and load them.""" + +from .data_source import IntakeDataSource + +__all__ = ["IntakeDataSource"] diff --git a/esmvalcore/intake/_data_source.py b/esmvalcore/intake/_data_source.py new file mode 100644 index 0000000000..2b3e6531e7 --- /dev/null +++ b/esmvalcore/intake/_data_source.py @@ -0,0 +1,84 @@ +import logging +from pathlib import Path + +import intake +import intake_esm + +from ..config._config import get_project_config +from ..local import DataSource, _select_files + +__all__ = ["IntakeDataSource", "get_project_config", "_select_files"] + +logger = logging.getLogger(__name__) + +_CACHE: dict[Path, intake_esm.core.esm_datastore] = {} + + +def clear_catalog_cache(): + """Clear the catalog cache.""" + _CACHE.clear() + + +def load_catalog(project, drs): + """Load an intake-esm catalog and associated facet mapping.""" + catalog_info = get_project_config(project).get("catalogs", {}) + site = drs.get(project, "default") + if site not in catalog_info: + return None, {} + + catalog_url = Path(catalog_info[site]["file"]).expanduser() + + if catalog_url not in _CACHE: + logger.info( + "Loading intake-esm catalog (this may take some time): %s", + catalog_url, + ) + _CACHE[catalog_url] = intake.open_esm_datastore(catalog_url) + logger.info("Done loading catalog") + + catalog = _CACHE[catalog_url] + facets = catalog_info[site]["facets"] + return catalog, facets + + +def find_files(variable, drs): + """Find files for variable in intake-esm catalog.""" + catalog, facets = load_catalog(variable["project"], drs) + if not catalog: + return [] + + query = {} + for ours, theirs in facets.items(): + if ours == "version" and "version" not in variable: + # skip version if not specified in recipe + continue + query[theirs] = variable[ours] + + selection = catalog.search(**query) + + # Select latest version + if "version" not in variable and "version" in facets: + latest_version = selection.df[facets["version"]].max() + variable["version"] = latest_version + query = { + facets["version"]: latest_version, + } + selection = selection.search(**query) + + filenames = list(selection.df["path"]) + + # Select only files within the time range + filenames = _select_files( + filenames, variable["start_year"], variable["end_year"] + ) + + return filenames + + +class IntakeDataSource(DataSource): + """ + Class to handle loading data using Intake-esm. + """ + + def __init__(): + pass diff --git a/pyproject.toml b/pyproject.toml index 4ce0f4c303..1c8d72dba9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,6 +45,8 @@ dependencies = [ "fire", "geopy", "humanfriendly", + "intake>=0.7.0", + "intake-esm>=2024.2.6", "iris-grib>=0.20.0", # github.com/ESMValGroup/ESMValCore/issues/2535 "isodate>=0.7.0", "jinja2", From b1b76fb6d40c7c5df0e15f99a1f213eb096d9969 Mon Sep 17 00:00:00 2001 From: Charles Turner Date: Wed, 5 Feb 2025 12:21:38 +0800 Subject: [PATCH 02/17] Skeleton --- esmvalcore/intake/_data_source.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/esmvalcore/intake/_data_source.py b/esmvalcore/intake/_data_source.py index 2b3e6531e7..b0425a7272 100644 --- a/esmvalcore/intake/_data_source.py +++ b/esmvalcore/intake/_data_source.py @@ -19,7 +19,7 @@ def clear_catalog_cache(): _CACHE.clear() -def load_catalog(project, drs): +def load_catalog(project, drs) -> tuple[intake_esm.core.esm_datastore, dict]: """Load an intake-esm catalog and associated facet mapping.""" catalog_info = get_project_config(project).get("catalogs", {}) site = drs.get(project, "default") @@ -80,5 +80,11 @@ class IntakeDataSource(DataSource): Class to handle loading data using Intake-esm. """ - def __init__(): + def __init__(self, file, **facets): pass + + def get_glob_patterns(self, **facets): + return super().get_glob_patterns(**facets) + + def find_files(self, **facets): + return super().find_files(**facets) From dd73d1dd88ddf6efa56cb0ca41aa15b71bfb871c Mon Sep 17 00:00:00 2001 From: Charles Turner Date: Wed, 5 Feb 2025 19:57:50 +1100 Subject: [PATCH 03/17] Playing around --- esmvalcore/dataset.py | 40 +++++++++++++++++++++++++++++++++++ esmvalcore/intake/__init__.py | 2 +- 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/esmvalcore/dataset.py b/esmvalcore/dataset.py index 6717485e38..2c39dda1cb 100644 --- a/esmvalcore/dataset.py +++ b/esmvalcore/dataset.py @@ -35,6 +35,8 @@ from esmvalcore.preprocessor import preprocess from esmvalcore.typing import Facets, FacetValue +from .intake import IntakeDataSource + __all__ = [ "Dataset", "INHERITED_FACETS", @@ -156,6 +158,44 @@ def from_recipe( return datasets_from_recipe(recipe, session) + def _from_catalog( + self, + ) + """Load dataset from an intake-esm catalog. + + Raises + ------ + InputFilesNotFound + When no files were found. + + Returns + ------- + iris.cube.Cube + An :mod:`iris` cube with the data corresponding the the dataset. + """ + input_files = list(self.files) + for supplementary_dataset in self.supplementaries: + input_files.extend(supplementary_dataset.files) + esgf.download(input_files, self.session["download_dir"]) + + cube = self._load() + supplementary_cubes = [] + for supplementary_dataset in self.supplementaries: + supplementary_cube = supplementary_dataset._load() + supplementary_cubes.append(supplementary_cube) + + output_file = _get_output_file(self.facets, self.session.preproc_dir) + cubes = preprocess( + [cube], + "add_supplementary_variables", + input_files=input_files, + output_file=output_file, + debug=self.session["save_intermediary_cubes"], + supplementary_cubes=supplementary_cubes, + ) + + return cubes[0] + def _file_to_dataset( self, file: esgf.ESGFFile | local.LocalFile, diff --git a/esmvalcore/intake/__init__.py b/esmvalcore/intake/__init__.py index 6c18ace9b6..33f8fd5354 100644 --- a/esmvalcore/intake/__init__.py +++ b/esmvalcore/intake/__init__.py @@ -1,5 +1,5 @@ """Find files using an intake-esm catalog and load them.""" -from .data_source import IntakeDataSource +from ._data_source import IntakeDataSource __all__ = ["IntakeDataSource"] From ed1676b25b46826ea40044ae22568306e8fdee6e Mon Sep 17 00:00:00 2001 From: Charles Turner Date: Wed, 12 Feb 2025 19:51:51 +1100 Subject: [PATCH 04/17] Almost at a working IntakeDataset.load() --- esmvalcore/config-developer.yml | 8 +- esmvalcore/dataset.py | 38 -------- esmvalcore/intake/__init__.py | 4 +- esmvalcore/intake/_data_source.py | 90 ------------------ esmvalcore/intake/_dataset.py | 147 ++++++++++++++++++++++++++++++ 5 files changed, 153 insertions(+), 134 deletions(-) delete mode 100644 esmvalcore/intake/_data_source.py create mode 100644 esmvalcore/intake/_dataset.py diff --git a/esmvalcore/config-developer.yml b/esmvalcore/config-developer.yml index 40ef86c800..69e8e1e09c 100644 --- a/esmvalcore/config-developer.yml +++ b/esmvalcore/config-developer.yml @@ -39,10 +39,10 @@ CMIP6: NCI: '{activity}/{institute}/{dataset}/{exp}/{ensemble}/{mip}/{short_name}/{grid}/{version}' input_file: '{short_name}_{mip}_{dataset}_{exp}_{ensemble}_{grid}*.nc' catalogs: -  NCI: - files: - - /g/data/fs38/catalog/v2/esm/catalog.json - - /g/data/oi10/catalog/v2/esm/catalog.json + NCI: + file: + /g/data/fs38/catalog/v2/esm/catalog.json + # - /g/data/oi10/catalog/v2/esm/catalog.json facets: # mapping from recipe facets to intake-esm catalog facets # TODO: Fix these when Gadi is back up diff --git a/esmvalcore/dataset.py b/esmvalcore/dataset.py index 2c39dda1cb..7e5494b92e 100644 --- a/esmvalcore/dataset.py +++ b/esmvalcore/dataset.py @@ -35,7 +35,6 @@ from esmvalcore.preprocessor import preprocess from esmvalcore.typing import Facets, FacetValue -from .intake import IntakeDataSource __all__ = [ "Dataset", @@ -158,43 +157,6 @@ def from_recipe( return datasets_from_recipe(recipe, session) - def _from_catalog( - self, - ) - """Load dataset from an intake-esm catalog. - - Raises - ------ - InputFilesNotFound - When no files were found. - - Returns - ------- - iris.cube.Cube - An :mod:`iris` cube with the data corresponding the the dataset. - """ - input_files = list(self.files) - for supplementary_dataset in self.supplementaries: - input_files.extend(supplementary_dataset.files) - esgf.download(input_files, self.session["download_dir"]) - - cube = self._load() - supplementary_cubes = [] - for supplementary_dataset in self.supplementaries: - supplementary_cube = supplementary_dataset._load() - supplementary_cubes.append(supplementary_cube) - - output_file = _get_output_file(self.facets, self.session.preproc_dir) - cubes = preprocess( - [cube], - "add_supplementary_variables", - input_files=input_files, - output_file=output_file, - debug=self.session["save_intermediary_cubes"], - supplementary_cubes=supplementary_cubes, - ) - - return cubes[0] def _file_to_dataset( self, diff --git a/esmvalcore/intake/__init__.py b/esmvalcore/intake/__init__.py index 33f8fd5354..a153c62bae 100644 --- a/esmvalcore/intake/__init__.py +++ b/esmvalcore/intake/__init__.py @@ -1,5 +1,5 @@ """Find files using an intake-esm catalog and load them.""" -from ._data_source import IntakeDataSource +from ._dataset import IntakeDataset, find_files, load_catalog -__all__ = ["IntakeDataSource"] +__all__ = ["IntakeDataset", "find_files", "load_catalog"] diff --git a/esmvalcore/intake/_data_source.py b/esmvalcore/intake/_data_source.py deleted file mode 100644 index b0425a7272..0000000000 --- a/esmvalcore/intake/_data_source.py +++ /dev/null @@ -1,90 +0,0 @@ -import logging -from pathlib import Path - -import intake -import intake_esm - -from ..config._config import get_project_config -from ..local import DataSource, _select_files - -__all__ = ["IntakeDataSource", "get_project_config", "_select_files"] - -logger = logging.getLogger(__name__) - -_CACHE: dict[Path, intake_esm.core.esm_datastore] = {} - - -def clear_catalog_cache(): - """Clear the catalog cache.""" - _CACHE.clear() - - -def load_catalog(project, drs) -> tuple[intake_esm.core.esm_datastore, dict]: - """Load an intake-esm catalog and associated facet mapping.""" - catalog_info = get_project_config(project).get("catalogs", {}) - site = drs.get(project, "default") - if site not in catalog_info: - return None, {} - - catalog_url = Path(catalog_info[site]["file"]).expanduser() - - if catalog_url not in _CACHE: - logger.info( - "Loading intake-esm catalog (this may take some time): %s", - catalog_url, - ) - _CACHE[catalog_url] = intake.open_esm_datastore(catalog_url) - logger.info("Done loading catalog") - - catalog = _CACHE[catalog_url] - facets = catalog_info[site]["facets"] - return catalog, facets - - -def find_files(variable, drs): - """Find files for variable in intake-esm catalog.""" - catalog, facets = load_catalog(variable["project"], drs) - if not catalog: - return [] - - query = {} - for ours, theirs in facets.items(): - if ours == "version" and "version" not in variable: - # skip version if not specified in recipe - continue - query[theirs] = variable[ours] - - selection = catalog.search(**query) - - # Select latest version - if "version" not in variable and "version" in facets: - latest_version = selection.df[facets["version"]].max() - variable["version"] = latest_version - query = { - facets["version"]: latest_version, - } - selection = selection.search(**query) - - filenames = list(selection.df["path"]) - - # Select only files within the time range - filenames = _select_files( - filenames, variable["start_year"], variable["end_year"] - ) - - return filenames - - -class IntakeDataSource(DataSource): - """ - Class to handle loading data using Intake-esm. - """ - - def __init__(self, file, **facets): - pass - - def get_glob_patterns(self, **facets): - return super().get_glob_patterns(**facets) - - def find_files(self, **facets): - return super().find_files(**facets) diff --git a/esmvalcore/intake/_dataset.py b/esmvalcore/intake/_dataset.py new file mode 100644 index 0000000000..0edd7ad564 --- /dev/null +++ b/esmvalcore/intake/_dataset.py @@ -0,0 +1,147 @@ +import logging +from pathlib import Path +from typing import Sequence + +import intake +import intake_esm + +from esmvalcore.config._config import get_project_config +from esmvalcore.config import CFG, Config, Session +from ..cmor.table import VariableInfo +from esmvalcore.dataset import Dataset, File +from esmvalcore.typing import Facets, FacetValue + +__all__ = ["IntakeDataset", "get_project_config", "_select_files"] + +logger = logging.getLogger(__name__) + +_CACHE: dict[Path, intake_esm.core.esm_datastore] = {} + + + +def clear_catalog_cache(): + """Clear the catalog cache.""" + _CACHE.clear() + + +def load_catalog(project, drs) -> tuple[intake_esm.core.esm_datastore, dict]: + """Load an intake-esm catalog and associated facet mapping. + + Parameters + ---------- + project : str + The project name, eg. 'CMIP6'. + drs : dict + The DRS configuration. Can be obtained from the global configuration drs + field, eg. CFG['drs']. + + Returns + ------- + intake_esm.core.esm_datastore + The catalog. + dict + The facet mapping - a dictionarry mapping ESMVlCore dataset facet names + to the fields in the intake-esm datastore. + """ + catalog_info = get_project_config(project).get("catalogs", {}) + site = drs.get(project, "default") + if site not in catalog_info: + return None, {} + + catalog_url = Path(catalog_info[site]["file"]).expanduser() + + if catalog_url not in _CACHE: + logger.info( + "Loading intake-esm catalog (this may take some time): %s", + catalog_url, + ) + _CACHE[catalog_url] = intake.open_esm_datastore(catalog_url) + logger.info("Done loading catalog") + + catalog = _CACHE[catalog_url] + facets = catalog_info[site]["facets"] + return catalog, facets + + +def find_files(variable : dict , drs): + """Find files for variable in intake-esm catalog. + + Parameters + ---------- + variable : dict + A dict mapping the variable names used to initialise the IntakeDataset + object to their ESMValCore facet names. For example, + ``` + ACCESS_ESM1_5 = IntakeDataset( + short_name='tos', + project='CMIP6', + ) + ``` + would result in a variable dict of {'short_name': 'tos', 'project': 'CMIP6'}. + drs : dict + The DRS configuration. Can be obtained from the global configuration drs + field, eg. CFG['drs']. + + """ + catalog, facets = load_catalog(variable["project"], drs) + if not catalog: + return [] + + query = {val : variable.get(key) for key, val in facets.items()} + query = {key : val for key, val in query.items() if val is not None} + """ + for ours, theirs in facets.items(): + if ours == 'version' and 'version' not in variable: + # skip version if not specified in recipe + continue + query[theirs] = variable[ours] + + ^ Check this logic is retained + """ + + unmapped = {key: val for key, val in variable.items() if key not in facets} + unmapped.pop("project", None) + + selection = catalog.search(**query) + + # Select latest version + if 'version' in facets and 'version' not in variable: + latest_version = selection.df[facets['version']].max() # These are strings - so wtf doe this mean? + variable['version'] = latest_version + query = { + facets['version']: latest_version, + } + selection = selection.search(**query) + + # Can we do this without all the instantiations into a dataframe? I think so + return selection.df['path'].tolist() + + + +class IntakeDataset(Dataset): + """ + Class to handle loading data using Intake-esm. + """ + + def __init__(self, **facets): + project = facets["project"] + self.catalog, self._facets = load_catalog(project, CFG["drs"]) + super().__init__(**facets) + + @property + def files(self) -> Sequence[File]: + if self._files is None: + self._files = find_files(self.facets, CFG["drs"]) + return self._files + + def load(): + """ + Load the dataset. + """ + super().load() + + + + + + From fa1ea2eac1d68935cebd68291b65559efbe671d4 Mon Sep 17 00:00:00 2001 From: Charles Turner Date: Tue, 25 Feb 2025 14:47:20 +1100 Subject: [PATCH 05/17] Working intake-esm implementation - probably still some kinks to iron out --- esmvalcore/intake/__init__.py | 4 +- esmvalcore/intake/_dataset.py | 146 ++++++++++++++++++++-------------- 2 files changed, 88 insertions(+), 62 deletions(-) diff --git a/esmvalcore/intake/__init__.py b/esmvalcore/intake/__init__.py index a153c62bae..916b475e96 100644 --- a/esmvalcore/intake/__init__.py +++ b/esmvalcore/intake/__init__.py @@ -1,5 +1,5 @@ """Find files using an intake-esm catalog and load them.""" -from ._dataset import IntakeDataset, find_files, load_catalog +from ._dataset import IntakeDataset, load_catalog -__all__ = ["IntakeDataset", "find_files", "load_catalog"] +__all__ = ["IntakeDataset", "load_catalog"] diff --git a/esmvalcore/intake/_dataset.py b/esmvalcore/intake/_dataset.py index 0edd7ad564..c41222d664 100644 --- a/esmvalcore/intake/_dataset.py +++ b/esmvalcore/intake/_dataset.py @@ -1,13 +1,18 @@ import logging +import datetime from pathlib import Path from typing import Sequence +import isodate import intake import intake_esm +import iris +from ncdata.iris_xarray import cubes_from_xarray from esmvalcore.config._config import get_project_config from esmvalcore.config import CFG, Config, Session -from ..cmor.table import VariableInfo +from esmvalcore.preprocessor import clip_timerange, fix_metadata +from esmvalcore.cmor.table import VariableInfo from esmvalcore.dataset import Dataset, File from esmvalcore.typing import Facets, FacetValue @@ -63,60 +68,6 @@ def load_catalog(project, drs) -> tuple[intake_esm.core.esm_datastore, dict]: return catalog, facets -def find_files(variable : dict , drs): - """Find files for variable in intake-esm catalog. - - Parameters - ---------- - variable : dict - A dict mapping the variable names used to initialise the IntakeDataset - object to their ESMValCore facet names. For example, - ``` - ACCESS_ESM1_5 = IntakeDataset( - short_name='tos', - project='CMIP6', - ) - ``` - would result in a variable dict of {'short_name': 'tos', 'project': 'CMIP6'}. - drs : dict - The DRS configuration. Can be obtained from the global configuration drs - field, eg. CFG['drs']. - - """ - catalog, facets = load_catalog(variable["project"], drs) - if not catalog: - return [] - - query = {val : variable.get(key) for key, val in facets.items()} - query = {key : val for key, val in query.items() if val is not None} - """ - for ours, theirs in facets.items(): - if ours == 'version' and 'version' not in variable: - # skip version if not specified in recipe - continue - query[theirs] = variable[ours] - - ^ Check this logic is retained - """ - - unmapped = {key: val for key, val in variable.items() if key not in facets} - unmapped.pop("project", None) - - selection = catalog.search(**query) - - # Select latest version - if 'version' in facets and 'version' not in variable: - latest_version = selection.df[facets['version']].max() # These are strings - so wtf doe this mean? - variable['version'] = latest_version - query = { - facets['version']: latest_version, - } - selection = selection.search(**query) - - # Can we do this without all the instantiations into a dataframe? I think so - return selection.df['path'].tolist() - - class IntakeDataset(Dataset): """ @@ -131,17 +82,92 @@ def __init__(self, **facets): @property def files(self) -> Sequence[File]: if self._files is None: - self._files = find_files(self.facets, CFG["drs"]) + self._files = self._find_files(self.facets, CFG["drs"]) return self._files - def load(): + def load(self) -> iris.cube.Cube: """ - Load the dataset. + Load the dataset. We should be able to do this by just querying for catalog + for the files in self.files and then loading them into an xarray dataset. + This isthen converted to an iris cube. """ - super().load() + cat_subset = self.catalog.search(path=self.files) + if len(cat_subset) > 1: + raise ValueError("Multiple datasets found for the same query") + elif len(cat_subset) == 0: + raise ValueError("No datasets found for the query") + + ds = cat_subset.to_dask() + da = ds[self.facets["short_name"]] + cube = da.to_iris() + + extra_facets = {k : v for k, v in self.facets.items() if k not in ["short_name", "project", "dataset","mip","frequency"]} + cube = fix_metadata((cube,), + short_name = self.facets["short_name"], + project = self.facets["project"], + dataset = self.facets["dataset"], + mip = self.facets["mip"], + frequency = self.facets.get("frequency"), + **extra_facets + )[0] + + return clip_timerange(cube, self.facets["timerange"]) + + + def _find_files(self, variable : dict , drs) -> Sequence[File]: + """Find files for variable in intake-esm catalog. + + As a side effect, sets the unmapped_facets attribute - this is used to + cache facets which are not in the datastore. + + Parameters + ---------- + variable : dict + A dict mapping the variable names used to initialise the IntakeDataset + object to their ESMValCore facet names. For example, + ``` + ACCESS_ESM1_5 = IntakeDataset( + short_name='tos', + project='CMIP6', + ) + ``` + would result in a variable dict of {'short_name': 'tos', 'project': 'CMIP6'}. + drs : dict + The DRS configuration. Can be obtained from the global configuration drs + field, eg. CFG['drs']. + """ + catalog, facets = load_catalog(variable["project"], drs) + if not catalog: + return [] + query = {val : variable.get(key) for key, val in facets.items()} + query = {key : val for key, val in query.items() if val is not None} + """ + for ours, theirs in facets.items(): + if ours == 'version' and 'version' not in variable: + # skip version if not specified in recipe + continue + query[theirs] = variable[ours] - + ^ Check this logic is retained + """ + + unmapped = {key: val for key, val in variable.items() if key not in facets} + unmapped.pop("project", None) + + self._unmapped_facets = unmapped + + selection = catalog.search(**query) + + # Select latest version + if 'version' in facets and 'version' not in variable: + latest_version = max(selection.unique().version) # These are strings - so wtf doe this mean? + variable['version'] = latest_version + query = { + facets['version']: latest_version, + } + selection = selection.search(**query) + return selection.unique().path From 648f119eb9958d421b3102d12e2d127c3c6d778c Mon Sep 17 00:00:00 2001 From: Charles Turner Date: Wed, 12 Mar 2025 15:59:54 +1100 Subject: [PATCH 06/17] Working with multiple catalogues per project --- esmvalcore/config-developer.yml | 41 +++++---- esmvalcore/dataset.py | 2 +- esmvalcore/intake/__init__.py | 4 +- esmvalcore/intake/_dataset.py | 148 ++++++++++++++------------------ 4 files changed, 93 insertions(+), 102 deletions(-) diff --git a/esmvalcore/config-developer.yml b/esmvalcore/config-developer.yml index 69e8e1e09c..4cb8bab9aa 100644 --- a/esmvalcore/config-developer.yml +++ b/esmvalcore/config-developer.yml @@ -40,21 +40,32 @@ CMIP6: input_file: '{short_name}_{mip}_{dataset}_{exp}_{ensemble}_{grid}*.nc' catalogs: NCI: - file: - /g/data/fs38/catalog/v2/esm/catalog.json - # - /g/data/oi10/catalog/v2/esm/catalog.json - facets: - # mapping from recipe facets to intake-esm catalog facets - # TODO: Fix these when Gadi is back up - activity: activity_id - dataset: source_id - ensemble: member_id - exp: experiment_id - grid: grid_label - institute: institution_id - mip: table_id - short_name: variable_id - version: version + - file: + /g/data/fs38/catalog/v2/esm/catalog.json + facets: + activity: activity_id + dataset: source_id + ensemble: member_id + exp: experiment_id + grid: grid_label + institute: institution_id + mip: table_id + short_name: variable_id + version: version + frequency: frequency + - file: + /g/data/oi10/catalog/v2/esm/catalog.json + facets: + activity: activity_id + dataset: source_id + ensemble: member_id + exp: experiment_id + grid: grid_label + institute: institution_id + mip: table_id + short_name: variable_id + version: version + frequency: frequency output_file: '{project}_{dataset}_{mip}_{exp}_{ensemble}_{short_name}_{grid}' cmor_type: 'CMIP6' diff --git a/esmvalcore/dataset.py b/esmvalcore/dataset.py index 7e5494b92e..ae2c272699 100644 --- a/esmvalcore/dataset.py +++ b/esmvalcore/dataset.py @@ -771,7 +771,7 @@ def _load(self) -> Cube: "check_level": self.session["check_level"], "cmor_table": self.facets["project"], "mip": self.facets["mip"], - "frequency": self.facets["frequency"], + "frequency": self.facets["frequency"], # Error here "short_name": self.facets["short_name"], } if "timerange" in self.facets: diff --git a/esmvalcore/intake/__init__.py b/esmvalcore/intake/__init__.py index 916b475e96..e3033ab87c 100644 --- a/esmvalcore/intake/__init__.py +++ b/esmvalcore/intake/__init__.py @@ -1,5 +1,5 @@ """Find files using an intake-esm catalog and load them.""" -from ._dataset import IntakeDataset, load_catalog +from ._dataset import IntakeDataset, load_catalogs -__all__ = ["IntakeDataset", "load_catalog"] +__all__ = ["IntakeDataset", "load_catalogs"] diff --git a/esmvalcore/intake/_dataset.py b/esmvalcore/intake/_dataset.py index c41222d664..a8cf2fe894 100644 --- a/esmvalcore/intake/_dataset.py +++ b/esmvalcore/intake/_dataset.py @@ -1,20 +1,20 @@ import logging -import datetime from pathlib import Path from typing import Sequence -import isodate +# import isodate import intake import intake_esm import iris -from ncdata.iris_xarray import cubes_from_xarray +# from ncdata.iris_xarray import cubes_from_xarray from esmvalcore.config._config import get_project_config -from esmvalcore.config import CFG, Config, Session +from esmvalcore.config import CFG from esmvalcore.preprocessor import clip_timerange, fix_metadata -from esmvalcore.cmor.table import VariableInfo +# from esmvalcore.cmor.table import VariableInfo from esmvalcore.dataset import Dataset, File -from esmvalcore.typing import Facets, FacetValue +from esmvalcore.local import LocalFile +# from esmvalcore.typing import Facets, FacetValue __all__ = ["IntakeDataset", "get_project_config", "_select_files"] @@ -29,23 +29,23 @@ def clear_catalog_cache(): _CACHE.clear() -def load_catalog(project, drs) -> tuple[intake_esm.core.esm_datastore, dict]: - """Load an intake-esm catalog and associated facet mapping. +def load_catalogs(project, drs) -> tuple[list[intake_esm.core.esm_datastore], list[dict]]: + """Load all intake-esm catalogs for a project and their associated facet mappings. Parameters ---------- project : str The project name, eg. 'CMIP6'. drs : dict - The DRS configuration. Can be obtained from the global configuration drs - field, eg. CFG['drs']. + The DRS configuration. Can be obtained from the global configuration drs + field, eg. CFG['drs']. Returns ------- intake_esm.core.esm_datastore The catalog. dict - The facet mapping - a dictionarry mapping ESMVlCore dataset facet names + The facet mapping - a dictionary mapping ESMVlCore dataset facet names to the fields in the intake-esm datastore. """ catalog_info = get_project_config(project).get("catalogs", {}) @@ -53,19 +53,25 @@ def load_catalog(project, drs) -> tuple[intake_esm.core.esm_datastore, dict]: if site not in catalog_info: return None, {} - catalog_url = Path(catalog_info[site]["file"]).expanduser() - - if catalog_url not in _CACHE: - logger.info( - "Loading intake-esm catalog (this may take some time): %s", - catalog_url, - ) - _CACHE[catalog_url] = intake.open_esm_datastore(catalog_url) - logger.info("Done loading catalog") + catalog_urls = [ + Path(catalog.get('file')).expanduser() for catalog in catalog_info[site] + ] + facet_list = [ + catalog.get('facets') for catalog in catalog_info[site] + ] + + for catalog_url in catalog_urls: + if catalog_url not in _CACHE: + logger.info( + "Loading intake-esm catalog (this may take some time): %s", + catalog_url, + ) + _CACHE[catalog_url] = intake.open_esm_datastore(catalog_url) + logger.info("Successfully loaded catalog %s", catalog_url) - catalog = _CACHE[catalog_url] - facets = catalog_info[site]["facets"] - return catalog, facets + return ( + [_CACHE[cat_url] for cat_url in catalog_urls], facet_list + ) @@ -76,7 +82,7 @@ class IntakeDataset(Dataset): def __init__(self, **facets): project = facets["project"] - self.catalog, self._facets = load_catalog(project, CFG["drs"]) + self.catalog, self._facets = load_catalogs(project, CFG["drs"]) super().__init__(**facets) @property @@ -85,47 +91,23 @@ def files(self) -> Sequence[File]: self._files = self._find_files(self.facets, CFG["drs"]) return self._files - def load(self) -> iris.cube.Cube: - """ - Load the dataset. We should be able to do this by just querying for catalog - for the files in self.files and then loading them into an xarray dataset. - This isthen converted to an iris cube. - """ - cat_subset = self.catalog.search(path=self.files) - if len(cat_subset) > 1: - raise ValueError("Multiple datasets found for the same query") - elif len(cat_subset) == 0: - raise ValueError("No datasets found for the query") - - ds = cat_subset.to_dask() - da = ds[self.facets["short_name"]] - cube = da.to_iris() - - extra_facets = {k : v for k, v in self.facets.items() if k not in ["short_name", "project", "dataset","mip","frequency"]} - cube = fix_metadata((cube,), - short_name = self.facets["short_name"], - project = self.facets["project"], - dataset = self.facets["dataset"], - mip = self.facets["mip"], - frequency = self.facets.get("frequency"), - **extra_facets - )[0] - - return clip_timerange(cube, self.facets["timerange"]) - + @property + def filenames(self) -> Sequence[str]: + return [str(f) for f in self.files] def _find_files(self, variable : dict , drs) -> Sequence[File]: - """Find files for variable in intake-esm catalog. + """Find files for variable in all intake-esm catalogs associated with a + project. - As a side effect, sets the unmapped_facets attribute - this is used to + As a side effect, sets the unmapped_facets attribute - this is used to cache facets which are not in the datastore. - + Parameters ---------- variable : dict - A dict mapping the variable names used to initialise the IntakeDataset - object to their ESMValCore facet names. For example, + A dict mapping the variable names used to initialise the IntakeDataset + object to their ESMValCore facet names. For example, ``` ACCESS_ESM1_5 = IntakeDataset( short_name='tos', @@ -134,40 +116,38 @@ def _find_files(self, variable : dict , drs) -> Sequence[File]: ``` would result in a variable dict of {'short_name': 'tos', 'project': 'CMIP6'}. drs : dict - The DRS configuration. Can be obtained from the global configuration drs + The DRS configuration. Can be obtained from the global configuration drs field, eg. CFG['drs']. - + """ - catalog, facets = load_catalog(variable["project"], drs) - if not catalog: + catalogs, facets_list = load_catalogs(variable["project"], drs) + if not catalogs: return [] - query = {val : variable.get(key) for key, val in facets.items()} - query = {key : val for key, val in query.items() if val is not None} - """ - for ours, theirs in facets.items(): - if ours == 'version' and 'version' not in variable: - # skip version if not specified in recipe - continue - query[theirs] = variable[ours] + files = [] - ^ Check this logic is retained - """ + for catalog, facets in zip(catalogs, facets_list): + query = {val : variable.get(key) for key, val in facets.items()} + query = {key : val for key, val in query.items() if val is not None} + + unmapped = {key: val for key, val in variable.items() if key not in facets} + unmapped.pop("project", None) + + self._unmapped_facets = unmapped - unmapped = {key: val for key, val in variable.items() if key not in facets} - unmapped.pop("project", None) + selection = catalog.search(**query) - self._unmapped_facets = unmapped + # Select latest version + if 'version' in facets and 'version' not in variable: + latest_version = max(selection.unique().version) # These are strings - so wtf doe this mean? + variable['version'] = latest_version + query = { + facets['version']: latest_version, + } + selection = selection.search(**query) - selection = catalog.search(**query) + files += [LocalFile(f) for f in selection.unique().path] - # Select latest version - if 'version' in facets and 'version' not in variable: - latest_version = max(selection.unique().version) # These are strings - so wtf doe this mean? - variable['version'] = latest_version - query = { - facets['version']: latest_version, - } - selection = selection.search(**query) - return selection.unique().path + self.augment_facets() + return files From 2b91fecce36d10de25d75bdc3870a5208d561255 Mon Sep 17 00:00:00 2001 From: Charles Turner Date: Thu, 13 Mar 2025 09:43:52 +0800 Subject: [PATCH 07/17] Cleanup - mypy & ruff errors --- esmvalcore/config-developer.yml | 43 +++++++++++++-------- esmvalcore/intake/_dataset.py | 68 +++++++++++++++------------------ 2 files changed, 59 insertions(+), 52 deletions(-) diff --git a/esmvalcore/config-developer.yml b/esmvalcore/config-developer.yml index 4cb8bab9aa..046ec34788 100644 --- a/esmvalcore/config-developer.yml +++ b/esmvalcore/config-developer.yml @@ -86,21 +86,34 @@ CMIP5: input_file: '{short_name}_{mip}_{dataset}_{exp}_{ensemble}*.nc' catalogs:  NCI: - files: - - /g/data/al33/catalog/v2/esm/catalog.json - - /g/data/rr3/catalog/v2/esm/catalog.json - facets: - # mapping from recipe facets to intake-esm catalog facets - # TODO: Fix these when Gadi is back up - activity: activity_id - dataset: source_id - ensemble: member_id - exp: experiment_id - grid: grid_label - institute: institution_id - mip: table_id - short_name: variable_id - version: version + - file: + /g/data/rr3/catalog/v2/esm/catalog.json + facets: + # mapping from recipe facets to intake-esm catalog facets + # TODO: Fix these when Gadi is back up + activity: activity_id + dataset: source_id + ensemble: ensemble + exp: experiment + grid: grid_label + institute: institution_id + mip: table_id + short_name: variable + version: version + - file: + /g/data/al33/catalog/v2/esm/catalog.json + facets: + # mapping from recipe facets to intake-esm catalog facets + # TODO: Fix these when Gadi is back up + activity: activity_id + dataset: source_id + ensemble: ensemble + exp: experiment + institute: institute + mip: table + short_name: variable + version: version + timerange: time_range output_file: '{project}_{dataset}_{mip}_{exp}_{ensemble}_{short_name}' CMIP3: diff --git a/esmvalcore/intake/_dataset.py b/esmvalcore/intake/_dataset.py index a8cf2fe894..d924bb0081 100644 --- a/esmvalcore/intake/_dataset.py +++ b/esmvalcore/intake/_dataset.py @@ -1,35 +1,31 @@ import logging from pathlib import Path from typing import Sequence -# import isodate +# import isodate import intake import intake_esm -import iris -# from ncdata.iris_xarray import cubes_from_xarray -from esmvalcore.config._config import get_project_config from esmvalcore.config import CFG -from esmvalcore.preprocessor import clip_timerange, fix_metadata -# from esmvalcore.cmor.table import VariableInfo +from esmvalcore.config._config import get_project_config from esmvalcore.dataset import Dataset, File from esmvalcore.local import LocalFile -# from esmvalcore.typing import Facets, FacetValue -__all__ = ["IntakeDataset", "get_project_config", "_select_files"] +__all__ = ["IntakeDataset", "load_catalogs", "clear_catalog_cache"] logger = logging.getLogger(__name__) _CACHE: dict[Path, intake_esm.core.esm_datastore] = {} - def clear_catalog_cache(): """Clear the catalog cache.""" _CACHE.clear() -def load_catalogs(project, drs) -> tuple[list[intake_esm.core.esm_datastore], list[dict]]: +def load_catalogs( + project, drs +) -> tuple[list[intake_esm.core.esm_datastore], list[dict]]: """Load all intake-esm catalogs for a project and their associated facet mappings. Parameters @@ -51,14 +47,13 @@ def load_catalogs(project, drs) -> tuple[list[intake_esm.core.esm_datastore], li catalog_info = get_project_config(project).get("catalogs", {}) site = drs.get(project, "default") if site not in catalog_info: - return None, {} + return [None], [{}] catalog_urls = [ - Path(catalog.get('file')).expanduser() for catalog in catalog_info[site] - ] - facet_list = [ - catalog.get('facets') for catalog in catalog_info[site] + Path(catalog.get("file")).expanduser() + for catalog in catalog_info[site] ] + facet_list = [catalog.get("facets") for catalog in catalog_info[site]] for catalog_url in catalog_urls: if catalog_url not in _CACHE: @@ -69,18 +64,13 @@ def load_catalogs(project, drs) -> tuple[list[intake_esm.core.esm_datastore], li _CACHE[catalog_url] = intake.open_esm_datastore(catalog_url) logger.info("Successfully loaded catalog %s", catalog_url) - return ( - [_CACHE[cat_url] for cat_url in catalog_urls], facet_list - ) - + return ([_CACHE[cat_url] for cat_url in catalog_urls], facet_list) class IntakeDataset(Dataset): - """ - Class to handle loading data using Intake-esm. - """ + """Load data using Intake-ESM.""" - def __init__(self, **facets): + def __init__(self, **facets): project = facets["project"] self.catalog, self._facets = load_catalogs(project, CFG["drs"]) super().__init__(**facets) @@ -91,18 +81,20 @@ def files(self) -> Sequence[File]: self._files = self._find_files(self.facets, CFG["drs"]) return self._files + @files.setter + def files(self, value): + self._files = value + @property def filenames(self) -> Sequence[str]: return [str(f) for f in self.files] - def _find_files(self, variable : dict , drs) -> Sequence[File]: - """Find files for variable in all intake-esm catalogs associated with a - project. + def _find_files(self, variable: dict, drs) -> Sequence[File]: # type: ignore[override] + """Find files for variable in all intake-esm catalogs associated with a project. As a side effect, sets the unmapped_facets attribute - this is used to cache facets which are not in the datastore. - Parameters ---------- variable : dict @@ -118,7 +110,6 @@ def _find_files(self, variable : dict , drs) -> Sequence[File]: drs : dict The DRS configuration. Can be obtained from the global configuration drs field, eg. CFG['drs']. - """ catalogs, facets_list = load_catalogs(variable["project"], drs) if not catalogs: @@ -126,11 +117,13 @@ def _find_files(self, variable : dict , drs) -> Sequence[File]: files = [] - for catalog, facets in zip(catalogs, facets_list): - query = {val : variable.get(key) for key, val in facets.items()} - query = {key : val for key, val in query.items() if val is not None} + for catalog, facets in zip(catalogs, facets_list, strict=False): + query = {val: variable.get(key) for key, val in facets.items()} + query = {key: val for key, val in query.items() if val is not None} - unmapped = {key: val for key, val in variable.items() if key not in facets} + unmapped = { + key: val for key, val in variable.items() if key not in facets + } unmapped.pop("project", None) self._unmapped_facets = unmapped @@ -138,16 +131,17 @@ def _find_files(self, variable : dict , drs) -> Sequence[File]: selection = catalog.search(**query) # Select latest version - if 'version' in facets and 'version' not in variable: - latest_version = max(selection.unique().version) # These are strings - so wtf doe this mean? - variable['version'] = latest_version + if "version" in facets and "version" not in variable: + latest_version = max( + selection.unique().version + ) # These are strings - need to double check the sorting here. + variable["version"] = latest_version query = { - facets['version']: latest_version, + facets["version"]: latest_version, } selection = selection.search(**query) files += [LocalFile(f) for f in selection.unique().path] - self.augment_facets() return files From c7b8ffb15c9f7677848d422b9bd2433f4370066e Mon Sep 17 00:00:00 2001 From: Charles Turner Date: Thu, 13 Mar 2025 09:54:28 +0800 Subject: [PATCH 08/17] Remove WIP --- esmvalcore/dataset.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/esmvalcore/dataset.py b/esmvalcore/dataset.py index ae2c272699..6717485e38 100644 --- a/esmvalcore/dataset.py +++ b/esmvalcore/dataset.py @@ -35,7 +35,6 @@ from esmvalcore.preprocessor import preprocess from esmvalcore.typing import Facets, FacetValue - __all__ = [ "Dataset", "INHERITED_FACETS", @@ -157,7 +156,6 @@ def from_recipe( return datasets_from_recipe(recipe, session) - def _file_to_dataset( self, file: esgf.ESGFFile | local.LocalFile, @@ -771,7 +769,7 @@ def _load(self) -> Cube: "check_level": self.session["check_level"], "cmor_table": self.facets["project"], "mip": self.facets["mip"], - "frequency": self.facets["frequency"], # Error here + "frequency": self.facets["frequency"], "short_name": self.facets["short_name"], } if "timerange" in self.facets: From 31b35cb331bf6d738a233cacc11b7510b484d95f Mon Sep 17 00:00:00 2001 From: Charles Turner Date: Thu, 13 Mar 2025 09:56:46 +0800 Subject: [PATCH 09/17] Update depenencies & dev environment --- environment.yml | 2 ++ pyproject.toml | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/environment.yml b/environment.yml index 321b0484c5..ae45a1ed8d 100644 --- a/environment.yml +++ b/environment.yml @@ -18,6 +18,8 @@ dependencies: - fire - geopy - humanfriendly + - intake >=2.0.0 + - intake-esm >=2025.2.3 - iris >=3.11 # 3.11 first to support Numpy 2 and Python 3.13 - iris-esmf-regrid >=0.11.0 - iris-grib >=0.20.0 # github.com/ESMValGroup/ESMValCore/issues/2535 diff --git a/pyproject.toml b/pyproject.toml index 1c8d72dba9..b832daa17b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,8 +45,8 @@ dependencies = [ "fire", "geopy", "humanfriendly", - "intake>=0.7.0", - "intake-esm>=2024.2.6", + "intake>=2.0.0", + "intake-esm>=2025.2.3", "iris-grib>=0.20.0", # github.com/ESMValGroup/ESMValCore/issues/2535 "isodate>=0.7.0", "jinja2", From a8532a5ba56063318a8f9477432c7e95232d44d4 Mon Sep 17 00:00:00 2001 From: Charles Turner Date: Thu, 13 Mar 2025 09:58:24 +0800 Subject: [PATCH 10/17] Pre-commit modifications --- esmvalcore/config-developer.yml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/esmvalcore/config-developer.yml b/esmvalcore/config-developer.yml index 046ec34788..afd4147a22 100644 --- a/esmvalcore/config-developer.yml +++ b/esmvalcore/config-developer.yml @@ -40,7 +40,7 @@ CMIP6: input_file: '{short_name}_{mip}_{dataset}_{exp}_{ensemble}_{grid}*.nc' catalogs: NCI: - - file: + - file: /g/data/fs38/catalog/v2/esm/catalog.json facets: activity: activity_id @@ -53,7 +53,7 @@ CMIP6: short_name: variable_id version: version frequency: frequency - - file: + - file: /g/data/oi10/catalog/v2/esm/catalog.json facets: activity: activity_id @@ -86,7 +86,7 @@ CMIP5: input_file: '{short_name}_{mip}_{dataset}_{exp}_{ensemble}*.nc' catalogs:  NCI: - - file: + - file: /g/data/rr3/catalog/v2/esm/catalog.json facets: # mapping from recipe facets to intake-esm catalog facets @@ -100,7 +100,7 @@ CMIP5: mip: table_id short_name: variable version: version - - file: + - file: /g/data/al33/catalog/v2/esm/catalog.json facets: # mapping from recipe facets to intake-esm catalog facets @@ -216,7 +216,7 @@ CORDEX: input_file: '{short_name}_{domain}_{driver}_{exp}_{ensemble}_{institute}-{dataset}_{rcm_version}_{mip}*.nc' catalogs:  NCI: - files: + files: - /g/data/oi10/catalog/v2/esm/catalog.json facets: # mapping from recipe facets to intake-esm catalog facets From 568cb8d3e94bd9954c5ef399d649f585ab522438 Mon Sep 17 00:00:00 2001 From: Charles Turner Date: Thu, 13 Mar 2025 14:02:31 +0800 Subject: [PATCH 11/17] Fixed most of codacy (mypy-strict?) gripes --- esmvalcore/intake/_dataset.py | 39 +++++++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/esmvalcore/intake/_dataset.py b/esmvalcore/intake/_dataset.py index d924bb0081..f57efb58b2 100644 --- a/esmvalcore/intake/_dataset.py +++ b/esmvalcore/intake/_dataset.py @@ -1,6 +1,9 @@ +"""Import datsets using Intake-ESM.""" + import logging +from numbers import Number from pathlib import Path -from typing import Sequence +from typing import Any, Sequence # import isodate import intake @@ -24,8 +27,8 @@ def clear_catalog_cache(): def load_catalogs( - project, drs -) -> tuple[list[intake_esm.core.esm_datastore], list[dict]]: + project: str, drs: dict +) -> tuple[list[intake_esm.core.esm_datastore], list[dict[str, str]]]: """Load all intake-esm catalogs for a project and their associated facet mappings. Parameters @@ -44,7 +47,9 @@ def load_catalogs( The facet mapping - a dictionary mapping ESMVlCore dataset facet names to the fields in the intake-esm datastore. """ - catalog_info = get_project_config(project).get("catalogs", {}) + catalog_info: dict[str, Any] = get_project_config(project).get( + "catalogs", {} + ) site = drs.get(project, "default") if site not in catalog_info: return [None], [{}] @@ -73,6 +78,7 @@ class IntakeDataset(Dataset): def __init__(self, **facets): project = facets["project"] self.catalog, self._facets = load_catalogs(project, CFG["drs"]) + self._unmapped_facets = {} super().__init__(**facets) @property @@ -82,14 +88,20 @@ def files(self) -> Sequence[File]: return self._files @files.setter - def files(self, value): + def files(self, value: Sequence[File]): + """Manually set the files for the dataset.""" self._files = value @property def filenames(self) -> Sequence[str]: + """String representation of the filenames in the dataset.""" return [str(f) for f in self.files] - def _find_files(self, variable: dict, drs) -> Sequence[File]: # type: ignore[override] + def _find_files( # type: ignore[override] + self, + facet_map: dict[str, str | Sequence[str] | Number], + drs: dict[str, Any], + ) -> Sequence[File]: """Find files for variable in all intake-esm catalogs associated with a project. As a side effect, sets the unmapped_facets attribute - this is used to @@ -111,18 +123,23 @@ def _find_files(self, variable: dict, drs) -> Sequence[File]: # type: ignore[ov The DRS configuration. Can be obtained from the global configuration drs field, eg. CFG['drs']. """ - catalogs, facets_list = load_catalogs(variable["project"], drs) + if not isinstance(facet_map["project"], str): + raise TypeError( + "The project facet must be a string for Intake Datasets." + ) + + catalogs, facets_list = load_catalogs(facet_map["project"], drs) if not catalogs: return [] files = [] for catalog, facets in zip(catalogs, facets_list, strict=False): - query = {val: variable.get(key) for key, val in facets.items()} + query = {val: facet_map.get(key) for key, val in facets.items()} query = {key: val for key, val in query.items() if val is not None} unmapped = { - key: val for key, val in variable.items() if key not in facets + key: val for key, val in facet_map.items() if key not in facets } unmapped.pop("project", None) @@ -131,11 +148,11 @@ def _find_files(self, variable: dict, drs) -> Sequence[File]: # type: ignore[ov selection = catalog.search(**query) # Select latest version - if "version" in facets and "version" not in variable: + if "version" in facets and "version" not in facet_map: latest_version = max( selection.unique().version ) # These are strings - need to double check the sorting here. - variable["version"] = latest_version + facet_map["version"] = latest_version query = { facets["version"]: latest_version, } From 91fee5646bbdfb4bc94e66d7f4e8b88632f19e05 Mon Sep 17 00:00:00 2001 From: Charles Turner Date: Thu, 13 Mar 2025 14:08:35 +0800 Subject: [PATCH 12/17] Fix typo --- esmvalcore/intake/_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/esmvalcore/intake/_dataset.py b/esmvalcore/intake/_dataset.py index f57efb58b2..a7dfe4dbe3 100644 --- a/esmvalcore/intake/_dataset.py +++ b/esmvalcore/intake/_dataset.py @@ -1,4 +1,4 @@ -"""Import datsets using Intake-ESM.""" +"""Import datasets using Intake-ESM.""" import logging from numbers import Number From 9d894b97ffc146cd8196564cfd86244412e4b1a1 Mon Sep 17 00:00:00 2001 From: Charles Turner Date: Wed, 2 Apr 2025 13:16:10 +0800 Subject: [PATCH 13/17] Beginning to work on Bouwe's comments (WIP) --- esmvalcore/config-developer.yml | 74 ----------- esmvalcore/config/_intake.py | 124 ++++++++++++++++++ esmvalcore/{intake => data}/__init__.py | 2 +- .../intake/_intake_dataset.py} | 69 ++++++++-- 4 files changed, 185 insertions(+), 84 deletions(-) create mode 100644 esmvalcore/config/_intake.py rename esmvalcore/{intake => data}/__init__.py (62%) rename esmvalcore/{intake/_dataset.py => data/intake/_intake_dataset.py} (71%) diff --git a/esmvalcore/config-developer.yml b/esmvalcore/config-developer.yml index 39a7c30fb6..a8f0181046 100644 --- a/esmvalcore/config-developer.yml +++ b/esmvalcore/config-developer.yml @@ -38,34 +38,6 @@ CMIP6: SYNDA: '{activity}/{institute}/{dataset}/{exp}/{ensemble}/{mip}/{short_name}/{grid}/{version}' NCI: '{activity}/{institute}/{dataset}/{exp}/{ensemble}/{mip}/{short_name}/{grid}/{version}' input_file: '{short_name}_{mip}_{dataset}_{exp}_{ensemble}_{grid}*.nc' - catalogs: - NCI: - - file: - /g/data/fs38/catalog/v2/esm/catalog.json - facets: - activity: activity_id - dataset: source_id - ensemble: member_id - exp: experiment_id - grid: grid_label - institute: institution_id - mip: table_id - short_name: variable_id - version: version - frequency: frequency - - file: - /g/data/oi10/catalog/v2/esm/catalog.json - facets: - activity: activity_id - dataset: source_id - ensemble: member_id - exp: experiment_id - grid: grid_label - institute: institution_id - mip: table_id - short_name: variable_id - version: version - frequency: frequency output_file: '{project}_{dataset}_{mip}_{exp}_{ensemble}_{short_name}_{grid}' cmor_type: 'CMIP6' @@ -84,36 +56,6 @@ CMIP5: SMHI: '{dataset}/{ensemble}/{exp}/{frequency}' SYNDA: '{institute}/{dataset}/{exp}/{frequency}/{modeling_realm}/{mip}/{ensemble}/{version}' input_file: '{short_name}_{mip}_{dataset}_{exp}_{ensemble}*.nc' - catalogs: -  NCI: - - file: - /g/data/rr3/catalog/v2/esm/catalog.json - facets: - # mapping from recipe facets to intake-esm catalog facets - # TODO: Fix these when Gadi is back up - activity: activity_id - dataset: source_id - ensemble: ensemble - exp: experiment - grid: grid_label - institute: institution_id - mip: table_id - short_name: variable - version: version - - file: - /g/data/al33/catalog/v2/esm/catalog.json - facets: - # mapping from recipe facets to intake-esm catalog facets - # TODO: Fix these when Gadi is back up - activity: activity_id - dataset: source_id - ensemble: ensemble - exp: experiment - institute: institute - mip: table - short_name: variable - version: version - timerange: time_range output_file: '{project}_{dataset}_{mip}_{exp}_{ensemble}_{short_name}' CMIP3: @@ -214,22 +156,6 @@ CORDEX: ESGF: '{project.lower}/output/{domain}/{institute}/{driver}/{exp}/{ensemble}/{dataset}/{rcm_version}/{frequency}/{short_name}/{version}' SYNDA: '{domain}/{institute}/{driver}/{exp}/{ensemble}/{dataset}/{rcm_version}/{frequency}/{short_name}/{version}' input_file: '{short_name}_{domain}_{driver}_{exp}_{ensemble}_{institute}-{dataset}_{rcm_version}_{mip}*.nc' - catalogs: -  NCI: - files: - - /g/data/oi10/catalog/v2/esm/catalog.json - facets: - # mapping from recipe facets to intake-esm catalog facets - # TODO: Fix these when Gadi is back up - activity: activity_id - dataset: source_id - ensemble: member_id - exp: experiment_id - grid: grid_label - institute: institution_id - mip: table_id - short_name: variable_id - version: version output_file: '{project}_{institute}_{dataset}_{rcm_version}_{driver}_{domain}_{mip}_{exp}_{ensemble}_{short_name}' cmor_type: 'CMIP5' cmor_path: 'cordex' diff --git a/esmvalcore/config/_intake.py b/esmvalcore/config/_intake.py new file mode 100644 index 0000000000..7f4d15560d --- /dev/null +++ b/esmvalcore/config/_intake.py @@ -0,0 +1,124 @@ +"""esgf-pyclient configuration. + +The configuration is read from the file ~/.esmvaltool/esgf-pyclient.yml. +""" + +import logging +import os +import stat +from functools import lru_cache +from pathlib import Path + +import yaml + +logger = logging.getLogger(__name__) + +CONFIG_FILE = Path.home() / ".esmvaltool" / "data-intake.yml" + + +def read_config_file() -> dict: + """Read the configuration from file.""" + if CONFIG_FILE.exists(): + logger.info("Loading Intake-ESM configuration from %s", CONFIG_FILE) + mode = os.stat(CONFIG_FILE).st_mode + if mode & stat.S_IRWXG or mode & stat.S_IRWXO: + logger.warning("Correcting unsafe permissions on %s", CONFIG_FILE) + os.chmod(CONFIG_FILE, stat.S_IRUSR | stat.S_IWUSR) + with CONFIG_FILE.open(encoding="utf-8") as file: + cfg = yaml.safe_load(file) + else: + logger.info( + "Using default Intake-ESM configuration, configuration " + "file %s not present.", + CONFIG_FILE, + ) + cfg = {} + + return cfg + + +def load_intake_config(): + """Load the intake-esm configuration.""" + cfg = { + "CMIP6": { + "catalogs": { + "NCI": [ + { + "file": "/g/data/fs38/catalog/v2/esm/catalog.json", + "facets": { + "activity": "activity_id", + "dataset": "source_id", + "ensemble": "member_id", + "exp": "experiment_id", + "grid": "grid_label", + "institute": "institution_id", + "mip": "table_id", + "short_name": "variable_id", + "version": "version", + "frequency": "frequency", + }, + }, + { + "file": "/g/data/oi10/catalog/v2/esm/catalog.json", + "facets": { + "activity": "activity_id", + "dataset": "source_id", + "ensemble": "member_id", + "exp": "experiment_id", + "grid": "grid_label", + "institute": "institution_id", + "mip": "table_id", + "short_name": "variable_id", + "version": "version", + "frequency": "frequency", + }, + }, + ] + } + }, + "CMIP5": { + "catalogs": { + "NCI": [ + { + "file": "/g/data/rr3/catalog/v2/esm/catalog.json", + "facets": { + "activity": "activity_id", + "dataset": "source_id", + "ensemble": "ensemble", + "exp": "experiment", + "grid": "grid_label", + "institute": "institution_id", + "mip": "table_id", + "short_name": "variable", + "version": "version", + }, + }, + { + "file": "/g/data/al33/catalog/v2/esm/catalog.json", + "facets": { + "activity": "activity_id", + "dataset": "source_id", + "ensemble": "ensemble", + "exp": "experiment", + "institute": "institute", + "mip": "table", + "short_name": "variable", + "version": "version", + "timerange": "time_range", + }, + }, + ] + } + }, + } + + file_cfg = read_config_file() + cfg.update(file_cfg) + + return cfg + + +@lru_cache() +def get_intake_config(): + """Get the esgf-pyclient configuration.""" + return load_intake_config() diff --git a/esmvalcore/intake/__init__.py b/esmvalcore/data/__init__.py similarity index 62% rename from esmvalcore/intake/__init__.py rename to esmvalcore/data/__init__.py index e3033ab87c..3aedddeac9 100644 --- a/esmvalcore/intake/__init__.py +++ b/esmvalcore/data/__init__.py @@ -1,5 +1,5 @@ """Find files using an intake-esm catalog and load them.""" -from ._dataset import IntakeDataset, load_catalogs +from .intake._intake_dataset import IntakeDataset, load_catalogs __all__ = ["IntakeDataset", "load_catalogs"] diff --git a/esmvalcore/intake/_dataset.py b/esmvalcore/data/intake/_intake_dataset.py similarity index 71% rename from esmvalcore/intake/_dataset.py rename to esmvalcore/data/intake/_intake_dataset.py index a7dfe4dbe3..4e4b9734a4 100644 --- a/esmvalcore/intake/_dataset.py +++ b/esmvalcore/data/intake/_intake_dataset.py @@ -5,7 +5,6 @@ from pathlib import Path from typing import Any, Sequence -# import isodate import intake import intake_esm @@ -13,6 +12,7 @@ from esmvalcore.config._config import get_project_config from esmvalcore.dataset import Dataset, File from esmvalcore.local import LocalFile +from esmvalcore.typing import Facets __all__ = ["IntakeDataset", "load_catalogs", "clear_catalog_cache"] @@ -21,13 +21,13 @@ _CACHE: dict[Path, intake_esm.core.esm_datastore] = {} -def clear_catalog_cache(): +def clear_catalog_cache() -> None: """Clear the catalog cache.""" _CACHE.clear() def load_catalogs( - project: str, drs: dict + project: str, drs: dict[str, Any] ) -> tuple[list[intake_esm.core.esm_datastore], list[dict[str, str]]]: """Load all intake-esm catalogs for a project and their associated facet mappings. @@ -73,13 +73,17 @@ def load_catalogs( class IntakeDataset(Dataset): - """Load data using Intake-ESM.""" + """Class to handle loading data using Intake-ESM. - def __init__(self, **facets): - project = facets["project"] + Crucially, we do not subclass Dataset, as this is going to cause problems. + """ + + def __init__(self, **facets: dict[str, Any]) -> None: + project: str = facets["project"] # type: ignore[assignment] + self.facets: Facets = {} self.catalog, self._facets = load_catalogs(project, CFG["drs"]) - self._unmapped_facets = {} - super().__init__(**facets) + self._unmapped_facets: dict[str, Any] = {} + self._files: Sequence[File] | None = None @property def files(self) -> Sequence[File]: @@ -109,7 +113,7 @@ def _find_files( # type: ignore[override] Parameters ---------- - variable : dict + facet_map : dict A dict mapping the variable names used to initialise the IntakeDataset object to their ESMValCore facet names. For example, ``` @@ -162,3 +166,50 @@ def _find_files( # type: ignore[override] self.augment_facets() return files + + +""" +def find_files(*, project, short_name, dataset, **facets): + catalog, facet_map = load_catalogs(project, CFG["drs"]) + + if not isinstance(facet_map["project"], str): + raise TypeError( + "The project facet must be a string for Intake Datasets." + ) + + # catalogs, facets_list = load_catalogs(facet_map["project"], drs) + if not catalogs: + return [] + + files = [] + + for catalog, facets in zip(catalogs, facets_list, strict=False): + query = {val: facet_map.get(key) for key, val in facets.items()} + query = {key: val for key, val in query.items() if val is not None} + + unmapped = { + key: val for key, val in facet_map.items() if key not in facets + } + unmapped.pop("project", None) + + # self._unmapped_facets = unmapped + + selection = catalog.search(**query) + + # Select latest version + if "version" in facets and "version" not in facet_map: + latest_version = max( + selection.unique().version + ) # These are strings - need to double check the sorting here. + facet_map["version"] = latest_version + query = { + facets["version"]: latest_version, + } + selection = selection.search(**query) + + files += [LocalFile(f) for f in selection.unique().path] + + # self.augment_facets() + return files + +""" From 59d0d0211c6c1522f2b71ee91455a7ce442a7033 Mon Sep 17 00:00:00 2001 From: Charles Turner Date: Thu, 3 Apr 2025 13:35:52 +0800 Subject: [PATCH 14/17] Updates - restructured esmvalcore/data/intake following Bouwe's suggestions --- esmvalcore/data/__init__.py | 4 +- esmvalcore/data/intake/_intake_dataset.py | 165 ++++++---------------- 2 files changed, 43 insertions(+), 126 deletions(-) diff --git a/esmvalcore/data/__init__.py b/esmvalcore/data/__init__.py index 3aedddeac9..2df3bd6720 100644 --- a/esmvalcore/data/__init__.py +++ b/esmvalcore/data/__init__.py @@ -1,5 +1,5 @@ """Find files using an intake-esm catalog and load them.""" -from .intake._intake_dataset import IntakeDataset, load_catalogs +from .intake._intake_dataset import clear_catalog_cache, load_catalogs -__all__ = ["IntakeDataset", "load_catalogs"] +__all__ = ["load_catalogs", "clear_catalog_cache"] diff --git a/esmvalcore/data/intake/_intake_dataset.py b/esmvalcore/data/intake/_intake_dataset.py index 4e4b9734a4..d046ed3bc0 100644 --- a/esmvalcore/data/intake/_intake_dataset.py +++ b/esmvalcore/data/intake/_intake_dataset.py @@ -1,20 +1,16 @@ """Import datasets using Intake-ESM.""" import logging -from numbers import Number from pathlib import Path from typing import Any, Sequence import intake import intake_esm -from esmvalcore.config import CFG -from esmvalcore.config._config import get_project_config -from esmvalcore.dataset import Dataset, File +from esmvalcore.config._intake import get_intake_config from esmvalcore.local import LocalFile -from esmvalcore.typing import Facets -__all__ = ["IntakeDataset", "load_catalogs", "clear_catalog_cache"] +__all__ = ["load_catalogs", "clear_catalog_cache"] logger = logging.getLogger(__name__) @@ -47,9 +43,10 @@ def load_catalogs( The facet mapping - a dictionary mapping ESMVlCore dataset facet names to the fields in the intake-esm datastore. """ - catalog_info: dict[str, Any] = get_project_config(project).get( - "catalogs", {} + catalog_info: dict[str, Any] = ( + get_intake_config().get(project, {}).get("catalogs", {}) ) + site = drs.get(project, "default") if site not in catalog_info: return [None], [{}] @@ -72,144 +69,64 @@ def load_catalogs( return ([_CACHE[cat_url] for cat_url in catalog_urls], facet_list) -class IntakeDataset(Dataset): - """Class to handle loading data using Intake-ESM. - - Crucially, we do not subclass Dataset, as this is going to cause problems. - """ - - def __init__(self, **facets: dict[str, Any]) -> None: - project: str = facets["project"] # type: ignore[assignment] - self.facets: Facets = {} - self.catalog, self._facets = load_catalogs(project, CFG["drs"]) - self._unmapped_facets: dict[str, Any] = {} - self._files: Sequence[File] | None = None - - @property - def files(self) -> Sequence[File]: - if self._files is None: - self._files = self._find_files(self.facets, CFG["drs"]) - return self._files - - @files.setter - def files(self, value: Sequence[File]): - """Manually set the files for the dataset.""" - self._files = value - - @property - def filenames(self) -> Sequence[str]: - """String representation of the filenames in the dataset.""" - return [str(f) for f in self.files] - - def _find_files( # type: ignore[override] - self, - facet_map: dict[str, str | Sequence[str] | Number], - drs: dict[str, Any], - ) -> Sequence[File]: - """Find files for variable in all intake-esm catalogs associated with a project. - - As a side effect, sets the unmapped_facets attribute - this is used to - cache facets which are not in the datastore. - - Parameters - ---------- - facet_map : dict - A dict mapping the variable names used to initialise the IntakeDataset - object to their ESMValCore facet names. For example, - ``` - ACCESS_ESM1_5 = IntakeDataset( - short_name='tos', - project='CMIP6', - ) - ``` - would result in a variable dict of {'short_name': 'tos', 'project': 'CMIP6'}. - drs : dict - The DRS configuration. Can be obtained from the global configuration drs - field, eg. CFG['drs']. - """ - if not isinstance(facet_map["project"], str): - raise TypeError( - "The project facet must be a string for Intake Datasets." - ) - - catalogs, facets_list = load_catalogs(facet_map["project"], drs) - if not catalogs: - return [] - - files = [] - - for catalog, facets in zip(catalogs, facets_list, strict=False): - query = {val: facet_map.get(key) for key, val in facets.items()} - query = {key: val for key, val in query.items() if val is not None} - - unmapped = { - key: val for key, val in facet_map.items() if key not in facets - } - unmapped.pop("project", None) - - self._unmapped_facets = unmapped - - selection = catalog.search(**query) - - # Select latest version - if "version" in facets and "version" not in facet_map: - latest_version = max( - selection.unique().version - ) # These are strings - need to double check the sorting here. - facet_map["version"] = latest_version - query = { - facets["version"]: latest_version, - } - selection = selection.search(**query) +def find_files( + *, project: str, drs: dict, facets: dict +) -> Sequence[LocalFile]: + """Find files for variable in all intake-esm catalogs associated with a project. - files += [LocalFile(f) for f in selection.unique().path] - - self.augment_facets() - return files - - -""" -def find_files(*, project, short_name, dataset, **facets): - catalog, facet_map = load_catalogs(project, CFG["drs"]) - - if not isinstance(facet_map["project"], str): - raise TypeError( - "The project facet must be a string for Intake Datasets." + Parameters + ---------- + facet_map : dict + A dict mapping the variable names used to initialise the IntakeDataset + object to their ESMValCore facet names. For example, + ``` + ACCESS_ESM1_5 = IntakeDataset( + short_name='tos', + project='CMIP6', ) + ``` + would result in a variable dict of {'short_name': 'tos', 'project': 'CMIP6'}. + drs : dict + The DRS configuration. Can be obtained from the global configuration drs + field, eg. CFG['drs']. + """ + catalogs, facet_maps = load_catalogs(project, drs) - # catalogs, facets_list = load_catalogs(facet_map["project"], drs) if not catalogs: return [] files = [] - for catalog, facets in zip(catalogs, facets_list, strict=False): - query = {val: facet_map.get(key) for key, val in facets.items()} - query = {key: val for key, val in query.items() if val is not None} + for catalog, facet_map in zip(catalogs, facet_maps, strict=False): + query = {facet_map.get(key): val for key, val in facets.items()} + query.pop(None, None) - unmapped = { - key: val for key, val in facet_map.items() if key not in facets + _unused_facets = { + key: val for key, val in facets.items() if key not in facet_map } - unmapped.pop("project", None) - # self._unmapped_facets = unmapped + logger.info( + "Unable to refine datastore search on catalog %s with the following facets %s", + catalog.esmcat.catalog_file, + _unused_facets, + ) selection = catalog.search(**query) + if not selection: + continue + # Select latest version - if "version" in facets and "version" not in facet_map: + if "version" in facet_map and "version" not in facets: latest_version = max( selection.unique().version ) # These are strings - need to double check the sorting here. - facet_map["version"] = latest_version query = { - facets["version"]: latest_version, + **query, + facet_map["version"]: latest_version, } selection = selection.search(**query) files += [LocalFile(f) for f in selection.unique().path] - # self.augment_facets() return files - -""" From 2050081fffeb633e121a4a4eb7284dea906d55da Mon Sep 17 00:00:00 2001 From: Charles Turner Date: Tue, 6 May 2025 10:02:22 +0800 Subject: [PATCH 15/17] Reorder imports (ruff maybe?) From 59e420569e9f16f26a0ac8ae2974e48a49a95c56 Mon Sep 17 00:00:00 2001 From: Charles Turner Date: Mon, 12 May 2025 14:53:29 +0800 Subject: [PATCH 16/17] Add `_read_facets` to intake configuration: see https://github.com/intake/intake-esm/pull/717 --- esmvalcore/config/_intake.py | 47 ++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/esmvalcore/config/_intake.py b/esmvalcore/config/_intake.py index 7f4d15560d..e2305afd68 100644 --- a/esmvalcore/config/_intake.py +++ b/esmvalcore/config/_intake.py @@ -8,6 +8,7 @@ import stat from functools import lru_cache from pathlib import Path +from typing import Any import yaml @@ -122,3 +123,49 @@ def load_intake_config(): def get_intake_config(): """Get the esgf-pyclient configuration.""" return load_intake_config() + + +def _read_facets( + cfg: dict, + fhandle: str | None, + project: str | None = None, +) -> tuple[dict[str, Any], str]: + """ + Extract facet mapping from ESMValCore configuration for a given catalog file handle. + + Recursively traverses the ESMValCore configuration structure to find the + facet mapping that corresponds to the specified file handle. + + Parameters + ---------- + cfg : dict + The ESMValCore intake configuration dictionary. + fhandle : str + The file handle/path of the intake-esm catalog to match. + project : str, optional + The current project name in the configuration hierarchy. + + Returns + ------- + tuple + A tuple containing: + - dict: Facet mapping between ESMValCore facets and catalog columns + - str: The project name associated with the catalog file + """ + if fhandle is None: + raise ValueError( + "Unable to ascertain facets without valid file handle." + ) + + for _project, val in cfg.items(): + if not (isinstance(val, list)): + return _read_facets(val, fhandle, project or _project) + for facet_info in val: + file, facets = facet_info.get("file"), facet_info.get("facets") + if file == fhandle: + return facets, project # type: ignore[return-value] + else: + raise ValueError( + f"No facets found for {fhandle} in the config file. " + "Please check the config file and ensure it is valid." + ) From 2527059f389aad335d35ae0621bfe31c492a48d8 Mon Sep 17 00:00:00 2001 From: Charles Turner Date: Tue, 13 May 2025 09:50:12 +0800 Subject: [PATCH 17/17] Add `merge_intake_seach_history` function (see https://github.com/intake/intake-esm/pull/717/commits/73f150e33724010a98b6aed15a39b2c09676a442) --- esmvalcore/data/__init__.py | 7 +++- esmvalcore/data/intake/_interface.py | 54 ++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 1 deletion(-) create mode 100644 esmvalcore/data/intake/_interface.py diff --git a/esmvalcore/data/__init__.py b/esmvalcore/data/__init__.py index 2df3bd6720..0001840cab 100644 --- a/esmvalcore/data/__init__.py +++ b/esmvalcore/data/__init__.py @@ -1,5 +1,10 @@ """Find files using an intake-esm catalog and load them.""" from .intake._intake_dataset import clear_catalog_cache, load_catalogs +from .intake._interface import merge_intake_search_history -__all__ = ["load_catalogs", "clear_catalog_cache"] +__all__ = [ + "load_catalogs", + "clear_catalog_cache", + "merge_intake_search_history", +] diff --git a/esmvalcore/data/intake/_interface.py b/esmvalcore/data/intake/_interface.py new file mode 100644 index 0000000000..0b257d4773 --- /dev/null +++ b/esmvalcore/data/intake/_interface.py @@ -0,0 +1,54 @@ +import typing + + +def merge_intake_search_history( + search_history: list[dict[str, list[typing.Any]]], +) -> dict[str, typing.Any]: + """Create a facet mapping from an intake-esm search history. + + This takes an intake-esm search history, which typically looks something like + ```python + [ + {'variable_id': ['tos']}, + {'table_id': ['Omon']}, + {'experiment_id': ['historical']}, + {'member_id': ['r1i1p1f1']}, + {'source_id': ['ACCESS-ESM1-5']}, + {'grid_label': ['gn']}, + {'version': ['v.*']}, + ] + ``` + and turns it into something like + ```python + { + 'variable_id': 'tos', + 'table_id': 'Omon', + 'experiment_id': 'historical', + 'member_id': 'r1i1p1f1', + 'source_id': 'ACCESS-ESM1-5', + 'grid_label': 'gn', + 'version': 'v.*', + } + ``` + + Notes + ----- + This function is really quite ugly & could probably be improved. + """ + merged: dict[str, typing.Any] = {} + + for entry in search_history: + for key, value in entry.items(): + if key in merged: + if isinstance(merged[key], list): + merged[key].extend(value) + else: + merged[key] = [merged[key]] + value + else: + merged[key] = value + + for key, val in merged.items(): + if isinstance(val, list) and len(val) == 1: + merged[key] = val[0] + + return merged