Skip to content

Commit 2e527cc

Browse files
committed
feature(store): implement high level store
1 parent 6811c94 commit 2e527cc

File tree

12 files changed

+4780
-273
lines changed

12 files changed

+4780
-273
lines changed

HIGH_LEVEL_STORE.md

Lines changed: 955 additions & 0 deletions
Large diffs are not rendered by default.

src/zarr/abc/store.py

Lines changed: 698 additions & 1 deletion
Large diffs are not rendered by default.

src/zarr/api/asynchronous.py

Lines changed: 44 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import asyncio
44
import dataclasses
55
import warnings
6-
from typing import TYPE_CHECKING, Any, Literal, NotRequired, TypedDict, cast
6+
from typing import TYPE_CHECKING, Any, Literal, NotRequired, TypedDict
77

88
import numpy as np
99
import numpy.typing as npt
@@ -16,8 +16,7 @@
1616
AsyncArray,
1717
CompressorLike,
1818
create_array,
19-
from_array,
20-
get_array_metadata,
19+
from_array, # TODO: deprecate?
2120
)
2221
from zarr.core.array_spec import ArrayConfigLike, parse_array_config
2322
from zarr.core.buffer import NDArrayLike
@@ -37,17 +36,17 @@
3736
GroupMetadata,
3837
create_hierarchy,
3938
)
40-
from zarr.core.metadata import ArrayMetadataDict, ArrayV2Metadata, ArrayV3Metadata
39+
from zarr.core.metadata import ArrayV2Metadata, ArrayV3Metadata
4140
from zarr.errors import (
4241
ArrayNotFoundError,
4342
GroupNotFoundError,
44-
NodeTypeValidationError,
4543
ZarrDeprecationWarning,
4644
ZarrRuntimeWarning,
4745
ZarrUserWarning,
4846
)
4947
from zarr.storage import StorePath
5048
from zarr.storage._common import make_store_path
49+
from zarr.storage._high_level import HighLevelStore
5150

5251
if TYPE_CHECKING:
5352
from collections.abc import Iterable
@@ -372,32 +371,56 @@ async def open(
372371
mode = "r"
373372
else:
374373
mode = "a"
375-
store_path = await make_store_path(store, mode=mode, path=path, storage_options=storage_options)
374+
store_path = await make_store_path(
375+
store, mode=mode, path=path, storage_options=storage_options, zarr_format=zarr_format
376+
)
377+
378+
hl_store = store_path.store
379+
assert isinstance(hl_store, HighLevelStore)
380+
381+
# Import AsyncGroup for use in both code paths below
376382

377383
# TODO: the mode check below seems wrong!
378384
if "shape" not in kwargs and mode in {"a", "r", "r+", "w"}:
385+
# Use HighLevelStore to detect node type
386+
379387
try:
380-
metadata_dict = await get_array_metadata(store_path, zarr_format=zarr_format)
381-
# TODO: remove this cast when we fix typing for array metadata dicts
382-
_metadata_dict = cast("ArrayMetadataDict", metadata_dict)
383-
# for v2, the above would already have raised an exception if not an array
384-
zarr_format = _metadata_dict["zarr_format"]
385-
is_v3_array = zarr_format == 3 and _metadata_dict.get("node_type") == "array"
386-
if is_v3_array or zarr_format == 2:
388+
# Get metadata and check node type
389+
metadata_obj = await hl_store.get_metadata(store_path.path)
390+
391+
# Check if it's an array
392+
if isinstance(metadata_obj, ArrayV2Metadata | ArrayV3Metadata):
387393
return AsyncArray(
388-
store_path=store_path, metadata=_metadata_dict, config=kwargs.get("config")
394+
store_path=store_path,
395+
metadata=metadata_obj.to_dict(),
396+
config=kwargs.get("config"),
389397
)
390-
except (AssertionError, FileNotFoundError, NodeTypeValidationError):
398+
else:
399+
# It's a group - use open_group to properly handle consolidated metadata
400+
return await open_group(
401+
store=store_path, zarr_format=zarr_format, mode=mode, **kwargs
402+
)
403+
except FileNotFoundError:
404+
# Neither array nor group exists, fall through to create path
391405
pass
406+
392407
return await open_group(store=store_path, zarr_format=zarr_format, mode=mode, **kwargs)
393408

409+
# User provided shape or other array-specific kwargs, try to create/open as array
410+
# But first check if there's already a group at this location
394411
try:
395-
return await open_array(store=store_path, zarr_format=zarr_format, mode=mode, **kwargs)
396-
except (KeyError, NodeTypeValidationError):
397-
# KeyError for a missing key
398-
# NodeTypeValidationError for failing to parse node metadata as an array when it's
399-
# actually a group
400-
return await open_group(store=store_path, zarr_format=zarr_format, mode=mode, **kwargs)
412+
# Check if something already exists at this path
413+
metadata_obj = await hl_store.get_metadata(store_path.path)
414+
415+
# If it's a group, try to open as group (will fail with TypeError if kwargs invalid for group)
416+
if not isinstance(metadata_obj, ArrayV2Metadata | ArrayV3Metadata):
417+
return await open_group(store=store_path, zarr_format=zarr_format, mode=mode, **kwargs)
418+
# If it's an array, fall through to open_array below
419+
except FileNotFoundError:
420+
# Nothing exists, fall through to create array
421+
pass
422+
423+
return await open_array(store=store_path, zarr_format=zarr_format, mode=mode, **kwargs)
401424

402425

403426
async def open_consolidated(

src/zarr/core/array.py

Lines changed: 84 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
from __future__ import annotations
22

3-
import json
43
import warnings
5-
from asyncio import gather
64
from collections.abc import Iterable, Mapping
75
from dataclasses import dataclass, field, replace
86
from itertools import starmap
@@ -39,7 +37,6 @@
3937
NDBuffer,
4038
default_buffer_prototype,
4139
)
42-
from zarr.core.buffer.cpu import buffer_prototype as cpu_buffer_prototype
4340
from zarr.core.chunk_grids import RegularChunkGrid, _auto_partition, normalize_chunks
4441
from zarr.core.chunk_key_encodings import (
4542
ChunkKeyEncoding,
@@ -50,9 +47,6 @@
5047
)
5148
from zarr.core.common import (
5249
JSON,
53-
ZARR_JSON,
54-
ZARRAY_JSON,
55-
ZATTRS_JSON,
5650
DimensionNames,
5751
MemoryOrder,
5852
ShapeLike,
@@ -116,11 +110,9 @@
116110
parse_compressor,
117111
parse_filters,
118112
)
119-
from zarr.core.metadata.v3 import parse_node_type_array
120113
from zarr.core.sync import sync
121114
from zarr.errors import (
122115
ArrayNotFoundError,
123-
MetadataValidationError,
124116
ZarrDeprecationWarning,
125117
ZarrUserWarning,
126118
)
@@ -216,65 +208,51 @@ def create_codec_pipeline(metadata: ArrayMetadata, *, store: Store | None = None
216208
async def get_array_metadata(
217209
store_path: StorePath, zarr_format: ZarrFormat | None = 3
218210
) -> dict[str, JSON]:
219-
if zarr_format == 2:
220-
zarray_bytes, zattrs_bytes = await gather(
221-
(store_path / ZARRAY_JSON).get(prototype=cpu_buffer_prototype),
222-
(store_path / ZATTRS_JSON).get(prototype=cpu_buffer_prototype),
223-
)
224-
if zarray_bytes is None:
211+
"""
212+
Get array metadata using HighLevelStore.
213+
214+
This function uses HighLevelStore to handle all format-specific metadata
215+
fetching and parsing, eliminating manual key access.
216+
217+
Note: If you need to specify zarr_format, it's better to pass it to make_store_path()
218+
when creating the StorePath, rather than passing it here.
219+
"""
220+
from zarr.storage._high_level import HighLevelStore
221+
222+
# Store is always a HighLevelStore (ensured by make_store_path)
223+
hl_store = store_path.store
224+
assert isinstance(hl_store, HighLevelStore)
225+
226+
# If format is specified and different from store's format, recreate HighLevelStore
227+
# This is a legacy code path - prefer passing zarr_format to make_store_path()
228+
if zarr_format is not None and (
229+
not hl_store._format_detected or hl_store._zarr_format != zarr_format
230+
):
231+
hl_store = HighLevelStore(hl_store.store, zarr_format=zarr_format)
232+
233+
try:
234+
# Use HighLevelStore to fetch and parse array metadata
235+
metadata_obj = await hl_store.get_array_metadata(store_path.path)
236+
# Convert to dict for compatibility
237+
return metadata_obj.to_dict()
238+
except FileNotFoundError as e:
239+
# Convert to ArrayNotFoundError for consistency with existing API
240+
if zarr_format == 2:
225241
msg = (
226242
"A Zarr V2 array metadata document was not found in store "
227243
f"{store_path.store!r} at path {store_path.path!r}."
228244
)
229-
raise ArrayNotFoundError(msg)
230-
elif zarr_format == 3:
231-
zarr_json_bytes = await (store_path / ZARR_JSON).get(prototype=cpu_buffer_prototype)
232-
if zarr_json_bytes is None:
245+
elif zarr_format == 3:
233246
msg = (
234247
"A Zarr V3 array metadata document was not found in store "
235248
f"{store_path.store!r} at path {store_path.path!r}."
236249
)
237-
raise ArrayNotFoundError(msg)
238-
elif zarr_format is None:
239-
zarr_json_bytes, zarray_bytes, zattrs_bytes = await gather(
240-
(store_path / ZARR_JSON).get(prototype=cpu_buffer_prototype),
241-
(store_path / ZARRAY_JSON).get(prototype=cpu_buffer_prototype),
242-
(store_path / ZATTRS_JSON).get(prototype=cpu_buffer_prototype),
243-
)
244-
if zarr_json_bytes is not None and zarray_bytes is not None:
245-
# warn and favor v3
246-
msg = f"Both zarr.json (Zarr format 3) and .zarray (Zarr format 2) metadata objects exist at {store_path}. Zarr v3 will be used."
247-
warnings.warn(msg, category=ZarrUserWarning, stacklevel=1)
248-
if zarr_json_bytes is None and zarray_bytes is None:
250+
else:
249251
msg = (
250252
f"Neither Zarr V3 nor Zarr V2 array metadata documents "
251253
f"were found in store {store_path.store!r} at path {store_path.path!r}."
252254
)
253-
raise ArrayNotFoundError(msg)
254-
# set zarr_format based on which keys were found
255-
if zarr_json_bytes is not None:
256-
zarr_format = 3
257-
else:
258-
zarr_format = 2
259-
else:
260-
msg = f"Invalid value for 'zarr_format'. Expected 2, 3, or None. Got '{zarr_format}'." # type: ignore[unreachable]
261-
raise MetadataValidationError(msg)
262-
263-
metadata_dict: dict[str, JSON]
264-
if zarr_format == 2:
265-
# V2 arrays are comprised of a .zarray and .zattrs objects
266-
assert zarray_bytes is not None
267-
metadata_dict = json.loads(zarray_bytes.to_bytes())
268-
zattrs_dict = json.loads(zattrs_bytes.to_bytes()) if zattrs_bytes is not None else {}
269-
metadata_dict["attributes"] = zattrs_dict
270-
else:
271-
# V3 arrays are comprised of a zarr.json object
272-
assert zarr_json_bytes is not None
273-
metadata_dict = json.loads(zarr_json_bytes.to_bytes())
274-
275-
parse_node_type_array(metadata_dict.get("node_type"))
276-
277-
return metadata_dict
255+
raise ArrayNotFoundError(msg) from e
278256

279257

280258
@dataclass(frozen=True)
@@ -1001,8 +979,34 @@ async def example():
1001979
# <AsyncArray memory://... shape=(100, 100) dtype=int32>
1002980
```
1003981
"""
1004-
store_path = await make_store_path(store)
1005-
metadata_dict = await get_array_metadata(store_path, zarr_format=zarr_format)
982+
# make_store_path creates StorePath with HighLevelStore configured for zarr_format
983+
store_path = await make_store_path(store, zarr_format=zarr_format)
984+
985+
# Store is always a HighLevelStore (ensured by make_store_path)
986+
from zarr.storage import HighLevelStore
987+
988+
hl_store = store_path.store
989+
assert isinstance(hl_store, HighLevelStore)
990+
991+
try:
992+
# Get array metadata using HighLevelStore
993+
metadata_obj = await hl_store.get_array_metadata(store_path.path)
994+
# Convert to dict for compatibility
995+
metadata_dict = metadata_obj.to_dict()
996+
except FileNotFoundError as e:
997+
# Convert to ArrayNotFoundError for consistency with existing API
998+
if zarr_format == 2:
999+
msg = (
1000+
"A Zarr V2 array metadata document was not found in store "
1001+
f"{store_path.store!r} at path {store_path.path!r}."
1002+
)
1003+
else:
1004+
msg = (
1005+
"A Zarr V3 array metadata document was not found in store "
1006+
f"{store_path.store!r} at path {store_path.path!r}."
1007+
)
1008+
raise ArrayNotFoundError(msg) from e
1009+
10061010
# TODO: remove this cast when we have better type hints
10071011
_metadata_dict = cast("ArrayV3MetadataDict", metadata_dict)
10081012
return cls(store_path=store_path, metadata=_metadata_dict)
@@ -1844,15 +1848,21 @@ async def resize(self, new_shape: ShapeLike, delete_outside_chunks: bool = True)
18441848
old_chunk_coords = set(self.metadata.chunk_grid.all_chunk_coords(self.metadata.shape))
18451849
new_chunk_coords = set(self.metadata.chunk_grid.all_chunk_coords(new_shape))
18461850

1847-
async def _delete_key(key: str) -> None:
1848-
await (self.store_path / key).delete()
1851+
# Use HighLevelStore.delete_chunk for semantic chunk operations
1852+
from zarr.storage import HighLevelStore
1853+
1854+
# Store is always a HighLevelStore (ensured by make_store_path)
1855+
hl_store = self.store_path.store
1856+
assert isinstance(hl_store, HighLevelStore)
1857+
1858+
async def _delete_chunk(chunk_coords: tuple[int, ...]) -> None:
1859+
await hl_store.delete_chunk(
1860+
self.store_path.path, chunk_coords, metadata=self.metadata
1861+
)
18491862

18501863
await concurrent_map(
1851-
[
1852-
(self.metadata.encode_chunk_key(chunk_coords),)
1853-
for chunk_coords in old_chunk_coords.difference(new_chunk_coords)
1854-
],
1855-
_delete_key,
1864+
[(chunk_coords,) for chunk_coords in old_chunk_coords.difference(new_chunk_coords)],
1865+
_delete_chunk,
18561866
zarr_config.get("async.concurrency"),
18571867
)
18581868

@@ -2011,6 +2021,15 @@ async def info_complete(self) -> Any:
20112021
def _info(
20122022
self, count_chunks_initialized: int | None = None, count_bytes_stored: int | None = None
20132023
) -> Any:
2024+
# Get the store type - if it's a HighLevelStore, get the underlying store type
2025+
from zarr.storage import HighLevelStore
2026+
2027+
store = self.store_path.store
2028+
if isinstance(store, HighLevelStore):
2029+
store_type = type(store.store).__name__
2030+
else:
2031+
store_type = type(store).__name__
2032+
20142033
return ArrayInfo(
20152034
_zarr_format=self.metadata.zarr_format,
20162035
_data_type=self._zdtype,
@@ -2023,7 +2042,7 @@ def _info(
20232042
_compressors=self.compressors,
20242043
_filters=self.filters,
20252044
_serializer=self.serializer,
2026-
_store_type=type(self.store_path.store).__name__,
2045+
_store_type=store_type,
20272046
_count_bytes=self.nbytes,
20282047
_count_bytes_stored=count_bytes_stored,
20292048
_count_chunks_initialized=count_chunks_initialized,

0 commit comments

Comments
 (0)