Skip to content

Commit fe9190c

Browse files
committed
Fix race condition in from_array for arrays with shards
1 parent 111d765 commit fe9190c

File tree

3 files changed

+80
-10
lines changed

3 files changed

+80
-10
lines changed

changes/3169.bugfix.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix race condition when passing array data in ``create_array(data=..)`` for an array that has a set shard size.

src/zarr/core/array.py

Lines changed: 55 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1174,6 +1174,18 @@ def cdata_shape(self) -> ChunkCoords:
11741174
"""
11751175
return tuple(starmap(ceildiv, zip(self.shape, self.chunks, strict=False)))
11761176

1177+
@property
1178+
def _shard_data_shape(self) -> ChunkCoords:
1179+
"""
1180+
The shape of the shard grid for this array.
1181+
1182+
Returns
1183+
-------
1184+
Tuple[int]
1185+
The shape of the chunk grid for this array.
1186+
"""
1187+
return tuple(starmap(ceildiv, zip(self.shape, self.shards or self.chunks, strict=False)))
1188+
11771189
@property
11781190
def nchunks(self) -> int:
11791191
"""
@@ -1216,7 +1228,11 @@ async def nbytes_stored(self) -> int:
12161228
return await self.store_path.store.getsize_prefix(self.store_path.path)
12171229

12181230
def _iter_chunk_coords(
1219-
self, *, origin: Sequence[int] | None = None, selection_shape: Sequence[int] | None = None
1231+
self,
1232+
*,
1233+
iter_shards: bool = False,
1234+
origin: Sequence[int] | None = None,
1235+
selection_shape: Sequence[int] | None = None,
12201236
) -> Iterator[ChunkCoords]:
12211237
"""
12221238
Create an iterator over the coordinates of chunks in chunk grid space. If the `origin`
@@ -1228,6 +1244,8 @@ def _iter_chunk_coords(
12281244
12291245
Parameters
12301246
----------
1247+
iter_shards : bool, default=False
1248+
Whether to iterate by shard (if True) or by chunk (if False).
12311249
origin : Sequence[int] | None, default=None
12321250
The origin of the selection relative to the array's chunk grid.
12331251
selection_shape : Sequence[int] | None, default=None
@@ -1238,7 +1256,11 @@ def _iter_chunk_coords(
12381256
chunk_coords: ChunkCoords
12391257
The coordinates of each chunk in the selection.
12401258
"""
1241-
return _iter_grid(self.cdata_shape, origin=origin, selection_shape=selection_shape)
1259+
return _iter_grid(
1260+
self._shard_data_shape if iter_shards else self.cdata_shape,
1261+
origin=origin,
1262+
selection_shape=selection_shape,
1263+
)
12421264

12431265
def _iter_chunk_keys(
12441266
self, *, origin: Sequence[int] | None = None, selection_shape: Sequence[int] | None = None
@@ -1265,13 +1287,19 @@ def _iter_chunk_keys(
12651287
yield self.metadata.encode_chunk_key(k)
12661288

12671289
def _iter_chunk_regions(
1268-
self, *, origin: Sequence[int] | None = None, selection_shape: Sequence[int] | None = None
1290+
self,
1291+
*,
1292+
iter_shards: bool = False,
1293+
origin: Sequence[int] | None = None,
1294+
selection_shape: Sequence[int] | None = None,
12691295
) -> Iterator[tuple[slice, ...]]:
12701296
"""
12711297
Iterate over the regions spanned by each chunk.
12721298
12731299
Parameters
12741300
----------
1301+
iter_shards : bool, default=False
1302+
Whether to iterate by shard (if True) or by chunk (if False).
12751303
origin : Sequence[int] | None, default=None
12761304
The origin of the selection relative to the array's chunk grid.
12771305
selection_shape : Sequence[int] | None, default=None
@@ -1282,11 +1310,12 @@ def _iter_chunk_regions(
12821310
region: tuple[slice, ...]
12831311
A tuple of slice objects representing the region spanned by each chunk in the selection.
12841312
"""
1313+
region_size = (self.shards or self.chunks) if iter_shards else self.chunks
12851314
for cgrid_position in self._iter_chunk_coords(
1286-
origin=origin, selection_shape=selection_shape
1315+
iter_shards=iter_shards, origin=origin, selection_shape=selection_shape
12871316
):
12881317
out: tuple[slice, ...] = ()
1289-
for c_pos, c_shape in zip(cgrid_position, self.chunks, strict=False):
1318+
for c_pos, c_shape in zip(cgrid_position, region_size, strict=False):
12901319
start = c_pos * c_shape
12911320
stop = start + c_shape
12921321
out += (slice(start, stop, 1),)
@@ -2184,6 +2213,13 @@ def cdata_shape(self) -> ChunkCoords:
21842213
"""
21852214
return tuple(starmap(ceildiv, zip(self.shape, self.chunks, strict=False)))
21862215

2216+
@property
2217+
def _shard_data_shape(self) -> ChunkCoords:
2218+
"""
2219+
The shape of the shard grid for this array.
2220+
"""
2221+
return tuple(starmap(ceildiv, zip(self.shape, self.shards or self.chunks, strict=False)))
2222+
21872223
@property
21882224
def nchunks(self) -> int:
21892225
"""
@@ -2271,7 +2307,10 @@ def nbytes_stored(self) -> int:
22712307
return sync(self._async_array.nbytes_stored())
22722308

22732309
def _iter_chunk_keys(
2274-
self, origin: Sequence[int] | None = None, selection_shape: Sequence[int] | None = None
2310+
self,
2311+
*,
2312+
origin: Sequence[int] | None = None,
2313+
selection_shape: Sequence[int] | None = None,
22752314
) -> Iterator[str]:
22762315
"""
22772316
Iterate over the storage keys of each chunk, relative to an optional origin, and optionally
@@ -2294,13 +2333,19 @@ def _iter_chunk_keys(
22942333
)
22952334

22962335
def _iter_chunk_regions(
2297-
self, origin: Sequence[int] | None = None, selection_shape: Sequence[int] | None = None
2336+
self,
2337+
*,
2338+
iter_shards: bool = False,
2339+
origin: Sequence[int] | None = None,
2340+
selection_shape: Sequence[int] | None = None,
22982341
) -> Iterator[tuple[slice, ...]]:
22992342
"""
23002343
Iterate over the regions spanned by each chunk.
23012344
23022345
Parameters
23032346
----------
2347+
iter_shards : bool, default=False
2348+
Whether to iterate by shard (if True) or by chunk (if False).
23042349
origin : Sequence[int] | None, default=None
23052350
The origin of the selection relative to the array's chunk grid.
23062351
selection_shape : Sequence[int] | None, default=None
@@ -2312,7 +2357,7 @@ def _iter_chunk_regions(
23122357
A tuple of slice objects representing the region spanned by each chunk in the selection.
23132358
"""
23142359
yield from self._async_array._iter_chunk_regions(
2315-
origin=origin, selection_shape=selection_shape
2360+
iter_shards=iter_shards, origin=origin, selection_shape=selection_shape
23162361
)
23172362

23182363
def __array__(
@@ -4100,7 +4145,7 @@ async def _copy_array_region(chunk_coords: ChunkCoords | slice, _data: Array) ->
41004145

41014146
# Stream data from the source array to the new array
41024147
await concurrent_map(
4103-
[(region, data) for region in result._iter_chunk_regions()],
4148+
[(region, data) for region in result._iter_chunk_regions(iter_shards=True)],
41044149
_copy_array_region,
41054150
zarr.core.config.config.get("async.concurrency"),
41064151
)
@@ -4111,7 +4156,7 @@ async def _copy_arraylike_region(chunk_coords: slice, _data: NDArrayLike) -> Non
41114156

41124157
# Stream data from the source array to the new array
41134158
await concurrent_map(
4114-
[(region, data) for region in result._iter_chunk_regions()],
4159+
[(region, data) for region in result._iter_chunk_regions(iter_shards=True)],
41154160
_copy_arraylike_region,
41164161
zarr.core.config.config.get("async.concurrency"),
41174162
)

tests/test_array.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1644,6 +1644,30 @@ async def test_from_array(
16441644
assert result.chunks == new_chunks
16451645

16461646

1647+
@pytest.mark.parametrize("store", ["memory"], indirect=True)
1648+
@pytest.mark.parametrize("chunks", [(10, 10)])
1649+
@pytest.mark.parametrize("shards", [(60, 60)])
1650+
async def test_from_array_shards(
1651+
store: Store,
1652+
zarr_format: ZarrFormat,
1653+
chunks: tuple[int, ...],
1654+
shards: tuple[int, ...],
1655+
) -> None:
1656+
# Regression test for https://github.com/zarr-developers/zarr-python/issues/3169
1657+
source_data = np.arange(3600).reshape((60, 60))
1658+
1659+
zarr.create_array(
1660+
store=store,
1661+
data=source_data,
1662+
chunks=chunks,
1663+
shards=shards,
1664+
)
1665+
1666+
array = zarr.open_array(store=store)
1667+
1668+
assert np.array_equal(array[:], source_data)
1669+
1670+
16471671
@pytest.mark.parametrize("store", ["local"], indirect=True)
16481672
@pytest.mark.parametrize("chunks", ["keep", "auto"])
16491673
@pytest.mark.parametrize("write_data", [True, False])

0 commit comments

Comments
 (0)