Skip to content

Commit 3c5ec3f

Browse files
committed
concurrent streaming for equal chunk sizes
1 parent 91152be commit 3c5ec3f

File tree

4 files changed

+52
-27
lines changed

4 files changed

+52
-27
lines changed

src/zarr/api/asynchronous.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
from zarr.core.group import AsyncGroup, ConsolidatedMetadata, GroupMetadata
2828
from zarr.core.metadata import ArrayMetadataDict, ArrayV2Metadata, ArrayV3Metadata
2929
from zarr.core.metadata.v2 import _default_filters_and_compressor
30-
from zarr.core.sync import sync
3130
from zarr.errors import NodeTypeValidationError
3231
from zarr.storage import (
3332
StoreLike,
@@ -562,16 +561,19 @@ async def array(
562561

563562
new_array = await create(data.shape, **kwargs)
564563

565-
async def _copy_chunk(chunk_coords: ChunkCoords) -> None:
564+
async def _copy_chunk(chunk_coords: ChunkCoords|slice) -> None:
566565
arr = await data._async_array.getitem(chunk_coords)
567566
await new_array.setitem(chunk_coords, arr)
568567

569-
# Stream data from the source array to the new array
570-
await concurrent_map(
571-
[(region,) for region in data._iter_chunk_regions()],
572-
_copy_chunk,
573-
config.get("async.concurrency"),
574-
)
568+
if new_array.chunks == data.chunks:
569+
# Stream data from the source array to the new array
570+
await concurrent_map(
571+
[(region,) for region in data._iter_chunk_regions()],
572+
_copy_chunk,
573+
config.get("async.concurrency"),
574+
)
575+
else:
576+
await _copy_chunk(slice(None))
575577
return new_array
576578

577579
# ensure data is array-like

tests/conftest.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,12 @@ async def store(request: pytest.FixtureRequest, tmpdir: LEGACY_PATH) -> Store:
7676
return await parse_store(param, str(tmpdir))
7777

7878

79+
@pytest.fixture
80+
async def store2(request: pytest.FixtureRequest, tmpdir: LEGACY_PATH) -> Store:
81+
param = request.param
82+
return await parse_store(param, str(tmpdir / "store2"))
83+
84+
7985
@pytest.fixture(params=["local", "memory", "zip"])
8086
def sync_store(request: pytest.FixtureRequest, tmp_path: LEGACY_PATH) -> Store:
8187
result = sync(parse_store(request.param, str(tmp_path)))

tests/test_array.py

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,13 @@
22
import json
33
import math
44
import pickle
5-
import time
65
from itertools import accumulate
76
from typing import Any, Literal
87

98
import numcodecs
109
import numpy as np
1110
import pytest
1211
from numcodecs import Zstd
13-
from numpy import dtype
1412
from numpy.ma.testutils import assert_array_equal
1513

1614
import zarr.api.asynchronous
@@ -893,31 +891,40 @@ async def test_scalar_array() -> None:
893891
assert arr.shape == ()
894892

895893

896-
async def test_creation_from_other_zarr(tmpdir):
894+
@pytest.mark.parametrize("store", ["memory", "local"], indirect=True)
895+
@pytest.mark.parametrize("store2", ["memory", "local"], indirect=True)
896+
@pytest.mark.parametrize("src_chunks", [(1, 2), (5, 5), (5, 10)])
897+
@pytest.mark.parametrize("new_chunks", [(1, 2), (5, 5), (5, 10)])
898+
async def test_creation_from_other_zarr(store, store2, src_chunks, new_chunks):
897899
src_fill_value = 2
898900
src_dtype = np.dtype("uint8")
899901
src_attributes = {}
900-
src_chunks = (2, 2)
901902

902-
src = zarr.create((10, 10), chunks=src_chunks, dtype=src_dtype, store=LocalStore(str(tmpdir)), fill_value = src_fill_value, attributes=src_attributes)
903-
src[:] = np.arange(100).reshape((10,10))
904-
905-
result = zarr.array(src, store=MemoryStore())
906-
assert_array_equal(result[:], src[:])
907-
assert result.fill_value == src_fill_value
908-
assert result.dtype==src_dtype
909-
assert result.attrs.asdict() == src_attributes
910-
assert result.chunks == src_chunks
903+
src = zarr.create(
904+
(10, 10),
905+
chunks=src_chunks,
906+
dtype=src_dtype,
907+
store=store,
908+
fill_value=src_fill_value,
909+
attributes=src_attributes,
910+
)
911+
src[:] = np.arange(100).reshape((10, 10))
911912

912913
new_fill_value = 3
913914
new_dtype = np.dtype("uint16")
914-
new_attributes = {"foo":"bar"}
915-
new_chunks = (5, 10)
916-
917-
result2 = zarr.array(src, store=MemoryStore(), chunks=new_chunks, dtype=new_dtype, fill_value = new_fill_value, attributes=new_attributes)
915+
new_attributes = {"foo": "bar"}
916+
917+
result2 = zarr.array(
918+
src,
919+
store=store2,
920+
chunks=new_chunks,
921+
dtype=new_dtype,
922+
fill_value=new_fill_value,
923+
attributes=new_attributes,
924+
)
918925

919926
assert_array_equal(result2[:], src[:])
920927
assert result2.fill_value == new_fill_value
921928
assert result2.dtype == new_dtype
922929
assert result2.attrs == new_attributes
923-
assert result2.chunks == new_chunks
930+
assert result2.chunks == new_chunks

tests/test_indexing.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1872,7 +1872,7 @@ def test_iter_grid(
18721872
"""
18731873
Test that iter_grid works as expected for 1, 2, and 3 dimensions.
18741874
"""
1875-
grid_shape = (10,2,7)[:ndim]
1875+
grid_shape = (10, 2, 7)[:ndim]
18761876

18771877
if origin_0d is not None:
18781878
origin_kwarg = origin_0d * ndim
@@ -1954,3 +1954,13 @@ def test_vectorized_indexing_incompatible_shape(store) -> None:
19541954
)
19551955
with pytest.raises(ValueError, match="Attempting to set"):
19561956
arr[np.array([1, 2]), np.array([1, 2])] = np.array([[-1, -2], [-3, -4]])
1957+
1958+
1959+
def test_iter_chunk_regions():
1960+
chunks = (2, 3)
1961+
a = zarr.create((10, 10), chunks=chunks)
1962+
a[:] = 1
1963+
for region in a._iter_chunk_regions():
1964+
assert_array_equal(a[region], np.ones_like(a[region]))
1965+
a[region] = 0
1966+
assert_array_equal(a[region], np.zeros_like(a[region]))

0 commit comments

Comments
 (0)