diff --git a/conftest.py b/conftest.py index 61e0c4ff0..9406e3d98 100644 --- a/conftest.py +++ b/conftest.py @@ -44,6 +44,30 @@ def pytest_runtest_setup(item): pytest.skip("set --run-minio-tests to run tests requiring docker and minio") +def _xarray_subset(): + ds = xr.tutorial.open_dataset("air_temperature") + return ds.isel(time=slice(0, 10), lat=slice(0, 90), lon=slice(0, 180)) + + +@pytest.fixture(params=[2, 3]) +def zarr_store(tmpdir, request): + ds = _xarray_subset() + filepath = f"{tmpdir}/air.zarr" + ds.to_zarr(filepath, zarr_format=request.param) + ds.close() + return filepath + + +@pytest.fixture() +def zarr_store_scalar(tmpdir): + import zarr + + store = zarr.storage.MemoryStore() + zarr_store_scalar = zarr.create_array(store=store, shape=(), dtype="int8") + zarr_store_scalar[()] = 42 + return zarr_store_scalar + + # Common codec configurations DELTA_CODEC = {"name": "numcodecs.delta", "configuration": {"dtype": "`_. - Added experimental ManifestStore (:pull:`490`). - Added :py:meth:`ManifestStore.to_virtual_dataset()` method (:pull:`522`). By `Tom Nicholas `_. diff --git a/virtualizarr/backend.py b/virtualizarr/backend.py index 16901c9ca..90881b73b 100644 --- a/virtualizarr/backend.py +++ b/virtualizarr/backend.py @@ -28,6 +28,7 @@ KerchunkVirtualBackend, NetCDF3VirtualBackend, TIFFVirtualBackend, + ZarrVirtualBackend, ) from virtualizarr.readers.api import VirtualBackend from virtualizarr.utils import _FsspecFSFromFilepath @@ -43,6 +44,7 @@ # TODO add entrypoint to allow external libraries to add to this mapping VIRTUAL_BACKENDS = { "kerchunk": KerchunkVirtualBackend, + "zarr": ZarrVirtualBackend, "dmrpp": DMRPPVirtualBackend, "hdf5": HDFVirtualBackend, "netcdf4": HDFVirtualBackend, # note this is the same as for hdf5 @@ -70,6 +72,7 @@ class FileType(AutoName): fits = auto() dmrpp = auto() kerchunk = auto() + zarr = auto() def automatically_determine_filetype( @@ -87,8 +90,7 @@ def automatically_determine_filetype( # TODO how do we handle kerchunk json / parquet here? if Path(filepath).suffix == ".zarr": - # TODO we could imagine opening an existing zarr store, concatenating it, and writing a new virtual one... - raise NotImplementedError() + return FileType.zarr # Read magic bytes from local or remote file fpath = _FsspecFSFromFilepath( diff --git a/virtualizarr/codecs.py b/virtualizarr/codecs.py index fa8d01d90..46e342f18 100644 --- a/virtualizarr/codecs.py +++ b/virtualizarr/codecs.py @@ -94,10 +94,20 @@ def get_codec_config(codec: ZarrCodec) -> dict[str, Any]: """ Extract configuration from a codec, handling both zarr-python and numcodecs codecs. """ + if hasattr(codec, "codec_config"): return codec.codec_config elif hasattr(codec, "get_config"): return codec.get_config() + elif hasattr(codec, "_zstd_codec"): + # related issue: https://github.com/zarr-developers/VirtualiZarr/issues/514 + # very silly workaround. codec.to_dict for zstd gives: + # {'name': 'zstd', 'configuration': {'level': 0, 'checksum': False}} + # which when passed through ArrayV2Metadata -> numcodecs.get_codec gives the error: + # *** numcodecs.errors.UnknownCodecError: codec not available: 'None' + # if codec._zstd_codec.get_config() : {'id': 'zstd', 'level': 0, 'checksum': False} + # is passed to numcodecs.get_codec. It works fine. + return codec._zstd_codec.get_config() elif hasattr(codec, "to_dict"): return codec.to_dict() else: diff --git a/virtualizarr/manifests/manifest.py b/virtualizarr/manifests/manifest.py index 4f712b54e..981dd0759 100644 --- a/virtualizarr/manifests/manifest.py +++ b/virtualizarr/manifests/manifest.py @@ -1,4 +1,3 @@ -import json import re from collections.abc import ItemsView, Iterable, Iterator, KeysView, ValuesView from pathlib import PosixPath @@ -84,7 +83,8 @@ def validate_and_normalize_path_to_uri(path: str, fs_root: str | None = None) -> return urlunparse(components) elif any(path.startswith(prefix) for prefix in VALID_URI_PREFIXES): - if not PosixPath(path).suffix: + # Question: This feels fragile, is there a better way to ID a Zarr + if not PosixPath(path).suffix and "zarr" not in path: raise ValueError( f"entries in the manifest must be paths to files, but this path has no file suffix: {path}" ) @@ -96,7 +96,7 @@ def validate_and_normalize_path_to_uri(path: str, fs_root: str | None = None) -> # using PosixPath here ensures a clear error would be thrown on windows (whose paths and platform are not officially supported) _path = PosixPath(path) - if not _path.suffix: + if not _path.suffix and "zarr" not in path: raise ValueError( f"entries in the manifest must be paths to files, but this path has no file suffix: {path}" ) @@ -436,21 +436,6 @@ def __eq__(self, other: Any) -> bool: lengths_equal = (self._lengths == other._lengths).all() return paths_equal and offsets_equal and lengths_equal - @classmethod - def from_zarr_json(cls, filepath: str) -> "ChunkManifest": - """Create a ChunkManifest from a Zarr manifest.json file.""" - - with open(filepath, "r") as manifest_file: - entries = json.load(manifest_file) - - return cls(entries=entries) - - def to_zarr_json(self, filepath: str) -> None: - """Write the manifest to a Zarr manifest.json file.""" - entries = self.dict() - with open(filepath, "w") as json_file: - json.dump(entries, json_file, indent=4, separators=(", ", ": ")) - def rename_paths( self, new: str | Callable[[str], str], diff --git a/virtualizarr/manifests/store.py b/virtualizarr/manifests/store.py index 01f2db679..42cd1c614 100644 --- a/virtualizarr/manifests/store.py +++ b/virtualizarr/manifests/store.py @@ -18,7 +18,7 @@ from virtualizarr.manifests.array import ManifestArray from virtualizarr.manifests.group import ManifestGroup -from virtualizarr.vendor.zarr.metadata import dict_to_buffer +from virtualizarr.vendor.zarr.core.metadata import dict_to_buffer if TYPE_CHECKING: from collections.abc import AsyncGenerator, Iterable, Mapping diff --git a/virtualizarr/readers/__init__.py b/virtualizarr/readers/__init__.py index aa3e4e640..3d887844c 100644 --- a/virtualizarr/readers/__init__.py +++ b/virtualizarr/readers/__init__.py @@ -5,6 +5,9 @@ from virtualizarr.readers.kerchunk import KerchunkVirtualBackend from virtualizarr.readers.netcdf3 import NetCDF3VirtualBackend from virtualizarr.readers.tiff import TIFFVirtualBackend +from virtualizarr.readers.zarr import ( + ZarrVirtualBackend, +) __all__ = [ "DMRPPVirtualBackend", @@ -14,4 +17,5 @@ "KerchunkVirtualBackend", "NetCDF3VirtualBackend", "TIFFVirtualBackend", + "ZarrVirtualBackend", ] diff --git a/virtualizarr/readers/zarr.py b/virtualizarr/readers/zarr.py new file mode 100644 index 000000000..0cea7a16f --- /dev/null +++ b/virtualizarr/readers/zarr.py @@ -0,0 +1,196 @@ +from __future__ import annotations + +import asyncio +from pathlib import Path # noqa +from typing import ( + Any, + Hashable, + Iterable, + Mapping, + Optional, +) + +import numpy as np +from xarray import Dataset, Index +from zarr.api.asynchronous import open_group as open_group_async +from zarr.core.metadata import ArrayV3Metadata + +from virtualizarr.manifests import ( + ChunkManifest, + ManifestArray, + ManifestGroup, + ManifestStore, +) +from virtualizarr.manifests.manifest import validate_and_normalize_path_to_uri # noqa +from virtualizarr.readers.api import VirtualBackend +from virtualizarr.vendor.zarr.core.common import _concurrent_map + +FillValueT = bool | str | float | int | list | None + +ZARR_DEFAULT_FILL_VALUE: dict[str, FillValueT] = { + # numpy dtypes's hierarchy lets us avoid checking for all the widths + # https://numpy.org/doc/stable/reference/arrays.scalars.html + np.dtype("bool").kind: False, + np.dtype("int").kind: 0, + np.dtype("float").kind: 0.0, + np.dtype("complex").kind: [0.0, 0.0], + np.dtype("datetime64").kind: 0, +} + + +import zarr + + +async def get_chunk_mapping_prefix(zarr_array: zarr.AsyncArray, filepath: str) -> dict: + """Create a dictionary to pass into ChunkManifest __init__""" + + # TODO: For when we want to support reading V2 we should parse the /c/ and "/" between chunks + if zarr_array.shape == (): + # If we have a scalar array `c` + # https://zarr-specs.readthedocs.io/en/latest/v3/chunk-key-encodings/default/index.html#description + + prefix = zarr_array.name.lstrip("/") + "/c" + prefix_keys = [(prefix,)] + _lengths = [await zarr_array.store.getsize("c")] + _dict_keys = ["c"] + _paths = [filepath + "/" + _dict_keys[0]] + + else: + prefix = zarr_array.name.lstrip("/") + "/c/" + prefix_keys = [(x,) async for x in zarr_array.store.list_prefix(prefix)] + _lengths = await _concurrent_map(prefix_keys, zarr_array.store.getsize) + chunk_keys = [x[0].split(prefix)[1] for x in prefix_keys] + _dict_keys = [key.replace("/", ".") for key in chunk_keys] + _paths = [filepath + "/" + prefix + key for key in chunk_keys] + + _offsets = [0] * len(_lengths) + return { + key: {"path": path, "offset": offset, "length": length} + for key, path, offset, length in zip( + _dict_keys, + _paths, + _offsets, + _lengths, + ) + } + + +async def build_chunk_manifest( + zarr_array: zarr.AsyncArray, filepath: str +) -> ChunkManifest: + """Build a ChunkManifest from a dictionary""" + chunk_map = await get_chunk_mapping_prefix(zarr_array=zarr_array, filepath=filepath) + return ChunkManifest(chunk_map) + + +def get_metadata(zarr_array: zarr.AsyncArray[Any]) -> ArrayV3Metadata: + fill_value = zarr_array.metadata.fill_value + if fill_value is not None: + fill_value = ZARR_DEFAULT_FILL_VALUE[zarr_array.metadata.fill_value.dtype.kind] + + zarr_format = zarr_array.metadata.zarr_format + + if zarr_format == 2: + # TODO: Once we want to support V2, we will have to deconstruct the + # zarr_array codecs etc. and reconstruct them with create_v3_array_metadata + raise NotImplementedError("Reading Zarr V2 currently not supported.") + + elif zarr_format == 3: + return zarr_array.metadata + + else: + raise NotImplementedError("Zarr format is not recognized as v2 or v3.") + + +async def _construct_manifest_array(zarr_array: zarr.AsyncArray[Any], filepath: str): + array_metadata = get_metadata(zarr_array=zarr_array) + + chunk_manifest = await build_chunk_manifest(zarr_array, filepath=filepath) + return ManifestArray(metadata=array_metadata, chunkmanifest=chunk_manifest) + + +async def _construct_manifest_group( + filepath: str, + *, + reader_options: Optional[dict] = None, + drop_variables: str | Iterable[str] | None = None, + group: str | None = None, +): + reader_options = reader_options or {} + zarr_group = await open_group_async( + filepath, + storage_options=reader_options.get("storage_options"), + path=group, + mode="r", + ) + + zarr_array_keys = [key async for key in zarr_group.array_keys()] + + _drop_vars: list[Hashable] = [] if drop_variables is None else list(drop_variables) + + zarr_arrays = await asyncio.gather( + *[zarr_group.getitem(var) for var in zarr_array_keys if var not in _drop_vars] + ) + + manifest_arrays = await asyncio.gather( + *[ + _construct_manifest_array(zarr_array=array, filepath=filepath) # type: ignore[arg-type] + for array in zarr_arrays + ] + ) + + manifest_dict = { + array.basename: result for array, result in zip(zarr_arrays, manifest_arrays) + } + return ManifestGroup(manifest_dict, attributes=zarr_group.attrs) + + +def _construct_manifest_store( + filepath: str, + *, + reader_options: Optional[dict] = None, + drop_variables: str | Iterable[str] | None = None, + group: str | None = None, +) -> ManifestStore: + import asyncio + + manifest_group = asyncio.run( + _construct_manifest_group( + filepath=filepath, + group=group, + drop_variables=drop_variables, + reader_options=reader_options, + ) + ) + return ManifestStore(manifest_group) + + +class ZarrVirtualBackend(VirtualBackend): + @staticmethod + def open_virtual_dataset( + filepath: str, + group: str | None = None, + drop_variables: str | Iterable[str] | None = None, + loadable_variables: Iterable[str] | None = None, + decode_times: bool | None = None, + indexes: Mapping[str, Index] | None = None, + virtual_backend_kwargs: Optional[dict] = None, + reader_options: Optional[dict] = None, + ) -> Dataset: + filepath = validate_and_normalize_path_to_uri( + filepath, fs_root=Path.cwd().as_uri() + ) + + manifest_store = _construct_manifest_store( + filepath=filepath, + group=group, + drop_variables=drop_variables, + reader_options=reader_options, + ) + + ds = manifest_store.to_virtual_dataset( + loadable_variables=loadable_variables, + decode_times=decode_times, + indexes=indexes, + ) + return ds diff --git a/virtualizarr/tests/test_integration.py b/virtualizarr/tests/test_integration.py index e73c5ce6d..af61df828 100644 --- a/virtualizarr/tests/test_integration.py +++ b/virtualizarr/tests/test_integration.py @@ -158,6 +158,25 @@ def roundtrip_as_in_memory_icechunk( ], ) class TestRoundtrip: + def test_zarr_roundtrip( + self, + tmp_path, + roundtrip_func: RoundtripFunction, + ): + air_zarr_path = tmp_path / "air_temperature.zarr" + with xr.tutorial.open_dataset("air_temperature", decode_times=False) as ds: + # TODO: for now we will save as Zarr V3. Later we can parameterize it for V2. + ds.to_zarr(air_zarr_path, zarr_format=3, consolidated=False) + with open_virtual_dataset(str(air_zarr_path)) as vds: + roundtrip = roundtrip_func(vds, tmp_path, decode_times=False) + + # assert all_close to original dataset + xrt.assert_allclose(roundtrip, ds) + + # assert coordinate attributes are maintained + for coord in ds.coords: + assert ds.coords[coord].attrs == roundtrip.coords[coord].attrs + @parametrize_over_hdf_backends def test_roundtrip_no_concat( self, @@ -175,7 +194,6 @@ def test_roundtrip_no_concat( # use open_dataset_via_kerchunk to read it as references with open_virtual_dataset(str(air_nc_path), backend=hdf_backend) as vds: roundtrip = roundtrip_func(vds, tmp_path, decode_times=False) - # assert all_close to original dataset xrt.assert_allclose(roundtrip, ds) diff --git a/virtualizarr/tests/test_readers/test_zarr.py b/virtualizarr/tests/test_readers/test_zarr.py new file mode 100644 index 000000000..5a4057293 --- /dev/null +++ b/virtualizarr/tests/test_readers/test_zarr.py @@ -0,0 +1,91 @@ +import numpy as np +import pytest + +from virtualizarr import open_virtual_dataset +from virtualizarr.manifests import ManifestArray +from virtualizarr.readers.zarr import get_chunk_mapping_prefix +from virtualizarr.tests import requires_network + + +@requires_network +@pytest.mark.parametrize( + "zarr_store", + [ + pytest.param( + 2, + id="Zarr V2", + marks=pytest.mark.skip(reason="Zarr V2 not currently supported."), + ), + pytest.param(3, id="Zarr V3"), + ], + indirect=True, +) +class TestOpenVirtualDatasetZarr: + def test_loadable_variables(self, zarr_store, loadable_variables=["time", "air"]): + # check loadable variables + vds = open_virtual_dataset( + filepath=zarr_store, loadable_variables=loadable_variables + ) + assert isinstance(vds["time"].data, np.ndarray) + assert isinstance(vds["air"].data, np.ndarray), type(vds["air"].data) + + def test_drop_variables(self, zarr_store, drop_variables=["air"]): + # check variable is dropped + vds = open_virtual_dataset(filepath=zarr_store, drop_variables=drop_variables) + assert len(vds.data_vars) == 0 + + def test_manifest_indexing(self, zarr_store): + vds = open_virtual_dataset(filepath=zarr_store) + assert "0.0.0" in vds["air"].data.manifest.dict().keys() + + def test_virtual_dataset_zarr_attrs(self, zarr_store): + import zarr + + zg = zarr.open_group(zarr_store) + vds = open_virtual_dataset(filepath=zarr_store, loadable_variables=[]) + + non_var_arrays = ["time", "lat", "lon"] + + # check dims and coords are present + assert set(vds.coords) == set(non_var_arrays) + assert set(vds.dims) == set(non_var_arrays) + # check vars match + assert set(vds.keys()) == set(["air"]) + + # check top level attrs + assert zg.attrs.asdict() == vds.attrs + + arrays = [val for val in zg.keys()] + + # arrays are ManifestArrays + for array in arrays: + # check manifest array ArrayV3Metadata dtype + assert isinstance(vds[array].data, ManifestArray) + # compare manifest array ArrayV3Metadata + expected = zg[array].metadata.to_dict() + # Check attributes + assert expected["attributes"] == vds[array].attrs + assert expected["dimension_names"] == vds[array].dims + expected.pop( + "dimension_names" + ) # dimension_names are removed in conversion to virtual variable + expected[ + "attributes" + ] = {} # attributes are removed in conversion to virtual variable + actual = vds[array].data.metadata.to_dict() + assert expected == actual + + +def test_scalar_get_chunk_mapping_prefix(zarr_store_scalar): + # Use a scalar zarr store with a /c/ representing the scalar: + # https://zarr-specs.readthedocs.io/en/latest/v3/chunk-key-encodings/default/index.html#description + + import asyncio + + chunk_map = asyncio.run( + get_chunk_mapping_prefix( + zarr_array=zarr_store_scalar, filepath=str(zarr_store_scalar.store_path) + ) + ) + assert chunk_map["c"]["offset"] == 0 + assert chunk_map["c"]["length"] == 10 diff --git a/virtualizarr/utils.py b/virtualizarr/utils.py index b25b6c1e8..90c0678a2 100644 --- a/virtualizarr/utils.py +++ b/virtualizarr/utils.py @@ -14,6 +14,7 @@ if TYPE_CHECKING: import fsspec.core import fsspec.spec + import upath from obstore import ReadableFile from obstore.store import ObjectStore @@ -62,6 +63,7 @@ class _FsspecFSFromFilepath: filepath: str reader_options: Optional[dict] = field(default_factory=dict) fs: fsspec.AbstractFileSystem = field(init=False) + upath: upath.core.UPath = field(init=False) def open_file(self) -> OpenFileType: """Calls `.open` on fsspec.Filesystem instantiation using self.filepath as an input. @@ -77,18 +79,25 @@ def read_bytes(self, bytes: int) -> bytes: with self.open_file() as of: return of.read(bytes) + def get_mapper(self): + """Returns a mapper for use with Zarr""" + return self.fs.get_mapper(self.filepath) + def __post_init__(self) -> None: """Initialize the fsspec filesystem object""" import fsspec from upath import UPath - universal_filepath = UPath(self.filepath) - protocol = universal_filepath.protocol + if not isinstance(self.filepath, UPath): + upath = UPath(self.filepath) + + self.upath = upath + self.protocol = upath.protocol self.reader_options = self.reader_options or {} storage_options = self.reader_options.get("storage_options", {}) # type: ignore - self.fs = fsspec.filesystem(protocol, **storage_options) + self.fs = fsspec.filesystem(self.protocol, **storage_options) def check_for_collisions( @@ -168,7 +177,6 @@ def convert_v3_to_v2_metadata( array_filters: tuple[ArrayArrayCodec, ...] bytes_compressors: tuple[BytesBytesCodec, ...] array_filters, _, bytes_compressors = extract_codecs(v3_metadata.codecs) - # Handle compressor configuration compressor_config: dict[str, Any] | None = None if bytes_compressors: @@ -182,6 +190,7 @@ def convert_v3_to_v2_metadata( # Handle filter configurations filter_configs = [get_codec_config(filter_) for filter_ in array_filters] + v2_metadata = ArrayV2Metadata( shape=v3_metadata.shape, dtype=v3_metadata.data_type.to_numpy(), diff --git a/virtualizarr/vendor/__init__.py b/virtualizarr/vendor/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/virtualizarr/vendor/zarr/__init__.py b/virtualizarr/vendor/zarr/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/virtualizarr/vendor/zarr/core/__init__.py b/virtualizarr/vendor/zarr/core/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/virtualizarr/vendor/zarr/core/common.py b/virtualizarr/vendor/zarr/core/common.py new file mode 100644 index 000000000..b6363db76 --- /dev/null +++ b/virtualizarr/vendor/zarr/core/common.py @@ -0,0 +1,34 @@ +import asyncio +from itertools import starmap +from typing import ( + Any, + Awaitable, + Callable, + Iterable, + TypeVar, +) + +# Vendored directly from Zarr-python V3's private API +# https://github.com/zarr-developers/zarr-python/blob/458299857141a5470ba3956d8a1607f52ac33857/src/zarr/core/common.py#L53 +T = TypeVar("T", bound=tuple[Any, ...]) +V = TypeVar("V") + + +async def _concurrent_map( + items: Iterable[T], + func: Callable[..., Awaitable[V]], + limit: int | None = None, +) -> list[V]: + if limit is None: + return await asyncio.gather(*list(starmap(func, items))) + + else: + sem = asyncio.Semaphore(limit) + + async def run(item: tuple[Any]) -> V: + async with sem: + return await func(*item) + + return await asyncio.gather( + *[asyncio.ensure_future(run(item)) for item in items] + ) diff --git a/virtualizarr/vendor/zarr/metadata.py b/virtualizarr/vendor/zarr/core/metadata.py similarity index 100% rename from virtualizarr/vendor/zarr/metadata.py rename to virtualizarr/vendor/zarr/core/metadata.py diff --git a/virtualizarr/xarray.py b/virtualizarr/xarray.py index 124103a81..36b810b27 100644 --- a/virtualizarr/xarray.py +++ b/virtualizarr/xarray.py @@ -77,7 +77,6 @@ def construct_virtual_dataset( return replace_virtual_with_loadable_vars( fully_virtual_ds, loadable_ds, loadable_variables ) - else: # TODO pre-ManifestStore codepath, remove once all readers use ManifestStore approach