diff --git a/esmvalcore/_provenance.py b/esmvalcore/_provenance.py index 36bcea32b2..dc669731e5 100644 --- a/esmvalcore/_provenance.py +++ b/esmvalcore/_provenance.py @@ -4,7 +4,6 @@ import logging import os from functools import total_ordering -from pathlib import Path from netCDF4 import Dataset from PIL import Image @@ -214,11 +213,9 @@ def _initialize_activity(self, activity): def _initialize_entity(self): """Initialize the entity representing the file.""" if self.attributes is None: - self.attributes = {} - if "nc" in Path(self.filename).suffix: - with Dataset(self.filename, "r") as dataset: - for attr in dataset.ncattrs(): - self.attributes[attr] = dataset.getncattr(attr) + # This happens for ancestor files of preprocessor files as created + # in esmvalcore.preprocessor.Processorfile.__init__. + self.attributes = copy.deepcopy(self.filename.attributes) attributes = { "attribute:" + str(k).replace(" ", "_"): str(v) diff --git a/esmvalcore/local.py b/esmvalcore/local.py index 43acae37c6..70f30adee2 100644 --- a/esmvalcore/local.py +++ b/esmvalcore/local.py @@ -2,6 +2,7 @@ from __future__ import annotations +import copy import itertools import logging import os @@ -15,15 +16,18 @@ from cf_units import Unit from netCDF4 import Dataset, Variable -from .config import CFG -from .config._config import get_project_config -from .exceptions import RecipeError +from esmvalcore.config import CFG +from esmvalcore.config._config import get_project_config +from esmvalcore.exceptions import RecipeError +from esmvalcore.preprocessor._io import _load_from_file if TYPE_CHECKING: from collections.abc import Iterable - from .esgf import ESGFFile - from .typing import Facets, FacetValue + import iris.cube + + from esmvalcore.esgf import ESGFFile + from esmvalcore.typing import Facets, FacetValue logger = logging.getLogger(__name__) @@ -854,3 +858,34 @@ def facets(self) -> Facets: @facets.setter def facets(self, value: Facets) -> None: self._facets = value + + @property + def attributes(self) -> dict[str, Any]: + """Attributes read from the file.""" + if not hasattr(self, "_attributes"): + msg = ( + "Attributes have not been read yet. Call the `to_iris` method " + "first to read the attributes from the file." + ) + raise ValueError(msg) + return self._attributes + + @attributes.setter + def attributes(self, value: dict[str, Any]) -> None: + self._attributes = value + + def to_iris( + self, + ignore_warnings: list[dict[str, Any]] | None = None, + ) -> iris.cube.CubeList: + """Load the data as Iris cubes. + + Returns + ------- + iris.cube.CubeList + The loaded data. + """ + cubes = _load_from_file(self, ignore_warnings=ignore_warnings) + # Cache the attributes. + self.attributes = copy.deepcopy(dict(cubes[0].attributes.globals)) + return cubes diff --git a/esmvalcore/preprocessor/__init__.py b/esmvalcore/preprocessor/__init__.py index da0317e0c5..ff6f560cac 100644 --- a/esmvalcore/preprocessor/__init__.py +++ b/esmvalcore/preprocessor/__init__.py @@ -104,6 +104,7 @@ if TYPE_CHECKING: from collections.abc import Callable, Iterable, Sequence + import prov.model from dask.delayed import Delayed from esmvalcore.dataset import Dataset, File @@ -528,8 +529,9 @@ def __init__( input_files.extend(supplementary.files) ancestors = [TrackedFile(f) for f in input_files] else: - # Multimodel preprocessor functions set ancestors at runtime - # instead of here. + # Multimodel preprocessor functions set ancestors at runtime, + # in `esmvalcore.preprocessor.multi_model_statistics` and + # `esmvalcore.preprocessor.ensemble_statistics` instead of here. input_files = [] ancestors = [] @@ -556,6 +558,8 @@ def __init__( ancestors=ancestors, ) + self.activity = None + def check(self) -> None: """Check preprocessor settings.""" check_preprocessor_settings(self.settings) @@ -579,6 +583,10 @@ def cubes(self) -> list[Cube]: """Cubes.""" if self._cubes is None: self._cubes = [ds.load() for ds in self.datasets] # type: ignore + # Initialize provenance after loading the data, so that we can reuse + # the global attributes that have been read from the input files. + self.initialize_provenance(self.activity) + return self._cubes @cubes.setter @@ -669,6 +677,7 @@ def group(self, keys: list) -> str: def _apply_multimodel( products: set[PreprocessorFile], step: str, + activity: prov.model.ProvActivity, debug: bool | None, ) -> set[PreprocessorFile]: """Apply multi model step to products.""" @@ -679,6 +688,10 @@ def _apply_multimodel( step, "\n".join(str(p) for p in products - exclude), ) + for output_product_group in settings.get("output_products", {}).values(): + for output_product in output_product_group.values(): + output_product.initialize_provenance(activity) + result: list[PreprocessorFile] = preprocess( # type: ignore products - exclude, # type: ignore step, @@ -714,50 +727,10 @@ def __init__( self.debug = debug self.write_ncl_interface = write_ncl_interface - def _initialize_product_provenance(self) -> None: - """Initialize product provenance.""" - self._initialize_products(self.products) - self._initialize_multimodel_provenance() - self._initialize_ensemble_provenance() - - def _initialize_multiproduct_provenance(self, step: str) -> None: - input_products = self._get_input_products(step) - if input_products: - statistic_products = set() - - for input_product in input_products: - step_settings = input_product.settings[step] - output_products = step_settings.get("output_products", {}) - - for product in output_products.values(): - statistic_products.update(product.values()) - - self._initialize_products(statistic_products) - - def _initialize_multimodel_provenance(self) -> None: - """Initialize provenance for multi-model statistics.""" - step = "multi_model_statistics" - self._initialize_multiproduct_provenance(step) - - def _initialize_ensemble_provenance(self) -> None: - """Initialize provenance for ensemble statistics.""" - step = "ensemble_statistics" - self._initialize_multiproduct_provenance(step) - - def _get_input_products(self, step: str) -> list[PreprocessorFile]: - """Get input products.""" - return [ - product for product in self.products if step in product.settings - ] - - def _initialize_products(self, products: set[PreprocessorFile]) -> None: - """Initialize products.""" - for product in products: - product.initialize_provenance(self.activity) - def _run(self, _) -> list[str]: # noqa: C901,PLR0912 """Run the preprocessor.""" - self._initialize_product_provenance() + for product in self.products: + product.activity = self.activity steps = { step for product in self.products for step in product.settings @@ -773,6 +746,7 @@ def _run(self, _) -> list[str]: # noqa: C901,PLR0912 self.products = _apply_multimodel( self.products, step, + self.activity, self.debug, ) else: diff --git a/esmvalcore/preprocessor/_io.py b/esmvalcore/preprocessor/_io.py index 7c181a7f82..f050c4cfa7 100644 --- a/esmvalcore/preprocessor/_io.py +++ b/esmvalcore/preprocessor/_io.py @@ -113,7 +113,9 @@ def load( Invalid type for ``file``. """ - if isinstance(file, (str, Path)): + if hasattr(file, "to_iris"): + cubes = file.to_iris(ignore_warnings=ignore_warnings) + elif isinstance(file, (str, Path)): extension = ( file.suffix if isinstance(file, Path) diff --git a/tests/integration/preprocessor/test_preprocessing_task.py b/tests/integration/preprocessor/test_preprocessing_task.py index e25a361408..20e43a9029 100644 --- a/tests/integration/preprocessor/test_preprocessing_task.py +++ b/tests/integration/preprocessor/test_preprocessing_task.py @@ -1,5 +1,7 @@ """Tests for `esmvalcore.preprocessor.PreprocessingTask`.""" +from pathlib import Path + import iris import iris.cube import pytest @@ -7,6 +9,7 @@ import esmvalcore.preprocessor from esmvalcore.dataset import Dataset +from esmvalcore.local import LocalFile from esmvalcore.preprocessor import PreprocessingTask, PreprocessorFile @@ -15,11 +18,11 @@ def test_load_save_task(tmp_path, mocker, scheduler_lock): """Test that a task that just loads and saves a file.""" # Prepare a test dataset cube = iris.cube.Cube(data=[273.0], var_name="tas", units="K") - in_file = tmp_path / "tas_in.nc" + in_file = LocalFile(tmp_path / "tas_in.nc") iris.save(cube, in_file) dataset = Dataset(short_name="tas") dataset.files = [in_file] - dataset.load = lambda: cube.copy() + dataset.load = lambda: in_file.to_iris()[0] # Create task task = PreprocessingTask( @@ -62,8 +65,8 @@ def test_load_save_and_other_task(tmp_path, monkeypatch): # Prepare test datasets in_cube = iris.cube.Cube(data=[0.0], var_name="tas", units="degrees_C") (tmp_path / "climate_data").mkdir() - file1 = tmp_path / "climate_data" / "tas_dataset1.nc" - file2 = tmp_path / "climate_data" / "tas_dataset2.nc" + file1 = LocalFile(tmp_path / "climate_data" / "tas_dataset1.nc") + file2 = LocalFile(tmp_path / "climate_data" / "tas_dataset2.nc") # Save cubes for reading global attributes into provenance iris.save(in_cube, target=file1) @@ -71,11 +74,11 @@ def test_load_save_and_other_task(tmp_path, monkeypatch): dataset1 = Dataset(short_name="tas", dataset="dataset1") dataset1.files = [file1] - dataset1.load = lambda: in_cube.copy() + dataset1.load = lambda: file1.to_iris()[0] dataset2 = Dataset(short_name="tas", dataset="dataset1") dataset2.files = [file2] - dataset2.load = lambda: in_cube.copy() + dataset2.load = lambda: file2.to_iris()[0] # Create some mock preprocessor functions and patch # `esmvalcore.preprocessor` so it uses them. @@ -83,12 +86,18 @@ def single_preproc_func(cube): cube.data = cube.core_data() + 1.0 return cube - def multi_preproc_func(products): + def multi_preproc_func(products, output_products): + # Preprocessor function that mimics the behaviour of e.g. + # `esmvalcore.preprocessor.multi_model_statistics`.` for product in products: cube = product.cubes[0] cube.data = cube.core_data() + 1.0 product.cubes = [cube] - return products + output_product = output_products[""]["mean"] + output_product.cubes = [ + iris.cube.Cube([5.0], var_name="tas", units="degrees_C"), + ] + return products | {output_product} monkeypatch.setattr( esmvalcore.preprocessor, @@ -132,7 +141,17 @@ def multi_preproc_func(products): filename=tmp_path / "tas_dataset2.nc", settings={ "single_preproc_func": {}, - "multi_preproc_func": {}, + "multi_preproc_func": { + "output_products": { + "": { + "mean": PreprocessorFile( + filename=tmp_path / "tas_dataset2_mean.nc", + attributes={"dataset": "dataset2_mean"}, + settings={}, + ), + }, + }, + }, }, datasets=[dataset2], attributes={"dataset": "dataset2"}, @@ -149,9 +168,9 @@ def multi_preproc_func(products): task.run() - # Check that two files were saved and the preprocessor functions were + # Check that three files were saved and the preprocessor functions were # only applied to the second one. - assert len(task.products) == 2 + assert len(task.products) == 3 for product in task.products: print(product.filename) assert product.filename.exists() @@ -161,6 +180,11 @@ def multi_preproc_func(products): assert out_cube.data.tolist() == [0.0] elif product.attributes["dataset"] == "dataset2": assert out_cube.data.tolist() == [2.0] + elif product.attributes["dataset"] == "dataset2_mean": + assert out_cube.data.tolist() == [5.0] else: msg = "unexpected product" raise AssertionError(msg) + provenance_file = Path(product.provenance_file) + assert provenance_file.exists() + assert provenance_file.read_text(encoding="utf-8") diff --git a/tests/integration/recipe/test_recipe.py b/tests/integration/recipe/test_recipe.py index 1a8de202e4..adeb030ea3 100644 --- a/tests/integration/recipe/test_recipe.py +++ b/tests/integration/recipe/test_recipe.py @@ -1411,8 +1411,14 @@ def get_diagnostic_filename(basename, cfg, extension="nc"): def simulate_preprocessor_run(task): """Simulate preprocessor run.""" - task._initialize_product_provenance() for product in task.products: + # Populate the LocalFile.attributes attribute and initialize + # provenance as done in `PreprocessingTask.cubes`. + for dataset in product.datasets: + for file in dataset.files: + file.to_iris() + product.initialize_provenance(task.activity) + create_test_file(product.filename) product.save_provenance() @@ -1871,9 +1877,6 @@ def test_ensemble_statistics(tmp_path, patched_datafinder, session): assert len(product_out) == len(datasets) * len(statistics) - task._initialize_product_provenance() - assert next(iter(products)).provenance is not None - def test_multi_model_statistics(tmp_path, patched_datafinder, session): statistics = ["mean", "max"] @@ -1920,9 +1923,6 @@ def test_multi_model_statistics(tmp_path, patched_datafinder, session): assert len(product_out) == len(statistics) - task._initialize_product_provenance() - assert next(iter(products)).provenance is not None - def test_multi_model_statistics_exclude(tmp_path, patched_datafinder, session): statistics = ["mean", "max"] @@ -1976,8 +1976,6 @@ def test_multi_model_statistics_exclude(tmp_path, patched_datafinder, session): for id_, _ in product_out: assert id_ != "OBS" assert id_ == "CMIP5" - task._initialize_product_provenance() - assert next(iter(products)).provenance is not None def test_groupby_combined_statistics(tmp_path, patched_datafinder, session): diff --git a/tests/unit/local/test_to_iris.py b/tests/unit/local/test_to_iris.py new file mode 100644 index 0000000000..15a50729ac --- /dev/null +++ b/tests/unit/local/test_to_iris.py @@ -0,0 +1,33 @@ +import iris.cube +import pytest + +from esmvalcore.local import LocalFile + + +@pytest.fixture +def local_file(tmp_path): + cube = iris.cube.Cube([0]) + cube.attributes.globals["attribute"] = "value" + file = tmp_path / "test.nc" + iris.save(cube, file) + return LocalFile(file) + + +def test_to_iris(local_file): + cubes = local_file.to_iris() + assert len(cubes) == 1 + + +def test_attributes(local_file): + local_file.to_iris() # Load the file to populate attributes + attrs = local_file.attributes + assert attrs["attribute"] == "value" + + +def test_attributes_without_loading(local_file): + """Test that accessing attributes without loading the file first raises.""" + with pytest.raises( + ValueError, + match=r"Attributes have not been read yet.*", + ): + local_file.attributes # noqa: B018 diff --git a/tests/unit/provenance/test_trackedfile.py b/tests/unit/provenance/test_trackedfile.py index 1a72735940..9e22ca461b 100644 --- a/tests/unit/provenance/test_trackedfile.py +++ b/tests/unit/provenance/test_trackedfile.py @@ -1,42 +1,59 @@ +from pathlib import Path + import pytest from prov.model import ProvDocument from esmvalcore._provenance import ESMVALTOOL_URI_PREFIX, TrackedFile +from esmvalcore.local import LocalFile + + +@pytest.fixture +def tracked_input_file_nc(): + input_file_nc = LocalFile("/path/to/file.nc") + input_file_nc.attributes = {"a": "A"} + return TrackedFile(filename=input_file_nc) @pytest.fixture -def tracked_file_nc(): +def tracked_output_file_nc(): return TrackedFile( - filename="/path/to/file.nc", + filename=Path("/path/to/file.nc"), attributes={"a": "A"}, - prov_filename="/original/path/to/file.nc", ) @pytest.fixture -def tracked_file_grb(): - return TrackedFile( - filename="/path/to/file.grb", - prov_filename="/original/path/to/file.grb", - ) +def tracked_input_file_grb(): + input_file_grb = LocalFile("/path/to/file.grb") + input_file_grb.attributes = {"a": "A"} + return TrackedFile(filename=input_file_grb) -def test_init_nc(tracked_file_nc): +def test_init_input_nc(tracked_input_file_nc): """Test `esmvalcore._provenance.TrackedFile.__init__`.""" - assert tracked_file_nc.filename == "/path/to/file.nc" - assert tracked_file_nc.attributes == {"a": "A"} - assert tracked_file_nc.prov_filename == "/original/path/to/file.nc" + assert tracked_input_file_nc.filename == LocalFile("/path/to/file.nc") + assert tracked_input_file_nc.attributes is None -def test_init_grb(tracked_file_grb): +def test_init_output_nc(tracked_output_file_nc): """Test `esmvalcore._provenance.TrackedFile.__init__`.""" - assert tracked_file_grb.filename == "/path/to/file.grb" - assert tracked_file_grb.attributes is None - assert tracked_file_grb.prov_filename == "/original/path/to/file.grb" + assert tracked_output_file_nc.filename == Path("/path/to/file.nc") + assert tracked_output_file_nc.attributes == {"a": "A"} + +def test_init_grb(tracked_input_file_grb): + """Test `esmvalcore._provenance.TrackedFile.__init__`.""" + assert tracked_input_file_grb.filename == LocalFile("/path/to/file.grb") + assert tracked_input_file_grb.attributes is None -def test_initialize_provenance_nc(tracked_file_nc): + +@pytest.mark.parametrize( + "fixture_name", + ["tracked_input_file_nc", "tracked_output_file_nc"], +) +def test_initialize_provenance_nc(fixture_name, request): """Test `esmvalcore._provenance.TrackedFile.initialize_provenance`.""" + tracked_file_nc = request.getfixturevalue(fixture_name) provenance = ProvDocument() provenance.add_namespace("task", uri=ESMVALTOOL_URI_PREFIX + "task") activity = provenance.activity("task:test-task-name") @@ -48,21 +65,29 @@ def test_initialize_provenance_nc(tracked_file_nc): assert tracked_file_nc.attributes == {"a": "A"} -def test_initialize_provenance_grb(tracked_file_grb): +def test_initialize_provenance_grb(tracked_input_file_grb): """Test `esmvalcore._provenance.TrackedFile.initialize_provenance`.""" provenance = ProvDocument() provenance.add_namespace("task", uri=ESMVALTOOL_URI_PREFIX + "task") activity = provenance.activity("task:test-task-name") - tracked_file_grb.initialize_provenance(activity) - assert isinstance(tracked_file_grb.provenance, ProvDocument) - assert tracked_file_grb.activity == activity - assert str(tracked_file_grb.entity.identifier) == "file:/path/to/file.grb" - assert tracked_file_grb.attributes == {} + tracked_input_file_grb.initialize_provenance(activity) + assert isinstance(tracked_input_file_grb.provenance, ProvDocument) + assert tracked_input_file_grb.activity == activity + assert ( + str(tracked_input_file_grb.entity.identifier) + == "file:/path/to/file.grb" + ) + assert tracked_input_file_grb.attributes == {"a": "A"} -def test_copy_provenance(tracked_file_nc): +@pytest.mark.parametrize( + "fixture_name", + ["tracked_input_file_nc", "tracked_output_file_nc"], +) +def test_copy_provenance(fixture_name, request): """Test `esmvalcore._provenance.TrackedFile.copy_provenance`.""" + tracked_file_nc = request.getfixturevalue(fixture_name) provenance = ProvDocument() provenance.add_namespace("task", uri=ESMVALTOOL_URI_PREFIX + "task") activity = provenance.activity("task:test-task-name")