From 8780499905de2c170c1ec8d0fcdf502a076e94d8 Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Mon, 23 Jun 2025 12:09:28 +0200 Subject: [PATCH 1/7] (chore): handle new zarr dtype API --- python/zarrs/pipeline.py | 5 +- python/zarrs/utils.py | 6 +- src/chunk_item.rs | 1 + tests/test_v2.py | 247 +++++++++++++++++++-------------------- tests/test_vlen.py | 82 +++++-------- 5 files changed, 154 insertions(+), 187 deletions(-) diff --git a/python/zarrs/pipeline.py b/python/zarrs/pipeline.py index 6ebc24d..2efce5e 100644 --- a/python/zarrs/pipeline.py +++ b/python/zarrs/pipeline.py @@ -21,6 +21,7 @@ from zarr.core.chunk_grids import ChunkGrid from zarr.core.common import ChunkCoords from zarr.core.indexing import SelectorTuple + from zarr.dtype import ZDType from ._internal import CodecPipelineImpl, codec_metadata_v2_to_v3 from .utils import ( @@ -134,7 +135,7 @@ def __iter__(self) -> Iterator[Codec]: yield from self.codecs def validate( - self, *, shape: ChunkCoords, dtype: np.dtype[Any], chunk_grid: ChunkGrid + self, *, shape: ChunkCoords, dtype: ZDType, chunk_grid: ChunkGrid ) -> None: raise NotImplementedError("validate") @@ -236,7 +237,7 @@ def _raise_error_on_unsupported_batch_dtype( # https://github.com/LDeakin/zarrs/blob/0532fe983b7b42b59dbf84e50a2fe5e6f7bad4ce/zarrs_metadata/src/v2_to_v3.rs#L289-L293 for VSUMm # Further, our pipeline does not support variable-length objects due to limitations on decode_into, so object/np.dtypes.StringDType is also out if any( - info.dtype.kind in {"V", "S", "U", "M", "m", "O", "T"} + info.dtype.to_native_dtype().kind in {"V", "S", "U", "M", "m", "O", "T"} for (_, info, _, _, _) in batch_info ): raise UnsupportedDataTypeError() diff --git a/python/zarrs/utils.py b/python/zarrs/utils.py index 35743b8..81d5977 100644 --- a/python/zarrs/utils.py +++ b/python/zarrs/utils.py @@ -8,7 +8,6 @@ import numpy as np from zarr.core.array_spec import ArraySpec from zarr.core.indexing import SelectorTuple, is_integer -from zarr.core.metadata.v2 import _default_fill_value from zarrs._internal import Basic, WithSubset @@ -17,6 +16,7 @@ from types import EllipsisType from zarr.abc.store import ByteGetter, ByteSetter + from zarr.dtype import ZDType # adapted from https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor @@ -139,9 +139,9 @@ def get_shape_for_selector( return resulting_shape_from_index(shape, selector_tuple, drop_axes, pad=pad) -def get_implicit_fill_value(dtype: np.dtype, fill_value: Any) -> Any: +def get_implicit_fill_value(dtype: ZDType, fill_value: Any) -> Any: if fill_value is None: - fill_value = _default_fill_value(dtype) + fill_value = dtype.default_scalar() return fill_value diff --git a/src/chunk_item.rs b/src/chunk_item.rs index feac5cb..da0d9f8 100644 --- a/src/chunk_item.rs +++ b/src/chunk_item.rs @@ -68,6 +68,7 @@ impl Basic { let chunk_shape = chunk_spec.getattr("shape")?.extract()?; let mut dtype: String = chunk_spec .getattr("dtype")? + .call_method0("to_native_dtype")? .call_method0("__str__")? .extract()?; if dtype == "object" { diff --git a/tests/test_v2.py b/tests/test_v2.py index fd8592e..e332f1c 100644 --- a/tests/test_v2.py +++ b/tests/test_v2.py @@ -15,6 +15,9 @@ from zarr import config from zarr.abc.store import Store from zarr.core.buffer.core import default_buffer_prototype +from zarr.core.dtype import FixedLengthUTF32, Structured, VariableLengthUTF8 +from zarr.core.dtype.npy.bytes import NullTerminatedBytes +from zarr.core.dtype.wrapper import ZDType from zarr.core.sync import sync from zarr.storage import LocalStore, StorePath @@ -40,33 +43,7 @@ def test_simple(store: StorePath) -> None: assert np.array_equal(data, a[:, :]) -@pytest.mark.parametrize( - ("dtype", "fill_value"), - [ - ("bool", False), - ("int64", 0), - ("float64", 0.0), - ("|S1", b""), - ("|U1", ""), - ("object", ""), - (str, ""), - ], -) -def test_implicit_fill_value(store: LocalStore, dtype: str, fill_value: Any) -> None: - arr = zarr.create( - store=store, shape=(4,), fill_value=None, zarr_format=2, dtype=dtype - ) - assert arr.metadata.fill_value is None - assert arr.metadata.to_dict()["fill_value"] is None - result = arr[:] - numpy_dtype = np.dtype(object) if dtype is str else np.dtype(dtype) - expected = np.full(arr.shape, fill_value, dtype=numpy_dtype) - np.testing.assert_array_equal(result, expected) - - -def test_codec_pipeline(tmp_path) -> None: - # https://github.com/zarr-developers/zarr-python/issues/2243 - store = LocalStore(tmp_path) +def test_codec_pipeline(store) -> None: array = zarr.create( store=store, shape=(1,), @@ -82,15 +59,15 @@ def test_codec_pipeline(tmp_path) -> None: @pytest.mark.parametrize( - ("dtype", "expected_dtype", "fill_value", "fill_value_encoding"), + ("dtype", "expected_dtype", "fill_value", "fill_value_json"), [ - ("|S", "|S0", b"X", "WA=="), - ("|V", "|V0", b"X", "WA=="), + ("|S1", "|S1", b"X", "WA=="), + ("|V1", "|V1", b"X", "WA=="), ("|V10", "|V10", b"X", "WAAAAAAAAAAAAA=="), ], ) async def test_v2_encode_decode( - dtype, expected_dtype, fill_value, fill_value_encoding, tmp_path + dtype, expected_dtype, fill_value, fill_value_json ) -> None: with config.set( { @@ -98,7 +75,7 @@ async def test_v2_encode_decode( "array.v2_default_compressor.bytes": None, } ): - store = zarr.storage.LocalStore(tmp_path) + store = zarr.storage.MemoryStore() g = zarr.group(store=store, zarr_format=2) g.create_array( name="foo", @@ -119,8 +96,8 @@ async def test_v2_encode_decode( "chunks": [3], "compressor": None, "dtype": expected_dtype, - "fill_value": fill_value_encoding, - "filters": [{"id": "vlen-bytes"}] if dtype == "|S" else None, + "fill_value": fill_value_json, + "filters": None, "order": "C", "shape": [3], "zarr_format": 2, @@ -133,42 +110,24 @@ async def test_v2_encode_decode( np.testing.assert_equal(data, expected) -@pytest.mark.parametrize("dtype_value", [["|S", b"Y"], ["|U", "Y"], ["O", b"Y"]]) -def test_v2_encode_decode_with_data(dtype_value, tmp_path): - dtype, value = dtype_value - with config.set( - { - "array.v2_default_filters": { - "string": [{"id": "vlen-utf8"}], - "bytes": [{"id": "vlen-bytes"}], - }, - } - ): - expected = np.full((3,), value, dtype=dtype) - a = zarr.create( - store=tmp_path, - shape=(3,), - zarr_format=2, - dtype=dtype, - ) - a[:] = expected - data = a[:] - np.testing.assert_equal(data, expected) - - -@pytest.mark.parametrize("dtype", [str, "str"]) -async def test_create_dtype_str(dtype: Any, tmp_path) -> None: - # see https://github.com/zarr-developers/zarr-python/issues/2627 for why this test - # is probably wrong - arr = zarr.create(store=tmp_path, shape=3, dtype=dtype, zarr_format=2) - assert arr.dtype.kind == "O" - assert arr.metadata.to_dict()["dtype"] == "|O" - assert arr.metadata.filters == (numcodecs.vlen.VLenBytes(),) - arr[:] = [b"a", b"bb", b"ccc"] - result = arr[:] - np.testing.assert_array_equal( - result, np.array([b"a", b"bb", b"ccc"], dtype="object") +@pytest.mark.parametrize( + ("dtype", "value"), + [ + (NullTerminatedBytes(length=1), b"Y"), + (FixedLengthUTF32(length=1), "Y"), + (VariableLengthUTF8(), "Y"), + ], +) +def test_v2_encode_decode_with_data(dtype: ZDType[Any, Any], value: str): + expected = np.full((3,), value, dtype=dtype.to_native_dtype()) + a = zarr.create( + shape=(3,), + zarr_format=2, + dtype=dtype, ) + a[:] = expected + data = a[:] + np.testing.assert_equal(data, expected) @pytest.mark.parametrize( @@ -217,19 +176,17 @@ def test_create_array_defaults(store: Store): ) +@pytest.mark.parametrize("numpy_order", ["C", "F"]) @pytest.mark.parametrize( - "array_order", ["C", pytest.param("F", marks=[pytest.mark.xfail])] -) -@pytest.mark.parametrize("data_order", ["C", "F"]) -@pytest.mark.parametrize( - "memory_order", ["C", pytest.param("F", marks=[pytest.mark.xfail])] + "zarr_order", ["C", pytest.param("F", marks=pytest.mark.xfail())] ) def test_v2_non_contiguous( - array_order: Literal["C", "F"], - data_order: Literal["C", "F"], - memory_order: Literal["C", "F"], - tmp_path: Path, + numpy_order: Literal["C", "F"], zarr_order: Literal["C", "F"], tmp_path: Path ) -> None: + """ + Make sure zarr v2 arrays save data using the memory order given to the zarr array, + not the memory order of the original numpy array. + """ store = LocalStore(tmp_path / "a_store") arr = zarr.create_array( store, @@ -241,26 +198,29 @@ def test_v2_non_contiguous( filters=None, compressors=None, overwrite=True, - order=array_order, - config={"order": memory_order}, + order=zarr_order, ) - # Non-contiguous write - a = np.arange(arr.shape[0] * arr.shape[1]).reshape(arr.shape, order=data_order) + # Non-contiguous write, using numpy memory order + a = np.arange(arr.shape[0] * arr.shape[1]).reshape(arr.shape, order=numpy_order) arr[6:9, 3:6] = a[6:9, 3:6] # The slice on the RHS is important np.testing.assert_array_equal(arr[6:9, 3:6], a[6:9, 3:6]) + buf = sync(store.get("2.1", default_buffer_prototype())) + assert buf is not None np.testing.assert_array_equal( a[6:9, 3:6], - np.frombuffer( - sync(store.get("2.1", default_buffer_prototype())).to_bytes(), - dtype="float64", - ).reshape((3, 3), order=array_order), + np.frombuffer(buf.to_bytes(), dtype="float64").reshape( + (3, 3), order=zarr_order + ), ) - if memory_order == "F": - assert (arr[6:9, 3:6]).flags.f_contiguous + # After writing and reading from zarr array, order should be same as zarr order + sub_arr = arr[6:9, 3:6] + assert isinstance(sub_arr, np.ndarray) + if zarr_order == "F": + assert (sub_arr).flags.f_contiguous else: - assert (arr[6:9, 3:6]).flags.c_contiguous + assert (sub_arr).flags.c_contiguous store = LocalStore(tmp_path / "other_store") arr = zarr.create_array( @@ -273,18 +233,19 @@ def test_v2_non_contiguous( compressors=None, filters=None, overwrite=True, - order=array_order, - config={"order": memory_order}, + order=zarr_order, ) - # Contiguous write - a = np.arange(9).reshape((3, 3), order=data_order) - if data_order == "F": - assert a.flags.f_contiguous - else: - assert a.flags.c_contiguous + a = np.arange(9).reshape((3, 3), order=numpy_order) arr[6:9, 3:6] = a np.testing.assert_array_equal(arr[6:9, 3:6], a) + # After writing and reading from zarr array, order should be same as zarr order + sub_arr = arr[6:9, 3:6] + assert isinstance(sub_arr, np.ndarray) + if zarr_order == "F": + assert (sub_arr).flags.f_contiguous + else: + assert (sub_arr).flags.c_contiguous def test_default_compressor_deprecation_warning(): @@ -292,38 +253,6 @@ def test_default_compressor_deprecation_warning(): zarr.storage.default_compressor = "zarr.codecs.zstd.ZstdCodec()" -@pytest.mark.parametrize( - "dtype_expected", - [ - ["b", "zstd", None], - ["i", "zstd", None], - ["f", "zstd", None], - ["|S1", "zstd", "vlen-bytes"], - ["|U1", "zstd", "vlen-utf8"], - ], -) -def test_default_filters_and_compressor(dtype_expected: Any) -> None: - with config.set( - { - "array.v2_default_compressor": { - "numeric": {"id": "zstd", "level": "0"}, - "string": {"id": "zstd", "level": "0"}, - "bytes": {"id": "zstd", "level": "0"}, - }, - "array.v2_default_filters": { - "numeric": [], - "string": [{"id": "vlen-utf8"}], - "bytes": [{"id": "vlen-bytes"}], - }, - } - ): - dtype, expected_compressor, expected_filter = dtype_expected - arr = zarr.create(shape=(3,), path="foo", store={}, zarr_format=2, dtype=dtype) - assert arr.metadata.compressor.codec_id == expected_compressor - if expected_filter is not None: - assert arr.metadata.filters[0].codec_id == expected_filter - - @pytest.mark.parametrize("fill_value", [None, (b"", 0, 0.0)], ids=["no_fill", "fill"]) def test_structured_dtype_roundtrip(fill_value, tmp_path) -> None: a = np.array( @@ -344,3 +273,63 @@ def test_structured_dtype_roundtrip(fill_value, tmp_path) -> None: za[...] = a za = zarr.open_array(store=array_path) assert (a == za[:]).all() + + +@pytest.mark.parametrize( + ( + "fill_value", + "dtype", + "expected_result", + ), + [ + ( + ("Alice", 30), + np.dtype([("name", "U10"), ("age", "i4")]), + np.array([("Alice", 30)], dtype=[("name", "U10"), ("age", "i4")])[0], + ), + ( + ["Bob", 25], + np.dtype([("name", "U10"), ("age", "i4")]), + np.array([("Bob", 25)], dtype=[("name", "U10"), ("age", "i4")])[0], + ), + ( + b"\x01\x00\x00\x00\x02\x00\x00\x00", + np.dtype([("x", "i4"), ("y", "i4")]), + np.array([(1, 2)], dtype=[("x", "i4"), ("y", "i4")])[0], + ), + ], + ids=[ + "tuple_input", + "list_input", + "bytes_input", + ], +) +def test_parse_structured_fill_value_valid( + fill_value: Any, dtype: np.dtype[Any], expected_result: Any +) -> None: + zdtype = Structured.from_native_dtype(dtype) + result = zdtype.cast_scalar(fill_value) + assert result.dtype == expected_result.dtype + assert result == expected_result + if isinstance(expected_result, np.void): + for name in expected_result.dtype.names or []: + assert result[name] == expected_result[name] + + +@pytest.mark.parametrize("fill_value", [None, b"x"], ids=["no_fill", "fill"]) +def test_other_dtype_roundtrip(fill_value, tmp_path) -> None: + a = np.array([b"a\0\0", b"bb", b"ccc"], dtype="V7") + array_path = tmp_path / "data.zarr" + za = zarr.create( + shape=(3,), + store=array_path, + chunks=(2,), + fill_value=fill_value, + zarr_format=2, + dtype=a.dtype, + ) + if fill_value is not None: + assert (np.array([fill_value] * a.shape[0], dtype=a.dtype) == za[:]).all() + za[...] = a + za = zarr.open_array(store=array_path) + assert (a == za[:]).all() diff --git a/tests/test_vlen.py b/tests/test_vlen.py index afb3610..b9e29eb 100644 --- a/tests/test_vlen.py +++ b/tests/test_vlen.py @@ -3,32 +3,44 @@ import numpy as np import pytest import zarr +from zarr import Array from zarr.abc.codec import Codec from zarr.abc.store import Store from zarr.codecs import ZstdCodec -from zarr.core.metadata.v3 import ArrayV3Metadata, DataType -from zarr.core.strings import _NUMPY_SUPPORTS_VLEN_STRING +from zarr.core.dtype import get_data_type_from_native_dtype +from zarr.core.dtype.npy.string import _NUMPY_SUPPORTS_VLEN_STRING +from zarr.core.metadata.v3 import ArrayV3Metadata from zarr.storage import StorePath -numpy_str_dtypes: list[type | str | None] = [None, str, "str", np.dtypes.StrDType] -expected_zarr_string_dtype: np.dtype[Any] +numpy_str_dtypes: list[type | str | None] = [ + None, + str, + "str", + np.dtypes.StrDType, + "S", + "U", +] +expected_array_string_dtype: np.dtype[Any] if _NUMPY_SUPPORTS_VLEN_STRING: numpy_str_dtypes.append(np.dtypes.StringDType) - expected_zarr_string_dtype = np.dtypes.StringDType() + expected_array_string_dtype = np.dtypes.StringDType() else: - expected_zarr_string_dtype = np.dtype("O") + expected_array_string_dtype = np.dtype("O") -@pytest.mark.parametrize("store", ["local"], indirect=["store"]) +@pytest.mark.filterwarnings( + "ignore::zarr.core.dtype.common.UnstableSpecificationWarning" +) +@pytest.mark.parametrize("store", ["memory", "local"], indirect=["store"]) @pytest.mark.parametrize("dtype", numpy_str_dtypes) @pytest.mark.parametrize("as_object_array", [False, True]) @pytest.mark.parametrize("compressor", [None, ZstdCodec()]) def test_vlen_string( store: Store, dtype: np.dtype[Any] | None, + compressor: Codec | None, *, as_object_array: bool, - compressor: Codec | None, ) -> None: strings = ["hello", "world", "this", "is", "a", "test"] data = np.array(strings, dtype=dtype).reshape((2, 3)) @@ -47,54 +59,18 @@ def test_vlen_string( # should also work if input array is an object array, provided we explicitly specified # a stringlike dtype when creating the Array if as_object_array: - data = data.astype("O") - - a[:, :] = data - assert np.array_equal(data, a[:, :]) - assert a.metadata.data_type == DataType.string - assert a.dtype == expected_zarr_string_dtype - - # test round trip - b = zarr.open(sp) - assert isinstance(b.metadata, ArrayV3Metadata) # needed for mypy - assert np.array_equal(data, b[:, :]) - assert b.metadata.data_type == DataType.string - assert a.dtype == expected_zarr_string_dtype - - -@pytest.mark.parametrize("store", ["local"], indirect=["store"]) -@pytest.mark.parametrize("as_object_array", [False, True]) -@pytest.mark.parametrize("compressor", [None, ZstdCodec()]) -def test_vlen_bytes( - store: Store, *, as_object_array: bool, compressor: Codec | None -) -> None: - bstrings = [b"hello", b"world", b"this", b"is", b"a", b"test"] - data = np.array(bstrings).reshape((2, 3)) - assert data.dtype == "|S5" + data_obj = data.astype("O") - sp = StorePath(store, path="string") - a = zarr.create_array( - sp, - shape=data.shape, - chunks=data.shape, - dtype=data.dtype, - fill_value=b"", - compressors=compressor, - ) - assert isinstance(a.metadata, ArrayV3Metadata) # needed for mypy - - # should also work if input array is an object array, provided we explicitly specified - # a bytesting-like dtype when creating the Array - if as_object_array: - data = data.astype("O") - a[:, :] = data + a[:, :] = data_obj + else: + a[:, :] = data assert np.array_equal(data, a[:, :]) - assert a.metadata.data_type == DataType.bytes - assert a.dtype == "O" + assert a.metadata.data_type == get_data_type_from_native_dtype(data.dtype) + assert a.dtype == data.dtype # test round trip - b = zarr.open(sp) + b = Array.open(sp) assert isinstance(b.metadata, ArrayV3Metadata) # needed for mypy assert np.array_equal(data, b[:, :]) - assert b.metadata.data_type == DataType.bytes - assert a.dtype == "O" + assert b.metadata.data_type == get_data_type_from_native_dtype(data.dtype) + assert a.dtype == data.dtype From 754d775c9f141a4d3aed7f3403056f7c56ce6038 Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Mon, 23 Jun 2025 12:17:25 +0200 Subject: [PATCH 2/7] (fix): dep in pyproject toml for zarr from main --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 57e02d4..bddaaa3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,7 +30,7 @@ dependencies = [ 'donfig', 'pytest', 'universal_pathlib>=0.2.0', - "zarr>=3.0.3,<3.1", + "zarr @ git+https://github.com/zarr-developers/zarr-python", ] [project.optional-dependencies] From b8e730462ff2c2125b4732700f7ffc9094168b4d Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Mon, 30 Jun 2025 09:22:06 +0200 Subject: [PATCH 3/7] (fix): use local store for v2 --- tests/test_v2.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_v2.py b/tests/test_v2.py index e332f1c..59d7635 100644 --- a/tests/test_v2.py +++ b/tests/test_v2.py @@ -67,7 +67,7 @@ def test_codec_pipeline(store) -> None: ], ) async def test_v2_encode_decode( - dtype, expected_dtype, fill_value, fill_value_json + dtype, expected_dtype, fill_value, fill_value_json, tmp_path ) -> None: with config.set( { @@ -75,7 +75,7 @@ async def test_v2_encode_decode( "array.v2_default_compressor.bytes": None, } ): - store = zarr.storage.MemoryStore() + store = zarr.storage.LocalStore(tmp_path) g = zarr.group(store=store, zarr_format=2) g.create_array( name="foo", From ae47cfcb3b9324b13ce507eb84fae81ac952daae Mon Sep 17 00:00:00 2001 From: Lachlan Deakin Date: Sun, 13 Jul 2025 08:45:45 +1000 Subject: [PATCH 4/7] fix: don't use `default_filters`/`default_compressor` --- tests/test_v2.py | 67 +++++++++++++++++++++--------------------------- 1 file changed, 29 insertions(+), 38 deletions(-) diff --git a/tests/test_v2.py b/tests/test_v2.py index 59d7635..bf59d22 100644 --- a/tests/test_v2.py +++ b/tests/test_v2.py @@ -69,45 +69,36 @@ def test_codec_pipeline(store) -> None: async def test_v2_encode_decode( dtype, expected_dtype, fill_value, fill_value_json, tmp_path ) -> None: - with config.set( - { - "array.v2_default_filters.bytes": [{"id": "vlen-bytes"}], - "array.v2_default_compressor.bytes": None, - } - ): - store = zarr.storage.LocalStore(tmp_path) - g = zarr.group(store=store, zarr_format=2) - g.create_array( - name="foo", - shape=(3,), - chunks=(3,), - dtype=dtype, - fill_value=fill_value, - compressor=None, - ) + store = zarr.storage.LocalStore(tmp_path) + g = zarr.group(store=store, zarr_format=2) + g.create_array( + name="foo", + shape=(3,), + chunks=(3,), + dtype=dtype, + fill_value=fill_value, + compressor=None, + ) - result = await store.get( - "foo/.zarray", zarr.core.buffer.default_buffer_prototype() - ) - assert result is not None - - serialized = json.loads(result.to_bytes()) - expected = { - "chunks": [3], - "compressor": None, - "dtype": expected_dtype, - "fill_value": fill_value_json, - "filters": None, - "order": "C", - "shape": [3], - "zarr_format": 2, - "dimension_separator": ".", - } - assert serialized == expected - - data = zarr.open_array(store=store, path="foo")[:] - expected = np.full((3,), b"X", dtype=dtype) - np.testing.assert_equal(data, expected) + result = await store.get("foo/.zarray", zarr.core.buffer.default_buffer_prototype()) + assert result is not None + + serialized = json.loads(result.to_bytes()) + expected = { + "chunks": [3], + "compressor": None, + "dtype": expected_dtype, + "fill_value": fill_value_json, + "filters": None, + "order": "C", + "shape": [3], + "zarr_format": 2, + "dimension_separator": ".", + } + assert serialized == expected + + data = zarr.open_array(store=store, path="foo")[:] + np.testing.assert_equal(data, np.full((3,), b"X", dtype=dtype)) @pytest.mark.parametrize( From aed60eb5f8c773600f10c0362663eed7957d436b Mon Sep 17 00:00:00 2001 From: Lachlan Deakin Date: Sat, 12 Jul 2025 16:53:37 +1000 Subject: [PATCH 5/7] feat: initialise from `ArrayMetadata` and store --- python/zarrs/_internal.pyi | 3 +- python/zarrs/pipeline.py | 67 ++++++++++++++++++++++++-------------- src/chunk_item.rs | 13 ++------ src/lib.rs | 52 +++++++++++++++++++---------- src/store.rs | 4 +-- src/store/filesystem.rs | 2 +- src/store/http.rs | 2 +- src/store/manager.rs | 61 ---------------------------------- tests/test_v2.py | 34 ++++++++++++++++++- tests/test_vlen.py | 9 +++++ 10 files changed, 125 insertions(+), 122 deletions(-) delete mode 100644 src/store/manager.rs diff --git a/python/zarrs/_internal.pyi b/python/zarrs/_internal.pyi index b5fa0f5..bad2ff2 100644 --- a/python/zarrs/_internal.pyi +++ b/python/zarrs/_internal.pyi @@ -13,7 +13,8 @@ class Basic: class CodecPipelineImpl: def __new__( cls, - metadata: builtins.str, + array_metadata: builtins.str, + store_config: StoreConfig, *, validate_checksums: builtins.bool | None = None, store_empty_chunks: builtins.bool | None = None, diff --git a/python/zarrs/pipeline.py b/python/zarrs/pipeline.py index 2efce5e..27a0d90 100644 --- a/python/zarrs/pipeline.py +++ b/python/zarrs/pipeline.py @@ -2,20 +2,22 @@ import asyncio import json -import re from dataclasses import dataclass from typing import TYPE_CHECKING, TypedDict +from warnings import warn import numpy as np from zarr.abc.codec import Codec, CodecPipeline +from zarr.codecs._v2 import V2Codec from zarr.core import BatchedCodecPipeline from zarr.core.config import config +from zarr.core.metadata import ArrayMetadata, ArrayV2Metadata, ArrayV3Metadata if TYPE_CHECKING: from collections.abc import Generator, Iterable, Iterator from typing import Any, Self - from zarr.abc.store import ByteGetter, ByteSetter + from zarr.abc.store import ByteGetter, ByteSetter, Store from zarr.core.array_spec import ArraySpec from zarr.core.buffer import Buffer, NDArrayLike, NDBuffer from zarr.core.chunk_grids import ChunkGrid @@ -40,10 +42,14 @@ class UnsupportedMetadataError(Exception): pass -def get_codec_pipeline_impl(codec_metadata_json: str) -> CodecPipelineImpl | None: +def get_codec_pipeline_impl( + metadata: ArrayMetadata, store: Store +) -> CodecPipelineImpl | None: try: + array_metadata_json = json.dumps(metadata.to_dict()) return CodecPipelineImpl( - codec_metadata_json, + array_metadata_json, + store_config=store, validate_checksums=config.get("codec_pipeline.validate_checksums", None), store_empty_chunks=config.get("array.write_empty_chunks", None), chunk_concurrent_minimum=config.get( @@ -55,10 +61,11 @@ def get_codec_pipeline_impl(codec_metadata_json: str) -> CodecPipelineImpl | Non num_threads=config.get("threading.max_workers", None), ) except TypeError as e: - if re.match(r"codec (delta|zlib) is not supported", str(e)): - return None - else: - raise e + warn( + f"Array is unsupported by ZarrsCodecPipeline: {e}", + category=UserWarning, + ) + return None def codecs_to_dict(codecs: Iterable[Codec]) -> Generator[dict[str, Any], None, None]: @@ -89,37 +96,47 @@ class ZarrsCodecPipelineState(TypedDict): codecs: tuple[Codec, ...] +def array_metadata_to_codecs(metadata: ArrayMetadata) -> list[Codec]: + if isinstance(metadata, ArrayV3Metadata): + return metadata.codecs + elif isinstance(metadata, ArrayV2Metadata): + v2_codec = V2Codec(filters=metadata.filters, compressor=metadata.compressor) + return [v2_codec] + + @dataclass class ZarrsCodecPipeline(CodecPipeline): - codecs: tuple[Codec, ...] + metadata: ArrayMetadata + store: Store impl: CodecPipelineImpl | None - codec_metadata_json: str python_impl: BatchedCodecPipeline def __getstate__(self) -> ZarrsCodecPipelineState: - return {"codec_metadata_json": self.codec_metadata_json, "codecs": self.codecs} + return {"metadata": self.metadata, "store": self.store} def __setstate__(self, state: ZarrsCodecPipelineState): - self.codecs = state["codecs"] - self.codec_metadata_json = state["codec_metadata_json"] - self.impl = get_codec_pipeline_impl(self.codec_metadata_json) - self.python_impl = BatchedCodecPipeline.from_codecs(self.codecs) + self.metadata = state["metadata"] + self.store = state["store"] + self.impl = get_codec_pipeline_impl(self.metadata, self.store) + codecs = array_metadata_to_codecs(self.metadata) + self.python_impl = BatchedCodecPipeline.from_codecs(codecs) def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: - raise NotImplementedError("evolve_from_array_spec") + return self @classmethod def from_codecs(cls, codecs: Iterable[Codec]) -> Self: - codec_metadata = list(codecs_to_dict(codecs)) - codec_metadata_json = json.dumps(codec_metadata) - # TODO: upstream zarr-python has not settled on how to deal with configs yet - # Should they be checked when an array is created, or when an operation is performed? - # https://github.com/zarr-developers/zarr-python/issues/2409 - # https://github.com/zarr-developers/zarr-python/pull/2429#issuecomment-2566976567 + return BatchedCodecPipeline.from_codecs(codecs) + + @classmethod + def from_array_metadata_and_store( + cls, array_metadata: ArrayMetadata, store: Store + ) -> Self: + codecs = array_metadata_to_codecs(array_metadata) return cls( - codec_metadata_json=codec_metadata_json, - codecs=tuple(codecs), - impl=get_codec_pipeline_impl(codec_metadata_json), + metadata=array_metadata, + store=store, + impl=get_codec_pipeline_impl(array_metadata, store), python_impl=BatchedCodecPipeline.from_codecs(codecs), ) diff --git a/src/chunk_item.rs b/src/chunk_item.rs index da0d9f8..eaea38e 100644 --- a/src/chunk_item.rs +++ b/src/chunk_item.rs @@ -14,10 +14,9 @@ use zarrs::{ storage::StoreKey, }; -use crate::{store::StoreConfig, utils::PyErrExt}; +use crate::utils::PyErrExt; pub(crate) trait ChunksItem { - fn store_config(&self) -> StoreConfig; fn key(&self) -> &StoreKey; fn representation(&self) -> &ChunkRepresentation; } @@ -26,7 +25,6 @@ pub(crate) trait ChunksItem { #[gen_stub_pyclass] #[pyclass] pub(crate) struct Basic { - store: StoreConfig, key: StoreKey, representation: ChunkRepresentation, } @@ -62,7 +60,7 @@ fn fill_value_to_bytes(dtype: &str, fill_value: &Bound<'_, PyAny>) -> PyResult, chunk_spec: &Bound<'_, PyAny>) -> PyResult { - let store: StoreConfig = byte_interface.getattr("store")?.extract()?; + // let store: StoreConfig = byte_interface.getattr("store")?.extract()?; let path: String = byte_interface.getattr("path")?.extract()?; let chunk_shape = chunk_spec.getattr("shape")?.extract()?; @@ -79,7 +77,6 @@ impl Basic { let fill_value: Bound<'_, PyAny> = chunk_spec.getattr("fill_value")?; let fill_value_bytes = fill_value_to_bytes(&dtype, &fill_value)?; Ok(Self { - store, key: StoreKey::new(path).map_py_err::()?, representation: get_chunk_representation(chunk_shape, &dtype, fill_value_bytes)?, }) @@ -118,9 +115,6 @@ impl WithSubset { } impl ChunksItem for Basic { - fn store_config(&self) -> StoreConfig { - self.store.clone() - } fn key(&self) -> &StoreKey { &self.key } @@ -130,9 +124,6 @@ impl ChunksItem for Basic { } impl ChunksItem for WithSubset { - fn store_config(&self) -> StoreConfig { - self.item.store.clone() - } fn key(&self) -> &StoreKey { &self.item.key } diff --git a/src/lib.rs b/src/lib.rs index afd70cd..30a55ac 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,14 +20,15 @@ use unsafe_cell_slice::UnsafeCellSlice; use utils::is_whole_chunk; use zarrs::array::codec::{ ArrayPartialDecoderTraits, ArrayToBytesCodecTraits, CodecOptions, CodecOptionsBuilder, + StoragePartialDecoder, }; use zarrs::array::{ - copy_fill_value_into, update_array_bytes, ArrayBytes, ArrayBytesFixedDisjointView, ArraySize, - CodecChain, FillValue, + copy_fill_value_into, update_array_bytes, Array, ArrayBytes, ArrayBytesFixedDisjointView, + ArrayMetadata, ArraySize, CodecChain, FillValue, }; use zarrs::array_subset::ArraySubset; -use zarrs::metadata::v3::MetadataV3; -use zarrs::storage::StoreKey; +use zarrs::storage::store::MemoryStore; +use zarrs::storage::{ReadableWritableListableStorage, StorageHandle, StoreKey}; mod chunk_item; mod concurrency; @@ -41,14 +42,14 @@ mod utils; use crate::chunk_item::ChunksItem; use crate::concurrency::ChunkConcurrentLimitAndCodecOptions; use crate::metadata_v2::codec_metadata_v2_to_v3; -use crate::store::StoreManager; +use crate::store::StoreConfig; use crate::utils::{PyErrExt as _, PyUntypedArrayExt as _}; // TODO: Use a OnceLock for store with get_or_try_init when stabilised? #[gen_stub_pyclass] #[pyclass] pub struct CodecPipelineImpl { - pub(crate) stores: StoreManager, + pub(crate) store: ReadableWritableListableStorage, pub(crate) codec_chain: Arc, pub(crate) codec_options: CodecOptions, pub(crate) chunk_concurrent_minimum: usize, @@ -63,7 +64,7 @@ impl CodecPipelineImpl { codec_chain: &CodecChain, codec_options: &CodecOptions, ) -> PyResult> { - let value_encoded = self.stores.get(item)?; + let value_encoded = self.store.get(item.key()).map_py_err::()?; let value_decoded = if let Some(value_encoded) = value_encoded { let value_encoded: Vec = value_encoded.into(); // zero-copy in this case codec_chain @@ -94,7 +95,7 @@ impl CodecPipelineImpl { .map_py_err::()?; if value_decoded.is_fill_value(item.representation().fill_value()) { - self.stores.erase(item) + self.store.erase(item.key()).map_py_err::() } else { let value_encoded = codec_chain .encode(value_decoded, item.representation(), codec_options) @@ -102,7 +103,9 @@ impl CodecPipelineImpl { .map_py_err::()?; // Store the encoded chunk - self.stores.set(item, value_encoded.into()) + self.store + .set(item.key(), value_encoded.into()) + .map_py_err::() } } @@ -204,7 +207,8 @@ impl CodecPipelineImpl { #[pymethods] impl CodecPipelineImpl { #[pyo3(signature = ( - metadata, + array_metadata, + store_config, *, validate_checksums=None, store_empty_chunks=None, @@ -214,17 +218,22 @@ impl CodecPipelineImpl { ))] #[new] fn new( - metadata: &str, + array_metadata: &str, + store_config: StoreConfig, validate_checksums: Option, store_empty_chunks: Option, chunk_concurrent_minimum: Option, chunk_concurrent_maximum: Option, num_threads: Option, ) -> PyResult { - let metadata: Vec = - serde_json::from_str(metadata).map_py_err::()?; - let codec_chain = - Arc::new(CodecChain::from_metadata(&metadata).map_py_err::()?); + let metadata: ArrayMetadata = + serde_json::from_str(array_metadata).map_py_err::()?; + + // TODO: Add a direct metadata -> codec chain method to zarrs + let store = Arc::new(MemoryStore::new()); + let array = Array::new_with_metadata(store, "/", metadata).map_py_err::()?; + let codec_chain = Arc::new(array.codecs().clone()); + let mut codec_options = CodecOptionsBuilder::new(); if let Some(validate_checksums) = validate_checksums { codec_options = codec_options.validate_checksums(validate_checksums); @@ -240,8 +249,11 @@ impl CodecPipelineImpl { chunk_concurrent_maximum.unwrap_or(rayon::current_num_threads()); let num_threads = num_threads.unwrap_or(rayon::current_num_threads()); + let store: ReadableWritableListableStorage = + (&store_config).try_into().map_py_err::()?; + Ok(Self { - stores: StoreManager::default(), + store, codec_chain, codec_options, chunk_concurrent_minimum, @@ -281,7 +293,9 @@ impl CodecPipelineImpl { partial_chunk_descriptions, map, |item| { - let input_handle = self.stores.decoder(item)?; + let storage_handle = Arc::new(StorageHandle::new(self.store.clone())); + let input_handle = + StoragePartialDecoder::new(storage_handle, item.key().clone()); let partial_decoder = self .codec_chain .clone() @@ -331,7 +345,9 @@ impl CodecPipelineImpl { && chunk_subset.shape() == item.representation().shape_u64() { // See zarrs::array::Array::retrieve_chunk_into - if let Some(chunk_encoded) = self.stores.get(&item)? { + if let Some(chunk_encoded) = + self.store.get(item.key()).map_py_err::()? + { // Decode the encoded data into the output buffer let chunk_encoded: Vec = chunk_encoded.into(); self.codec_chain.decode_into( diff --git a/src/store.rs b/src/store.rs index 8fc3ee6..8c6b206 100644 --- a/src/store.rs +++ b/src/store.rs @@ -15,13 +15,11 @@ use crate::{runtime::tokio_block_on, utils::PyErrExt}; mod filesystem; mod http; -mod manager; pub use self::filesystem::FilesystemStoreConfig; pub use self::http::HttpStoreConfig; -pub(crate) use self::manager::StoreManager; -#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Debug, Clone)] #[gen_stub_pyclass_enum] pub enum StoreConfig { Filesystem(FilesystemStoreConfig), diff --git a/src/store/filesystem.rs b/src/store/filesystem.rs index 8ee865b..9bf5b16 100644 --- a/src/store/filesystem.rs +++ b/src/store/filesystem.rs @@ -6,7 +6,7 @@ use zarrs::{filesystem::FilesystemStore, storage::ReadableWritableListableStorag use crate::utils::PyErrExt; -#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Debug, Clone)] #[gen_stub_pyclass] #[pyclass] pub struct FilesystemStoreConfig { diff --git a/src/store/http.rs b/src/store/http.rs index 0c7820b..725d5a4 100644 --- a/src/store/http.rs +++ b/src/store/http.rs @@ -6,7 +6,7 @@ use zarrs::storage::ReadableWritableListableStorage; use super::opendal_builder_to_sync_store; -#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Debug, Clone)] #[gen_stub_pyclass] #[pyclass] pub struct HttpStoreConfig { diff --git a/src/store/manager.rs b/src/store/manager.rs deleted file mode 100644 index d32a68d..0000000 --- a/src/store/manager.rs +++ /dev/null @@ -1,61 +0,0 @@ -use std::{ - collections::BTreeMap, - sync::{Arc, Mutex}, -}; - -use pyo3::{exceptions::PyRuntimeError, PyResult}; -use zarrs::{ - array::codec::StoragePartialDecoder, - storage::{Bytes, MaybeBytes, ReadableWritableListableStorage, StorageHandle}, -}; - -use crate::{chunk_item::ChunksItem, store::PyErrExt as _}; - -use super::StoreConfig; - -#[derive(Default)] -pub(crate) struct StoreManager(Mutex>); - -impl StoreManager { - fn store(&self, item: &I) -> PyResult { - use std::collections::btree_map::Entry::{Occupied, Vacant}; - match self - .0 - .lock() - .map_py_err::()? - .entry(item.store_config()) - { - Occupied(e) => Ok(e.get().clone()), - Vacant(e) => Ok(e.insert((&item.store_config()).try_into()?).clone()), - } - } - - pub(crate) fn get(&self, item: &I) -> PyResult { - self.store(item)? - .get(item.key()) - .map_py_err::() - } - - pub(crate) fn set(&self, item: &I, value: Bytes) -> PyResult<()> { - self.store(item)? - .set(item.key(), value) - .map_py_err::() - } - - pub(crate) fn erase(&self, item: &I) -> PyResult<()> { - self.store(item)? - .erase(item.key()) - .map_py_err::() - } - - pub(crate) fn decoder(&self, item: &I) -> PyResult { - // Partially decode the chunk into the output buffer - let storage_handle = Arc::new(StorageHandle::new(self.store(item)?)); - // NOTE: Normally a storage transformer would exist between the storage handle and the input handle - // but zarr-python does not support them nor forward them to the codec pipeline - Ok(StoragePartialDecoder::new( - storage_handle, - item.key().clone(), - )) - } -} diff --git a/tests/test_v2.py b/tests/test_v2.py index bf59d22..3a47059 100644 --- a/tests/test_v2.py +++ b/tests/test_v2.py @@ -43,6 +43,9 @@ def test_simple(store: StorePath) -> None: assert np.array_equal(data, a[:, :]) +@pytest.mark.filterwarnings( + "ignore:Array is unsupported by ZarrsCodecPipeline. codec delta is not supported:UserWarning" +) def test_codec_pipeline(store) -> None: array = zarr.create( store=store, @@ -58,6 +61,13 @@ def test_codec_pipeline(store) -> None: np.testing.assert_array_equal(result, expected) +@pytest.mark.filterwarnings( + "ignore:Array is unsupported by ZarrsCodecPipeline. data type |S1 is not supported:UserWarning" +) +@pytest.mark.filterwarnings( + # TODO: Fix handling of string fill values for Zarr v2 bytes data + "ignore:Array is unsupported by ZarrsCodecPipeline. incompatible fill value ..+. for data type bytes:UserWarning" +) @pytest.mark.parametrize( ("dtype", "expected_dtype", "fill_value", "fill_value_json"), [ @@ -101,6 +111,12 @@ async def test_v2_encode_decode( np.testing.assert_equal(data, np.full((3,), b"X", dtype=dtype)) +@pytest.mark.filterwarnings( + "ignore:Array is unsupported by ZarrsCodecPipeline. data type |U1 is not supported:UserWarning" +) +@pytest.mark.filterwarnings( + "ignore:Array is unsupported by ZarrsCodecPipeline. data type |S1 is not supported:UserWarning" +) @pytest.mark.parametrize( ("dtype", "value"), [ @@ -109,9 +125,10 @@ async def test_v2_encode_decode( (VariableLengthUTF8(), "Y"), ], ) -def test_v2_encode_decode_with_data(dtype: ZDType[Any, Any], value: str): +def test_v2_encode_decode_with_data(dtype: ZDType[Any, Any], value: str, tmp_path): expected = np.full((3,), value, dtype=dtype.to_native_dtype()) a = zarr.create( + store=tmp_path, shape=(3,), zarr_format=2, dtype=dtype, @@ -121,6 +138,9 @@ def test_v2_encode_decode_with_data(dtype: ZDType[Any, Any], value: str): np.testing.assert_equal(data, expected) +@pytest.mark.filterwarnings( + "ignore:Array is unsupported by ZarrsCodecPipeline. codec delta is not supported:UserWarning" +) @pytest.mark.parametrize( "filters", [[], [numcodecs.Delta(dtype=" None: a = np.array( @@ -307,6 +331,14 @@ def test_parse_structured_fill_value_valid( assert result[name] == expected_result[name] +@pytest.mark.filterwarnings( + # TODO: Permit this in zarrs? + "ignore:Array is unsupported by ZarrsCodecPipeline. unsupported Zarr V2 array. unsupported fill value Null for data type bytes:UserWarning" +) +@pytest.mark.filterwarnings( + # TODO: Fix handling of string fill values for Zarr v2 bytes data + "ignore:Array is unsupported by ZarrsCodecPipeline. incompatible fill value .eAAAAAAAAA==. for data type bytes:UserWarning" +) @pytest.mark.parametrize("fill_value", [None, b"x"], ids=["no_fill", "fill"]) def test_other_dtype_roundtrip(fill_value, tmp_path) -> None: a = np.array([b"a\0\0", b"bb", b"ccc"], dtype="V7") diff --git a/tests/test_vlen.py b/tests/test_vlen.py index b9e29eb..2c4ffaa 100644 --- a/tests/test_vlen.py +++ b/tests/test_vlen.py @@ -28,6 +28,15 @@ expected_array_string_dtype = np.dtype("O") +@pytest.mark.filterwarnings( + "ignore:Array is unsupported by ZarrsCodecPipeline. data type fixed_length_utf32 is not supported:UserWarning" +) +@pytest.mark.filterwarnings( + "ignore:Array is unsupported by ZarrsCodecPipeline. data type null_terminated_bytes is not supported:UserWarning" +) +@pytest.mark.filterwarnings( + "ignore:Array is unsupported by ZarrsCodecPipeline. data type fixed_length_utf32 is not supported:UserWarning" +) @pytest.mark.filterwarnings( "ignore::zarr.core.dtype.common.UnstableSpecificationWarning" ) From ae270852d353cf236c391bb235e58ab60fd046d3 Mon Sep 17 00:00:00 2001 From: Lachlan Deakin Date: Wed, 16 Jul 2025 08:02:22 +1000 Subject: [PATCH 6/7] Apply suggestion from @ilan-gold Co-authored-by: Ilan Gold --- src/chunk_item.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/chunk_item.rs b/src/chunk_item.rs index eaea38e..c4d592f 100644 --- a/src/chunk_item.rs +++ b/src/chunk_item.rs @@ -60,7 +60,6 @@ fn fill_value_to_bytes(dtype: &str, fill_value: &Bound<'_, PyAny>) -> PyResult, chunk_spec: &Bound<'_, PyAny>) -> PyResult { - // let store: StoreConfig = byte_interface.getattr("store")?.extract()?; let path: String = byte_interface.getattr("path")?.extract()?; let chunk_shape = chunk_spec.getattr("shape")?.extract()?; From f5940403843a2eae314f64fdf6f46ceb49175a13 Mon Sep 17 00:00:00 2001 From: Ilan Gold Date: Wed, 16 Jul 2025 12:17:51 +0200 Subject: [PATCH 7/7] chore: add spy for pipeline (#105) --- pyproject.toml | 1 + tests/test_pipeline.py | 25 +++++++++++++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index b2e3a06..cd96d31 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ test = [ "pytest", "pytest-asyncio", "pytest-xdist", + "pytest-mock", ] doc = ["sphinx>=7.4.6", "myst-parser"] dev = [ diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 70d5ffc..215fc15 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -256,3 +256,28 @@ def test_roundtrip_read_only_zarrs( assert np.all( res == store_values, ), res + + +@pytest.mark.parametrize( + "codec", + [zarr.codecs.BloscCodec(), zarr.codecs.GzipCodec(), zarr.codecs.ZstdCodec()], +) +@pytest.mark.parametrize("should_shard", [True, False]) +def test_pipeline_used( + mocker, codec: zarr.abc.codec.BaseCodec, tmp_path: Path, *, should_shard: bool +): + z = zarr.create_array( + tmp_path / "foo.zarr", + dtype=np.uint16, + shape=(80, 100), + chunks=(10, 10), + shards=(20, 20) if should_shard else None, + compressors=[codec], + ) + spy_read = mocker.spy(z._async_array.codec_pipeline, "read") + spy_write = mocker.spy(z._async_array.codec_pipeline, "write") + assert isinstance(z._async_array.codec_pipeline, zarrs.ZarrsCodecPipeline) + z[...] = np.random.random(z.shape) + z[...] + assert spy_read.call_count == 1 + assert spy_write.call_count == 1