22
33import asyncio
44from dataclasses import dataclass
5- from typing import TYPE_CHECKING , TypeGuard
5+ from typing import TYPE_CHECKING , Self , TypeGuard , overload
6+ from typing_extensions import Literal
67
78import numpy as np
89from numcodecs .compat import ensure_bytes , ensure_ndarray_like
910
10- from zarr .abc .codec import ArrayBytesCodec , Numcodec
11+ from zarr .abc .codec import ArrayArrayCodec , ArrayBytesCodec , BaseCodec , BytesBytesCodec , CodecJSON , CodecJSON_V2 , Numcodec
12+ from zarr .core .buffer .core import BufferPrototype
13+ from zarr .core .common import BaseConfig , NamedConfig , ZarrFormat
1114from zarr .registry import get_ndbuffer_class
1215
1316if TYPE_CHECKING :
@@ -133,3 +136,100 @@ async def _encode_single(
133136
134137 def compute_encoded_size (self , _input_byte_length : int , _chunk_spec : ArraySpec ) -> int :
135138 raise NotImplementedError
139+
140+
141+ @dataclass (frozen = True , kw_only = True )
142+ class NumcodecsWrapper :
143+ codec : Numcodec
144+
145+ @overload
146+ def to_json (self , zarr_format : Literal [2 ]) -> CodecJSON_V2 [str ]: ...
147+ @overload
148+ def to_json (self , zarr_format : Literal [3 ]) -> NamedConfig [str , BaseConfig ]: ...
149+
150+ def to_json (self , zarr_format : ZarrFormat ) -> CodecJSON_V2 [str ] | NamedConfig [str , BaseConfig ]:
151+ if zarr_format == 2 :
152+ return self .codec .get_config ()
153+ elif zarr_format == 3 :
154+ config = self .codec .get_config ()
155+ config_no_id = {k : v for k , v in config .items () if k != "id" }
156+ return {"name" : config ["id" ], "configuration" : config_no_id }
157+ raise ValueError (f"Unsupported zarr format: { zarr_format } " ) # pragma: no cover
158+
159+ @classmethod
160+ def _from_json_v2 (cls , data : CodecJSON ) -> Self :
161+ raise NotADirectoryError (
162+ "This class does not support creating instances from JSON data for Zarr format 2."
163+ )
164+
165+ @classmethod
166+ def _from_json_v3 (cls , data : CodecJSON ) -> Self :
167+ raise NotImplementedError (
168+ "This class does not support creating instances from JSON data for Zarr format 3."
169+ )
170+
171+ def compute_encoded_size (self , input_byte_length : int , chunk_spec : ArraySpec ) -> int :
172+ raise NotImplementedError
173+
174+ def to_array_array (self ) -> NumcodecsArrayArrayCodec :
175+ """
176+ Use the ``_codec`` attribute to create a NumcodecsArrayArrayCodec.
177+ """
178+ return NumcodecsArrayArrayCodec (codec = self .codec )
179+
180+ def to_bytes_bytes (self ) -> NumcodecsBytesBytesCodec :
181+ """
182+ Use the ``_codec`` attribute to create a NumcodecsBytesBytesCodec.
183+ """
184+ return NumcodecsBytesBytesCodec (codec = self .codec )
185+
186+ def to_array_bytes (self ) -> NumcodecsArrayBytesCodec :
187+ """
188+ Use the ``_codec`` attribute to create a NumcodecsArrayBytesCodec.
189+ """
190+ return NumcodecsArrayBytesCodec (codec = self .codec )
191+
192+
193+ class NumcodecsBytesBytesCodec (NumcodecsWrapper , BytesBytesCodec ):
194+ async def _decode_single (self , chunk_data : Buffer , chunk_spec : ArraySpec ) -> Buffer :
195+ return await asyncio .to_thread (
196+ as_numpy_array_wrapper ,
197+ self .codec .decode ,
198+ chunk_data ,
199+ chunk_spec .prototype ,
200+ )
201+
202+ def _encode (self , chunk_bytes : Buffer , prototype : BufferPrototype ) -> Buffer :
203+ encoded = self .codec .encode (chunk_bytes .as_array_like ())
204+ if isinstance (encoded , np .ndarray ): # Required for checksum codecs
205+ return prototype .buffer .from_bytes (encoded .tobytes ())
206+ return prototype .buffer .from_bytes (encoded )
207+
208+ async def _encode_single (self , chunk_data : Buffer , chunk_spec : ArraySpec ) -> Buffer :
209+ return await asyncio .to_thread (self ._encode , chunk_data , chunk_spec .prototype )
210+
211+
212+ @dataclass (kw_only = True , frozen = True )
213+ class NumcodecsArrayArrayCodec (NumcodecsWrapper , ArrayArrayCodec ):
214+ async def _decode_single (self , chunk_data : NDBuffer , chunk_spec : ArraySpec ) -> NDBuffer :
215+ chunk_ndarray = chunk_data .as_ndarray_like ()
216+ out = await asyncio .to_thread (self .codec .decode , chunk_ndarray )
217+ return chunk_spec .prototype .nd_buffer .from_ndarray_like (out .reshape (chunk_spec .shape )) # type: ignore[union-attr]
218+
219+ async def _encode_single (self , chunk_data : NDBuffer , chunk_spec : ArraySpec ) -> NDBuffer :
220+ chunk_ndarray = chunk_data .as_ndarray_like ()
221+ out = await asyncio .to_thread (self .codec .encode , chunk_ndarray )
222+ return chunk_spec .prototype .nd_buffer .from_ndarray_like (out ) # type: ignore[arg-type]
223+
224+
225+ @dataclass (kw_only = True , frozen = True )
226+ class NumcodecsArrayBytesCodec (NumcodecsWrapper , ArrayBytesCodec ):
227+ async def _decode_single (self , chunk_data : Buffer , chunk_spec : ArraySpec ) -> NDBuffer :
228+ chunk_bytes = chunk_data .to_bytes ()
229+ out = await asyncio .to_thread (self .codec .decode , chunk_bytes )
230+ return chunk_spec .prototype .nd_buffer .from_ndarray_like (out .reshape (chunk_spec .shape ))
231+
232+ async def _encode_single (self , chunk_data : NDBuffer , chunk_spec : ArraySpec ) -> Buffer :
233+ chunk_ndarray = chunk_data .as_ndarray_like ()
234+ out = await asyncio .to_thread (self .codec .encode , chunk_ndarray )
235+ return chunk_spec .prototype .buffer .from_bytes (out )
0 commit comments