Skip to content

Commit c827225

Browse files
committed
Add test and make max gap and max coalesce size config options
1 parent 590117e commit c827225

File tree

3 files changed

+38
-11
lines changed

3 files changed

+38
-11
lines changed

src/zarr/codecs/sharding.py

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -569,9 +569,10 @@ async def _load_partial_shard_maybe(
569569
def _coalesce_chunks(
570570
self,
571571
chunks: list[_ChunkCoordsByteSlice],
572-
max_gap_bytes: int = 2**20, # 1MiB
573-
coalesce_max_bytes: int = 100 * 2**20, # 100MiB
574572
) -> list[list[_ChunkCoordsByteSlice]]:
573+
max_gap_bytes = config.get("sharding.read.coalesce_max_gap_bytes")
574+
coalesce_max_bytes = config.get("sharding.read.coalesce_max_bytes")
575+
575576
sorted_chunks = sorted(chunks, key=lambda c: c.byte_slice.start)
576577

577578
groups = []
@@ -590,15 +591,6 @@ def _coalesce_chunks(
590591

591592
groups.append(current_group)
592593

593-
from pprint import pprint
594-
595-
pprint(
596-
[
597-
f"{len(g)} chunks, {(g[-1].byte_slice.stop - g[0].byte_slice.start) / 1e6:.1f}MB"
598-
for g in groups
599-
]
600-
)
601-
602594
return groups
603595

604596
async def _get_group_bytes(

src/zarr/core/config.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,12 @@ def enable_gpu(self) -> ConfigSet:
108108
},
109109
"async": {"concurrency": 10, "timeout": None},
110110
"threading": {"max_workers": None},
111+
"sharding": {
112+
"read": {
113+
"coalesce_max_bytes": 100 * 2**20, # 100MiB
114+
"coalesce_max_gap_bytes": 2**20, # 1MiB
115+
}
116+
},
111117
"json_indent": 2,
112118
"codec_pipeline": {
113119
"path": "zarr.core.codec_pipeline.BatchedCodecPipeline",

tests/test_codecs/test_sharding.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,35 @@ def test_sharding_partial_read(
197197
assert np.all(read_data == 1)
198198

199199

200+
@pytest.mark.parametrize("index_location", ["start", "end"])
201+
@pytest.mark.parametrize("store", ["local", "memory", "zip"], indirect=["store"])
202+
def test_sharding_multiple_chunks_partial_shard_read(
203+
store: Store, index_location: ShardingCodecIndexLocation
204+
) -> None:
205+
array_shape = (8, 64)
206+
shard_shape = (4, 32)
207+
chunk_shape = (2, 4)
208+
209+
data = np.arange(np.prod(array_shape), dtype="float32").reshape(array_shape)
210+
211+
a = zarr.create_array(
212+
StorePath(store),
213+
shape=data.shape,
214+
chunks=chunk_shape,
215+
shards={"shape": shard_shape, "index_location": index_location},
216+
compressors=BloscCodec(cname="lz4"),
217+
dtype=data.dtype,
218+
fill_value=1,
219+
)
220+
a[:] = data
221+
222+
# Reads 2.5 (3 full, one partial) chunks each from 2 shards (a subset of both shards)
223+
assert np.allclose(a[0, 22:42], np.arange(22, 42, dtype="float32"))
224+
225+
# Reads 2 chunks from both shards along dimension 0
226+
assert np.allclose(a[:, 0], np.arange(0, data.size, array_shape[1], dtype="float32"))
227+
228+
200229
@pytest.mark.parametrize(
201230
"array_fixture",
202231
[

0 commit comments

Comments
 (0)