From c04d7dc3ea3f83f6e9baf09fcda798231cfed979 Mon Sep 17 00:00:00 2001 From: Norman Rzepka Date: Fri, 18 Oct 2024 21:36:20 +0200 Subject: [PATCH 1/3] refactors codec pipeline for v2 --- src/zarr/codecs/_v2.py | 117 ++++++++++++++++++----------------------- src/zarr/core/array.py | 7 ++- tests/test_v2.py | 9 ++++ 3 files changed, 62 insertions(+), 71 deletions(-) diff --git a/src/zarr/codecs/_v2.py b/src/zarr/codecs/_v2.py index 0f50264be8..83f8ee80bd 100644 --- a/src/zarr/codecs/_v2.py +++ b/src/zarr/codecs/_v2.py @@ -1,24 +1,25 @@ from __future__ import annotations -import asyncio from dataclasses import dataclass from typing import TYPE_CHECKING import numcodecs -from numcodecs.compat import ensure_bytes, ensure_ndarray +import numcodecs.compat +from numcodecs.compat import ensure_bytes, ensure_ndarray_like -from zarr.abc.codec import ArrayArrayCodec, ArrayBytesCodec -from zarr.core.buffer import Buffer, NDBuffer, default_buffer_prototype +from zarr.abc.codec import ArrayBytesCodec from zarr.registry import get_ndbuffer_class if TYPE_CHECKING: import numcodecs.abc from zarr.core.array_spec import ArraySpec + from zarr.core.buffer import Buffer, NDBuffer @dataclass(frozen=True) -class V2Compressor(ArrayBytesCodec): +class V2Codec(ArrayBytesCodec): + filters: tuple[numcodecs.abc.Codec, ...] | None compressor: numcodecs.abc.Codec | None is_fixed_size = False @@ -28,81 +29,63 @@ async def _decode_single( chunk_bytes: Buffer, chunk_spec: ArraySpec, ) -> NDBuffer: - if self.compressor is not None: - chunk_numpy_array = ensure_ndarray( - await asyncio.to_thread(self.compressor.decode, chunk_bytes.as_array_like()) - ) + cdata = chunk_bytes.as_array_like() + # decompress + if self.compressor: + chunk = self.compressor.decode(cdata) else: - chunk_numpy_array = ensure_ndarray(chunk_bytes.as_array_like()) + chunk = cdata + + # apply filters + if self.filters: + for f in reversed(self.filters): + chunk = f.decode(chunk) + + # view as numpy array with correct dtype + chunk = ensure_ndarray_like(chunk) + # special case object dtype, because incorrect handling can lead to + # segfaults and other bad things happening + if chunk_spec.dtype != object: + chunk = chunk.view(chunk_spec.dtype) + elif chunk.dtype != object: + # If we end up here, someone must have hacked around with the filters. + # We cannot deal with object arrays unless there is an object + # codec in the filter chain, i.e., a filter that converts from object + # array to something else during encoding, and converts back to object + # array during decoding. + raise RuntimeError("cannot read object array without object codec") - # ensure correct dtype - if str(chunk_numpy_array.dtype) != chunk_spec.dtype and not chunk_spec.dtype.hasobject: - chunk_numpy_array = chunk_numpy_array.view(chunk_spec.dtype) + # ensure correct chunk shape + chunk = chunk.reshape(-1, order="A") + chunk = chunk.reshape(chunk_spec.shape, order=chunk_spec.order) - return get_ndbuffer_class().from_numpy_array(chunk_numpy_array) + return get_ndbuffer_class().from_ndarray_like(chunk) async def _encode_single( - self, - chunk_array: NDBuffer, - _chunk_spec: ArraySpec, - ) -> Buffer | None: - chunk_numpy_array = chunk_array.as_numpy_array() - if self.compressor is not None: - if ( - not chunk_numpy_array.flags.c_contiguous - and not chunk_numpy_array.flags.f_contiguous - ): - chunk_numpy_array = chunk_numpy_array.copy(order="A") - encoded_chunk_bytes = ensure_bytes( - await asyncio.to_thread(self.compressor.encode, chunk_numpy_array) - ) - else: - encoded_chunk_bytes = ensure_bytes(chunk_numpy_array) - - return default_buffer_prototype().buffer.from_bytes(encoded_chunk_bytes) - - def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int: - raise NotImplementedError - - -@dataclass(frozen=True) -class V2Filters(ArrayArrayCodec): - filters: tuple[numcodecs.abc.Codec, ...] | None - - is_fixed_size = False - - async def _decode_single( self, chunk_array: NDBuffer, chunk_spec: ArraySpec, - ) -> NDBuffer: - chunk_ndarray = chunk_array.as_ndarray_like() - # apply filters in reverse order - if self.filters is not None: - for filter in self.filters[::-1]: - chunk_ndarray = await asyncio.to_thread(filter.decode, chunk_ndarray) + ) -> Buffer | None: + chunk = chunk_array.as_ndarray_like() - # ensure correct chunk shape - if chunk_ndarray.shape != chunk_spec.shape: - chunk_ndarray = chunk_ndarray.reshape( - chunk_spec.shape, - order=chunk_spec.order, - ) + # apply filters + if self.filters: + for f in self.filters: + chunk = f.encode(chunk) - return get_ndbuffer_class().from_ndarray_like(chunk_ndarray) + # check object encoding + if ensure_ndarray_like(chunk).dtype == object: + raise RuntimeError("cannot write object array without object codec") - async def _encode_single( - self, - chunk_array: NDBuffer, - chunk_spec: ArraySpec, - ) -> NDBuffer | None: - chunk_ndarray = chunk_array.as_ndarray_like().ravel(order=chunk_spec.order) + # compress + if self.compressor: + cdata = self.compressor.encode(chunk) + else: + cdata = chunk - if self.filters is not None: - for filter in self.filters: - chunk_ndarray = await asyncio.to_thread(filter.encode, chunk_ndarray) + cdata = ensure_bytes(cdata) - return get_ndbuffer_class().from_ndarray_like(chunk_ndarray) + return chunk_spec.prototype.buffer.from_bytes(cdata) def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int: raise NotImplementedError diff --git a/src/zarr/core/array.py b/src/zarr/core/array.py index da477056ee..522a090c4e 100644 --- a/src/zarr/core/array.py +++ b/src/zarr/core/array.py @@ -12,7 +12,7 @@ from zarr._compat import _deprecate_positional_args from zarr.abc.store import Store, set_or_delete from zarr.codecs import _get_default_array_bytes_codec -from zarr.codecs._v2 import V2Compressor, V2Filters +from zarr.codecs._v2 import V2Codec from zarr.core.attributes import Attributes from zarr.core.buffer import ( BufferPrototype, @@ -116,9 +116,8 @@ def create_codec_pipeline(metadata: ArrayMetadata) -> CodecPipeline: if isinstance(metadata, ArrayV3Metadata): return get_pipeline_class().from_codecs(metadata.codecs) elif isinstance(metadata, ArrayV2Metadata): - return get_pipeline_class().from_codecs( - [V2Filters(metadata.filters), V2Compressor(metadata.compressor)] - ) + v2_codec = V2Codec(filters=metadata.filters, compressor=metadata.compressor) + return get_pipeline_class().from_codecs([v2_codec]) else: raise TypeError diff --git a/tests/test_v2.py b/tests/test_v2.py index 439b15b64c..3dd17848fb 100644 --- a/tests/test_v2.py +++ b/tests/test_v2.py @@ -121,3 +121,12 @@ async def test_create_dtype_str(dtype: Any) -> None: arr[:] = ["a", "bb", "ccc"] result = arr[:] np.testing.assert_array_equal(result, np.array(["a", "bb", "ccc"], dtype="object")) + + +@pytest.mark.parametrize("filters", [[], [numcodecs.Delta(dtype=" None: + array_fixture = [42] + arr = zarr.create(shape=1, dtype=" Date: Fri, 18 Oct 2024 21:40:16 +0200 Subject: [PATCH 2/3] async --- src/zarr/codecs/_v2.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/zarr/codecs/_v2.py b/src/zarr/codecs/_v2.py index 83f8ee80bd..89894e2b91 100644 --- a/src/zarr/codecs/_v2.py +++ b/src/zarr/codecs/_v2.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio from dataclasses import dataclass from typing import TYPE_CHECKING @@ -32,14 +33,14 @@ async def _decode_single( cdata = chunk_bytes.as_array_like() # decompress if self.compressor: - chunk = self.compressor.decode(cdata) + chunk = await asyncio.to_thread(self.compressor.decode, cdata) else: chunk = cdata # apply filters if self.filters: for f in reversed(self.filters): - chunk = f.decode(chunk) + chunk = await asyncio.to_thread(f.decode, chunk) # view as numpy array with correct dtype chunk = ensure_ndarray_like(chunk) @@ -71,7 +72,7 @@ async def _encode_single( # apply filters if self.filters: for f in self.filters: - chunk = f.encode(chunk) + chunk = await asyncio.to_thread(f.encode, chunk) # check object encoding if ensure_ndarray_like(chunk).dtype == object: @@ -79,7 +80,7 @@ async def _encode_single( # compress if self.compressor: - cdata = self.compressor.encode(chunk) + cdata = await asyncio.to_thread(self.compressor.encode, chunk) else: cdata = chunk From c608fef1300f95e94f26103c957045d6a718959f Mon Sep 17 00:00:00 2001 From: Norman Rzepka Date: Fri, 18 Oct 2024 21:43:31 +0200 Subject: [PATCH 3/3] rm ensure_bytes --- src/zarr/codecs/_v2.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/zarr/codecs/_v2.py b/src/zarr/codecs/_v2.py index 89894e2b91..30504ad204 100644 --- a/src/zarr/codecs/_v2.py +++ b/src/zarr/codecs/_v2.py @@ -5,8 +5,7 @@ from typing import TYPE_CHECKING import numcodecs -import numcodecs.compat -from numcodecs.compat import ensure_bytes, ensure_ndarray_like +from numcodecs.compat import ensure_ndarray_like from zarr.abc.codec import ArrayBytesCodec from zarr.registry import get_ndbuffer_class @@ -84,8 +83,6 @@ async def _encode_single( else: cdata = chunk - cdata = ensure_bytes(cdata) - return chunk_spec.prototype.buffer.from_bytes(cdata) def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int: