55from  typing  import  TYPE_CHECKING 
66
77import  numcodecs 
8- from  numcodecs .compat  import  ensure_bytes ,  ensure_ndarray 
8+ from  numcodecs .compat  import  ensure_ndarray_like 
99
10- from  zarr .abc .codec  import  ArrayArrayCodec , ArrayBytesCodec 
11- from  zarr .core .buffer  import  Buffer , NDBuffer , default_buffer_prototype 
10+ from  zarr .abc .codec  import  ArrayBytesCodec 
1211from  zarr .registry  import  get_ndbuffer_class 
1312
1413if  TYPE_CHECKING :
1514    import  numcodecs .abc 
1615
1716    from  zarr .core .array_spec  import  ArraySpec 
17+     from  zarr .core .buffer  import  Buffer , NDBuffer 
1818
1919
2020@dataclass (frozen = True ) 
21- class  V2Compressor (ArrayBytesCodec ):
21+ class  V2Codec (ArrayBytesCodec ):
22+     filters : tuple [numcodecs .abc .Codec , ...] |  None 
2223    compressor : numcodecs .abc .Codec  |  None 
2324
2425    is_fixed_size  =  False 
@@ -28,81 +29,61 @@ async def _decode_single(
2829        chunk_bytes : Buffer ,
2930        chunk_spec : ArraySpec ,
3031    ) ->  NDBuffer :
31-         if   self . compressor   is   not   None : 
32-              chunk_numpy_array   =   ensure_ndarray ( 
33-                  await   asyncio . to_thread ( self .compressor . decode ,  chunk_bytes . as_array_like ()) 
34-             )
32+         cdata   =   chunk_bytes . as_array_like () 
33+         # decompress 
34+         if   self .compressor : 
35+             chunk   =   await   asyncio . to_thread ( self . compressor . decode ,  cdata )
3536        else :
36-             chunk_numpy_array  =  ensure_ndarray (chunk_bytes .as_array_like ())
37+             chunk  =  cdata 
38+ 
39+         # apply filters 
40+         if  self .filters :
41+             for  f  in  reversed (self .filters ):
42+                 chunk  =  await  asyncio .to_thread (f .decode , chunk )
43+ 
44+         # view as numpy array with correct dtype 
45+         chunk  =  ensure_ndarray_like (chunk )
46+         # special case object dtype, because incorrect handling can lead to 
47+         # segfaults and other bad things happening 
48+         if  chunk_spec .dtype  !=  object :
49+             chunk  =  chunk .view (chunk_spec .dtype )
50+         elif  chunk .dtype  !=  object :
51+             # If we end up here, someone must have hacked around with the filters. 
52+             # We cannot deal with object arrays unless there is an object 
53+             # codec in the filter chain, i.e., a filter that converts from object 
54+             # array to something else during encoding, and converts back to object 
55+             # array during decoding. 
56+             raise  RuntimeError ("cannot read object array without object codec" )
3757
38-         # ensure correct dtype  
39-         if   str ( chunk_numpy_array . dtype )  !=   chunk_spec . dtype   and   not   chunk_spec . dtype . hasobject : 
40-              chunk_numpy_array   =   chunk_numpy_array . view (chunk_spec .dtype )
58+         # ensure correct chunk shape  
59+         chunk   =   chunk . reshape ( - 1 ,  order = "A" ) 
60+         chunk   =   chunk . reshape (chunk_spec .shape ,  order = chunk_spec . order )
4161
42-         return  get_ndbuffer_class ().from_numpy_array ( chunk_numpy_array )
62+         return  get_ndbuffer_class ().from_ndarray_like ( chunk )
4363
4464    async  def  _encode_single (
45-         self ,
46-         chunk_array : NDBuffer ,
47-         _chunk_spec : ArraySpec ,
48-     ) ->  Buffer  |  None :
49-         chunk_numpy_array  =  chunk_array .as_numpy_array ()
50-         if  self .compressor  is  not   None :
51-             if  (
52-                 not  chunk_numpy_array .flags .c_contiguous 
53-                 and  not  chunk_numpy_array .flags .f_contiguous 
54-             ):
55-                 chunk_numpy_array  =  chunk_numpy_array .copy (order = "A" )
56-             encoded_chunk_bytes  =  ensure_bytes (
57-                 await  asyncio .to_thread (self .compressor .encode , chunk_numpy_array )
58-             )
59-         else :
60-             encoded_chunk_bytes  =  ensure_bytes (chunk_numpy_array )
61- 
62-         return  default_buffer_prototype ().buffer .from_bytes (encoded_chunk_bytes )
63- 
64-     def  compute_encoded_size (self , _input_byte_length : int , _chunk_spec : ArraySpec ) ->  int :
65-         raise  NotImplementedError 
66- 
67- 
68- @dataclass (frozen = True ) 
69- class  V2Filters (ArrayArrayCodec ):
70-     filters : tuple [numcodecs .abc .Codec , ...] |  None 
71- 
72-     is_fixed_size  =  False 
73- 
74-     async  def  _decode_single (
7565        self ,
7666        chunk_array : NDBuffer ,
7767        chunk_spec : ArraySpec ,
78-     ) ->  NDBuffer :
79-         chunk_ndarray  =  chunk_array .as_ndarray_like ()
80-         # apply filters in reverse order 
81-         if  self .filters  is  not   None :
82-             for  filter  in  self .filters [::- 1 ]:
83-                 chunk_ndarray  =  await  asyncio .to_thread (filter .decode , chunk_ndarray )
84- 
85-         # ensure correct chunk shape 
86-         if  chunk_ndarray .shape  !=  chunk_spec .shape :
87-             chunk_ndarray  =  chunk_ndarray .reshape (
88-                 chunk_spec .shape ,
89-                 order = chunk_spec .order ,
90-             )
68+     ) ->  Buffer  |  None :
69+         chunk  =  chunk_array .as_ndarray_like ()
9170
92-         return  get_ndbuffer_class ().from_ndarray_like (chunk_ndarray )
71+         # apply filters 
72+         if  self .filters :
73+             for  f  in  self .filters :
74+                 chunk  =  await  asyncio .to_thread (f .encode , chunk )
9375
94-     async  def  _encode_single (
95-         self ,
96-         chunk_array : NDBuffer ,
97-         chunk_spec : ArraySpec ,
98-     ) ->  NDBuffer  |  None :
99-         chunk_ndarray  =  chunk_array .as_ndarray_like ().ravel (order = chunk_spec .order )
76+         # check object encoding 
77+         if  ensure_ndarray_like (chunk ).dtype  ==  object :
78+             raise  RuntimeError ("cannot write object array without object codec" )
10079
101-         if  self .filters  is  not   None :
102-             for  filter  in  self .filters :
103-                 chunk_ndarray  =  await  asyncio .to_thread (filter .encode , chunk_ndarray )
80+         # compress 
81+         if  self .compressor :
82+             cdata  =  await  asyncio .to_thread (self .compressor .encode , chunk )
83+         else :
84+             cdata  =  chunk 
10485
105-         return  get_ndbuffer_class (). from_ndarray_like ( chunk_ndarray )
86+         return  chunk_spec . prototype . buffer . from_bytes ( cdata )
10687
10788    def  compute_encoded_size (self , _input_byte_length : int , _chunk_spec : ArraySpec ) ->  int :
10889        raise  NotImplementedError 
0 commit comments