diff --git a/src/zarr/codecs/_v2.py b/src/zarr/codecs/_v2.py index 0f50264be8..30504ad204 100644 --- a/src/zarr/codecs/_v2.py +++ b/src/zarr/codecs/_v2.py @@ -5,20 +5,21 @@ from typing import TYPE_CHECKING import numcodecs -from numcodecs.compat import ensure_bytes, ensure_ndarray +from numcodecs.compat import ensure_ndarray_like -from zarr.abc.codec import ArrayArrayCodec, ArrayBytesCodec -from zarr.core.buffer import Buffer, NDBuffer, default_buffer_prototype +from zarr.abc.codec import ArrayBytesCodec from zarr.registry import get_ndbuffer_class if TYPE_CHECKING: import numcodecs.abc from zarr.core.array_spec import ArraySpec + from zarr.core.buffer import Buffer, NDBuffer @dataclass(frozen=True) -class V2Compressor(ArrayBytesCodec): +class V2Codec(ArrayBytesCodec): + filters: tuple[numcodecs.abc.Codec, ...] | None compressor: numcodecs.abc.Codec | None is_fixed_size = False @@ -28,81 +29,61 @@ async def _decode_single( chunk_bytes: Buffer, chunk_spec: ArraySpec, ) -> NDBuffer: - if self.compressor is not None: - chunk_numpy_array = ensure_ndarray( - await asyncio.to_thread(self.compressor.decode, chunk_bytes.as_array_like()) - ) + cdata = chunk_bytes.as_array_like() + # decompress + if self.compressor: + chunk = await asyncio.to_thread(self.compressor.decode, cdata) else: - chunk_numpy_array = ensure_ndarray(chunk_bytes.as_array_like()) + chunk = cdata + + # apply filters + if self.filters: + for f in reversed(self.filters): + chunk = await asyncio.to_thread(f.decode, chunk) + + # view as numpy array with correct dtype + chunk = ensure_ndarray_like(chunk) + # special case object dtype, because incorrect handling can lead to + # segfaults and other bad things happening + if chunk_spec.dtype != object: + chunk = chunk.view(chunk_spec.dtype) + elif chunk.dtype != object: + # If we end up here, someone must have hacked around with the filters. + # We cannot deal with object arrays unless there is an object + # codec in the filter chain, i.e., a filter that converts from object + # array to something else during encoding, and converts back to object + # array during decoding. + raise RuntimeError("cannot read object array without object codec") - # ensure correct dtype - if str(chunk_numpy_array.dtype) != chunk_spec.dtype and not chunk_spec.dtype.hasobject: - chunk_numpy_array = chunk_numpy_array.view(chunk_spec.dtype) + # ensure correct chunk shape + chunk = chunk.reshape(-1, order="A") + chunk = chunk.reshape(chunk_spec.shape, order=chunk_spec.order) - return get_ndbuffer_class().from_numpy_array(chunk_numpy_array) + return get_ndbuffer_class().from_ndarray_like(chunk) async def _encode_single( - self, - chunk_array: NDBuffer, - _chunk_spec: ArraySpec, - ) -> Buffer | None: - chunk_numpy_array = chunk_array.as_numpy_array() - if self.compressor is not None: - if ( - not chunk_numpy_array.flags.c_contiguous - and not chunk_numpy_array.flags.f_contiguous - ): - chunk_numpy_array = chunk_numpy_array.copy(order="A") - encoded_chunk_bytes = ensure_bytes( - await asyncio.to_thread(self.compressor.encode, chunk_numpy_array) - ) - else: - encoded_chunk_bytes = ensure_bytes(chunk_numpy_array) - - return default_buffer_prototype().buffer.from_bytes(encoded_chunk_bytes) - - def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int: - raise NotImplementedError - - -@dataclass(frozen=True) -class V2Filters(ArrayArrayCodec): - filters: tuple[numcodecs.abc.Codec, ...] | None - - is_fixed_size = False - - async def _decode_single( self, chunk_array: NDBuffer, chunk_spec: ArraySpec, - ) -> NDBuffer: - chunk_ndarray = chunk_array.as_ndarray_like() - # apply filters in reverse order - if self.filters is not None: - for filter in self.filters[::-1]: - chunk_ndarray = await asyncio.to_thread(filter.decode, chunk_ndarray) - - # ensure correct chunk shape - if chunk_ndarray.shape != chunk_spec.shape: - chunk_ndarray = chunk_ndarray.reshape( - chunk_spec.shape, - order=chunk_spec.order, - ) + ) -> Buffer | None: + chunk = chunk_array.as_ndarray_like() - return get_ndbuffer_class().from_ndarray_like(chunk_ndarray) + # apply filters + if self.filters: + for f in self.filters: + chunk = await asyncio.to_thread(f.encode, chunk) - async def _encode_single( - self, - chunk_array: NDBuffer, - chunk_spec: ArraySpec, - ) -> NDBuffer | None: - chunk_ndarray = chunk_array.as_ndarray_like().ravel(order=chunk_spec.order) + # check object encoding + if ensure_ndarray_like(chunk).dtype == object: + raise RuntimeError("cannot write object array without object codec") - if self.filters is not None: - for filter in self.filters: - chunk_ndarray = await asyncio.to_thread(filter.encode, chunk_ndarray) + # compress + if self.compressor: + cdata = await asyncio.to_thread(self.compressor.encode, chunk) + else: + cdata = chunk - return get_ndbuffer_class().from_ndarray_like(chunk_ndarray) + return chunk_spec.prototype.buffer.from_bytes(cdata) def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int: raise NotImplementedError diff --git a/src/zarr/core/array.py b/src/zarr/core/array.py index 8c4d797e9a..a4b86b85ae 100644 --- a/src/zarr/core/array.py +++ b/src/zarr/core/array.py @@ -13,7 +13,7 @@ from zarr._compat import _deprecate_positional_args from zarr.abc.store import Store, set_or_delete from zarr.codecs import _get_default_array_bytes_codec -from zarr.codecs._v2 import V2Compressor, V2Filters +from zarr.codecs._v2 import V2Codec from zarr.core.attributes import Attributes from zarr.core.buffer import ( BufferPrototype, @@ -118,9 +118,8 @@ def create_codec_pipeline(metadata: ArrayMetadata) -> CodecPipeline: if isinstance(metadata, ArrayV3Metadata): return get_pipeline_class().from_codecs(metadata.codecs) elif isinstance(metadata, ArrayV2Metadata): - return get_pipeline_class().from_codecs( - [V2Filters(metadata.filters), V2Compressor(metadata.compressor)] - ) + v2_codec = V2Codec(filters=metadata.filters, compressor=metadata.compressor) + return get_pipeline_class().from_codecs([v2_codec]) else: raise TypeError diff --git a/tests/test_v2.py b/tests/test_v2.py index 439b15b64c..3dd17848fb 100644 --- a/tests/test_v2.py +++ b/tests/test_v2.py @@ -121,3 +121,12 @@ async def test_create_dtype_str(dtype: Any) -> None: arr[:] = ["a", "bb", "ccc"] result = arr[:] np.testing.assert_array_equal(result, np.array(["a", "bb", "ccc"], dtype="object")) + + +@pytest.mark.parametrize("filters", [[], [numcodecs.Delta(dtype=" None: + array_fixture = [42] + arr = zarr.create(shape=1, dtype="