Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 42 additions & 12 deletions tests/test_20_open_dataset.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,36 @@
from pathlib import Path

import dask
import pytest
import xarray as xr


@pytest.mark.parametrize("download", [True, False])
class CountingScheduler:
"""Simple dask scheduler counting the number of computes.

Reference: https://stackoverflow.com/questions/53289286/"""

def __init__(self, max_computes: int = 0) -> None:
self.total_computes = 0
self.max_computes = max_computes

def __call__(self, dsk, keys, **kwargs): # type: ignore[no-untyped-def]
self.total_computes += 1
if self.total_computes > self.max_computes:
msg = f"Too many computes. Total: {self.total_computes:d} > max: {self.max_computes:d}."
raise RuntimeError(msg)
return dask.get(dsk, keys, **kwargs)


def raise_if_dask_computes(max_computes: int = 0) -> dask.config.set:
scheduler = CountingScheduler(max_computes)
return dask.config.set(scheduler=scheduler)


@pytest.mark.parametrize(
"download",
[True, False],
)
def test_open_dataset(tmp_path: Path, index_node: str, download: bool) -> None:
esgpull_path = tmp_path / "esgpull"
selection = {
Expand All @@ -21,24 +47,24 @@ def test_open_dataset(tmp_path: Path, index_node: str, download: bool) -> None:
'"CMIP6.ScenarioMIP.EC-Earth-Consortium.EC-Earth3-CC.ssp585.r1i1p1f1.fx.areacella.gr.v20210113.areacella_fx_EC-Earth3-CC_ssp585_r1i1p1f1_gr.nc"',
]
}
ds = xr.open_dataset(
selection, # type: ignore[arg-type]
esgpull_path=esgpull_path,
concat_dims="experiment_id",
engine="esgf",
index_node=index_node,
download=download,
chunks={},
)

with raise_if_dask_computes():
ds = xr.open_dataset(
selection, # type: ignore[arg-type]
esgpull_path=esgpull_path,
concat_dims="experiment_id",
engine="esgf",
index_node=index_node,
download=download,
chunks={},
)
assert (esgpull_path / "data" / "CMIP6").exists() is download

# Chunks
for dim in ds.dims:
assert not ds[dim].chunks
assert ds.chunksizes == {
"experiment_id": (1, 1),
"time": (12, 12),
"time": (24,),
"lat": (256,),
"lon": (512,),
"bnds": (2,),
Expand Down Expand Up @@ -83,3 +109,7 @@ def test_open_dataset(tmp_path: Path, index_node: str, download: bool) -> None:
"CMIP6.ScenarioMIP.EC-Earth-Consortium.EC-Earth3-CC.ssp585.r1i1p1f1.Amon.tas.gr.v20210113",
"CMIP6.ScenarioMIP.EC-Earth-Consortium.EC-Earth3-CC.ssp585.r1i1p1f1.fx.areacella.gr.v20210113",
]

# Compute
with raise_if_dask_computes(1):
ds.compute()
6 changes: 4 additions & 2 deletions xarray_esgf/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ def open_dataset(
):
ds = xr.open_dataset(
self._client.fs[file].drs if download else file.url,
chunks=-1,
engine="h5netcdf",
drop_variables=drop_variables,
)
Expand Down Expand Up @@ -171,6 +170,9 @@ def open_dataset(

for name, var in obj.variables.items():
if name not in obj.dims:
var.encoding["preferred_chunks"] = dict(var.chunksizes)
var.encoding["preferred_chunks"] = {
dim: (1,) * var.sizes[dim]
for dim in set(var.dims) & set(concat_dims)
}

return obj