-
Notifications
You must be signed in to change notification settings - Fork 57
Updates zarr-parser to use obstore list_async instead of concurrent_map
#892
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
aa93b8b
37dff68
2fa25a7
626d0b9
9d6a312
bab147d
17e35cc
6cbb7c0
b400a34
19122a7
e22981f
fda8ce6
bbd6a1f
9cba9e8
f50b724
1be91cc
4ed8295
9114613
31c8ed0
e0ddfc2
d96d5c5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,6 +6,7 @@ | |
| from pathlib import Path | ||
| from typing import TYPE_CHECKING, Any, cast | ||
|
|
||
| import numpy as np | ||
| import zarr | ||
| from obspec_utils.registry import ObjectStoreRegistry | ||
| from zarr.api.asynchronous import open_group as open_group_async | ||
|
|
@@ -19,8 +20,10 @@ | |
| ManifestGroup, | ||
| ManifestStore, | ||
| ) | ||
| from virtualizarr.manifests.manifest import validate_and_normalize_path_to_uri | ||
| from virtualizarr.vendor.zarr.core.common import _concurrent_map | ||
| from virtualizarr.manifests.manifest import ( | ||
| parse_manifest_index, | ||
| validate_and_normalize_path_to_uri, | ||
| ) | ||
|
|
||
| if TYPE_CHECKING: | ||
| import zarr | ||
|
|
@@ -90,33 +93,43 @@ async def _handle_scalar_array( | |
|
|
||
|
|
||
| async def _build_chunk_mapping( | ||
| chunk_keys: list[str], zarr_array: ZarrArrayType, path: str, prefix: str | ||
| zarr_array: ZarrArrayType, path: str, prefix: str | ||
| ) -> dict[str, dict[str, Any]]: | ||
| """ | ||
| Build chunk mapping from a list of chunk keys. | ||
| Build chunk mapping by listing the object store with obstore. | ||
|
|
||
| Uses obstore's list_async with Arrow output to get chunk paths and sizes | ||
| in a single Rust-level call, avoiding per-chunk getsize calls. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| chunk_keys | ||
| List of storage keys for chunks. | ||
| zarr_array | ||
| The Zarr array. | ||
| path | ||
| Base path for constructing chunk paths. | ||
| prefix | ||
| Prefix to strip from chunk keys. | ||
| Prefix to list and strip from chunk keys. | ||
|
|
||
| Returns | ||
| ------- | ||
| dict | ||
| Mapping of normalized chunk coordinates to storage locations. | ||
| """ | ||
|
|
||
| size_map: dict[str, int] = {} | ||
| stream = zarr_array.store.store.list_async(prefix=prefix, return_arrow=True) | ||
|
||
| async for batch in stream: | ||
| size_map.update( | ||
| zip(batch.column("path").to_pylist(), batch.column("size").to_pylist()) | ||
|
||
| ) | ||
|
|
||
| # filter out metadata files | ||
| chunk_keys = [k for k in size_map if not k.split("/")[-1].startswith(".")] | ||
|
|
||
| if not chunk_keys: | ||
| return {} | ||
|
|
||
| lengths = await _concurrent_map( | ||
| [(k,) for k in chunk_keys], zarr_array.store.getsize | ||
| ) | ||
| lengths = [size_map[k] for k in chunk_keys] | ||
|
||
| dict_keys = _normalize_chunk_keys(chunk_keys, prefix) | ||
| paths = [join_url(path, k) for k in chunk_keys] | ||
| offsets = [0] * len(lengths) | ||
|
|
@@ -158,24 +171,7 @@ async def get_chunk_mapping( | |
| scalar_key = f"{prefix}0" | ||
| return await _handle_scalar_array(zarr_array, path, scalar_key) | ||
|
|
||
| # List all keys under the array prefix, filtering out metadata files | ||
| prefix_keys = [(x,) async for x in zarr_array.store.list_prefix(prefix)] | ||
| if not prefix_keys: | ||
| return {} | ||
|
|
||
| metadata_files = {".zarray", ".zattrs", ".zgroup", ".zmetadata"} | ||
| chunk_keys = [] | ||
| for key_tuple in prefix_keys: | ||
| key = key_tuple[0] | ||
| file_name = ( | ||
| key[len(prefix) :] | ||
| if prefix and key.startswith(prefix) | ||
| else key.split("/")[-1] | ||
| ) | ||
| if file_name not in metadata_files: | ||
| chunk_keys.append(key) | ||
|
|
||
| return await _build_chunk_mapping(chunk_keys, zarr_array, path, prefix) | ||
| return await _build_chunk_mapping(zarr_array, path, prefix) | ||
|
|
||
| def get_metadata(self, zarr_array: ZarrArrayType) -> ArrayV3Metadata: | ||
| """Convert V2 metadata to V3 format.""" | ||
|
|
@@ -272,12 +268,7 @@ async def get_chunk_mapping( | |
|
|
||
| # List chunk keys under the c/ subdirectory | ||
| prefix = f"{name}/c/" if name else "c/" | ||
| prefix_keys = [(x,) async for x in zarr_array.store.list_prefix(prefix)] | ||
| if not prefix_keys: | ||
| return {} | ||
|
|
||
| chunk_keys = [x[0] for x in prefix_keys] | ||
| return await _build_chunk_mapping(chunk_keys, zarr_array, path, prefix) | ||
| return await _build_chunk_mapping(zarr_array, path, prefix) | ||
|
|
||
| def get_metadata(self, zarr_array: ZarrArrayType) -> ArrayV3Metadata: | ||
| """Return V3 metadata as-is (no conversion needed).""" | ||
|
|
@@ -322,17 +313,28 @@ async def build_chunk_manifest(zarr_array: ZarrArrayType, path: str) -> ChunkMan | |
| """ | ||
| strategy = get_strategy(zarr_array) | ||
| chunk_map = await strategy.get_chunk_mapping(zarr_array, path) | ||
| chunk_grid_shape = zarr_array._chunk_grid_shape | ||
|
|
||
| if not chunk_map: | ||
| import math | ||
|
|
||
| if zarr_array.shape and zarr_array.chunks: | ||
| chunk_grid_shape = tuple( | ||
| math.ceil(s / c) for s, c in zip(zarr_array.shape, zarr_array.chunks) | ||
| ) | ||
| return ChunkManifest(chunk_map, shape=chunk_grid_shape) | ||
|
|
||
| return ChunkManifest(chunk_map) | ||
| return ChunkManifest(chunk_map, shape=chunk_grid_shape) | ||
|
|
||
| # Pre-allocate N-D numpy arrays shaped like the chunk grid. | ||
| # Empty string paths indicate missing chunks (sparse arrays). | ||
| paths_arr = np.empty(shape=chunk_grid_shape, dtype=np.dtypes.StringDType()) | ||
| offsets_arr = np.zeros(shape=chunk_grid_shape, dtype=np.dtype("uint64")) | ||
| lengths_arr = np.zeros(shape=chunk_grid_shape, dtype=np.dtype("uint64")) | ||
|
|
||
| for key, entry in chunk_map.items(): | ||
| idx = parse_manifest_index(key) | ||
| paths_arr[idx] = entry["path"] | ||
| offsets_arr[idx] = entry["offset"] | ||
| lengths_arr[idx] = entry["length"] | ||
|
|
||
| return ChunkManifest.from_arrays( | ||
| paths=paths_arr, | ||
| offsets=offsets_arr, | ||
| lengths=lengths_arr, | ||
| ) | ||
|
|
||
|
|
||
| def get_metadata(zarr_array: ZarrArrayType) -> ArrayV3Metadata: | ||
|
|
||
This file was deleted.
Uh oh!
There was an error while loading. Please reload this page.