Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions src/zarr/abc/codec.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from __future__ import annotations

from abc import abstractmethod
from typing import TYPE_CHECKING, Any, Generic, TypeVar
from typing import TYPE_CHECKING, Any, Generic, NotRequired, TypedDict, TypeVar

from zarr.abc.metadata import Metadata
from zarr.abc.metadata import Metadata, T
from zarr.core.buffer import Buffer, NDBuffer
from zarr.core.common import ChunkCoords, concurrent_map
from zarr.core.config import config
Expand Down Expand Up @@ -35,7 +35,7 @@
CodecOutput = TypeVar("CodecOutput", bound=NDBuffer | Buffer)


class BaseCodec(Metadata, Generic[CodecInput, CodecOutput]):
class BaseCodec(Generic[CodecInput, CodecOutput, T], Metadata[T]):
"""Generic base class for codecs.

Codecs can be registered via zarr.codecs.registry.
Expand Down Expand Up @@ -153,25 +153,41 @@ async def encode(
return await _batching_helper(self._encode_single, chunks_and_specs)


class ArrayArrayCodec(BaseCodec[NDBuffer, NDBuffer]):
class ArrayArrayCodec(BaseCodec[NDBuffer, NDBuffer, T]):
"""Base class for array-to-array codecs."""

...


class ArrayBytesCodec(BaseCodec[NDBuffer, Buffer]):
class ArrayBytesCodec(BaseCodec[NDBuffer, Buffer, T]):
"""Base class for array-to-bytes codecs."""

...


class BytesBytesCodec(BaseCodec[Buffer, Buffer]):
class BytesBytesCodec(BaseCodec[Buffer, Buffer, T]):
"""Base class for bytes-to-bytes codecs."""

...


Codec = ArrayArrayCodec | ArrayBytesCodec | BytesBytesCodec
Codec = ArrayArrayCodec[Any] | ArrayBytesCodec[Any] | BytesBytesCodec[Any]


class CodecConfigDict(TypedDict):
"""A dictionary representing a codec configuration."""

...


CodecConfigDictType = TypeVar("CodecConfigDictType", bound=CodecConfigDict)


class CodecDict(Generic[CodecConfigDictType], TypedDict):
"""A generic dictionary representing a codec."""

name: str
configuration: NotRequired[CodecConfigDictType]


class ArrayBytesCodecPartialDecodeMixin:
Expand Down
15 changes: 8 additions & 7 deletions src/zarr/abc/metadata.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
from __future__ import annotations

from collections.abc import Sequence
from typing import TYPE_CHECKING
from collections.abc import Mapping, Sequence
from typing import TYPE_CHECKING, Generic, TypeVar, cast

if TYPE_CHECKING:
from typing import Self

from zarr.core.common import JSON

from dataclasses import dataclass, fields

__all__ = ["Metadata"]

T = TypeVar("T", bound=Mapping[str, object])


@dataclass(frozen=True)
class Metadata:
def to_dict(self) -> dict[str, JSON]:
class Metadata(Generic[T]):
def to_dict(self) -> T:
"""
Recursively serialize this model to a dictionary.
This method inspects the fields of self and calls `x.to_dict()` for any fields that
Expand All @@ -35,10 +36,10 @@ def to_dict(self) -> dict[str, JSON]:
else:
out_dict[key] = value

return out_dict
return cast(T, out_dict)

@classmethod
def from_dict(cls, data: dict[str, JSON]) -> Self:
def from_dict(cls, data: T) -> Self:
"""
Create an instance of the model from a dictionary
"""
Expand Down
6 changes: 3 additions & 3 deletions src/zarr/codecs/_v2.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any

import numcodecs
from numcodecs.compat import ensure_bytes, ensure_ndarray
Expand All @@ -18,7 +18,7 @@


@dataclass(frozen=True)
class V2Compressor(ArrayBytesCodec):
class V2Compressor(ArrayBytesCodec[Any]):
compressor: numcodecs.abc.Codec | None

is_fixed_size = False
Expand Down Expand Up @@ -66,7 +66,7 @@ def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec)


@dataclass(frozen=True)
class V2Filters(ArrayArrayCodec):
class V2Filters(ArrayArrayCodec[Any]):
filters: tuple[numcodecs.abc.Codec, ...] | None

is_fixed_size = False
Expand Down
30 changes: 24 additions & 6 deletions src/zarr/codecs/blosc.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
from dataclasses import dataclass, replace
from enum import Enum
from functools import cached_property
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, cast

import numcodecs
from numcodecs.blosc import Blosc

from zarr.abc.codec import BytesBytesCodec
from zarr.abc.codec import BytesBytesCodec, CodecConfigDict, CodecDict
from zarr.core.buffer.cpu import as_numpy_array_wrapper
from zarr.core.common import JSON, parse_enum, parse_named_configuration, to_thread
from zarr.registry import register_codec
Expand Down Expand Up @@ -54,6 +54,22 @@ class BloscCname(Enum):
zlib = "zlib"


class BloscCodecConfigDict(CodecConfigDict):
"""A dictionary representing a Blosc codec configuration."""

typesize: int
cname: BloscCname
clevel: int
shuffle: BloscShuffle
blocksize: int


class BloscCodecDict(CodecDict[BloscCodecConfigDict]):
"""A dictionary representing a Blosc codec."""

...


# See https://zarr.readthedocs.io/en/stable/tutorial.html#configuring-blosc
numcodecs.blosc.use_threads = False

Expand Down Expand Up @@ -83,7 +99,7 @@ def parse_blocksize(data: JSON) -> int:


@dataclass(frozen=True)
class BloscCodec(BytesBytesCodec):
class BloscCodec(BytesBytesCodec[BloscCodecDict]):
is_fixed_size = False

typesize: int | None
Expand Down Expand Up @@ -114,16 +130,16 @@ def __init__(
object.__setattr__(self, "blocksize", blocksize_parsed)

@classmethod
def from_dict(cls, data: dict[str, JSON]) -> Self:
def from_dict(cls, data: BloscCodecDict) -> Self:
_, configuration_parsed = parse_named_configuration(data, "blosc")
return cls(**configuration_parsed) # type: ignore[arg-type]

def to_dict(self) -> dict[str, JSON]:
def to_dict(self) -> BloscCodecDict:
if self.typesize is None:
raise ValueError("`typesize` needs to be set for serialization.")
if self.shuffle is None:
raise ValueError("`shuffle` needs to be set for serialization.")
return {
out_dict = {
"name": "blosc",
"configuration": {
"typesize": self.typesize,
Expand All @@ -134,6 +150,8 @@ def to_dict(self) -> dict[str, JSON]:
},
}

return cast(BloscCodecDict, out_dict)

def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self:
dtype = array_spec.dtype
new_codec = self
Expand Down
35 changes: 25 additions & 10 deletions src/zarr/codecs/bytes.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@

import numpy as np

from zarr.abc.codec import ArrayBytesCodec
from zarr.abc.codec import ArrayBytesCodec, CodecConfigDict, CodecDict
from zarr.core.buffer import Buffer, NDArrayLike, NDBuffer
from zarr.core.common import JSON, parse_enum, parse_named_configuration
from zarr.core.common import parse_enum, parse_named_configuration
from zarr.registry import register_codec

if TYPE_CHECKING:
from typing import Self
from typing import Literal, Self

from zarr.core.array_spec import ArraySpec

Expand All @@ -30,8 +30,21 @@ class Endian(Enum):
default_system_endian = Endian(sys.byteorder)


class BytesCodecConfigDict(CodecConfigDict):
"""A dictionary representing a bytes codec configuration."""

# TODO: Why not type this w/ the Endian Enum
endian: Literal["big", "little"]


class BytesCodecDict(CodecDict[BytesCodecConfigDict]):
"""A dictionary representing a bytes codec."""

...


@dataclass(frozen=True)
class BytesCodec(ArrayBytesCodec):
class BytesCodec(ArrayBytesCodec[BytesCodecDict]):
is_fixed_size = True

endian: Endian | None
Expand All @@ -42,18 +55,20 @@ def __init__(self, *, endian: Endian | str | None = default_system_endian) -> No
object.__setattr__(self, "endian", endian_parsed)

@classmethod
def from_dict(cls, data: dict[str, JSON]) -> Self:
def from_dict(cls, data: BytesCodecDict) -> Self:
_, configuration_parsed = parse_named_configuration(
data, "bytes", require_configuration=False
)

configuration_parsed = configuration_parsed or {}
return cls(**configuration_parsed) # type: ignore[arg-type]

def to_dict(self) -> dict[str, JSON]:
if self.endian is None:
return {"name": "bytes"}
else:
return {"name": "bytes", "configuration": {"endian": self.endian.value}}
def to_dict(self) -> BytesCodecDict:
out_dict: BytesCodecDict = {"name": "bytes"}
if self.endian is not None:
out_dict["configuration"] = {"endian": self.endian.value}

return out_dict

def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self:
if array_spec.dtype.itemsize == 0:
Expand Down
19 changes: 13 additions & 6 deletions src/zarr/codecs/crc32c_.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import typing_extensions
from crc32c import crc32c

from zarr.abc.codec import BytesBytesCodec
from zarr.core.common import JSON, parse_named_configuration
from zarr.abc.codec import BytesBytesCodec, CodecConfigDict, CodecDict
from zarr.core.common import parse_named_configuration
from zarr.registry import register_codec

if TYPE_CHECKING:
Expand All @@ -18,17 +18,24 @@
from zarr.core.buffer import Buffer


class Crc32cCodecDict(CodecDict[CodecConfigDict]):
"""A dictionary representing a CRC32C codec."""

...


@dataclass(frozen=True)
class Crc32cCodec(BytesBytesCodec):
class Crc32cCodec(BytesBytesCodec[Crc32cCodecDict]):
is_fixed_size = True

@classmethod
def from_dict(cls, data: dict[str, JSON]) -> Self:
def from_dict(cls, data: Crc32cCodecDict) -> Self:
parse_named_configuration(data, "crc32c", require_configuration=False)
return cls()

def to_dict(self) -> dict[str, JSON]:
return {"name": "crc32c"}
def to_dict(self) -> Crc32cCodecDict:
out_dict = {"name": "crc32c"}
return cast(Crc32cCodecDict, out_dict)

async def _decode_single(
self,
Expand Down
25 changes: 19 additions & 6 deletions src/zarr/codecs/gzip.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, cast

from numcodecs.gzip import GZip

from zarr.abc.codec import BytesBytesCodec
from zarr.abc.codec import BytesBytesCodec, CodecConfigDict, CodecDict
from zarr.core.buffer.cpu import as_numpy_array_wrapper
from zarr.core.common import JSON, parse_named_configuration, to_thread
from zarr.registry import register_codec
Expand All @@ -17,6 +17,18 @@
from zarr.core.buffer import Buffer


class GzipCodecConfigDict(CodecConfigDict):
"""A dictionary representing a gzip codec configuration."""

level: int


class GzipCodecDict(CodecDict[GzipCodecConfigDict]):
"""A dictionary representing a gzip codec."""

...


def parse_gzip_level(data: JSON) -> int:
if not isinstance(data, (int)):
raise TypeError(f"Expected int, got {type(data)}")
Expand All @@ -28,7 +40,7 @@ def parse_gzip_level(data: JSON) -> int:


@dataclass(frozen=True)
class GzipCodec(BytesBytesCodec):
class GzipCodec(BytesBytesCodec[GzipCodecDict]):
is_fixed_size = False

level: int = 5
Expand All @@ -39,12 +51,13 @@ def __init__(self, *, level: int = 5) -> None:
object.__setattr__(self, "level", level_parsed)

@classmethod
def from_dict(cls, data: dict[str, JSON]) -> Self:
def from_dict(cls, data: GzipCodecDict) -> Self:
_, configuration_parsed = parse_named_configuration(data, "gzip")
return cls(**configuration_parsed) # type: ignore[arg-type]

def to_dict(self) -> dict[str, JSON]:
return {"name": "gzip", "configuration": {"level": self.level}}
def to_dict(self) -> GzipCodecDict:
out_dict = {"name": "gzip", "configuration": {"level": self.level}}
return cast(GzipCodecDict, out_dict)

async def _decode_single(
self,
Expand Down
Loading