Skip to content

Commit e707375

Browse files
maxrjonessharkinsspatialpre-commit-ci[bot]
authored
Add a function to produce a ManifestStore from HDF5 files (#516)
* Add a function to produce a ManifestStore from HDF5 files * Improve chunk index determination * Allow specifying chunk key encoding * Move obstore import * Add docstring entry * Fix typo * Return empty ChunkManifests for empty HDF Datasets. * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Fix type checking block. --------- Co-authored-by: sharkinsspatial <[email protected]> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent e5fff19 commit e707375

File tree

4 files changed

+208
-41
lines changed

4 files changed

+208
-41
lines changed

virtualizarr/readers/hdf/hdf.py

Lines changed: 140 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from pathlib import Path
55
from typing import (
66
TYPE_CHECKING,
7+
Any,
78
Dict,
89
Hashable,
910
Iterable,
@@ -27,6 +28,8 @@
2728
ChunkEntry,
2829
ChunkManifest,
2930
ManifestArray,
31+
ManifestGroup,
32+
ManifestStore,
3033
)
3134
from virtualizarr.manifests.manifest import validate_and_normalize_path_to_uri
3235
from virtualizarr.manifests.utils import create_v3_array_metadata
@@ -41,6 +44,7 @@
4144
if TYPE_CHECKING:
4245
from h5py import Dataset as H5Dataset
4346
from h5py import Group as H5Group
47+
from obstore.store import ObjectStore
4448

4549
FillValueType = Union[
4650
int,
@@ -58,6 +62,111 @@
5862

5963

6064
class HDFVirtualBackend(VirtualBackend):
65+
@staticmethod
66+
def _construct_manifest_array(
67+
path: str,
68+
dataset: H5Dataset,
69+
group: str,
70+
) -> ManifestArray:
71+
"""
72+
Construct a ManifestArray from an h5py dataset
73+
Parameters
74+
----------
75+
path: str
76+
The path of the hdf5 file.
77+
dataset : h5py.Dataset
78+
An h5py dataset.
79+
group : str
80+
Name of the group containing this h5py.Dataset.
81+
Returns
82+
-------
83+
ManifestArray
84+
"""
85+
chunks = dataset.chunks if dataset.chunks else dataset.shape
86+
codecs = codecs_from_dataset(dataset)
87+
attrs = HDFVirtualBackend._extract_attrs(dataset)
88+
dtype = dataset.dtype
89+
90+
codec_configs = [
91+
numcodec_config_to_configurable(codec.get_config()) for codec in codecs
92+
]
93+
94+
fill_value = dataset.fillvalue.item()
95+
dims = tuple(HDFVirtualBackend._dataset_dims(dataset, group=group))
96+
metadata = create_v3_array_metadata(
97+
shape=dataset.shape,
98+
data_type=dtype,
99+
chunk_shape=chunks,
100+
fill_value=fill_value,
101+
codecs=codec_configs,
102+
dimension_names=dims,
103+
attributes=attrs,
104+
)
105+
106+
manifest = HDFVirtualBackend._dataset_chunk_manifest(path, dataset)
107+
return ManifestArray(metadata=metadata, chunkmanifest=manifest)
108+
109+
@staticmethod
110+
def _construct_manifest_group(
111+
store: ObjectStore,
112+
filepath: str,
113+
*,
114+
group: str | None = None,
115+
drop_variables: Optional[List[str]] = None,
116+
) -> ManifestGroup:
117+
"""
118+
Construct a virtual Group from a HDF dataset.
119+
"""
120+
from virtualizarr.utils import ObstoreReader
121+
122+
if drop_variables is None:
123+
drop_variables = []
124+
125+
reader = ObstoreReader(store=store, path=filepath)
126+
f = h5py.File(reader, mode="r")
127+
128+
if group is not None and group != "":
129+
g = f[group]
130+
group_name = group
131+
if not isinstance(g, h5py.Group):
132+
raise ValueError("The provided group is not an HDF group")
133+
else:
134+
g = f["/"]
135+
group_name = "/"
136+
137+
manifest_dict = {}
138+
non_coordinate_dimesion_vars = HDFVirtualBackend._find_non_coord_dimension_vars(
139+
group=g
140+
)
141+
drop_variables = list(set(drop_variables + non_coordinate_dimesion_vars))
142+
attrs: dict[str, Any] = {}
143+
for key in g.keys():
144+
if key not in drop_variables:
145+
if isinstance(g[key], h5py.Dataset):
146+
variable = HDFVirtualBackend._construct_manifest_array(
147+
path=filepath,
148+
dataset=g[key],
149+
group=group_name,
150+
)
151+
if variable is not None:
152+
manifest_dict[key] = variable
153+
return ManifestGroup(arrays=manifest_dict, attributes=attrs)
154+
155+
@staticmethod
156+
def _create_manifest_store(
157+
filepath: str,
158+
*,
159+
prefix: str,
160+
store: ObjectStore,
161+
group: str | None = None,
162+
) -> ManifestStore:
163+
# Create a group containing dataset level metadata and all the manifest arrays
164+
manifest_group = HDFVirtualBackend._construct_manifest_group(
165+
store=store, filepath=filepath, group=group
166+
)
167+
# Convert to a manifest store
168+
return ManifestStore(stores={prefix: store}, group=manifest_group)
169+
61170
@staticmethod
62171
def open_virtual_dataset(
63172
filepath: str,
@@ -119,7 +228,7 @@ def open_virtual_dataset(
119228
def _dataset_chunk_manifest(
120229
path: str,
121230
dataset: H5Dataset,
122-
) -> Optional[ChunkManifest]:
231+
) -> ChunkManifest:
123232
"""
124233
Generate ChunkManifest for HDF5 dataset.
125234
@@ -138,7 +247,7 @@ def _dataset_chunk_manifest(
138247
dsid = dataset.id
139248
if dataset.chunks is None:
140249
if dsid.get_offset() is None:
141-
return None
250+
chunk_manifest = ChunkManifest(entries={}, shape=dataset.shape)
142251
else:
143252
key_list = [0] * (len(dataset.shape) or 1)
144253
key = ".".join(map(str, key_list))
@@ -149,42 +258,42 @@ def _dataset_chunk_manifest(
149258
chunk_key = ChunkKey(key)
150259
chunk_entries = {chunk_key: chunk_entry}
151260
chunk_manifest = ChunkManifest(entries=chunk_entries)
152-
return chunk_manifest
153261
else:
154262
num_chunks = dsid.get_num_chunks()
155263
if num_chunks == 0:
156-
raise ValueError("The dataset is chunked but contains no chunks")
157-
shape = tuple(
158-
math.ceil(a / b) for a, b in zip(dataset.shape, dataset.chunks)
159-
)
160-
paths = np.empty(shape, dtype=np.dtypes.StringDType) # type: ignore
161-
offsets = np.empty(shape, dtype=np.uint64)
162-
lengths = np.empty(shape, dtype=np.uint64)
163-
164-
def get_key(blob):
165-
return tuple(
166-
[a // b for a, b in zip(blob.chunk_offset, dataset.chunks)]
264+
chunk_manifest = ChunkManifest(entries={}, shape=dataset.shape)
265+
else:
266+
shape = tuple(
267+
math.ceil(a / b) for a, b in zip(dataset.shape, dataset.chunks)
167268
)
269+
paths = np.empty(shape, dtype=np.dtypes.StringDType) # type: ignore
270+
offsets = np.empty(shape, dtype=np.uint64)
271+
lengths = np.empty(shape, dtype=np.uint64)
272+
273+
def get_key(blob):
274+
return tuple(
275+
[a // b for a, b in zip(blob.chunk_offset, dataset.chunks)]
276+
)
168277

169-
def add_chunk_info(blob):
170-
key = get_key(blob)
171-
paths[key] = path
172-
offsets[key] = blob.byte_offset
173-
lengths[key] = blob.size
278+
def add_chunk_info(blob):
279+
key = get_key(blob)
280+
paths[key] = path
281+
offsets[key] = blob.byte_offset
282+
lengths[key] = blob.size
174283

175-
has_chunk_iter = callable(getattr(dsid, "chunk_iter", None))
176-
if has_chunk_iter:
177-
dsid.chunk_iter(add_chunk_info)
178-
else:
179-
for index in range(num_chunks):
180-
add_chunk_info(dsid.get_chunk_info(index))
284+
has_chunk_iter = callable(getattr(dsid, "chunk_iter", None))
285+
if has_chunk_iter:
286+
dsid.chunk_iter(add_chunk_info)
287+
else:
288+
for index in range(num_chunks):
289+
add_chunk_info(dsid.get_chunk_info(index))
181290

182-
chunk_manifest = ChunkManifest.from_arrays(
183-
paths=paths, # type: ignore
184-
offsets=offsets,
185-
lengths=lengths,
186-
)
187-
return chunk_manifest
291+
chunk_manifest = ChunkManifest.from_arrays(
292+
paths=paths, # type: ignore
293+
offsets=offsets,
294+
lengths=lengths,
295+
)
296+
return chunk_manifest
188297

189298
@staticmethod
190299
def _dataset_dims(dataset: H5Dataset, group: str = "") -> List[str]:

virtualizarr/tests/test_readers/test_hdf/test_hdf.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,18 @@ class TestDatasetChunkManifest:
1616
def test_empty_chunks(self, empty_chunks_hdf5_file):
1717
f = h5py.File(empty_chunks_hdf5_file)
1818
ds = f["data"]
19-
with pytest.raises(ValueError, match="chunked but contains no chunks"):
20-
HDFVirtualBackend._dataset_chunk_manifest(
21-
path=empty_chunks_hdf5_file, dataset=ds
22-
)
19+
manifest = HDFVirtualBackend._dataset_chunk_manifest(
20+
path=empty_chunks_hdf5_file, dataset=ds
21+
)
22+
assert manifest.shape_chunk_grid == (0,)
2323

24-
@pytest.mark.skip("Need to differentiate non coordinate dimensions from empty")
2524
def test_empty_dataset(self, empty_dataset_hdf5_file):
2625
f = h5py.File(empty_dataset_hdf5_file)
2726
ds = f["data"]
28-
with pytest.raises(ValueError, match="no space allocated in the file"):
29-
HDFVirtualBackend._dataset_chunk_manifest(
30-
path=empty_dataset_hdf5_file, dataset=ds
31-
)
27+
manifest = HDFVirtualBackend._dataset_chunk_manifest(
28+
path=empty_dataset_hdf5_file, dataset=ds
29+
)
30+
assert manifest.shape_chunk_grid == (0,)
3231

3332
def test_no_chunking(self, no_chunks_hdf5_file):
3433
f = h5py.File(no_chunks_hdf5_file)
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import numpy as np
2+
import pytest
3+
import xarray as xr
4+
5+
from virtualizarr.readers.hdf import HDFVirtualBackend
6+
from virtualizarr.tests import (
7+
requires_hdf5plugin,
8+
requires_obstore,
9+
)
10+
11+
12+
@pytest.fixture(name="basic_ds")
13+
def basic_ds():
14+
x = np.arange(100)
15+
y = np.arange(100)
16+
temperature = 0.1 * x[:, None] + 0.1 * y[None, :]
17+
ds = xr.Dataset(
18+
{"temperature": (["x", "y"], temperature)},
19+
coords={"x": np.arange(100), "y": np.arange(100)},
20+
)
21+
return ds
22+
23+
24+
@requires_hdf5plugin
25+
@requires_obstore
26+
class TestHDFManifestStore:
27+
def test_rountrip_simple_virtualdataset(self, tmpdir, basic_ds):
28+
from obstore.store import LocalStore
29+
30+
"Roundtrip a dataset to/from NetCDF with the HDF reader and ManifestStore"
31+
32+
filepath = f"{tmpdir}/basic_ds_roundtrip.nc"
33+
basic_ds.to_netcdf(filepath, engine="h5netcdf")
34+
store = HDFVirtualBackend._create_manifest_store(
35+
filepath=filepath, store=LocalStore(), prefix="file://"
36+
)
37+
rountripped_ds = xr.open_dataset(
38+
store, engine="zarr", consolidated=False, zarr_format=3
39+
)
40+
xr.testing.assert_allclose(basic_ds, rountripped_ds)

virtualizarr/utils.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import importlib
44
import io
5+
from dataclasses import dataclass, field
56
from typing import TYPE_CHECKING, Any, Iterable, Optional, Union
67

78
from zarr.abc.codec import ArrayArrayCodec, BytesBytesCodec
@@ -12,14 +13,32 @@
1213
if TYPE_CHECKING:
1314
import fsspec.core
1415
import fsspec.spec
16+
from obstore import ReadableFile
17+
from obstore.store import ObjectStore
1518

1619
# See pangeo_forge_recipes.storage
1720
OpenFileType = Union[
1821
fsspec.core.OpenFile, fsspec.spec.AbstractBufferedFile, io.IOBase
1922
]
2023

2124

22-
from dataclasses import dataclass, field
25+
class ObstoreReader:
26+
_reader: ReadableFile
27+
28+
def __init__(self, store: ObjectStore, path: str) -> None:
29+
import obstore as obs
30+
31+
self._reader = obs.open_reader(store, path)
32+
33+
def read(self, size: int, /) -> bytes:
34+
return self._reader.read(size).to_bytes()
35+
36+
def seek(self, offset: int, whence: int = 0, /):
37+
# TODO: Check on default for whence
38+
return self._reader.seek(offset, whence)
39+
40+
def tell(self) -> int:
41+
return self._reader.tell()
2342

2443

2544
@dataclass

0 commit comments

Comments
 (0)