Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions esmvalcore/_provenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import logging
import os
from functools import total_ordering
from pathlib import Path

from netCDF4 import Dataset
from PIL import Image
Expand Down Expand Up @@ -214,11 +213,9 @@ def _initialize_activity(self, activity):
def _initialize_entity(self):
"""Initialize the entity representing the file."""
if self.attributes is None:
self.attributes = {}
if "nc" in Path(self.filename).suffix:
with Dataset(self.filename, "r") as dataset:
for attr in dataset.ncattrs():
self.attributes[attr] = dataset.getncattr(attr)
# This happens for ancestor files of preprocessor files as created
# in esmvalcore.preprocessor.Processorfile.__init__.
self.attributes = copy.deepcopy(self.filename.attributes)

attributes = {
"attribute:" + str(k).replace(" ", "_"): str(v)
Expand Down
45 changes: 40 additions & 5 deletions esmvalcore/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import copy
import itertools
import logging
import os
Expand All @@ -15,15 +16,18 @@
from cf_units import Unit
from netCDF4 import Dataset, Variable

from .config import CFG
from .config._config import get_project_config
from .exceptions import RecipeError
from esmvalcore.config import CFG
from esmvalcore.config._config import get_project_config
from esmvalcore.exceptions import RecipeError
from esmvalcore.preprocessor._io import _load_from_file

if TYPE_CHECKING:
from collections.abc import Iterable

from .esgf import ESGFFile
from .typing import Facets, FacetValue
import iris.cube

from esmvalcore.esgf import ESGFFile
from esmvalcore.typing import Facets, FacetValue

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -854,3 +858,34 @@ def facets(self) -> Facets:
@facets.setter
def facets(self, value: Facets) -> None:
self._facets = value

@property
def attributes(self) -> dict[str, Any]:
"""Attributes read from the file."""
if not hasattr(self, "_attributes"):
msg = (
"Attributes have not been read yet. Call the `to_iris` method "
"first to read the attributes from the file."
)
raise ValueError(msg)
return self._attributes

@attributes.setter
def attributes(self, value: dict[str, Any]) -> None:
self._attributes = value

def to_iris(
self,
ignore_warnings: list[dict[str, Any]] | None = None,
) -> iris.cube.CubeList:
"""Load the data as Iris cubes.
Returns
-------
iris.cube.CubeList
The loaded data.
"""
cubes = _load_from_file(self, ignore_warnings=ignore_warnings)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where does this sit inside the stack - as in, can we not pass already loaded cube(lists) sitting in memory, do we have to load separately? It's just that iris I/O is costly

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The preprocessor function load calls this method instead of acessing the files itself. That makes it possible to introduce more classes like LocalFile for different data sources in #2765 without further modifications of the existing code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok - am not 100% sold on that but let's see the final structure when we use it via #2765 - definitely not the place to complain here πŸ˜ƒ

# Cache the attributes.
self.attributes = copy.deepcopy(dict(cubes[0].attributes.globals))
return cubes
62 changes: 18 additions & 44 deletions esmvalcore/preprocessor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
if TYPE_CHECKING:
from collections.abc import Callable, Iterable, Sequence

import prov.model
from dask.delayed import Delayed

from esmvalcore.dataset import Dataset, File
Expand Down Expand Up @@ -528,8 +529,9 @@ def __init__(
input_files.extend(supplementary.files)
ancestors = [TrackedFile(f) for f in input_files]
else:
# Multimodel preprocessor functions set ancestors at runtime
# instead of here.
# Multimodel preprocessor functions set ancestors at runtime,
# in `esmvalcore.preprocessor.multi_model_statistics` and
# `esmvalcore.preprocessor.ensemble_statistics` instead of here.
input_files = []
ancestors = []

Expand All @@ -556,6 +558,8 @@ def __init__(
ancestors=ancestors,
)

self.activity = None

def check(self) -> None:
"""Check preprocessor settings."""
check_preprocessor_settings(self.settings)
Expand All @@ -579,6 +583,10 @@ def cubes(self) -> list[Cube]:
"""Cubes."""
if self._cubes is None:
self._cubes = [ds.load() for ds in self.datasets] # type: ignore
# Initialize provenance after loading the data, so that we can reuse
# the global attributes that have been read from the input files.
self.initialize_provenance(self.activity)

return self._cubes

@cubes.setter
Expand Down Expand Up @@ -669,6 +677,7 @@ def group(self, keys: list) -> str:
def _apply_multimodel(
products: set[PreprocessorFile],
step: str,
activity: prov.model.ProvActivity,
debug: bool | None,
) -> set[PreprocessorFile]:
"""Apply multi model step to products."""
Expand All @@ -679,6 +688,10 @@ def _apply_multimodel(
step,
"\n".join(str(p) for p in products - exclude),
)
for output_product_group in settings.get("output_products", {}).values():
for output_product in output_product_group.values():
output_product.initialize_provenance(activity)

result: list[PreprocessorFile] = preprocess( # type: ignore
products - exclude, # type: ignore
step,
Expand Down Expand Up @@ -714,50 +727,10 @@ def __init__(
self.debug = debug
self.write_ncl_interface = write_ncl_interface

def _initialize_product_provenance(self) -> None:
"""Initialize product provenance."""
self._initialize_products(self.products)
self._initialize_multimodel_provenance()
self._initialize_ensemble_provenance()

def _initialize_multiproduct_provenance(self, step: str) -> None:
input_products = self._get_input_products(step)
if input_products:
statistic_products = set()

for input_product in input_products:
step_settings = input_product.settings[step]
output_products = step_settings.get("output_products", {})

for product in output_products.values():
statistic_products.update(product.values())

self._initialize_products(statistic_products)

def _initialize_multimodel_provenance(self) -> None:
"""Initialize provenance for multi-model statistics."""
step = "multi_model_statistics"
self._initialize_multiproduct_provenance(step)

def _initialize_ensemble_provenance(self) -> None:
"""Initialize provenance for ensemble statistics."""
step = "ensemble_statistics"
self._initialize_multiproduct_provenance(step)

def _get_input_products(self, step: str) -> list[PreprocessorFile]:
"""Get input products."""
return [
product for product in self.products if step in product.settings
]

def _initialize_products(self, products: set[PreprocessorFile]) -> None:
"""Initialize products."""
for product in products:
product.initialize_provenance(self.activity)

def _run(self, _) -> list[str]: # noqa: C901,PLR0912
"""Run the preprocessor."""
self._initialize_product_provenance()
for product in self.products:
product.activity = self.activity

steps = {
step for product in self.products for step in product.settings
Expand All @@ -773,6 +746,7 @@ def _run(self, _) -> list[str]: # noqa: C901,PLR0912
self.products = _apply_multimodel(
self.products,
step,
self.activity,
self.debug,
)
else:
Expand Down
4 changes: 3 additions & 1 deletion esmvalcore/preprocessor/_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ def load(
Invalid type for ``file``.
"""
if isinstance(file, (str, Path)):
if hasattr(file, "to_iris"):
cubes = file.to_iris(ignore_warnings=ignore_warnings)
elif isinstance(file, (str, Path)):
extension = (
file.suffix
if isinstance(file, Path)
Expand Down
46 changes: 35 additions & 11 deletions tests/integration/preprocessor/test_preprocessing_task.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
"""Tests for `esmvalcore.preprocessor.PreprocessingTask`."""

from pathlib import Path

import iris
import iris.cube
import pytest
from prov.model import ProvDocument

import esmvalcore.preprocessor
from esmvalcore.dataset import Dataset
from esmvalcore.local import LocalFile
from esmvalcore.preprocessor import PreprocessingTask, PreprocessorFile


Expand All @@ -15,11 +18,11 @@ def test_load_save_task(tmp_path, mocker, scheduler_lock):
"""Test that a task that just loads and saves a file."""
# Prepare a test dataset
cube = iris.cube.Cube(data=[273.0], var_name="tas", units="K")
in_file = tmp_path / "tas_in.nc"
in_file = LocalFile(tmp_path / "tas_in.nc")
iris.save(cube, in_file)
dataset = Dataset(short_name="tas")
dataset.files = [in_file]
dataset.load = lambda: cube.copy()
dataset.load = lambda: in_file.to_iris()[0]

# Create task
task = PreprocessingTask(
Expand Down Expand Up @@ -62,33 +65,39 @@ def test_load_save_and_other_task(tmp_path, monkeypatch):
# Prepare test datasets
in_cube = iris.cube.Cube(data=[0.0], var_name="tas", units="degrees_C")
(tmp_path / "climate_data").mkdir()
file1 = tmp_path / "climate_data" / "tas_dataset1.nc"
file2 = tmp_path / "climate_data" / "tas_dataset2.nc"
file1 = LocalFile(tmp_path / "climate_data" / "tas_dataset1.nc")
file2 = LocalFile(tmp_path / "climate_data" / "tas_dataset2.nc")

# Save cubes for reading global attributes into provenance
iris.save(in_cube, target=file1)
iris.save(in_cube, target=file2)

dataset1 = Dataset(short_name="tas", dataset="dataset1")
dataset1.files = [file1]
dataset1.load = lambda: in_cube.copy()
dataset1.load = lambda: file1.to_iris()[0]

dataset2 = Dataset(short_name="tas", dataset="dataset1")
dataset2.files = [file2]
dataset2.load = lambda: in_cube.copy()
dataset2.load = lambda: file2.to_iris()[0]

# Create some mock preprocessor functions and patch
# `esmvalcore.preprocessor` so it uses them.
def single_preproc_func(cube):
cube.data = cube.core_data() + 1.0
return cube

def multi_preproc_func(products):
def multi_preproc_func(products, output_products):
# Preprocessor function that mimics the behaviour of e.g.
# `esmvalcore.preprocessor.multi_model_statistics`.`
for product in products:
cube = product.cubes[0]
cube.data = cube.core_data() + 1.0
product.cubes = [cube]
return products
output_product = output_products[""]["mean"]
output_product.cubes = [
iris.cube.Cube([5.0], var_name="tas", units="degrees_C"),
]
return products | {output_product}

monkeypatch.setattr(
esmvalcore.preprocessor,
Expand Down Expand Up @@ -132,7 +141,17 @@ def multi_preproc_func(products):
filename=tmp_path / "tas_dataset2.nc",
settings={
"single_preproc_func": {},
"multi_preproc_func": {},
"multi_preproc_func": {
"output_products": {
"": {
"mean": PreprocessorFile(
filename=tmp_path / "tas_dataset2_mean.nc",
attributes={"dataset": "dataset2_mean"},
settings={},
),
},
},
},
},
datasets=[dataset2],
attributes={"dataset": "dataset2"},
Expand All @@ -149,9 +168,9 @@ def multi_preproc_func(products):

task.run()

# Check that two files were saved and the preprocessor functions were
# Check that three files were saved and the preprocessor functions were
# only applied to the second one.
assert len(task.products) == 2
assert len(task.products) == 3
for product in task.products:
print(product.filename)
assert product.filename.exists()
Expand All @@ -161,6 +180,11 @@ def multi_preproc_func(products):
assert out_cube.data.tolist() == [0.0]
elif product.attributes["dataset"] == "dataset2":
assert out_cube.data.tolist() == [2.0]
elif product.attributes["dataset"] == "dataset2_mean":
assert out_cube.data.tolist() == [5.0]
else:
msg = "unexpected product"
raise AssertionError(msg)
provenance_file = Path(product.provenance_file)
assert provenance_file.exists()
assert provenance_file.read_text(encoding="utf-8")
16 changes: 7 additions & 9 deletions tests/integration/recipe/test_recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1411,8 +1411,14 @@ def get_diagnostic_filename(basename, cfg, extension="nc"):

def simulate_preprocessor_run(task):
"""Simulate preprocessor run."""
task._initialize_product_provenance()
for product in task.products:
# Populate the LocalFile.attributes attribute and initialize
# provenance as done in `PreprocessingTask.cubes`.
for dataset in product.datasets:
for file in dataset.files:
file.to_iris()
product.initialize_provenance(task.activity)

create_test_file(product.filename)
product.save_provenance()

Expand Down Expand Up @@ -1871,9 +1877,6 @@ def test_ensemble_statistics(tmp_path, patched_datafinder, session):

assert len(product_out) == len(datasets) * len(statistics)

task._initialize_product_provenance()
assert next(iter(products)).provenance is not None


def test_multi_model_statistics(tmp_path, patched_datafinder, session):
statistics = ["mean", "max"]
Expand Down Expand Up @@ -1920,9 +1923,6 @@ def test_multi_model_statistics(tmp_path, patched_datafinder, session):

assert len(product_out) == len(statistics)

task._initialize_product_provenance()
assert next(iter(products)).provenance is not None


def test_multi_model_statistics_exclude(tmp_path, patched_datafinder, session):
statistics = ["mean", "max"]
Expand Down Expand Up @@ -1976,8 +1976,6 @@ def test_multi_model_statistics_exclude(tmp_path, patched_datafinder, session):
for id_, _ in product_out:
assert id_ != "OBS"
assert id_ == "CMIP5"
task._initialize_product_provenance()
assert next(iter(products)).provenance is not None


def test_groupby_combined_statistics(tmp_path, patched_datafinder, session):
Expand Down
Loading