-
Couldn't load subscription status.
- Fork 43
Intake-ESM Integration based on #1218 #2690
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 13 commits
c129966
b1b76fb
dd73d1d
ed1676b
fa1ea2e
648f119
2b91fec
c7b8ffb
31b35cb
a8532a5
7e56959
568cb8d
91fee56
9d894b9
59d0d02
2050081
59e4205
2527059
4641965
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,6 +38,34 @@ CMIP6: | |
| SYNDA: '{activity}/{institute}/{dataset}/{exp}/{ensemble}/{mip}/{short_name}/{grid}/{version}' | ||
| NCI: '{activity}/{institute}/{dataset}/{exp}/{ensemble}/{mip}/{short_name}/{grid}/{version}' | ||
| input_file: '{short_name}_{mip}_{dataset}_{exp}_{ensemble}_{grid}*.nc' | ||
| catalogs: | ||
| NCI: | ||
| - file: | ||
| /g/data/fs38/catalog/v2/esm/catalog.json | ||
| facets: | ||
| activity: activity_id | ||
| dataset: source_id | ||
| ensemble: member_id | ||
| exp: experiment_id | ||
| grid: grid_label | ||
| institute: institution_id | ||
| mip: table_id | ||
| short_name: variable_id | ||
| version: version | ||
| frequency: frequency | ||
| - file: | ||
| /g/data/oi10/catalog/v2/esm/catalog.json | ||
| facets: | ||
| activity: activity_id | ||
| dataset: source_id | ||
| ensemble: member_id | ||
| exp: experiment_id | ||
| grid: grid_label | ||
| institute: institution_id | ||
| mip: table_id | ||
| short_name: variable_id | ||
| version: version | ||
| frequency: frequency | ||
| output_file: '{project}_{dataset}_{mip}_{exp}_{ensemble}_{short_name}_{grid}' | ||
| cmor_type: 'CMIP6' | ||
|
|
||
|
|
@@ -56,6 +84,36 @@ CMIP5: | |
| SMHI: '{dataset}/{ensemble}/{exp}/{frequency}' | ||
| SYNDA: '{institute}/{dataset}/{exp}/{frequency}/{modeling_realm}/{mip}/{ensemble}/{version}' | ||
| input_file: '{short_name}_{mip}_{dataset}_{exp}_{ensemble}*.nc' | ||
| catalogs: | ||
| Β NCI: | ||
| - file: | ||
| /g/data/rr3/catalog/v2/esm/catalog.json | ||
| facets: | ||
| # mapping from recipe facets to intake-esm catalog facets | ||
| # TODO: Fix these when Gadi is back up | ||
| activity: activity_id | ||
| dataset: source_id | ||
| ensemble: ensemble | ||
| exp: experiment | ||
| grid: grid_label | ||
| institute: institution_id | ||
| mip: table_id | ||
| short_name: variable | ||
| version: version | ||
| - file: | ||
| /g/data/al33/catalog/v2/esm/catalog.json | ||
| facets: | ||
| # mapping from recipe facets to intake-esm catalog facets | ||
| # TODO: Fix these when Gadi is back up | ||
| activity: activity_id | ||
| dataset: source_id | ||
| ensemble: ensemble | ||
| exp: experiment | ||
| institute: institute | ||
| mip: table | ||
| short_name: variable | ||
| version: version | ||
| timerange: time_range | ||
| output_file: '{project}_{dataset}_{mip}_{exp}_{ensemble}_{short_name}' | ||
|
|
||
| CMIP3: | ||
|
|
@@ -156,6 +214,22 @@ CORDEX: | |
| ESGF: '{project.lower}/output/{domain}/{institute}/{driver}/{exp}/{ensemble}/{dataset}/{rcm_version}/{frequency}/{short_name}/{version}' | ||
| SYNDA: '{domain}/{institute}/{driver}/{exp}/{ensemble}/{dataset}/{rcm_version}/{frequency}/{short_name}/{version}' | ||
| input_file: '{short_name}_{domain}_{driver}_{exp}_{ensemble}_{institute}-{dataset}_{rcm_version}_{mip}*.nc' | ||
| catalogs: | ||
| Β NCI: | ||
| files: | ||
| - /g/data/oi10/catalog/v2/esm/catalog.json | ||
| facets: | ||
| # mapping from recipe facets to intake-esm catalog facets | ||
| # TODO: Fix these when Gadi is back up | ||
|
||
| activity: activity_id | ||
| dataset: source_id | ||
| ensemble: member_id | ||
| exp: experiment_id | ||
| grid: grid_label | ||
| institute: institution_id | ||
| mip: table_id | ||
| short_name: variable_id | ||
| version: version | ||
| output_file: '{project}_{institute}_{dataset}_{rcm_version}_{driver}_{domain}_{mip}_{exp}_{ensemble}_{short_name}' | ||
| cmor_type: 'CMIP5' | ||
| cmor_path: 'cordex' | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| """Find files using an intake-esm catalog and load them.""" | ||
|
|
||
| from ._dataset import IntakeDataset, load_catalogs | ||
|
|
||
| __all__ = ["IntakeDataset", "load_catalogs"] | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,164 @@ | ||
| """Import datasets using Intake-ESM.""" | ||
|
|
||
| import logging | ||
| from numbers import Number | ||
| from pathlib import Path | ||
| from typing import Any, Sequence | ||
|
|
||
| # import isodate | ||
| import intake | ||
| import intake_esm | ||
|
|
||
| from esmvalcore.config import CFG | ||
| from esmvalcore.config._config import get_project_config | ||
| from esmvalcore.dataset import Dataset, File | ||
| from esmvalcore.local import LocalFile | ||
|
|
||
| __all__ = ["IntakeDataset", "load_catalogs", "clear_catalog_cache"] | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| _CACHE: dict[Path, intake_esm.core.esm_datastore] = {} | ||
|
|
||
|
|
||
| def clear_catalog_cache(): | ||
|
Check warning on line 24 in esmvalcore/intake/_dataset.py
|
||
| """Clear the catalog cache.""" | ||
| _CACHE.clear() | ||
|
|
||
|
|
||
| def load_catalogs( | ||
| project: str, drs: dict | ||
| ) -> tuple[list[intake_esm.core.esm_datastore], list[dict[str, str]]]: | ||
| """Load all intake-esm catalogs for a project and their associated facet mappings. | ||
| Parameters | ||
| ---------- | ||
| project : str | ||
| The project name, eg. 'CMIP6'. | ||
| drs : dict | ||
| The DRS configuration. Can be obtained from the global configuration drs | ||
| field, eg. CFG['drs']. | ||
| Returns | ||
| ------- | ||
| intake_esm.core.esm_datastore | ||
| The catalog. | ||
| dict | ||
| The facet mapping - a dictionary mapping ESMVlCore dataset facet names | ||
| to the fields in the intake-esm datastore. | ||
| """ | ||
| catalog_info: dict[str, Any] = get_project_config(project).get( | ||
|
Check warning on line 50 in esmvalcore/intake/_dataset.py
|
||
| "catalogs", {} | ||
| ) | ||
| site = drs.get(project, "default") | ||
| if site not in catalog_info: | ||
| return [None], [{}] | ||
|
|
||
| catalog_urls = [ | ||
| Path(catalog.get("file")).expanduser() | ||
| for catalog in catalog_info[site] | ||
| ] | ||
| facet_list = [catalog.get("facets") for catalog in catalog_info[site]] | ||
|
|
||
| for catalog_url in catalog_urls: | ||
| if catalog_url not in _CACHE: | ||
| logger.info( | ||
| "Loading intake-esm catalog (this may take some time): %s", | ||
| catalog_url, | ||
| ) | ||
| _CACHE[catalog_url] = intake.open_esm_datastore(catalog_url) | ||
| logger.info("Successfully loaded catalog %s", catalog_url) | ||
|
|
||
| return ([_CACHE[cat_url] for cat_url in catalog_urls], facet_list) | ||
|
|
||
|
|
||
| class IntakeDataset(Dataset): | ||
|
||
| """Load data using Intake-ESM.""" | ||
|
|
||
| def __init__(self, **facets): | ||
| project = facets["project"] | ||
| self.catalog, self._facets = load_catalogs(project, CFG["drs"]) | ||
| self._unmapped_facets = {} | ||
| super().__init__(**facets) | ||
|
|
||
| @property | ||
| def files(self) -> Sequence[File]: | ||
| if self._files is None: | ||
| self._files = self._find_files(self.facets, CFG["drs"]) | ||
| return self._files | ||
|
|
||
| @files.setter | ||
| def files(self, value: Sequence[File]): | ||
|
Check warning on line 91 in esmvalcore/intake/_dataset.py
|
||
| """Manually set the files for the dataset.""" | ||
| self._files = value | ||
|
|
||
| @property | ||
| def filenames(self) -> Sequence[str]: | ||
| """String representation of the filenames in the dataset.""" | ||
| return [str(f) for f in self.files] | ||
|
|
||
| def _find_files( # type: ignore[override] | ||
|
Check warning on line 100 in esmvalcore/intake/_dataset.py
|
||
| self, | ||
| facet_map: dict[str, str | Sequence[str] | Number], | ||
| drs: dict[str, Any], | ||
| ) -> Sequence[File]: | ||
| """Find files for variable in all intake-esm catalogs associated with a project. | ||
|
Check notice on line 105 in esmvalcore/intake/_dataset.py
|
||
| As a side effect, sets the unmapped_facets attribute - this is used to | ||
| cache facets which are not in the datastore. | ||
| Parameters | ||
| ---------- | ||
| variable : dict | ||
| A dict mapping the variable names used to initialise the IntakeDataset | ||
| object to their ESMValCore facet names. For example, | ||
| ``` | ||
| ACCESS_ESM1_5 = IntakeDataset( | ||
| short_name='tos', | ||
| project='CMIP6', | ||
| ) | ||
| ``` | ||
| would result in a variable dict of {'short_name': 'tos', 'project': 'CMIP6'}. | ||
| drs : dict | ||
| The DRS configuration. Can be obtained from the global configuration drs | ||
| field, eg. CFG['drs']. | ||
| """ | ||
| if not isinstance(facet_map["project"], str): | ||
| raise TypeError( | ||
| "The project facet must be a string for Intake Datasets." | ||
| ) | ||
|
|
||
| catalogs, facets_list = load_catalogs(facet_map["project"], drs) | ||
| if not catalogs: | ||
| return [] | ||
|
|
||
| files = [] | ||
|
|
||
| for catalog, facets in zip(catalogs, facets_list, strict=False): | ||
| query = {val: facet_map.get(key) for key, val in facets.items()} | ||
| query = {key: val for key, val in query.items() if val is not None} | ||
|
|
||
| unmapped = { | ||
| key: val for key, val in facet_map.items() if key not in facets | ||
| } | ||
| unmapped.pop("project", None) | ||
|
|
||
| self._unmapped_facets = unmapped | ||
|
|
||
| selection = catalog.search(**query) | ||
|
|
||
| # Select latest version | ||
| if "version" in facets and "version" not in facet_map: | ||
| latest_version = max( | ||
| selection.unique().version | ||
| ) # These are strings - need to double check the sorting here. | ||
| facet_map["version"] = latest_version | ||
| query = { | ||
| facets["version"]: latest_version, | ||
| } | ||
| selection = selection.search(**query) | ||
|
|
||
| files += [LocalFile(f) for f in selection.unique().path] | ||
|
|
||
| self.augment_facets() | ||
| return files | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The plan was to not further extend config-developer, but rather move this to the new configuration that lives in
~/.config/esmvaltool. See #2371 for an example of what we thought the configuration should look like.