55from typing import TYPE_CHECKING
66
77import numcodecs
8- from numcodecs .compat import ensure_bytes , ensure_ndarray
8+ from numcodecs .compat import ensure_ndarray_like
99
10- from zarr .abc .codec import ArrayArrayCodec , ArrayBytesCodec
11- from zarr .core .buffer import Buffer , NDBuffer , default_buffer_prototype
10+ from zarr .abc .codec import ArrayBytesCodec
1211from zarr .registry import get_ndbuffer_class
1312
1413if TYPE_CHECKING :
1514 import numcodecs .abc
1615
1716 from zarr .core .array_spec import ArraySpec
17+ from zarr .core .buffer import Buffer , NDBuffer
1818
1919
2020@dataclass (frozen = True )
21- class V2Compressor (ArrayBytesCodec ):
21+ class V2Codec (ArrayBytesCodec ):
22+ filters : tuple [numcodecs .abc .Codec , ...] | None
2223 compressor : numcodecs .abc .Codec | None
2324
2425 is_fixed_size = False
@@ -28,81 +29,61 @@ async def _decode_single(
2829 chunk_bytes : Buffer ,
2930 chunk_spec : ArraySpec ,
3031 ) -> NDBuffer :
31- if self . compressor is not None :
32- chunk_numpy_array = ensure_ndarray (
33- await asyncio . to_thread ( self .compressor . decode , chunk_bytes . as_array_like ())
34- )
32+ cdata = chunk_bytes . as_array_like ()
33+ # decompress
34+ if self .compressor :
35+ chunk = await asyncio . to_thread ( self . compressor . decode , cdata )
3536 else :
36- chunk_numpy_array = ensure_ndarray (chunk_bytes .as_array_like ())
37+ chunk = cdata
38+
39+ # apply filters
40+ if self .filters :
41+ for f in reversed (self .filters ):
42+ chunk = await asyncio .to_thread (f .decode , chunk )
43+
44+ # view as numpy array with correct dtype
45+ chunk = ensure_ndarray_like (chunk )
46+ # special case object dtype, because incorrect handling can lead to
47+ # segfaults and other bad things happening
48+ if chunk_spec .dtype != object :
49+ chunk = chunk .view (chunk_spec .dtype )
50+ elif chunk .dtype != object :
51+ # If we end up here, someone must have hacked around with the filters.
52+ # We cannot deal with object arrays unless there is an object
53+ # codec in the filter chain, i.e., a filter that converts from object
54+ # array to something else during encoding, and converts back to object
55+ # array during decoding.
56+ raise RuntimeError ("cannot read object array without object codec" )
3757
38- # ensure correct dtype
39- if str ( chunk_numpy_array . dtype ) != chunk_spec . dtype and not chunk_spec . dtype . hasobject :
40- chunk_numpy_array = chunk_numpy_array . view (chunk_spec .dtype )
58+ # ensure correct chunk shape
59+ chunk = chunk . reshape ( - 1 , order = "A" )
60+ chunk = chunk . reshape (chunk_spec .shape , order = chunk_spec . order )
4161
42- return get_ndbuffer_class ().from_numpy_array ( chunk_numpy_array )
62+ return get_ndbuffer_class ().from_ndarray_like ( chunk )
4363
4464 async def _encode_single (
45- self ,
46- chunk_array : NDBuffer ,
47- _chunk_spec : ArraySpec ,
48- ) -> Buffer | None :
49- chunk_numpy_array = chunk_array .as_numpy_array ()
50- if self .compressor is not None :
51- if (
52- not chunk_numpy_array .flags .c_contiguous
53- and not chunk_numpy_array .flags .f_contiguous
54- ):
55- chunk_numpy_array = chunk_numpy_array .copy (order = "A" )
56- encoded_chunk_bytes = ensure_bytes (
57- await asyncio .to_thread (self .compressor .encode , chunk_numpy_array )
58- )
59- else :
60- encoded_chunk_bytes = ensure_bytes (chunk_numpy_array )
61-
62- return default_buffer_prototype ().buffer .from_bytes (encoded_chunk_bytes )
63-
64- def compute_encoded_size (self , _input_byte_length : int , _chunk_spec : ArraySpec ) -> int :
65- raise NotImplementedError
66-
67-
68- @dataclass (frozen = True )
69- class V2Filters (ArrayArrayCodec ):
70- filters : tuple [numcodecs .abc .Codec , ...] | None
71-
72- is_fixed_size = False
73-
74- async def _decode_single (
7565 self ,
7666 chunk_array : NDBuffer ,
7767 chunk_spec : ArraySpec ,
78- ) -> NDBuffer :
79- chunk_ndarray = chunk_array .as_ndarray_like ()
80- # apply filters in reverse order
81- if self .filters is not None :
82- for filter in self .filters [::- 1 ]:
83- chunk_ndarray = await asyncio .to_thread (filter .decode , chunk_ndarray )
84-
85- # ensure correct chunk shape
86- if chunk_ndarray .shape != chunk_spec .shape :
87- chunk_ndarray = chunk_ndarray .reshape (
88- chunk_spec .shape ,
89- order = chunk_spec .order ,
90- )
68+ ) -> Buffer | None :
69+ chunk = chunk_array .as_ndarray_like ()
9170
92- return get_ndbuffer_class ().from_ndarray_like (chunk_ndarray )
71+ # apply filters
72+ if self .filters :
73+ for f in self .filters :
74+ chunk = await asyncio .to_thread (f .encode , chunk )
9375
94- async def _encode_single (
95- self ,
96- chunk_array : NDBuffer ,
97- chunk_spec : ArraySpec ,
98- ) -> NDBuffer | None :
99- chunk_ndarray = chunk_array .as_ndarray_like ().ravel (order = chunk_spec .order )
76+ # check object encoding
77+ if ensure_ndarray_like (chunk ).dtype == object :
78+ raise RuntimeError ("cannot write object array without object codec" )
10079
101- if self .filters is not None :
102- for filter in self .filters :
103- chunk_ndarray = await asyncio .to_thread (filter .encode , chunk_ndarray )
80+ # compress
81+ if self .compressor :
82+ cdata = await asyncio .to_thread (self .compressor .encode , chunk )
83+ else :
84+ cdata = chunk
10485
105- return get_ndbuffer_class (). from_ndarray_like ( chunk_ndarray )
86+ return chunk_spec . prototype . buffer . from_bytes ( cdata )
10687
10788 def compute_encoded_size (self , _input_byte_length : int , _chunk_spec : ArraySpec ) -> int :
10889 raise NotImplementedError
0 commit comments