Skip to content

Commit 1b98255

Browse files
Environment variable flag to preserve unmodified, raw trace headers from SEG-Y as variable. (TGSAI#659)
* Reimplement disaster recovery logic * Ensure getting true raw bytes for DR array * Linting * Add v2 issue check * Fix pre-commit * Profiled disaster recovery array (#8) - Avoids duplicate read regression issue - Implements isolated and testable logic * Fix unclosed parenthesis * Linting * Test DR compatibility with all tested schemas * Fix missing test fixture error * Suppress unused linting error * Dr with modifications (#9) * Attempt to use view * Add hex-dump and MDIO output reproducer * Fixes * Cleanup * Provide clean disaster recovery interface * Begin work on tests * Fix flattening issue * Push for debugging * Numpy updates * Testing * Working end-to-end examples * Cleanup * Bandaid fix * linting pass 1 * Fix logic issue * Use wrapper class * Precommit * Remove external debugging code * Remove debug code * Remove errant numpy additon to pyproject toml * Fix uv lock to mainline * Pre-commit * Remove raw field additions. Depends on segy >= 0.5.1 * Removed raw byte inserts (#10) * Update Xarray api access (TGSAI#688) * Reimplement disaster recovery logic * Ensure getting true raw bytes for DR array * Linting * Add v2 issue check * Fix pre-commit * Profiled disaster recovery array (#8) - Avoids duplicate read regression issue - Implements isolated and testable logic * Fix unclosed parenthesis * Linting * Test DR compatibility with all tested schemas * Fix missing test fixture error * Suppress unused linting error * Attempt to use view * Add hex-dump and MDIO output reproducer * Fixes * Cleanup * Provide clean disaster recovery interface * Begin work on tests * Fix flattening issue * Push for debugging * Numpy updates * Testing * Working end-to-end examples * Cleanup * Bandaid fix * linting pass 1 * Fix logic issue * Use wrapper class * Precommit * Remove external debugging code * Remove debug code * Remove errant numpy additon to pyproject toml * Fix uv lock to mainline * Pre-commit * Removed raw byte inserts Removed the insertions of raw bytes into the raw bytes Variable. This issue will be addressed in tgsai/segy release >0.5.1 * Use new segy API calls * Updates to get working * Use released version * Linting * Linting * revert filter/raw stuff * rename env var for raw headers * simplify 240-byte scalar type * rename trace wrapper and do lazy decoding --------- Co-authored-by: Altay Sansal <[email protected]>
1 parent 48b949e commit 1b98255

File tree

7 files changed

+579
-8
lines changed

7 files changed

+579
-8
lines changed

src/mdio/builder/schemas/dtype.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ class ScalarType(StrEnum):
3232
COMPLEX64 = "complex64"
3333
COMPLEX128 = "complex128"
3434
COMPLEX256 = "complex256"
35+
BYTES240 = "V240" # fixed-width 240-byte string, used for raw v0/1/2 trace headers
3536

3637

3738
class StructuredField(CamelCaseStrictModel):

src/mdio/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,4 +64,5 @@ class ZarrFormat(IntEnum):
6464
ScalarType.COMPLEX64: complex(np.nan, np.nan),
6565
ScalarType.COMPLEX128: complex(np.nan, np.nan),
6666
ScalarType.COMPLEX256: complex(np.nan, np.nan),
67+
ScalarType.BYTES240: b"\x00" * 240,
6768
}

src/mdio/converters/segy.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,24 @@
77
from typing import TYPE_CHECKING
88

99
import numpy as np
10+
import zarr
1011
from segy import SegyFile
1112
from segy.config import SegySettings
1213
from segy.standards.codes import MeasurementSystem as segy_MeasurementSystem
1314
from segy.standards.fields.trace import Rev0 as TraceHeaderFieldsRev0
1415

1516
from mdio.api.io import _normalize_path
1617
from mdio.api.io import to_mdio
18+
from mdio.builder.schemas.chunk_grid import RegularChunkGrid
19+
from mdio.builder.schemas.chunk_grid import RegularChunkShape
20+
from mdio.builder.schemas.compressors import Blosc
21+
from mdio.builder.schemas.compressors import BloscCname
22+
from mdio.builder.schemas.dtype import ScalarType
1723
from mdio.builder.schemas.v1.units import LengthUnitEnum
1824
from mdio.builder.schemas.v1.units import LengthUnitModel
25+
from mdio.builder.schemas.v1.variable import VariableMetadata
1926
from mdio.builder.xarray_builder import to_xarray_dataset
27+
from mdio.constants import ZarrFormat
2028
from mdio.converters.exceptions import EnvironmentFormatError
2129
from mdio.converters.exceptions import GridTraceCountError
2230
from mdio.converters.exceptions import GridTraceSparsityError
@@ -333,6 +341,61 @@ def _add_grid_override_to_metadata(dataset: Dataset, grid_overrides: dict[str, A
333341
dataset.metadata.attributes["gridOverrides"] = grid_overrides
334342

335343

344+
def _add_raw_headers_to_template(mdio_template: AbstractDatasetTemplate) -> AbstractDatasetTemplate:
345+
"""Add raw headers capability to the MDIO template by monkey-patching its _add_variables method.
346+
347+
This function modifies the template's _add_variables method to also add a raw headers variable
348+
with the following characteristics:
349+
- Same rank as the Headers variable (all dimensions except vertical)
350+
- Name: "RawHeaders"
351+
- Type: ScalarType.HEADERS
352+
- No coordinates
353+
- zstd compressor
354+
- No additional metadata
355+
- Chunked the same as the Headers variable
356+
357+
Args:
358+
mdio_template: The MDIO template to mutate
359+
Returns:
360+
The mutated MDIO template
361+
"""
362+
# Check if raw headers enhancement has already been applied to avoid duplicate additions
363+
if hasattr(mdio_template, "_mdio_raw_headers_enhanced"):
364+
return mdio_template
365+
366+
# Store the original _add_variables method
367+
original_add_variables = mdio_template._add_variables
368+
369+
def enhanced_add_variables() -> None:
370+
# Call the original method first
371+
original_add_variables()
372+
373+
# Now add the raw headers variable
374+
chunk_shape = mdio_template._var_chunk_shape[:-1]
375+
376+
# Create chunk grid metadata
377+
chunk_metadata = RegularChunkGrid(configuration=RegularChunkShape(chunk_shape=chunk_shape))
378+
379+
# Add the raw headers variable using the builder's add_variable method
380+
mdio_template._builder.add_variable(
381+
name="raw_headers",
382+
long_name="Raw Headers",
383+
dimensions=mdio_template._dim_names[:-1], # All dimensions except vertical
384+
data_type=ScalarType.BYTES240,
385+
compressor=Blosc(cname=BloscCname.zstd),
386+
coordinates=None, # No coordinates as specified
387+
metadata=VariableMetadata(chunk_grid=chunk_metadata),
388+
)
389+
390+
# Replace the template's _add_variables method
391+
mdio_template._add_variables = enhanced_add_variables
392+
393+
# Mark the template as enhanced to prevent duplicate monkey-patching
394+
mdio_template._mdio_raw_headers_enhanced = True
395+
396+
return mdio_template
397+
398+
336399
def segy_to_mdio( # noqa PLR0913
337400
segy_spec: SegySpec,
338401
mdio_template: AbstractDatasetTemplate,
@@ -372,6 +435,14 @@ def segy_to_mdio( # noqa PLR0913
372435

373436
_, non_dim_coords = _get_coordinates(grid, segy_headers, mdio_template)
374437
header_dtype = to_structured_type(segy_spec.trace.header.dtype)
438+
439+
if os.getenv("MDIO__IMPORT__RAW_HEADERS") in ("1", "true", "yes", "on"):
440+
if zarr.config.get("default_zarr_format") == ZarrFormat.V2:
441+
logger.warning("Raw headers are only supported for Zarr v3. Skipping raw headers.")
442+
else:
443+
logger.warning("MDIO__IMPORT__RAW_HEADERS is experimental and expected to change or be removed.")
444+
mdio_template = _add_raw_headers_to_template(mdio_template)
445+
375446
horizontal_unit = _get_horizontal_coordinate_unit(segy_dimensions)
376447
mdio_ds: Dataset = mdio_template.build_dataset(
377448
name=mdio_template.name,
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
"""Consumer-side utility to get both raw and transformed header data with single filesystem read."""
2+
3+
from __future__ import annotations
4+
5+
from typing import TYPE_CHECKING
6+
7+
import numpy as np
8+
9+
if TYPE_CHECKING:
10+
from numpy.typing import NDArray
11+
from segy import SegyFile
12+
13+
14+
class SegyFileRawTraceWrapper:
15+
def __init__(self, segy_file: SegyFile, indices: int | list[int] | NDArray | slice):
16+
self.segy_file = segy_file
17+
self.indices = indices
18+
19+
self.idx = self.segy_file.trace.normalize_and_validate_query(self.indices)
20+
self.trace_buffer_array = self.segy_file.trace.fetch(self.idx, raw=True)
21+
22+
self.trace_view = self.trace_buffer_array.view(self.segy_file.spec.trace.dtype)
23+
24+
self.trace_decode_pipeline = self.segy_file.accessors.trace_decode_pipeline
25+
self.decoded_traces = None # decode later when not-raw header/sample is called
26+
27+
def _ensure_decoded(self) -> None:
28+
"""Apply trace decoding pipeline if not already done."""
29+
if self.decoded_traces is not None: # already done
30+
return
31+
self.decoded_traces = self.trace_decode_pipeline.apply(self.trace_view.copy())
32+
33+
@property
34+
def raw_header(self) -> NDArray:
35+
"""Get byte array view of the raw headers."""
36+
header_itemsize = self.segy_file.spec.trace.header.itemsize # should be 240
37+
return self.trace_view.header.view(np.dtype((np.void, header_itemsize)))
38+
39+
@property
40+
def header(self) -> NDArray:
41+
"""Get decoded header."""
42+
self._ensure_decoded() # decode when needed in-place to avoid copy.
43+
return self.decoded_traces.header
44+
45+
@property
46+
def sample(self) -> NDArray:
47+
"""Get decoded trace samples."""
48+
self._ensure_decoded() # decode when needed in-place to avoid copy.
49+
return self.decoded_traces.sample

src/mdio/segy/_workers.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
from mdio.api.io import to_mdio
1414
from mdio.builder.schemas.dtype import ScalarType
15+
from mdio.segy._raw_trace_wrapper import SegyFileRawTraceWrapper
1516

1617
if TYPE_CHECKING:
1718
from segy.arrays import HeaderArray
@@ -121,18 +122,38 @@ def trace_worker( # noqa: PLR0913
121122
zarr_config.set({"threading.max_workers": 1})
122123

123124
live_trace_indexes = local_grid_map[not_null].tolist()
124-
traces = segy_file.trace[live_trace_indexes]
125125

126126
header_key = "headers"
127+
raw_header_key = "raw_headers"
127128

128129
# Get subset of the dataset that has not yet been saved
129130
# The headers might not be present in the dataset
130131
worker_variables = [data_variable_name]
131132
if header_key in dataset.data_vars: # Keeping the `if` here to allow for more worker configurations
132133
worker_variables.append(header_key)
134+
if raw_header_key in dataset.data_vars:
135+
worker_variables.append(raw_header_key)
136+
137+
# traces = segy_file.trace[live_trace_indexes]
138+
# Raw headers are not intended to remain as a feature of the SEGY ingestion.
139+
# For that reason, we have wrapped the accessors to provide an interface that can be removed
140+
# and not require additional changes to the below code.
141+
# NOTE: The `raw_header_key` code block should be removed in full as it will become dead code.
142+
traces = SegyFileRawTraceWrapper(segy_file, live_trace_indexes)
133143

134144
ds_to_write = dataset[worker_variables]
135145

146+
if raw_header_key in worker_variables:
147+
tmp_raw_headers = np.zeros_like(dataset[raw_header_key])
148+
tmp_raw_headers[not_null] = traces.raw_header
149+
150+
ds_to_write[raw_header_key] = Variable(
151+
ds_to_write[raw_header_key].dims,
152+
tmp_raw_headers,
153+
attrs=ds_to_write[raw_header_key].attrs,
154+
encoding=ds_to_write[raw_header_key].encoding, # Not strictly necessary, but safer than not doing it.
155+
)
156+
136157
if header_key in worker_variables:
137158
# TODO(BrianMichell): Implement this better so that we can enable fill values without changing the code
138159
# https://github.com/TGSAI/mdio-python/issues/584

0 commit comments

Comments
 (0)