Skip to content

chore/hollow out zarr3 #780

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
346 changes: 30 additions & 316 deletions numcodecs/zarr3.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,330 +25,44 @@

from __future__ import annotations

import asyncio
import math
from dataclasses import dataclass, replace
from functools import cached_property
from importlib.metadata import version
from typing import Any, Self
from warnings import warn

import numpy as np
from packaging.version import Version

import numcodecs

try:
import zarr # noqa: F401

if Version(version('zarr')) < Version("3.0.0"): # pragma: no cover
raise ImportError("zarr 3.0.0 or later is required to use the numcodecs zarr integration.")
zarr_version = version('zarr')
if Version(zarr_version) < Version("3.0.8"): # pragma: no cover
msg = f"zarr 3.0.9 or later is required to use the numcodecs zarr integration. Got {zarr_version}."
raise ImportError(msg)
except ImportError as e: # pragma: no cover
raise ImportError(
"zarr 3.0.0 or later is required to use the numcodecs zarr integration."
) from e

from zarr.abc.codec import ArrayArrayCodec, ArrayBytesCodec, BytesBytesCodec
from zarr.abc.metadata import Metadata
from zarr.core.array_spec import ArraySpec
from zarr.core.buffer import Buffer, BufferPrototype, NDBuffer
from zarr.core.buffer.cpu import as_numpy_array_wrapper
from zarr.core.common import JSON, parse_named_configuration, product

CODEC_PREFIX = "numcodecs."


def _from_zarr_dtype(dtype: Any) -> np.dtype:
"""
Get a numpy data type from an array spec, depending on the zarr version.
"""
if Version(version('zarr')) >= Version("3.1.0"):
return dtype.to_native_dtype()
return dtype # pragma: no cover


def _to_zarr_dtype(dtype: np.dtype) -> Any:
if Version(version('zarr')) >= Version("3.1.0"):
from zarr.dtype import parse_data_type

return parse_data_type(dtype, zarr_format=3)
return dtype # pragma: no cover


def _expect_name_prefix(codec_name: str) -> str:
if not codec_name.startswith(CODEC_PREFIX):
raise ValueError(
f"Expected name to start with '{CODEC_PREFIX}'. Got {codec_name} instead."
) # pragma: no cover
return codec_name.removeprefix(CODEC_PREFIX)


def _parse_codec_configuration(data: dict[str, JSON]) -> dict[str, JSON]:
parsed_name, parsed_configuration = parse_named_configuration(data)
if not parsed_name.startswith(CODEC_PREFIX):
raise ValueError(
f"Expected name to start with '{CODEC_PREFIX}'. Got {parsed_name} instead."
) # pragma: no cover
id = _expect_name_prefix(parsed_name)
return {"id": id, **parsed_configuration}


@dataclass(frozen=True)
class _NumcodecsCodec(Metadata):
codec_name: str
codec_config: dict[str, JSON]

def __init_subclass__(cls, *, codec_name: str | None = None, **kwargs):
"""To be used only when creating the actual public-facing codec class."""
super().__init_subclass__(**kwargs)
if codec_name is not None:
namespace = codec_name

cls_name = f"{CODEC_PREFIX}{namespace}.{cls.__name__}"
cls.codec_name = f"{CODEC_PREFIX}{namespace}"
cls.__doc__ = f"""
See :class:`{cls_name}` for more details and parameters.
"""

def __init__(self, **codec_config: JSON) -> None:
if not self.codec_name:
raise ValueError(
"The codec name needs to be supplied through the `codec_name` attribute."
) # pragma: no cover
unprefixed_codec_name = _expect_name_prefix(self.codec_name)

if "id" not in codec_config:
codec_config = {"id": unprefixed_codec_name, **codec_config}
elif codec_config["id"] != unprefixed_codec_name:
raise ValueError(
f"Codec id does not match {unprefixed_codec_name}. Got: {codec_config['id']}."
) # pragma: no cover

object.__setattr__(self, "codec_config", codec_config)
warn(
"Numcodecs codecs are not in the Zarr version 3 specification and "
"may not be supported by other zarr implementations.",
category=UserWarning,
stacklevel=2,
)

@cached_property
def _codec(self) -> numcodecs.abc.Codec:
return numcodecs.get_codec(self.codec_config)

@classmethod
def from_dict(cls, data: dict[str, JSON]) -> Self:
codec_config = _parse_codec_configuration(data)
return cls(**codec_config)

def to_dict(self) -> dict[str, JSON]:
codec_config = self.codec_config.copy()
codec_config.pop("id", None)
return {
"name": self.codec_name,
"configuration": codec_config,
}

def compute_encoded_size(self, input_byte_length: int, chunk_spec: ArraySpec) -> int:
raise NotImplementedError # pragma: no cover

# Override __repr__ because dynamically constructed classes don't seem to work otherwise
def __repr__(self) -> str:
codec_config = self.codec_config.copy()
codec_config.pop("id", None)
return f"{self.__class__.__name__}(codec_name={self.codec_name!r}, codec_config={codec_config!r})"


class _NumcodecsBytesBytesCodec(_NumcodecsCodec, BytesBytesCodec):
def __init__(self, **codec_config: JSON) -> None:
super().__init__(**codec_config)

async def _decode_single(self, chunk_bytes: Buffer, chunk_spec: ArraySpec) -> Buffer:
return await asyncio.to_thread(
as_numpy_array_wrapper,
self._codec.decode,
chunk_bytes,
chunk_spec.prototype,
)

def _encode(self, chunk_bytes: Buffer, prototype: BufferPrototype) -> Buffer:
encoded = self._codec.encode(chunk_bytes.as_array_like())
if isinstance(encoded, np.ndarray): # Required for checksum codecs
return prototype.buffer.from_bytes(encoded.tobytes())
return prototype.buffer.from_bytes(encoded)

async def _encode_single(self, chunk_bytes: Buffer, chunk_spec: ArraySpec) -> Buffer:
return await asyncio.to_thread(self._encode, chunk_bytes, chunk_spec.prototype)


class _NumcodecsArrayArrayCodec(_NumcodecsCodec, ArrayArrayCodec):
def __init__(self, **codec_config: JSON) -> None:
super().__init__(**codec_config)

async def _decode_single(self, chunk_array: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer:
chunk_ndarray = chunk_array.as_ndarray_like()
out = await asyncio.to_thread(self._codec.decode, chunk_ndarray)
return chunk_spec.prototype.nd_buffer.from_ndarray_like(out.reshape(chunk_spec.shape))

async def _encode_single(self, chunk_array: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer:
chunk_ndarray = chunk_array.as_ndarray_like()
out = await asyncio.to_thread(self._codec.encode, chunk_ndarray)
return chunk_spec.prototype.nd_buffer.from_ndarray_like(out)


class _NumcodecsArrayBytesCodec(_NumcodecsCodec, ArrayBytesCodec):
def __init__(self, **codec_config: JSON) -> None:
super().__init__(**codec_config)

async def _decode_single(self, chunk_buffer: Buffer, chunk_spec: ArraySpec) -> NDBuffer:
chunk_bytes = chunk_buffer.to_bytes()
out = await asyncio.to_thread(self._codec.decode, chunk_bytes)
return chunk_spec.prototype.nd_buffer.from_ndarray_like(out.reshape(chunk_spec.shape))

async def _encode_single(self, chunk_ndbuffer: NDBuffer, chunk_spec: ArraySpec) -> Buffer:
chunk_ndarray = chunk_ndbuffer.as_ndarray_like()
out = await asyncio.to_thread(self._codec.encode, chunk_ndarray)
return chunk_spec.prototype.buffer.from_bytes(out)


# bytes-to-bytes codecs
class Blosc(_NumcodecsBytesBytesCodec, codec_name="blosc"):
pass


class LZ4(_NumcodecsBytesBytesCodec, codec_name="lz4"):
pass


class Zstd(_NumcodecsBytesBytesCodec, codec_name="zstd"):
pass


class Zlib(_NumcodecsBytesBytesCodec, codec_name="zlib"):
pass


class GZip(_NumcodecsBytesBytesCodec, codec_name="gzip"):
pass


class BZ2(_NumcodecsBytesBytesCodec, codec_name="bz2"):
pass


class LZMA(_NumcodecsBytesBytesCodec, codec_name="lzma"):
pass


class Shuffle(_NumcodecsBytesBytesCodec, codec_name="shuffle"):
def evolve_from_array_spec(self, array_spec: ArraySpec) -> Shuffle:
if self.codec_config.get("elementsize") is None:
dtype = _from_zarr_dtype(array_spec.dtype)
return Shuffle(**{**self.codec_config, "elementsize": dtype.itemsize})
return self # pragma: no cover


# array-to-array codecs ("filters")
class Delta(_NumcodecsArrayArrayCodec, codec_name="delta"):
def resolve_metadata(self, chunk_spec: ArraySpec) -> ArraySpec:
if astype := self.codec_config.get("astype"):
dtype = _to_zarr_dtype(np.dtype(astype)) # type: ignore[call-overload]
return replace(chunk_spec, dtype=dtype)
return chunk_spec


class BitRound(_NumcodecsArrayArrayCodec, codec_name="bitround"):
pass


class FixedScaleOffset(_NumcodecsArrayArrayCodec, codec_name="fixedscaleoffset"):
def resolve_metadata(self, chunk_spec: ArraySpec) -> ArraySpec:
if astype := self.codec_config.get("astype"):
dtype = _to_zarr_dtype(np.dtype(astype)) # type: ignore[call-overload]
return replace(chunk_spec, dtype=dtype)
return chunk_spec

def evolve_from_array_spec(self, array_spec: ArraySpec) -> FixedScaleOffset:
if self.codec_config.get("dtype") is None:
dtype = _from_zarr_dtype(array_spec.dtype)
return FixedScaleOffset(**{**self.codec_config, "dtype": str(dtype)})
return self


class Quantize(_NumcodecsArrayArrayCodec, codec_name="quantize"):
def __init__(self, **codec_config: JSON) -> None:
super().__init__(**codec_config)

def evolve_from_array_spec(self, array_spec: ArraySpec) -> Quantize:
if self.codec_config.get("dtype") is None:
dtype = _from_zarr_dtype(array_spec.dtype)
return Quantize(**{**self.codec_config, "dtype": str(dtype)})
return self


class PackBits(_NumcodecsArrayArrayCodec, codec_name="packbits"):
def resolve_metadata(self, chunk_spec: ArraySpec) -> ArraySpec:
return replace(
chunk_spec,
shape=(1 + math.ceil(product(chunk_spec.shape) / 8),),
dtype=_to_zarr_dtype(np.dtype("uint8")),
)

# todo: remove this type: ignore when this class can be defined w.r.t.
# a single zarr dtype API
def validate(self, *, dtype: np.dtype[Any], **_kwargs) -> None: # type: ignore[override]
_dtype = _from_zarr_dtype(dtype)
if _dtype != np.dtype("bool"):
raise ValueError(f"Packbits filter requires bool dtype. Got {dtype}.")


class AsType(_NumcodecsArrayArrayCodec, codec_name="astype"):
def resolve_metadata(self, chunk_spec: ArraySpec) -> ArraySpec:
dtype = _to_zarr_dtype(np.dtype(self.codec_config["encode_dtype"])) # type: ignore[arg-type]
return replace(chunk_spec, dtype=dtype)

def evolve_from_array_spec(self, array_spec: ArraySpec) -> AsType:
if self.codec_config.get("decode_dtype") is None:
# TODO: remove these coverage exemptions the correct way, i.e. with tests
dtype = _from_zarr_dtype(array_spec.dtype) # pragma: no cover
return AsType(**{**self.codec_config, "decode_dtype": str(dtype)}) # pragma: no cover
return self


# bytes-to-bytes checksum codecs
class _NumcodecsChecksumCodec(_NumcodecsBytesBytesCodec):
def compute_encoded_size(self, input_byte_length: int, chunk_spec: ArraySpec) -> int:
return input_byte_length + 4 # pragma: no cover


class CRC32(_NumcodecsChecksumCodec, codec_name="crc32"):
pass


class CRC32C(_NumcodecsChecksumCodec, codec_name="crc32c"):
pass


class Adler32(_NumcodecsChecksumCodec, codec_name="adler32"):
pass


class Fletcher32(_NumcodecsChecksumCodec, codec_name="fletcher32"):
pass


class JenkinsLookup3(_NumcodecsChecksumCodec, codec_name="jenkins_lookup3"):
pass


# array-to-bytes codecs
class PCodec(_NumcodecsArrayBytesCodec, codec_name="pcodec"):
pass


class ZFPY(_NumcodecsArrayBytesCodec, codec_name="zfpy"):
pass

msg = "zarr could not be imported. Zarr 3.1.0 or later is required to use the numcodecs zarr integration."
raise ImportError(msg) from e

from zarr.codecs._numcodecs import (
BZ2,
CRC32,
CRC32C,
LZ4,
LZMA,
ZFPY,
Adler32,
AsType,
BitRound,
Blosc,
Delta,
FixedScaleOffset,
Fletcher32,
GZip,
JenkinsLookup3,
PackBits,
PCodec,
Quantize,
Shuffle,
Zlib,
Zstd,
)

__all__ = [
"BZ2",
Expand Down
6 changes: 4 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -252,13 +252,15 @@ c-compiler = ">=1.9.0,<2"
cxx-compiler = ">=1.9.0,<2"
hatch = '==1.14.1'

[tool.hatch.metadata]
allow-direct-references = true

[[tool.hatch.envs.test.matrix]]
python = ["3.11"]
zarr = ["3.0.10", "3.1.0"]

[tool.hatch.envs.test]
dependencies = [
"zarr=={matrix:zarr}"
"zarr @ git+https://github.com/d-v-b/zarr-python.git@b6b2260a953e38bd4e432508b2063c5beda11703"
]
numpy="==2.2"
features = ["test"]
Expand Down
Loading