Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import h5py # type: ignore[import]
import numpy as np
import pytest
import ujson
import xarray as xr
from obstore.store import LocalStore
from xarray.core.variable import Variable
Expand Down Expand Up @@ -289,11 +290,16 @@ def netcdf4_virtual_dataset(netcdf4_file):


@pytest.fixture
def netcdf4_inlined_ref(netcdf4_file):
def netcdf4_inlined_ref(tmp_path, netcdf4_file):
"""Create an inlined reference from a NetCDF4 file."""
from kerchunk.hdf import SingleHdf5ToZarr

return SingleHdf5ToZarr(netcdf4_file, inline_threshold=1000).translate()
ref_filepath = tmp_path / "ref.json"
refs = SingleHdf5ToZarr(netcdf4_file, inline_threshold=1000).translate()
with open(ref_filepath, "w") as json_file:
ujson.dump(refs, json_file)

return f"file://{ref_filepath}"


# HDF5 file fixtures
Expand Down
1 change: 1 addition & 0 deletions virtualizarr/manifests/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"file:///",
"http://",
"https://",
"memory://",
}


Expand Down
6 changes: 6 additions & 0 deletions virtualizarr/parsers/kerchunk/json.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from collections.abc import Iterable

import ujson
from obstore.store import MemoryStore

from virtualizarr.manifests import ManifestStore
from virtualizarr.parsers.kerchunk.translator import manifestgroup_from_kerchunk_refs
Expand Down Expand Up @@ -55,6 +56,8 @@ def __call__(
"""
store, path_after_prefix = registry.resolve(url)

cache = MemoryStore()

# we need the whole thing so just get the entire contents in one request
resp = store.get(path_after_prefix)
content = resp.bytes().to_bytes()
Expand All @@ -65,5 +68,8 @@ def __call__(
group=self.group,
fs_root=self.fs_root,
skip_variables=self.skip_variables,
cache=cache,
)

registry.register("memory://kerchunk/", cache)
return ManifestStore(group=manifestgroup, registry=registry)
39 changes: 36 additions & 3 deletions virtualizarr/parsers/kerchunk/translator.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from __future__ import annotations

import base64
import uuid
from collections.abc import Iterable
from typing import cast
from typing import TYPE_CHECKING, cast

import numpy as np
import ujson
Expand All @@ -24,6 +26,11 @@
)
from virtualizarr.utils import determine_chunk_grid_shape

if TYPE_CHECKING:
from obstore.store import (
ObjectStore,
)


def from_kerchunk_refs(decoded_arr_refs_zarray, zattrs) -> "ArrayV3Metadata":
"""
Expand Down Expand Up @@ -67,6 +74,7 @@ def from_kerchunk_refs(decoded_arr_refs_zarray, zattrs) -> "ArrayV3Metadata":
codec_configs = [*filters, *(compressor if compressor is not None else [])]
numcodec_configs = [zarr_codec_config_to_v3(config) for config in codec_configs]
dimension_names = decoded_arr_refs_zarray["dimension_names"]

return create_v3_array_metadata(
chunk_shape=tuple(decoded_arr_refs_zarray["chunks"]),
data_type=dtype,
Expand All @@ -83,6 +91,7 @@ def manifestgroup_from_kerchunk_refs(
group: str | None = None,
fs_root: str | None = None,
skip_variables: Iterable[str] | None = None,
cache: ObjectStore | None = None,
) -> ManifestGroup:
"""
Construct a ManifestGroup from a dictionary of kerchunk references.
Expand All @@ -98,6 +107,8 @@ def manifestgroup_from_kerchunk_refs(
Required if any paths are relative in order to turn them into absolute paths (which virtualizarr requires).
skip_variables
Variables to ignore when creating the ManifestGroup.
cache
ObjectStore to use for caching in-lined data variables

Returns
-------
Expand All @@ -114,7 +125,9 @@ def manifestgroup_from_kerchunk_refs(

# TODO support iterating over multiple nested groups
marrs = {
arr_name: manifestarray_from_kerchunk_refs(refs, arr_name, fs_root=fs_root)
arr_name: manifestarray_from_kerchunk_refs(
refs, arr_name, fs_root=fs_root, cache=cache
)
for arr_name in arr_names
}

Expand Down Expand Up @@ -168,6 +181,7 @@ def manifestarray_from_kerchunk_refs(
refs: KerchunkStoreRefs,
var_name: str,
fs_root: str | None = None,
cache: ObjectStore | None = None,
) -> ManifestArray:
"""Create a single ManifestArray by reading specific keys of a kerchunk references dict."""

Expand All @@ -177,7 +191,9 @@ def manifestarray_from_kerchunk_refs(
chunk_dict, metadata, zattrs = parse_array_refs(arr_refs)
# we want to remove the _ARRAY_DIMENSIONS from the final variables' .attrs
if chunk_dict:
manifest = manifest_from_kerchunk_chunk_dict(chunk_dict, fs_root=fs_root)
manifest = manifest_from_kerchunk_chunk_dict(
chunk_dict, fs_root=fs_root, cache=cache
)
marr = ManifestArray(metadata=metadata, chunkmanifest=manifest)
elif len(metadata.shape) != 0:
# empty variables don't have physical chunks, but zarray shows that the variable
Expand All @@ -198,12 +214,29 @@ def manifestarray_from_kerchunk_refs(
return marr


def manifest_from_inline_reference(
kerchunk_chunk_dict: dict[ChunkKey, str | tuple[str] | tuple[str, int, int]],
cache: ObjectStore,
) -> ChunkManifest:
data = base64.b64decode(kerchunk_chunk_dict["0"].lstrip("base64:"))
key = str(uuid.uuid4())
cache.put(key, data)
return ChunkManifest(
{"0": {"path": f"memory://kerchunk/{key}", "offset": 0, "length": len(data)}}
)


def manifest_from_kerchunk_chunk_dict(
kerchunk_chunk_dict: dict[ChunkKey, str | tuple[str] | tuple[str, int, int]],
fs_root: str | None = None,
cache: ObjectStore | None = None,
) -> ChunkManifest:
"""Create a single ChunkManifest from the mapping of keys to chunk information stored inside kerchunk array refs."""

if reference := kerchunk_chunk_dict.get("0"):
if isinstance(reference, str) and reference.startswith("base64:"):
return manifest_from_inline_reference(kerchunk_chunk_dict, cache)

chunk_entries: dict[ChunkKey, ChunkEntry] = {}
for k, v in kerchunk_chunk_dict.items():
if isinstance(v, (str, bytes)):
Expand Down
36 changes: 2 additions & 34 deletions virtualizarr/tests/test_parsers/test_kerchunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,31 +276,6 @@ def test_open_virtual_dataset_existing_kerchunk_refs(
assert set(vds.variables) == set(netcdf4_virtual_dataset.variables)


@requires_kerchunk
def test_notimplemented_read_inline_refs(tmp_path, netcdf4_inlined_ref, local_registry):
# For now, we raise a NotImplementedError if we read existing references that have inlined data
# https://github.com/zarr-developers/VirtualiZarr/pull/251#pullrequestreview-2361916932

ref_filepath = tmp_path / "ref.json"

import ujson

with open(ref_filepath, "w") as json_file:
ujson.dump(netcdf4_inlined_ref, json_file)

parser = KerchunkJSONParser()
with pytest.raises(
NotImplementedError,
match="Reading inlined reference data is currently not supported",
):
with open_virtual_dataset(
url=ref_filepath.as_posix(),
registry=local_registry,
parser=parser,
) as _:
pass


@pytest.mark.parametrize("skip_variables", ["a", ["a"]])
def test_skip_variables(refs_file_factory, skip_variables, local_registry):
refs_file = refs_file_factory()
Expand All @@ -314,16 +289,9 @@ def test_skip_variables(refs_file_factory, skip_variables, local_registry):


@requires_kerchunk
def test_load_manifest(tmp_path, netcdf4_file, netcdf4_virtual_dataset, local_registry):
refs = netcdf4_virtual_dataset.vz.to_kerchunk(format="dict")
ref_filepath = tmp_path / "ref.json"
with open(ref_filepath.as_posix(), "w") as json_file:
ujson.dump(refs, json_file)

def test_load_manifest(tmp_path, netcdf4_file, netcdf4_inlined_ref, local_registry):
parser = KerchunkJSONParser()
manifest_store = parser(
url=f"file://{ref_filepath.as_posix()}", registry=local_registry
)
manifest_store = parser(url=netcdf4_inlined_ref, registry=local_registry)
with (
xr.open_dataset(
netcdf4_file,
Expand Down
Loading