Skip to content

Commit 3fede76

Browse files
Altay Sansaltasansal
authored andcommitted
Support for Zarr v3
1 parent 3c005df commit 3fede76

File tree

6 files changed

+53
-100
lines changed

6 files changed

+53
-100
lines changed

src/mdio/api/accessor.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -186,13 +186,13 @@ def _validate_store(self, storage_options):
186186
if storage_options is None:
187187
storage_options = {}
188188

189-
self.store = process_url(
189+
self.url = process_url(
190190
url=self.url,
191-
mode=self.mode,
192-
storage_options=storage_options,
193-
memory_cache_size=self._memory_cache_size,
194191
disk_cache=self._disk_cache,
195192
)
193+
self.store = zarr.open(
194+
self.url, mode=self.mode, storage_options=storage_options
195+
).store
196196

197197
def _connect(self):
198198
"""Open the zarr root."""
@@ -375,12 +375,12 @@ def stats(self, value: dict) -> None:
375375
@property
376376
def _metadata_group(self) -> zarr.Group:
377377
"""Get metadata zarr.group handle."""
378-
return self.root.metadata
378+
return self.root["metadata"]
379379

380380
@property
381381
def _data_group(self) -> zarr.Group:
382382
"""Get data zarr.Group handle."""
383-
return self.root.data
383+
return self.root["data"]
384384

385385
def __getitem__(self, item: int | tuple) -> npt.ArrayLike | da.Array | tuple:
386386
"""Data getter."""

src/mdio/api/convenience.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
from typing import TYPE_CHECKING
66

77
import zarr
8+
from numcodecs import Blosc
89
from tqdm.auto import tqdm
9-
from zarr import Blosc
1010

1111
from mdio.api.io_utils import process_url
1212
from mdio.core.indexing import ChunkIterator
@@ -21,6 +21,7 @@
2121
from mdio import MDIOReader
2222

2323

24+
# TODO(Altay): This is not implemented in Zarr v3 yet.
2425
def copy_mdio( # noqa: PLR0913
2526
source: MDIOReader,
2627
dest_path_or_buffer: str,
@@ -242,6 +243,7 @@ def rechunk_batch(
242243
write_rechunked_values(source, suffix_list, *plan)
243244

244245

246+
# TODO(Altay): This needs to be validated
245247
def rechunk(
246248
source: MDIOAccessor,
247249
chunks: tuple[int, ...],

src/mdio/api/io_utils.py

Lines changed: 8 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -2,40 +2,28 @@
22

33
from __future__ import annotations
44

5-
from typing import Any
6-
75
import dask.array as da
86
import zarr
9-
from zarr.storage import FSStore
7+
from zarr.storage._utils import normalize_path
108

119

1210
def process_url(
1311
url: str,
14-
mode: str,
15-
storage_options: dict[str, Any],
16-
memory_cache_size: int,
1712
disk_cache: bool,
18-
) -> FSStore:
13+
) -> str:
1914
"""Check read/write access to FSStore target and return FSStore with double caching.
2015
21-
It can use an in-memory Least Recently Used (LRU) cache implementation from
22-
Zarr, and optionally, a file cache (`simplecache` protocol from FSSpec) that
23-
is useful for remote stores.
24-
25-
File cache is only valid for remote stores. The LRU caching works
26-
on both remote and local.
16+
It can optionally use a file cache (`simplecache` protocol from fsspec) that
17+
is useful for remote stores. File cache is only useful for remote stores.
2718
2819
The `storage_options` argument represents a set of parameters to be passed
29-
to the FSSpec backend. Note that the format of `storage_options` is
20+
to the fsspec backend. Note that the format of `storage_options` is
3021
different if `disk_cache` is enabled or disabled, since `disk_cache`
3122
interanlly uses the simplecache protocol.
3223
3324
Args:
34-
url: FSSpec compliant url
35-
mode: Toggle for overwriting existing store
36-
storage_options: Storage options for the storage backend.
37-
memory_cache_size: Maximum in memory LRU cache size in bytes.
38-
disk_cache: This enables FSSpec's `simplecache` if True.
25+
url: fsspec compliant url
26+
disk_cache: This enables fsspec's `simplecache` if True.
3927
4028
Returns:
4129
Store with augmentations like cache, write verification etc.
@@ -52,7 +40,6 @@ def process_url(
5240
... url="s3://bucket/key",
5341
... mode="r",
5442
... storage_options={"key": "my_key", "secret": "my_secret"},
55-
... memory_cache_size=0,
5643
... disk_cache=False,
5744
... )
5845
@@ -64,7 +51,6 @@ def process_url(
6451
... url="s3://bucket/key",
6552
... mode="r",
6653
... storage_options={"s3": {"key": "my_key", "secret": "my_secret"}},
67-
... memory_cache_size=0,
6854
... disk_cache=True,
6955
... )
7056
@@ -77,35 +63,13 @@ def process_url(
7763
... "s3": {"key": "my_key", "secret": "my_secret"},
7864
... "simplecache": {"cache_storage": "custom/local/cache/path"},
7965
... },
80-
... memory_cache_size=0,
8166
... disk_cache=True,
8267
... )
8368
"""
8469
if disk_cache is True:
8570
url = "::".join(["simplecache", url])
8671

87-
# Strip whitespaces and slashes from end of string
88-
url = url.rstrip("/ ")
89-
90-
# Flag for checking write access
91-
check = True if mode == "w" else False
92-
93-
# TODO: Turning off write checking now because zarr has a bug.
94-
# Get rid of this once bug is fixed.
95-
check = False
96-
97-
store = FSStore(
98-
url=url,
99-
check=check,
100-
create=check,
101-
mode=mode,
102-
**storage_options,
103-
)
104-
105-
if memory_cache_size != 0:
106-
store = zarr.storage.LRUStoreCache(store=store, max_size=memory_cache_size)
107-
108-
return store
72+
return normalize_path(url)
10973

11074

11175
def open_zarr_array(group_handle: zarr.Group, name: str) -> zarr.Array:

src/mdio/converters/segy.py

Lines changed: 22 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,8 @@ def segy_to_mdio( # noqa: C901
343343
... grid_overrides={"HasDuplicates": True},
344344
... )
345345
"""
346+
zarr.config.set({"default_zarr_format": 2, "write_empty_chunks": False})
347+
346348
if index_names is None:
347349
index_names = [f"dim_{i}" for i in range(len(index_bytes))]
348350

@@ -364,13 +366,8 @@ def segy_to_mdio( # noqa: C901
364366
if storage_options_output is None:
365367
storage_options_output = {}
366368

367-
store = process_url(
368-
url=mdio_path_or_buffer,
369-
mode="w",
370-
storage_options=storage_options_output,
371-
memory_cache_size=0, # Making sure disk caching is disabled,
372-
disk_cache=False, # Making sure disk caching is disabled
373-
)
369+
url = process_url(url=mdio_path_or_buffer, disk_cache=False)
370+
root_group = zarr.open_group(url, mode="w", storage_options=storage_options_output)
374371

375372
# Open SEG-Y with MDIO's SegySpec. Endianness will be inferred.
376373
mdio_spec = mdio_segy_spec()
@@ -406,42 +403,43 @@ def segy_to_mdio( # noqa: C901
406403
logger.warning(f"Ingestion grid shape: {grid.shape}.")
407404
raise GridTraceCountError(np.sum(grid.live_mask), num_traces)
408405

409-
zarr_root = create_zarr_hierarchy(
410-
store=store,
406+
root_group = create_zarr_hierarchy(
407+
root_group=root_group,
411408
overwrite=overwrite,
412409
)
413410

414411
# Get UTC time, then add local timezone information offset.
415412
iso_datetime = datetime.now(timezone.utc).isoformat()
416413

417-
write_attribute(name="created", zarr_group=zarr_root, attribute=iso_datetime)
418-
write_attribute(name="api_version", zarr_group=zarr_root, attribute=API_VERSION)
414+
write_attribute(name="created", zarr_group=root_group, attribute=iso_datetime)
415+
write_attribute(name="api_version", zarr_group=root_group, attribute=API_VERSION)
419416

420417
dimensions_dict = [dim.to_dict() for dim in dimensions]
421-
write_attribute(name="dimension", zarr_group=zarr_root, attribute=dimensions_dict)
418+
write_attribute(name="dimension", zarr_group=root_group, attribute=dimensions_dict)
422419

423420
# Write trace count
424421
trace_count = np.count_nonzero(grid.live_mask)
425-
write_attribute(name="trace_count", zarr_group=zarr_root, attribute=trace_count)
422+
write_attribute(name="trace_count", zarr_group=root_group, attribute=trace_count)
426423

427424
# Note, live mask is not chunked since it's bool and small.
428-
zarr_root["metadata"].create_dataset(
429-
data=grid.live_mask,
425+
live_mask_arr = root_group["metadata"].create_array(
430426
name="live_mask",
431427
shape=grid.shape[:-1],
432-
chunks=-1,
433-
dimension_separator="/",
428+
chunks=grid.shape[:-1],
429+
dtype="bool",
430+
chunk_key_encoding={"name": "v2", "separator": "/"},
434431
)
432+
live_mask_arr[...] = grid.live_mask[...]
435433

436434
write_attribute(
437435
name="text_header",
438-
zarr_group=zarr_root["metadata"],
436+
zarr_group=root_group["metadata"],
439437
attribute=text_header.split("\n"),
440438
)
441439

442440
write_attribute(
443441
name="binary_header",
444-
zarr_group=zarr_root["metadata"],
442+
zarr_group=root_group["metadata"],
445443
attribute=binary_header.to_dict(),
446444
)
447445

@@ -470,8 +468,8 @@ def segy_to_mdio( # noqa: C901
470468
stats = blocked_io.to_zarr(
471469
segy_file=segy,
472470
grid=grid,
473-
data_root=zarr_root["data"],
474-
metadata_root=zarr_root["metadata"],
471+
data_root=root_group["data"],
472+
metadata_root=root_group["metadata"],
475473
name="_".join(["chunked", suffix]),
476474
dtype="float32",
477475
chunks=chunksize,
@@ -480,17 +478,7 @@ def segy_to_mdio( # noqa: C901
480478
)
481479

482480
for key, value in stats.items():
483-
write_attribute(name=key, zarr_group=zarr_root, attribute=value)
484-
485-
# Non-cached store for consolidating metadata.
486-
# If caching is enabled the metadata may fall out of cache hence
487-
# creating an incomplete `.zmetadata` file.
488-
store_nocache = process_url(
489-
url=mdio_path_or_buffer,
490-
mode="r+",
491-
storage_options=storage_options_output,
492-
memory_cache_size=0, # Making sure disk caching is disabled,
493-
disk_cache=False, # Making sure disk caching is disabled
494-
)
481+
write_attribute(name=key, zarr_group=root_group, attribute=value)
495482

496-
zarr.consolidate_metadata(store_nocache)
483+
# Finalize Zarr for fast open
484+
zarr.consolidate_metadata(root_group.store)

src/mdio/segy/blocked_io.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@
1111
import numpy as np
1212
from dask.array import Array
1313
from dask.array import map_blocks
14+
from numcodecs import Blosc
1415
from psutil import cpu_count
1516
from tqdm.auto import tqdm
16-
from zarr import Blosc
1717
from zarr import Group
1818

1919
from mdio.core import Grid
@@ -90,26 +90,24 @@ def to_zarr(
9090
"install 'zfpy' or install mdio with `--extras lossy`"
9191
)
9292

93-
trace_array = data_root.create_dataset(
93+
trace_array = data_root.create_array(
9494
name=name,
9595
shape=grid.shape,
9696
compressor=trace_compressor,
9797
chunks=chunks,
98-
dimension_separator="/",
99-
write_empty_chunks=False,
98+
chunk_key_encoding={"name": "v2", "separator": "/"},
10099
**kwargs,
101100
)
102101

103102
# Get header dtype in native order (little-endian 99.9% of the time)
104103
header_dtype = segy_file.spec.trace.header.dtype.newbyteorder("=")
105-
header_array = metadata_root.create_dataset(
104+
header_array = metadata_root.create_array(
106105
name="_".join([name, "trace_headers"]),
107106
shape=grid.shape[:-1], # Same spatial shape as data
108107
chunks=chunks[:-1], # Same spatial chunks as data
108+
chunk_key_encoding={"name": "v2", "separator": "/"},
109109
compressor=header_compressor,
110110
dtype=header_dtype,
111-
dimension_separator="/",
112-
write_empty_chunks=False,
113111
)
114112

115113
# Initialize chunk iterator (returns next chunk slice indices each iteration)

src/mdio/segy/helpers_segy.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,21 @@
11
"""Helper functions for tinkering with SEG-Y related Zarr."""
22

3-
from zarr import Group
4-
from zarr import open_group
3+
from typing import TYPE_CHECKING
4+
55
from zarr.errors import ContainsGroupError
6-
from zarr.storage import FSStore
76

87
from mdio.core.exceptions import MDIOAlreadyExistsError
98

109

11-
def create_zarr_hierarchy(store: FSStore, overwrite: bool) -> Group:
10+
if TYPE_CHECKING:
11+
from zarr.core import Group
12+
13+
14+
def create_zarr_hierarchy(root_group: "Group", overwrite: bool) -> "Group":
1215
"""Create `zarr` hierarchy for SEG-Y files.
1316
1417
Args:
15-
store: Output path where the converted output is written.
18+
root_group: Output root group where data will be written.
1619
overwrite: Toggle for overwriting existing store.
1720
1821
Returns:
@@ -21,14 +24,12 @@ def create_zarr_hierarchy(store: FSStore, overwrite: bool) -> Group:
2124
Raises:
2225
MDIOAlreadyExistsError: If a file with data already exists.
2326
"""
24-
root_group = open_group(store=store)
25-
2627
try:
2728
root_group.create_group(name="data", overwrite=overwrite)
2829
root_group.create_group(name="metadata", overwrite=overwrite)
2930
except ContainsGroupError as e:
3031
msg = (
31-
f"An MDIO file with data already exists at {store.path}. "
32+
f"An MDIO file with data already exists at {root_group.store.path}. "
3233
"If this is intentional, please specify 'overwrite=True'."
3334
)
3435
raise MDIOAlreadyExistsError(msg) from e

0 commit comments

Comments
 (0)