Skip to content

Commit c04d7dc

Browse files
committed
refactors codec pipeline for v2
1 parent 5f3a512 commit c04d7dc

File tree

3 files changed

+62
-71
lines changed

3 files changed

+62
-71
lines changed

src/zarr/codecs/_v2.py

Lines changed: 50 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,25 @@
11
from __future__ import annotations
22

3-
import asyncio
43
from dataclasses import dataclass
54
from typing import TYPE_CHECKING
65

76
import numcodecs
8-
from numcodecs.compat import ensure_bytes, ensure_ndarray
7+
import numcodecs.compat
8+
from numcodecs.compat import ensure_bytes, ensure_ndarray_like
99

10-
from zarr.abc.codec import ArrayArrayCodec, ArrayBytesCodec
11-
from zarr.core.buffer import Buffer, NDBuffer, default_buffer_prototype
10+
from zarr.abc.codec import ArrayBytesCodec
1211
from zarr.registry import get_ndbuffer_class
1312

1413
if TYPE_CHECKING:
1514
import numcodecs.abc
1615

1716
from zarr.core.array_spec import ArraySpec
17+
from zarr.core.buffer import Buffer, NDBuffer
1818

1919

2020
@dataclass(frozen=True)
21-
class V2Compressor(ArrayBytesCodec):
21+
class V2Codec(ArrayBytesCodec):
22+
filters: tuple[numcodecs.abc.Codec, ...] | None
2223
compressor: numcodecs.abc.Codec | None
2324

2425
is_fixed_size = False
@@ -28,81 +29,63 @@ async def _decode_single(
2829
chunk_bytes: Buffer,
2930
chunk_spec: ArraySpec,
3031
) -> NDBuffer:
31-
if self.compressor is not None:
32-
chunk_numpy_array = ensure_ndarray(
33-
await asyncio.to_thread(self.compressor.decode, chunk_bytes.as_array_like())
34-
)
32+
cdata = chunk_bytes.as_array_like()
33+
# decompress
34+
if self.compressor:
35+
chunk = self.compressor.decode(cdata)
3536
else:
36-
chunk_numpy_array = ensure_ndarray(chunk_bytes.as_array_like())
37+
chunk = cdata
38+
39+
# apply filters
40+
if self.filters:
41+
for f in reversed(self.filters):
42+
chunk = f.decode(chunk)
43+
44+
# view as numpy array with correct dtype
45+
chunk = ensure_ndarray_like(chunk)
46+
# special case object dtype, because incorrect handling can lead to
47+
# segfaults and other bad things happening
48+
if chunk_spec.dtype != object:
49+
chunk = chunk.view(chunk_spec.dtype)
50+
elif chunk.dtype != object:
51+
# If we end up here, someone must have hacked around with the filters.
52+
# We cannot deal with object arrays unless there is an object
53+
# codec in the filter chain, i.e., a filter that converts from object
54+
# array to something else during encoding, and converts back to object
55+
# array during decoding.
56+
raise RuntimeError("cannot read object array without object codec")
3757

38-
# ensure correct dtype
39-
if str(chunk_numpy_array.dtype) != chunk_spec.dtype and not chunk_spec.dtype.hasobject:
40-
chunk_numpy_array = chunk_numpy_array.view(chunk_spec.dtype)
58+
# ensure correct chunk shape
59+
chunk = chunk.reshape(-1, order="A")
60+
chunk = chunk.reshape(chunk_spec.shape, order=chunk_spec.order)
4161

42-
return get_ndbuffer_class().from_numpy_array(chunk_numpy_array)
62+
return get_ndbuffer_class().from_ndarray_like(chunk)
4363

4464
async def _encode_single(
45-
self,
46-
chunk_array: NDBuffer,
47-
_chunk_spec: ArraySpec,
48-
) -> Buffer | None:
49-
chunk_numpy_array = chunk_array.as_numpy_array()
50-
if self.compressor is not None:
51-
if (
52-
not chunk_numpy_array.flags.c_contiguous
53-
and not chunk_numpy_array.flags.f_contiguous
54-
):
55-
chunk_numpy_array = chunk_numpy_array.copy(order="A")
56-
encoded_chunk_bytes = ensure_bytes(
57-
await asyncio.to_thread(self.compressor.encode, chunk_numpy_array)
58-
)
59-
else:
60-
encoded_chunk_bytes = ensure_bytes(chunk_numpy_array)
61-
62-
return default_buffer_prototype().buffer.from_bytes(encoded_chunk_bytes)
63-
64-
def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int:
65-
raise NotImplementedError
66-
67-
68-
@dataclass(frozen=True)
69-
class V2Filters(ArrayArrayCodec):
70-
filters: tuple[numcodecs.abc.Codec, ...] | None
71-
72-
is_fixed_size = False
73-
74-
async def _decode_single(
7565
self,
7666
chunk_array: NDBuffer,
7767
chunk_spec: ArraySpec,
78-
) -> NDBuffer:
79-
chunk_ndarray = chunk_array.as_ndarray_like()
80-
# apply filters in reverse order
81-
if self.filters is not None:
82-
for filter in self.filters[::-1]:
83-
chunk_ndarray = await asyncio.to_thread(filter.decode, chunk_ndarray)
68+
) -> Buffer | None:
69+
chunk = chunk_array.as_ndarray_like()
8470

85-
# ensure correct chunk shape
86-
if chunk_ndarray.shape != chunk_spec.shape:
87-
chunk_ndarray = chunk_ndarray.reshape(
88-
chunk_spec.shape,
89-
order=chunk_spec.order,
90-
)
71+
# apply filters
72+
if self.filters:
73+
for f in self.filters:
74+
chunk = f.encode(chunk)
9175

92-
return get_ndbuffer_class().from_ndarray_like(chunk_ndarray)
76+
# check object encoding
77+
if ensure_ndarray_like(chunk).dtype == object:
78+
raise RuntimeError("cannot write object array without object codec")
9379

94-
async def _encode_single(
95-
self,
96-
chunk_array: NDBuffer,
97-
chunk_spec: ArraySpec,
98-
) -> NDBuffer | None:
99-
chunk_ndarray = chunk_array.as_ndarray_like().ravel(order=chunk_spec.order)
80+
# compress
81+
if self.compressor:
82+
cdata = self.compressor.encode(chunk)
83+
else:
84+
cdata = chunk
10085

101-
if self.filters is not None:
102-
for filter in self.filters:
103-
chunk_ndarray = await asyncio.to_thread(filter.encode, chunk_ndarray)
86+
cdata = ensure_bytes(cdata)
10487

105-
return get_ndbuffer_class().from_ndarray_like(chunk_ndarray)
88+
return chunk_spec.prototype.buffer.from_bytes(cdata)
10689

10790
def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int:
10891
raise NotImplementedError

src/zarr/core/array.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from zarr._compat import _deprecate_positional_args
1313
from zarr.abc.store import Store, set_or_delete
1414
from zarr.codecs import _get_default_array_bytes_codec
15-
from zarr.codecs._v2 import V2Compressor, V2Filters
15+
from zarr.codecs._v2 import V2Codec
1616
from zarr.core.attributes import Attributes
1717
from zarr.core.buffer import (
1818
BufferPrototype,
@@ -116,9 +116,8 @@ def create_codec_pipeline(metadata: ArrayMetadata) -> CodecPipeline:
116116
if isinstance(metadata, ArrayV3Metadata):
117117
return get_pipeline_class().from_codecs(metadata.codecs)
118118
elif isinstance(metadata, ArrayV2Metadata):
119-
return get_pipeline_class().from_codecs(
120-
[V2Filters(metadata.filters), V2Compressor(metadata.compressor)]
121-
)
119+
v2_codec = V2Codec(filters=metadata.filters, compressor=metadata.compressor)
120+
return get_pipeline_class().from_codecs([v2_codec])
122121
else:
123122
raise TypeError
124123

tests/test_v2.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,3 +121,12 @@ async def test_create_dtype_str(dtype: Any) -> None:
121121
arr[:] = ["a", "bb", "ccc"]
122122
result = arr[:]
123123
np.testing.assert_array_equal(result, np.array(["a", "bb", "ccc"], dtype="object"))
124+
125+
126+
@pytest.mark.parametrize("filters", [[], [numcodecs.Delta(dtype="<i4")], [numcodecs.Zlib(level=2)]])
127+
def test_v2_filters_codecs(filters: Any) -> None:
128+
array_fixture = [42]
129+
arr = zarr.create(shape=1, dtype="<i4", zarr_format=2, filters=filters)
130+
arr[:] = array_fixture
131+
result = arr[:]
132+
np.testing.assert_array_equal(result, array_fixture)

0 commit comments

Comments
 (0)