Possible race condition writing to zarr store with dask distributed and region kwarg #8222
Replies: 10 comments 2 replies
-
dask 2022.3.0 pyhd8ed1ab_0 conda-forge |
Beta Was this translation helpful? Give feedback.
-
Beta Was this translation helpful? Give feedback.
-
Doing: local_ds=nuvh_ds.compute()
local_ds.to_zarr(output_zarrstore,consolidated=True,region={"Time":slice(start_isel,end_isel)}) creates a proper dataset without holes. |
Beta Was this translation helpful? Give feedback.
-
Updating to latest dask/zarr did not change any behaviour. |
Beta Was this translation helpful? Give feedback.
-
Hi @rodgerduffett — thanks for the issue. These sorts of issues can be difficult because there are so many moving pieces. Could you try reducing the size of your example a lot? Could you also fill out the issue template? Thanks |
Beta Was this translation helpful? Give feedback.
-
Thank you for looking at this. I understand it is a tricky kind of problem! Please advise on how to fill out the issue template? When I created this I clicked 'New issue' and it popped up the edit box with '### What is your issue?' There does not seem to be more than this? |
Beta Was this translation helpful? Give feedback.
-
It would help if you provided a minimal example that we could run using LocalCluster. |
Beta Was this translation helpful? Give feedback.
-
Closing but feel free to reopen with an MCVE |
Beta Was this translation helpful? Give feedback.
-
import xarray as xr
import numpy as np
import pandas as pd
from dask.distributed import Client
import dask.bag as db
time_len = 10
np.random.seed(0)
temperature = 15 + 8 * np.random.randn(2, 2, time_len)
precipitation = 10 * np.random.rand(2, 2, time_len)
lon = [[-99.83, -99.32], [-99.79, -99.23]]
lat = [[42.25, 42.21], [42.63, 42.59]]
time = pd.date_range("2014-09-06", periods=time_len)
ds = xr.Dataset(
data_vars=dict(
temperature=(["x", "y", "time"], temperature),
precipitation=(["x", "y", "time"], precipitation),
),
coords=dict(
lon=(["x", "y"], lon),
lat=(["x", "y"], lat),
time=time,
),
attrs=dict(description="Weather related data."),
)
ds.to_zarr('example_race.zarr',mode='w')
def func(ii):
time_slice = slice(ii,ii+1)
ds_sub = ds.isel(time=time_slice).drop_vars(["lon","lat"])
print("Saving zarr time region: ",time_slice)
ds_sub.to_zarr('example_race.zarr',
consolidated=True,
mode="r+",
region={"time": time_slice})
if __name__ == '__main__':
with Client(threads_per_worker=2, n_workers=2, memory_limit="20.GiB") as client:
inputs = db.from_sequence(np.arange(0,time_len,1), npartitions=4)
db.map(func,ii=inputs).compute()``` |
Beta Was this translation helpful? Give feedback.
-
Hi @nicdel8888. Thanks for your example. In general, Xarray+Zarr+Dask do not enforce synchronization when writing from distributed processes. It is left to the user to make sure that separate writes do not clobber one another. That said, When I run your example, I don't see any missing data in the resulting store. You're regions to not overlap in this example so I don't see how you would run into a conflict. One note: you probably want to avoid running |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
What is your issue?
I have a dataset created using apply_ufunc that randomly fails to write with a 'FileNotFound' error. This appears to get be more frequent if the job is started with more workers in the cluster. After job dies with the error the file is found to be present and appears to contain data i.e. is non-zero.
I am running a SlurmCluster. Output is to a Beegfs filesystem mounted locally on each node in the cluster. I have not been able to find any indication of general file system or networking errors on the nodes running during the job.
The basic pseudocode flow is:
The error when the process dies is:
This file does exists by the time I can get to look at it after the job has died.
Please advise on how to troubleshoot this?
Many thanks!
Beta Was this translation helpful? Give feedback.
All reactions