|
| 1 | +"""Conversion from Numpy to MDIO v1 format.""" |
| 2 | + |
| 3 | +from __future__ import annotations |
| 4 | + |
| 5 | +import logging |
| 6 | +from typing import TYPE_CHECKING |
| 7 | + |
| 8 | +import numpy as np |
| 9 | + |
| 10 | +from mdio.api.io import _normalize_path |
| 11 | +from mdio.api.io import to_mdio |
| 12 | +from mdio.builder.schemas.chunk_grid import RegularChunkGrid |
| 13 | +from mdio.builder.schemas.chunk_grid import RegularChunkShape |
| 14 | +from mdio.builder.schemas.compressors import Blosc |
| 15 | +from mdio.builder.schemas.compressors import BloscCname |
| 16 | +from mdio.builder.schemas.dtype import ScalarType |
| 17 | +from mdio.builder.schemas.v1.variable import VariableMetadata |
| 18 | +from mdio.converters.type_converter import to_scalar_type |
| 19 | +from mdio.builder.templates.base import AbstractDatasetTemplate |
| 20 | +from mdio.builder.xarray_builder import to_xarray_dataset |
| 21 | +from mdio.core.dimension import Dimension |
| 22 | +from mdio.core.grid import Grid |
| 23 | +from mdio.core.utils_write import MAX_COORDINATES_BYTES |
| 24 | +from mdio.core.utils_write import MAX_SIZE_LIVE_MASK |
| 25 | +from mdio.core.utils_write import get_constrained_chunksize |
| 26 | + |
| 27 | +if TYPE_CHECKING: |
| 28 | + from pathlib import Path |
| 29 | + from typing import Any |
| 30 | + |
| 31 | + from numpy.typing import DTypeLike |
| 32 | + from numpy.typing import NDArray |
| 33 | + from upath import UPath |
| 34 | + from xarray import Dataset as xr_Dataset |
| 35 | + |
| 36 | + from mdio.builder.schemas import Dataset |
| 37 | + |
| 38 | +logger = logging.getLogger(__name__) |
| 39 | + |
| 40 | + |
| 41 | +def get_compressor(lossless: bool, compression_tolerance: float) -> list: |
| 42 | + """Get compressor configuration based on compression settings.""" |
| 43 | + if lossless: |
| 44 | + return [Blosc(cname=BloscCname.zstd)] |
| 45 | + else: |
| 46 | + # For lossy compression, we would need ZFP, but let's keep it simple for now |
| 47 | + # and use lossless as fallback |
| 48 | + logger.warning("Lossy compression not yet implemented, using lossless") |
| 49 | + return [Blosc(cname=BloscCname.zstd)] |
| 50 | + |
| 51 | + |
| 52 | +def _prepare_inputs( |
| 53 | + array: NDArray, |
| 54 | + mdio_template: AbstractDatasetTemplate, |
| 55 | + chunksize: tuple[int, ...] | None, |
| 56 | + index_coords: dict[str, NDArray] | None, |
| 57 | +) -> tuple[tuple[int, ...], dict[str, NDArray]]: |
| 58 | + """Prepare inputs and set defaults for chunksize and coordinates.""" |
| 59 | + dim_names = mdio_template.dimension_names |
| 60 | + |
| 61 | + # Use template's chunk shape if not provided |
| 62 | + if chunksize is None: |
| 63 | + chunksize = mdio_template.full_chunk_shape |
| 64 | + |
| 65 | + # Create default coordinates if not provided |
| 66 | + if index_coords is None: |
| 67 | + index_coords = {} |
| 68 | + for name, size in zip(dim_names, array.shape, strict=True): |
| 69 | + index_coords[name] = np.arange(size, dtype=np.int32) |
| 70 | + |
| 71 | + return chunksize, index_coords |
| 72 | + |
| 73 | + |
| 74 | +def _build_grid_and_dataset( |
| 75 | + array: NDArray, |
| 76 | + mdio_template: AbstractDatasetTemplate, |
| 77 | + chunksize: tuple[int, ...], |
| 78 | + index_coords: dict[str, NDArray], |
| 79 | + lossless: bool, |
| 80 | + compression_tolerance: float, |
| 81 | + header_dtype: DTypeLike | None, |
| 82 | +) -> tuple[Grid, Dataset]: |
| 83 | + """Build the grid and dataset for the numpy array using the provided template.""" |
| 84 | + # Create dimensions |
| 85 | + dims = [Dimension(name=name, coords=index_coords[name]) for name in mdio_template.dimension_names] |
| 86 | + grid = Grid(dims=dims) |
| 87 | + |
| 88 | + # Get compressor |
| 89 | + compressors = get_compressor(lossless, compression_tolerance) |
| 90 | + |
| 91 | + # Convert numpy dtype to MDIO ScalarType |
| 92 | + data_type = to_scalar_type(array.dtype) |
| 93 | + |
| 94 | + # Build dataset |
| 95 | + mdio_ds: Dataset = mdio_template.build_dataset( |
| 96 | + name=mdio_template.name, |
| 97 | + sizes=array.shape, |
| 98 | + header_dtype=header_dtype, |
| 99 | + ) |
| 100 | + |
| 101 | + # Update the default variable with correct dtype and compressor |
| 102 | + var_index = next((i for i, v in enumerate(mdio_ds.variables) if v.name == mdio_template.default_variable_name), None) |
| 103 | + if var_index is not None: |
| 104 | + mdio_ds.variables[var_index].data_type = data_type |
| 105 | + mdio_ds.variables[var_index].compressor = compressors[0] |
| 106 | + |
| 107 | + # Set chunk grid for the data variable |
| 108 | + chunk_grid = RegularChunkGrid(configuration=RegularChunkShape(chunk_shape=chunksize)) |
| 109 | + if mdio_ds.variables[var_index].metadata is None: |
| 110 | + mdio_ds.variables[var_index].metadata = VariableMetadata() |
| 111 | + mdio_ds.variables[var_index].metadata.chunk_grid = chunk_grid |
| 112 | + |
| 113 | + # Dynamically chunk the trace_mask |
| 114 | + _chunk_variable(ds=mdio_ds, target_variable_name="trace_mask") |
| 115 | + |
| 116 | + # Dynamically chunk coordinate variables |
| 117 | + for coord in mdio_template.coordinate_names: |
| 118 | + _chunk_variable(ds=mdio_ds, target_variable_name=coord) |
| 119 | + |
| 120 | + return grid, mdio_ds |
| 121 | + |
| 122 | + |
| 123 | +def _chunk_variable(ds: Dataset, target_variable_name: str) -> None: |
| 124 | + """Determines and sets the chunking for a specific Variable in the Dataset.""" |
| 125 | + # Find variable index by name |
| 126 | + index = next((i for i, obj in enumerate(ds.variables) if obj.name == target_variable_name), None) |
| 127 | + if index is None: |
| 128 | + return |
| 129 | + |
| 130 | + def determine_target_size(var_type: str) -> int: |
| 131 | + """Determines the target size (in bytes) for a Variable based on its type.""" |
| 132 | + if var_type == "bool": |
| 133 | + return MAX_SIZE_LIVE_MASK |
| 134 | + return MAX_COORDINATES_BYTES |
| 135 | + |
| 136 | + # Create the chunk grid metadata |
| 137 | + var_type = ds.variables[index].data_type |
| 138 | + full_shape = tuple(dim.size for dim in ds.variables[index].dimensions) |
| 139 | + target_size = determine_target_size(var_type) |
| 140 | + |
| 141 | + chunk_shape = get_constrained_chunksize(full_shape, var_type, target_size) |
| 142 | + chunk_grid = RegularChunkGrid(configuration=RegularChunkShape(chunk_shape=chunk_shape)) |
| 143 | + |
| 144 | + # Create variable metadata if it doesn't exist |
| 145 | + if ds.variables[index].metadata is None: |
| 146 | + ds.variables[index].metadata = VariableMetadata() |
| 147 | + |
| 148 | + ds.variables[index].metadata.chunk_grid = chunk_grid |
| 149 | + |
| 150 | + |
| 151 | +def _populate_coordinates_and_write( |
| 152 | + xr_dataset: xr_Dataset, |
| 153 | + grid: Grid, |
| 154 | + output_path: UPath | Path, |
| 155 | + array: NDArray, |
| 156 | +) -> None: |
| 157 | + """Populate coordinates and write data to MDIO.""" |
| 158 | + # Populate dimension coordinates |
| 159 | + drop_vars_delayed = [] |
| 160 | + for dim in grid.dims: |
| 161 | + xr_dataset[dim.name].values[:] = dim.coords |
| 162 | + drop_vars_delayed.append(dim.name) |
| 163 | + |
| 164 | + # Set trace mask to all True (no missing data for numpy arrays) |
| 165 | + xr_dataset.trace_mask.data[:] = True |
| 166 | + drop_vars_delayed.append("trace_mask") |
| 167 | + |
| 168 | + # Create data dataset with the numpy array |
| 169 | + data_var_name = xr_dataset.attrs.get("defaultVariableName", "amplitude") |
| 170 | + data_ds = xr_dataset[[data_var_name]].copy() |
| 171 | + data_ds[data_var_name].data[:] = array |
| 172 | + |
| 173 | + # Combine all datasets |
| 174 | + full_ds = xr_dataset[drop_vars_delayed].merge(data_ds) |
| 175 | + |
| 176 | + # Write everything at once |
| 177 | + to_mdio(full_ds, output_path=output_path, mode="w", compute=True) |
| 178 | + |
| 179 | + |
| 180 | +def numpy_to_mdio( # noqa: PLR0913 |
| 181 | + array: NDArray, |
| 182 | + mdio_template: AbstractDatasetTemplate, |
| 183 | + output_path: UPath | Path | str, |
| 184 | + chunksize: tuple[int, ...] | None = None, |
| 185 | + index_coords: dict[str, NDArray] | None = None, |
| 186 | + header_dtype: DTypeLike | None = None, |
| 187 | + lossless: bool = True, |
| 188 | + compression_tolerance: float = 0.01, |
| 189 | + storage_options: dict[str, Any] | None = None, |
| 190 | + overwrite: bool = False, |
| 191 | +) -> None: |
| 192 | + """Convert a NumPy array to MDIO v1 format. |
| 193 | +
|
| 194 | + This function converts a NumPy array into the MDIO format following the same |
| 195 | + interface pattern as SEG-Y to MDIO conversion. |
| 196 | +
|
| 197 | + Args: |
| 198 | + array: Input NumPy array to be converted to MDIO format. |
| 199 | + mdio_template: The MDIO template to use for the conversion. The template defines |
| 200 | + the dataset structure, but coordinate information is ignored for NumPy arrays. |
| 201 | + output_path: The universal path for the output MDIO v1 file. |
| 202 | + chunksize: Tuple specifying the chunk sizes for each dimension of the array. If not |
| 203 | + provided, uses the template's default chunk shape. Must match the number of |
| 204 | + dimensions in the input array. |
| 205 | + index_coords: Dictionary mapping dimension names to their coordinate arrays. If not |
| 206 | + provided, defaults to sequential integers (0 to size-1) for each dimension. |
| 207 | + header_dtype: Data type for trace headers, if applicable. Defaults to None. |
| 208 | + lossless: If True, uses lossless Blosc compression with zstandard. If False, uses ZFP lossy |
| 209 | + compression (requires `zfpy` library). |
| 210 | + compression_tolerance: Tolerance for ZFP compression in lossy mode. Ignored if |
| 211 | + `lossless=True`. Default is 0.01, providing ~70% size reduction. |
| 212 | + storage_options: Dictionary of storage options for the MDIO output file (e.g., |
| 213 | + cloud credentials). Defaults to None (anonymous access). |
| 214 | + overwrite: If True, overwrites existing MDIO file at the specified path. |
| 215 | +
|
| 216 | + Raises: |
| 217 | + FileExistsError: If the output location already exists and overwrite is False. |
| 218 | +
|
| 219 | + Examples: |
| 220 | + Convert a 3D NumPy array to MDIO format using a template: |
| 221 | +
|
| 222 | + >>> import numpy as np |
| 223 | + >>> from mdio.converters import numpy_to_mdio |
| 224 | + >>> from mdio.builder.templates.seismic_3d_poststack import Seismic3DPostStackTemplate |
| 225 | + >>> |
| 226 | + >>> array = np.random.rand(100, 200, 300) |
| 227 | + >>> template = Seismic3DPostStackTemplate(data_domain="time") |
| 228 | + >>> numpy_to_mdio( |
| 229 | + ... array=array, |
| 230 | + ... mdio_template=template, |
| 231 | + ... output_path="output/file.mdio", |
| 232 | + ... chunksize=(64, 64, 64), |
| 233 | + ... ) |
| 234 | +
|
| 235 | + For a cloud-based output on AWS S3 with custom coordinates: |
| 236 | +
|
| 237 | + >>> coords = { |
| 238 | + ... "inline": np.arange(0, 100, 2), |
| 239 | + ... "crossline": np.arange(0, 200, 4), |
| 240 | + ... "time": np.linspace(0, 0.3, 300), |
| 241 | + ... } |
| 242 | + >>> numpy_to_mdio( |
| 243 | + ... array=array, |
| 244 | + ... mdio_template=template, |
| 245 | + ... output_path="s3://bucket/file.mdio", |
| 246 | + ... chunksize=(32, 32, 128), |
| 247 | + ... index_coords=coords, |
| 248 | + ... lossless=False, |
| 249 | + ... compression_tolerance=0.01, |
| 250 | + ... ) |
| 251 | +
|
| 252 | + Convert a 2D array with default indexing and lossless compression: |
| 253 | +
|
| 254 | + >>> from mdio.builder.templates.seismic_2d_poststack import Seismic2DPostStackTemplate |
| 255 | + >>> array_2d = np.random.rand(500, 1000) |
| 256 | + >>> template_2d = Seismic2DPostStackTemplate(data_domain="time") |
| 257 | + >>> numpy_to_mdio( |
| 258 | + ... array=array_2d, |
| 259 | + ... mdio_template=template_2d, |
| 260 | + ... output_path="output/file_2d.mdio", |
| 261 | + ... chunksize=(512, 512), |
| 262 | + ... ) |
| 263 | + """ |
| 264 | + storage_options = storage_options or {} |
| 265 | + |
| 266 | + # Prepare inputs and set defaults |
| 267 | + chunksize, index_coords = _prepare_inputs(array, mdio_template, chunksize, index_coords) |
| 268 | + |
| 269 | + # Normalize path |
| 270 | + output_path = _normalize_path(output_path) |
| 271 | + |
| 272 | + # Check if output exists |
| 273 | + if not overwrite and output_path.exists(): |
| 274 | + err = f"Output location '{output_path.as_posix()}' exists. Set `overwrite=True` if intended." |
| 275 | + raise FileExistsError(err) |
| 276 | + |
| 277 | + # Build grid and dataset |
| 278 | + grid, mdio_ds = _build_grid_and_dataset( |
| 279 | + array=array, |
| 280 | + mdio_template=mdio_template, |
| 281 | + chunksize=chunksize, |
| 282 | + index_coords=index_coords, |
| 283 | + lossless=lossless, |
| 284 | + compression_tolerance=compression_tolerance, |
| 285 | + header_dtype=header_dtype, |
| 286 | + ) |
| 287 | + |
| 288 | + # Convert to xarray dataset |
| 289 | + xr_dataset: xr_Dataset = to_xarray_dataset(mdio_ds=mdio_ds) |
| 290 | + |
| 291 | + # Populate coordinates and write data |
| 292 | + _populate_coordinates_and_write( |
| 293 | + xr_dataset=xr_dataset, |
| 294 | + grid=grid, |
| 295 | + output_path=output_path, |
| 296 | + array=array, |
| 297 | + ) |
0 commit comments