Skip to content

Commit d3a7da2

Browse files
dmitriyrepinDmitriy RepintasansalBrianMichell
authored
segy_to_mdio_v1 (TGSAI#577)
* Templates and TemplateRegistry * Fix pre-commit issues * Rever dev container changes * PR Review: address issues * PR Review: register default templates at registry initialization * Dockerfile.dev * segy_to_mdio_v1 * Clean up * Prototype review notes * Add dev comment * Add notes that will be deleted later * segy_to_mdio_v1 pass 1 * indexing_v1 and blocked_io_v1 * Remove DEV notes * Clean up * Document bug location * Work around IndexError * Clean temporary code * More clean up * Remove *_1 infrastructure files * Restore accidently removed dask.array * Created an issue reproducer * Make the required template properties public * Simplify type converter * Improve templates * Move test_type_converter.py * Move test_type_converter.py * Revert to use the original grid * Integrate segy_to_mdio_v1_customized, fix indexing * Add dimension coordinates in tem,plates * Write statistics to Zarr * Delete factory_v1.py * Complete integrationtest. Fix coordinates * Fir pre-commit errors * PR review: fix trace_worker docstring * Review: address some of the issue * Fix bug * dding todo for sum squares calculation * Refactor ChunkIterator * Refactor ChunkIterator into ChunkIteratorV1 * Remove segy_to_mdio_v1_customized, dataset_serializer.to_zarr * Add support for trace headers without using _FillValue * Use StorageLocation in trace_worker_v1 * Fix statistics attribute name * PR review changes * PR Improvements: do a single write * TODO: chunked write for non-dimensional coordinates and trace_mask * Update StorageLocation to use fsspec * Reformat with pre-commit * Use domain name in get_grid_plan * Fix non-dim coords and chunk_samples=False * Convert test_3d_import_v1 to V1 * Fix test_meta_dataset_read * remove whitespace * clean up comments * update deps in lockfile * simplify dim and non-dim coordinate handling after scan * remove compatibility tests * add filtering capability to header worker * accept subset filter to pass to workers * make v1 grid planner awesome * double to single underscores in test names * fix broken test harnesses due to incorrect Sequence import * clean up dev comment * clean up whitespace * use new module name * clean up segy_to_mdio_v1 * fix whitespace and remove unnecessary list call * these are defined as float64 in template Previous check was passing due to an error in assignment during creation of coordinate variables * fix missing dimension coordinate for vertical axis * fix incorrect dtype comparison for time variable * simplify and fix critical bugs * rename v1 out of things and get rid of old code * remove fixed todo * remove more v1 from names * rename chunk iterator * fix dimensionality in tests due to new (missing) vertical dimension coordinate * add todo for numpy ingestion * fix references to non-v1 naming * extract grid operations to its own function * fix typo Co-authored-by: Brian Michell <[email protected]> * add todo for simplifying storage location * Remove no_fill_var_names, add domain var to Seismic3DPreStackShotTemplate * Part 2 of the previous commit * pre-commit formatting * remove dev mount --------- Co-authored-by: Dmitriy Repin <[email protected]> Co-authored-by: Altay Sansal <[email protected]> Co-authored-by: Brian Michell <[email protected]>
1 parent d21278c commit d3a7da2

37 files changed

+2441
-1319
lines changed

src/mdio/api/convenience.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,11 @@ def copy_mdio( # noqa: PLR0913
7979

8080
writer.live_mask[:] = reader.live_mask[:]
8181

82-
iterator = ChunkIterator(reader._traces, chunk_samples=False)
82+
shape = reader._traces.shape
83+
chunks = reader._traces.chunks
84+
chunks = chunks[:-1] + (shape[-1],) # don't chunk samples
85+
86+
iterator = ChunkIterator(shape=shape, chunks=chunks)
8387
progress = tqdm(iterator, unit="block")
8488
progress.set_description(desc=f"Copying data for '{access_pattern=}'")
8589
for slice_ in progress:
@@ -177,7 +181,10 @@ def create_rechunk_plan(
177181

178182
n_dimension = len(data_array.shape)
179183
dummy_array = zarr.empty(shape=data_array.shape, chunks=(MAX_BUFFER,) * n_dimension)
180-
iterator = ChunkIterator(dummy_array)
184+
185+
shape = dummy_array.shape
186+
chunks = dummy_array.chunks
187+
iterator = ChunkIterator(shape=shape, chunks=chunks)
181188

182189
return metadata_arrs, data_arrs, live_mask, iterator
183190

src/mdio/converters/numpy.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import numpy as np
88

99
from mdio.api.accessor import MDIOWriter
10-
from mdio.converters.segy import get_compressor
1110
from mdio.core.dimension import Dimension
1211
from mdio.core.factory import MDIOCreateConfig
1312
from mdio.core.factory import MDIOVariableConfig
@@ -137,6 +136,11 @@ def numpy_to_mdio( # noqa: PLR0913
137136
suffix = [str(idx) for idx, value in enumerate(suffix) if value is not None]
138137
suffix = "".join(suffix)
139138

139+
# TODO(Dmitrit Repin): Implement Numpy converted in MDIO v1
140+
# https://github.com/TGSAI/mdio-python/issues/596
141+
def get_compressor(lossless: bool, tolerance: float) -> list[str]:
142+
pass
143+
140144
compressors = get_compressor(lossless, compression_tolerance)
141145
mdio_var = MDIOVariableConfig(
142146
name=f"chunked_{suffix}",

src/mdio/converters/segy.py

Lines changed: 256 additions & 351 deletions
Large diffs are not rendered by default.
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
"""A module for converting numpy dtypes to MDIO scalar and structured types."""
2+
3+
from numpy import dtype as np_dtype
4+
5+
from mdio.schemas.dtype import ScalarType
6+
from mdio.schemas.dtype import StructuredField
7+
from mdio.schemas.dtype import StructuredType
8+
9+
10+
def to_scalar_type(data_type: np_dtype) -> ScalarType:
11+
"""Convert numpy dtype to MDIO ScalarType.
12+
13+
Out of the 24 built-in numpy scalar type objects
14+
(see https://numpy.org/doc/stable/reference/arrays.dtypes.html)
15+
this function supports only a limited subset:
16+
ScalarType.INT8 <-> int8
17+
ScalarType.INT16 <-> int16
18+
ScalarType.INT32 <-> int32
19+
ScalarType.INT64 <-> int64
20+
ScalarType.UINT8 <-> uint8
21+
ScalarType.UINT16 <-> uint16
22+
ScalarType.UINT32 <-> uint32
23+
ScalarType.UINT64 <-> uint64
24+
ScalarType.FLOAT32 <-> float32
25+
ScalarType.FLOAT64 <-> float64
26+
ScalarType.COMPLEX64 <-> complex64
27+
ScalarType.COMPLEX128 <-> complex128
28+
ScalarType.BOOL <-> bool
29+
30+
Args:
31+
data_type: numpy dtype to convert
32+
33+
Returns:
34+
ScalarType: corresponding MDIO scalar type
35+
36+
Raises:
37+
ValueError: if dtype is not supported
38+
"""
39+
try:
40+
return ScalarType(data_type.name)
41+
except ValueError as exc:
42+
err = f"Unsupported numpy dtype '{data_type.name}' for conversion to ScalarType."
43+
raise ValueError(err) from exc
44+
45+
46+
def to_structured_type(data_type: np_dtype) -> StructuredType:
47+
"""Convert numpy dtype to MDIO StructuredType.
48+
49+
This function supports only a limited subset of structured types.
50+
In particular:
51+
It does not support nested structured types.
52+
It supports fields of only 13 out of 24 built-in numpy scalar types.
53+
(see `to_scalar_type` for details).
54+
55+
Args:
56+
data_type: numpy dtype to convert
57+
58+
Returns:
59+
StructuredType: corresponding MDIO structured type
60+
61+
Raises:
62+
ValueError: if dtype is not structured or has no fields
63+
64+
"""
65+
if data_type is None or len(data_type.names or []) == 0:
66+
err = "None or empty dtype provided, cannot convert to StructuredType."
67+
raise ValueError(err)
68+
69+
fields = []
70+
for field_name in data_type.names:
71+
field_dtype = data_type.fields[field_name][0]
72+
scalar_type = to_scalar_type(field_dtype)
73+
structured_field = StructuredField(name=field_name, format=scalar_type)
74+
fields.append(structured_field)
75+
return StructuredType(fields=fields)
76+
77+
78+
def to_numpy_dtype(data_type: ScalarType | StructuredType) -> np_dtype:
79+
"""Get the numpy dtype for a variable."""
80+
if isinstance(data_type, ScalarType):
81+
return np_dtype(data_type.value)
82+
if isinstance(data_type, StructuredType):
83+
return np_dtype([(f.name, f.format.value) for f in data_type.fields])
84+
msg = f"Expected ScalarType or StructuredType, got '{type(data_type).__name__}'"
85+
raise ValueError(msg)

src/mdio/core/indexing.py

Lines changed: 66 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -4,78 +4,83 @@
44
from math import ceil
55

66
import numpy as np
7-
from zarr import Array
87

98

109
class ChunkIterator:
11-
"""Iterator for traversing a Zarr array in chunks.
10+
"""Chunk iterator for multi-dimensional arrays.
1211
13-
This iterator yields tuples of slices corresponding to the chunk boundaries of a Zarr array.
14-
It supports chunking all dimensions or taking the full extent of the last dimension.
12+
This iterator takes an array shape and chunks and every time it is iterated, it returns
13+
a dictionary (if dimensions are provided) or a tuple of slices that align with
14+
chunk boundaries. When dimensions are provided, they are used as the dictionary keys.
1515
1616
Args:
17-
array: The Zarr array to iterate, providing shape and chunk sizes.
18-
chunk_samples: If True, chunks all dimensions. If False, takes the full extent of the
19-
last dimension. Defaults to True.
20-
21-
22-
Example:
23-
>>> import zarr
24-
>>> arr = zarr.array(np.zeros((10, 20)), chunks=(3, 4))
25-
>>> it = ChunkIterator(arr)
26-
>>> for slices in it:
27-
... print(slices)
28-
(slice(0, 3, None), slice(0, 4, None))
29-
(slice(0, 3, None), slice(4, 8, None))
30-
...
31-
>>> it = ChunkIterator(arr, chunk_samples=False)
32-
>>> for slices in it:
33-
... print(slices)
34-
(slice(0, 3, None), slice(0, 20, None))
35-
(slice(3, 6, None), slice(0, 20, None))
36-
...
17+
shape: The shape of the array.
18+
chunks: The chunk sizes for each dimension.
19+
dim_names: The names of the array dimensions, to be used with DataArray.isel().
20+
If the dim_names are not provided, a tuple of the slices will be returned.
21+
22+
Attributes: # noqa: DOC602
23+
arr_shape: Shape of the array.
24+
len_chunks: Length of chunks in each dimension.
25+
dim_chunks: Number of chunks in each dimension.
26+
num_chunks: Total number of chunks.
27+
28+
Examples:
29+
>> chunks = (3, 4, 5)
30+
>> shape = (5, 11, 19)
31+
>> dims = ["inline", "crossline", "depth"]
32+
>>
33+
>> iter = ChunkIterator(shape=shape, chunks=chunks, dim_names=dims)
34+
>> for i in range(13):
35+
>> region = iter.__next__()
36+
>> print(region)
37+
{ "inline": slice(3,6, None), "crossline": slice(0,4, None), "depth": slice(0,5, None) }
38+
39+
>> iter = ChunkIterator(shape=shape, chunks=chunks, dim_names=None)
40+
>> for i in range(13):
41+
>> region = iter.__next__()
42+
>> print(region)
43+
(slice(3,6,None), slice(0,4,None), slice(0,5,None))
3744
"""
3845

39-
def __init__(self, array: Array, chunk_samples: bool = True):
40-
self.arr_shape = array.shape
41-
self.len_chunks = array.chunks
42-
43-
# If chunk_samples is False, set the last dimension's chunk size to its full extent
44-
if not chunk_samples:
45-
self.len_chunks = self.len_chunks[:-1] + (self.arr_shape[-1],)
46-
47-
# Calculate the number of chunks per dimension
48-
self.dim_chunks = [
49-
ceil(len_dim / chunk)
50-
for len_dim, chunk in zip(self.arr_shape, self.len_chunks, strict=True)
51-
]
46+
def __init__(
47+
self, shape: tuple[int, ...], chunks: tuple[int, ...], dim_names: tuple[str, ...] = None
48+
):
49+
self.arr_shape = tuple(shape) # Deep copy to ensure immutability
50+
self.len_chunks = tuple(chunks) # Deep copy to ensure immutability
51+
self.dims = dim_names
52+
53+
# Compute number of chunks per dimension, and total number of chunks
54+
self.dim_chunks = tuple(
55+
[
56+
ceil(len_dim / chunk)
57+
for len_dim, chunk in zip(self.arr_shape, self.len_chunks, strict=True)
58+
]
59+
)
5260
self.num_chunks = np.prod(self.dim_chunks)
5361

54-
# Set up chunk index combinations using ranges for each dimension
62+
# Under the hood stuff for the iterator. This generates C-ordered
63+
# permutation of chunk indices.
5564
dim_ranges = [range(dim_len) for dim_len in self.dim_chunks]
5665
self._ranges = itertools.product(*dim_ranges)
5766
self._idx = 0
5867

5968
def __iter__(self) -> "ChunkIterator":
60-
"""Return the iterator object itself."""
69+
"""Iteration context."""
6170
return self
6271

6372
def __len__(self) -> int:
64-
"""Return the total number of chunks."""
73+
"""Get total number of chunks."""
6574
return self.num_chunks
6675

67-
def __next__(self) -> tuple[slice, ...]:
68-
"""Yield the next set of chunk slices.
69-
70-
Returns:
71-
A tuple of slice objects for each dimension.
72-
73-
Raises:
74-
StopIteration: When all chunks have been iterated over.
75-
"""
76-
if self._idx < self.num_chunks:
76+
def __next__(self) -> dict[str, slice]:
77+
"""Iteration logic."""
78+
if self._idx <= self.num_chunks:
79+
# We build slices here. It is dimension agnostic
7780
current_start = next(self._ranges)
7881

82+
# TODO (Dmitriy Repin): Enhance ChunkIterator to make the last slice, if needed, smaller
83+
# https://github.com/TGSAI/mdio-python/issues/586
7984
start_indices = tuple(
8085
dim * chunk for dim, chunk in zip(current_start, self.len_chunks, strict=True)
8186
)
@@ -88,7 +93,17 @@ def __next__(self) -> tuple[slice, ...]:
8893
slice(start, stop) for start, stop in zip(start_indices, stop_indices, strict=True)
8994
)
9095

96+
if self.dims: # noqa SIM108
97+
# Example
98+
# {"inline":slice(3,6,None), "crossline":slice(0,4,None), "depth":slice(0,5,None)}
99+
result = dict(zip(self.dims, slices, strict=False))
100+
else:
101+
# Example
102+
# (slice(3,6,None), slice(0,4,None), slice(0,5,None))
103+
result = slices
104+
91105
self._idx += 1
92-
return slices
106+
107+
return result
93108

94109
raise StopIteration

src/mdio/core/storage_location.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
"""StorageLocation class for managing local and cloud storage locations."""
2+
3+
from pathlib import Path
4+
from typing import Any
5+
6+
import fsspec
7+
8+
9+
# TODO(Dmitriy Repin): Reuse fsspec functions for some methods we implemented here
10+
# https://github.com/TGSAI/mdio-python/issues/597
11+
class StorageLocation:
12+
"""A class to represent a local or cloud storage location for SEG-Y or MDIO files.
13+
14+
This class abstracts the storage location, allowing for both local file paths and
15+
cloud storage URIs (e.g., S3, GCS). It uses fsspec to check existence and manage options.
16+
Note, we do not want to make it a dataclass because we want the uri and the options to
17+
be read-only immutable properties.
18+
19+
uri: The URI of the storage location (e.g., '/path/to/file', 'file:///path/to/file',
20+
's3://bucket/path', 'gs://bucket/path').
21+
options: Optional dictionary of options for the cloud, such as credentials.
22+
23+
"""
24+
25+
def __init__(self, uri: str = "", options: dict[str, Any] = None):
26+
self._uri = uri
27+
self._options = options or {}
28+
self._fs = None
29+
30+
if uri.startswith(("s3://", "gs://")):
31+
return
32+
33+
if uri.startswith("file://"):
34+
self._uri = self._uri.removeprefix("file://")
35+
# For local paths, ensure they are absolute and resolved
36+
self._uri = str(Path(self._uri).resolve())
37+
return
38+
39+
@property
40+
def uri(self) -> str:
41+
"""Get the URI (read-only)."""
42+
return self._uri
43+
44+
@property
45+
def options(self) -> dict[str, Any]:
46+
"""Get the options (read-only)."""
47+
# Return a copy to prevent external modification
48+
return self._options.copy()
49+
50+
@property
51+
def _filesystem(self) -> fsspec.AbstractFileSystem:
52+
"""Get the fsspec filesystem instance for this storage location."""
53+
if self._fs is None:
54+
self._fs = fsspec.filesystem(self._protocol, **self._options)
55+
return self._fs
56+
57+
@property
58+
def _path(self) -> str:
59+
"""Extract the path portion from the URI."""
60+
if "://" in self._uri:
61+
return self._uri.split("://", 1)[1]
62+
return self._uri # For local paths without file:// prefix
63+
64+
@property
65+
def _protocol(self) -> str:
66+
"""Extract the protocol/scheme from the URI."""
67+
if "://" in self._uri:
68+
return self._uri.split("://", 1)[0]
69+
return "file" # Default to file protocol
70+
71+
def exists(self) -> bool:
72+
"""Check if the storage location exists using fsspec."""
73+
try:
74+
return self._filesystem.exists(self._path)
75+
except Exception as e:
76+
# Log the error and return False for safety
77+
# In a production environment, you might want to use proper logging
78+
print(f"Error checking existence of {self._uri}: {e}")
79+
return False
80+
81+
def __str__(self) -> str:
82+
"""String representation of the storage location."""
83+
return self._uri
84+
85+
def __repr__(self) -> str:
86+
"""Developer representation of the storage location."""
87+
return f"StorageLocation(uri='{self._uri}', options={self._options})"

0 commit comments

Comments
 (0)