Skip to content

Commit 78b3e9d

Browse files
authored
Fix to_netcdf(compute=False) with Dask distributed (#10730)
* Fix to_netcdf(compute=False) with Dask distributed Fixes #10725 * Silence incorrect mypy error
1 parent 36ee500 commit 78b3e9d

File tree

3 files changed

+26
-12
lines changed

3 files changed

+26
-12
lines changed

doc/whats-new.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ Bug fixes
3434
- Fix error when encoding an empty :py:class:`numpy.datetime64` array
3535
(:issue:`10722`, :pull:`10723`). By `Spencer Clark
3636
<https://github.com/spencerkclark>`_.
37+
- Fix error from ``to_netcdf(..., compute=False)`` when using Dask Distributed
38+
(:issue:`10725`).
39+
By `Stephan Hoyer <https://github.com/shoyer>`_.
3740
- Propagation coordinate attrs in :py:meth:`xarray.Dataset.map` (:issue:`9317`, :pull:`10602`).
3841
By `Justus Magin <https://github.com/keewis>`_.
3942

xarray/backends/api.py

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1858,6 +1858,20 @@ def open_mfdataset(
18581858
return combined
18591859

18601860

1861+
def _get_netcdf_autoclose(dataset: Dataset, engine: T_NetcdfEngine) -> bool:
1862+
"""Should we close files after each write operations?"""
1863+
scheduler = get_dask_scheduler()
1864+
have_chunks = any(v.chunks is not None for v in dataset.variables.values())
1865+
1866+
autoclose = have_chunks and scheduler in ["distributed", "multiprocessing"]
1867+
if autoclose and engine == "scipy":
1868+
raise NotImplementedError(
1869+
f"Writing netCDF files with the {engine} backend "
1870+
f"is not currently supported with dask's {scheduler} scheduler"
1871+
)
1872+
return autoclose
1873+
1874+
18611875
WRITEABLE_STORES: dict[T_NetcdfEngine, Callable] = {
18621876
"netcdf4": backends.NetCDF4DataStore.open,
18631877
"scipy": backends.ScipyDataStore,
@@ -2064,16 +2078,7 @@ def to_netcdf(
20642078
# sanitize unlimited_dims
20652079
unlimited_dims = _sanitize_unlimited_dims(dataset, unlimited_dims)
20662080

2067-
# handle scheduler specific logic
2068-
scheduler = get_dask_scheduler()
2069-
have_chunks = any(v.chunks is not None for v in dataset.variables.values())
2070-
2071-
autoclose = have_chunks and scheduler in ["distributed", "multiprocessing"]
2072-
if autoclose and engine == "scipy":
2073-
raise NotImplementedError(
2074-
f"Writing netCDF files with the {engine} backend "
2075-
f"is not currently supported with dask's {scheduler} scheduler"
2076-
)
2081+
autoclose = _get_netcdf_autoclose(dataset, engine)
20772082

20782083
if path_or_file is None:
20792084
if not compute:
@@ -2116,7 +2121,7 @@ def to_netcdf(
21162121
writes = writer.sync(compute=compute)
21172122

21182123
finally:
2119-
if not multifile:
2124+
if not multifile and not autoclose: # type: ignore[redundant-expr,unused-ignore]
21202125
if compute:
21212126
store.close()
21222127
else:

xarray/tests/test_distributed.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,11 +85,13 @@ def tmp_netcdf_filename(tmpdir):
8585

8686

8787
@pytest.mark.parametrize("engine,nc_format", ENGINES_AND_FORMATS)
88+
@pytest.mark.parametrize("compute", [True, False])
8889
def test_dask_distributed_netcdf_roundtrip(
8990
loop, # noqa: F811
9091
tmp_netcdf_filename,
9192
engine,
9293
nc_format,
94+
compute,
9395
):
9496
if engine not in ENGINES:
9597
pytest.skip("engine not available")
@@ -107,7 +109,11 @@ def test_dask_distributed_netcdf_roundtrip(
107109
)
108110
return
109111

110-
original.to_netcdf(tmp_netcdf_filename, engine=engine, format=nc_format)
112+
result = original.to_netcdf(
113+
tmp_netcdf_filename, engine=engine, format=nc_format, compute=compute
114+
)
115+
if not compute:
116+
result.compute()
111117

112118
with xr.open_dataset(
113119
tmp_netcdf_filename, chunks=chunks, engine=engine

0 commit comments

Comments
 (0)