Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 22 additions & 25 deletions src/mdio/commands/segy.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@
from click_params import JSON
from click_params import IntListParamType
from click_params import StringListParamType
from segy.schema import HeaderField
from segy.standards import get_segy_standard

from mdio.core.storage_location import StorageLocation
from mdio.schemas.v1.templates.template_registry import TemplateRegistry

SEGY_HELP = """
MDIO and SEG-Y conversion utilities. Below is general information about the SEG-Y format and MDIO
Expand Down Expand Up @@ -318,34 +323,29 @@ def segy_import( # noqa: PLR0913
# Lazy import to reduce CLI startup time
from mdio import segy_to_mdio # noqa: PLC0415

_ = (chunk_size, lossless, compression_tolerance, grid_overrides)

segy_spec = get_segy_standard(1.0)
index_names = header_names or [f"dim_{i}" for i in range(len(header_locations))]
index_types = header_types or ["int32"] * len(header_locations)
index_fields = [
HeaderField(name=name, byte=byte, format=format_)
for name, byte, format_ in zip(index_names, header_locations, index_types, strict=True)
]
segy_spec = segy_spec.customize(trace_header_fields=index_fields)

segy_to_mdio(
segy_path=segy_path,
mdio_path_or_buffer=mdio_path,
index_bytes=header_locations,
index_types=header_types,
index_names=header_names,
chunksize=chunk_size,
lossless=lossless,
compression_tolerance=compression_tolerance,
storage_options_input=storage_options_input,
storage_options_output=storage_options_output,
segy_spec=segy_spec,
mdio_template=TemplateRegistry().get("PostStack3DTime"),
input_location=StorageLocation(segy_path, storage_options_input),
output_location=StorageLocation(mdio_path, storage_options_output),
overwrite=overwrite,
grid_overrides=grid_overrides,
)


@cli.command(name="export")
@argument("mdio-file", type=STRING)
@argument("segy-path", type=Path(exists=False))
@option(
"-access",
"--access-pattern",
required=False,
default="012",
help="Access pattern of the file",
type=STRING,
show_default=True,
)
@option(
"-storage",
"--storage-options",
Expand All @@ -366,7 +366,6 @@ def segy_import( # noqa: PLR0913
def segy_export(
mdio_file: str,
segy_path: str,
access_pattern: str,
storage_options: dict[str, Any],
endian: str,
) -> None:
Expand All @@ -391,9 +390,7 @@ def segy_export(
from mdio import mdio_to_segy # noqa: PLC0415

mdio_to_segy(
mdio_path_or_buffer=mdio_file,
output_segy_path=segy_path,
access_pattern=access_pattern,
storage_options=storage_options,
input_location=StorageLocation(mdio_file, storage_options),
output_location=StorageLocation(str(segy_path)),
endian=endian,
)
89 changes: 51 additions & 38 deletions src/mdio/converters/mdio.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,22 @@
import os
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING

import numpy as np
import xarray as xr
from psutil import cpu_count
from tqdm.dask import TqdmCallback

from mdio import MDIOReader
from mdio.segy.blocked_io import to_segy
from mdio.segy.creation import concat_files
from mdio.segy.creation import get_required_segy_fields
from mdio.segy.creation import mdio_spec_to_segy
from mdio.segy.utilities import segy_export_rechunker

if TYPE_CHECKING:
from mdio.core.storage_location import StorageLocation

try:
import distributed
except ImportError:
Expand All @@ -26,15 +31,15 @@
NUM_CPUS = int(os.getenv("MDIO__EXPORT__CPU_COUNT", default_cpus))


def mdio_to_segy( # noqa: PLR0912, PLR0913
mdio_path_or_buffer: str,
output_segy_path: str,
def mdio_to_segy( # noqa: PLR0912, PLR0913, PLR0915
input_location: StorageLocation,
output_location: StorageLocation,
*,
endian: str = "big",
access_pattern: str = "012",
storage_options: dict = None,
new_chunks: tuple[int, ...] = None,
selection_mask: np.ndarray = None,
client: distributed.Client = None,
overwrite: bool = False,
) -> None:
"""Convert MDIO file to SEG-Y format.

Expand All @@ -47,20 +52,19 @@ def mdio_to_segy( # noqa: PLR0912, PLR0913
A `selection_mask` can be provided (same shape as spatial grid) to export a subset.

Args:
mdio_path_or_buffer: Input path where the MDIO is located.
output_segy_path: Path to the output SEG-Y file.
endian: Endianness of the input SEG-Y. Rev.2 allows little endian. Default is 'big'.
access_pattern: This specificies the chunk access pattern. Underlying zarr.Array must
exist. Examples: '012', '01'
storage_options: Storage options for the cloud storage backend. Default: None (anonymous)
input_location: Location of the input MDIO file.
output_location: Location of the output SEG-Y file.
endian: Endianness of the output SEG-Y. Rev.2 allows little endian. Default is "big".
new_chunks: Set manual chunksize. For development purposes only.
selection_mask: Array that lists the subset of traces
selection_mask: Array that lists the subset of traces.
client: Dask client. If `None` we will use local threaded scheduler. If `auto` is used we
will create multiple processes (with 8 threads each).
overwrite: Whether to overwrite the SEG-Y file if it already exists.

Raises:
ImportError: if distributed package isn't installed but requested.
ValueError: if cut mask is empty, i.e. no traces will be written.
FileExistsError: If the output location already exists and `overwrite` is False.
ImportError: If distributed package isn't installed but requested.
ValueError: If cut mask is empty, i.e. no traces will be written.

Examples:
To export an existing local MDIO file to SEG-Y we use the code snippet below. This will
Expand All @@ -69,40 +73,41 @@ def mdio_to_segy( # noqa: PLR0912, PLR0913

>>> from mdio import mdio_to_segy
>>>
>>>
>>> mdio_to_segy(
... mdio_path_or_buffer="prefix2/file.mdio",
... output_segy_path="prefix/file.segy",
... input_location=StorageLocation("prefix2/file.mdio"),
... output_location=StorageLocation("prefix/file.segy"),
... )

If we want to export this as an IEEE big-endian, using a selection mask, we would run:

>>> mdio_to_segy(
... mdio_path_or_buffer="prefix2/file.mdio",
... output_segy_path="prefix/file.segy",
... input_location=StorageLocation("prefix2/file.mdio"),
... output_location=StorageLocation("prefix/file.segy"),
... selection_mask=boolean_mask,
... )

"""
backend = "dask"

output_segy_path = Path(output_segy_path)
if not overwrite and output_location.exists():
err = f"Output location '{output_location.uri}' exists. Set `overwrite=True` if intended."
raise FileExistsError(err)

mdio = MDIOReader(
mdio_path_or_buffer=mdio_path_or_buffer,
access_pattern=access_pattern,
storage_options=storage_options,
)
output_segy_path = Path(output_location.uri)

if new_chunks is None:
new_chunks = segy_export_rechunker(mdio.chunks, mdio.shape, mdio._traces.dtype)
ds_tmp = xr.open_dataset(input_location.uri, engine="zarr", mask_and_scale=False)
amp = ds_tmp["amplitude"]
chunks = amp.encoding.get("chunks")
shape = amp.shape
dtype = amp.dtype
new_chunks = segy_export_rechunker(chunks, shape, dtype)
ds_tmp.close()

creation_args = [
mdio_path_or_buffer,
output_segy_path,
access_pattern,
input_location,
output_location,
endian,
storage_options,
new_chunks,
backend,
]
Expand All @@ -111,14 +116,16 @@ def mdio_to_segy( # noqa: PLR0912, PLR0913
if distributed is not None:
# This is in case we work with big data
feature = client.submit(mdio_spec_to_segy, *creation_args)
mdio, segy_factory = feature.result()
ds, segy_factory = feature.result()
else:
msg = "Distributed client was provided, but `distributed` is not installed"
raise ImportError(msg)
else:
mdio, segy_factory = mdio_spec_to_segy(*creation_args)
ds, segy_factory = mdio_spec_to_segy(*creation_args)

amp_da, headers_da, trace_mask_da, _, _ = get_required_segy_fields(ds)

live_mask = mdio.live_mask.compute()
live_mask = trace_mask_da.data.compute()

if selection_mask is not None:
live_mask = live_mask & selection_mask
Expand All @@ -138,12 +145,18 @@ def mdio_to_segy( # noqa: PLR0912, PLR0913
dim_slices += (slice(start, stop),)

# Lazily pull the data with limits now, and limit mask so its the same shape.
live_mask, headers, samples = mdio[dim_slices]
live_mask = live_mask.rechunk(headers.chunks)
trace_mask_da = trace_mask_da.data
headers = headers_da.data
samples = amp_da.data

live_mask_da = trace_mask_da[dim_slices]
headers = headers[dim_slices]
samples = samples[dim_slices]
live_mask_da = live_mask_da.rechunk(headers.chunks)

if selection_mask is not None:
selection_mask = selection_mask[dim_slices]
live_mask = live_mask & selection_mask
live_mask_da = live_mask_da & selection_mask

# tmp file root
out_dir = output_segy_path.parent
Expand All @@ -154,7 +167,7 @@ def mdio_to_segy( # noqa: PLR0912, PLR0913
block_records = to_segy(
samples=samples,
headers=headers,
live_mask=live_mask,
live_mask=live_mask_da,
segy_factory=segy_factory,
file_root=tmp_dir.name,
)
Expand Down
Loading
Loading