Skip to content

Commit f265989

Browse files
authored
Avoid issue with deepcopying/pickling IntakeESGFDatasets (#2990)
1 parent e926bc3 commit f265989

File tree

5 files changed

+66
-4
lines changed

5 files changed

+66
-4
lines changed

esmvalcore/_recipe/recipe.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -520,7 +520,12 @@ def _update_multiproduct(
520520

521521
if step == "ensemble_statistics":
522522
check.ensemble_statistics_preproc(settings)
523-
grouping = ["project", "dataset", "exp", "sub_experiment"]
523+
grouping: tuple[str, ...] | None = (
524+
"project",
525+
"dataset",
526+
"exp",
527+
"sub_experiment",
528+
)
524529
else:
525530
check.multimodel_statistics_preproc(settings)
526531
grouping = settings.get("groupby", None)

esmvalcore/io/intake_esgf.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from __future__ import annotations
2424

2525
import copy
26+
import logging
2627
from dataclasses import dataclass, field
2728
from pathlib import Path
2829
from typing import TYPE_CHECKING, Any
@@ -47,6 +48,8 @@
4748
"IntakeESGFDataset",
4849
]
4950

51+
logger = logging.getLogger(__name__)
52+
5053

5154
class _CachingCatalog(intake_esgf.ESGFCatalog):
5255
"""An ESGF catalog that caches to_path_dict results."""
@@ -122,7 +125,16 @@ def __hash__(self) -> int:
122125

123126
def prepare(self) -> None:
124127
"""Prepare the data for access."""
125-
self.catalog.to_path_dict(minimal_keys=False)
128+
try:
129+
self.catalog.to_path_dict(minimal_keys=False, quiet=True)
130+
except intake_esgf.exceptions.DatasetLoadError:
131+
logger.error(
132+
"Failed to download dataset '%s' from the ESGF. Error messages:\n%s",
133+
self.name,
134+
self.catalog.session_log(),
135+
)
136+
raise
137+
126138
for index in self.catalog.indices:
127139
# Set the sessions to None to avoid issues with pickling
128140
# requests_cache.CachedSession objects when max_parallel_tasks > 1.

esmvalcore/preprocessor/__init__.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
extract_levels,
7070
extract_location,
7171
extract_point,
72+
is_dataset,
7273
regrid,
7374
)
7475
from esmvalcore.preprocessor._rolling_window import rolling_window_statistics
@@ -619,7 +620,7 @@ def __init__(
619620
self,
620621
filename: Path,
621622
attributes: dict[str, Any] | None = None,
622-
settings: dict[str, Any] | None = None,
623+
settings: dict[str, dict[str, Any]] | None = None,
623624
datasets: list[Dataset] | None = None,
624625
) -> None:
625626
if datasets is not None:
@@ -644,6 +645,22 @@ def __init__(
644645
# Set some preprocessor settings (move all defaults here?)
645646
if settings is None:
646647
settings = {}
648+
649+
# Create a copy of any datasets in settings. This drops the information
650+
# in Dataset.files and avoids issues with deepcopying and pickling
651+
# those files. This is needed because
652+
# esmvalcore.io.intake_esgf.IntakeESGFDataset objects use a
653+
# cached_requests.CachedSession object that cannot be deepcopied or
654+
# pickled.
655+
settings = {
656+
fn: {
657+
arg: (
658+
value.copy() if is_dataset(value) else copy.deepcopy(value)
659+
)
660+
for arg, value in kwargs.items()
661+
}
662+
for fn, kwargs in settings.items()
663+
}
647664
self.settings = copy.deepcopy(settings)
648665
if attributes is None:
649666
attributes = {}

tests/integration/recipe/test_recipe.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -971,6 +971,11 @@ def test_reference_dataset(tmp_path, patched_datafinder, session, monkeypatch):
971971
)
972972

973973
assert product.settings["regrid"]["target_grid"] == reference.datasets[0]
974+
# Check that the target dataset does not have files, to prevent pickling
975+
# errors: https://github.com/ESMValGroup/ESMValCore/issues/2989.
976+
# The files can be found again at load time.
977+
assert product.settings["regrid"]["target_grid"]._files is None
978+
974979
assert product.settings["extract_levels"]["levels"] == levels
975980

976981
get_reference_levels.assert_called_once_with(reference.datasets[0])

tests/unit/io/test_intake_esgf.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from typing import TYPE_CHECKING
77

88
import intake_esgf
9+
import intake_esgf.exceptions
910
import iris.cube
1011
import pandas as pd
1112
import pytest
@@ -35,7 +36,29 @@ def test_prepare(mocker: MockerFixture) -> None:
3536
dataset = IntakeESGFDataset(name="id", facets={}, catalog=cat)
3637

3738
dataset.prepare()
38-
to_path_mock.assert_called_once_with(minimal_keys=False)
39+
to_path_mock.assert_called_once_with(minimal_keys=False, quiet=True)
40+
41+
42+
def test_prepare_fails(mocker: MockerFixture) -> None:
43+
"""IntakeESGFDataset.prepare should should log catalog.session_log() on failure."""
44+
cat = intake_esgf.ESGFCatalog()
45+
exc = intake_esgf.exceptions.DatasetLoadError(
46+
["CMCC.CMCC - CMS.historical.day.atmos.day.r1i1p1.sfcWind"],
47+
None,
48+
)
49+
to_path_mock = mocker.patch.object(
50+
cat,
51+
"to_path_dict",
52+
autospec=True,
53+
side_effect=exc,
54+
)
55+
session_log_mock = mocker.patch.object(cat, "session_log", autospec=True)
56+
dataset = IntakeESGFDataset(name="id", facets={}, catalog=cat)
57+
58+
with pytest.raises(intake_esgf.exceptions.DatasetLoadError):
59+
dataset.prepare()
60+
to_path_mock.assert_called_once_with(minimal_keys=False, quiet=True)
61+
session_log_mock.assert_called_once_with()
3962

4063

4164
def test_attributes_raises_before_to_iris() -> None:

0 commit comments

Comments
 (0)