44
55import base64
66import logging
7+ import multiprocessing as mp
78import os
8- from dataclasses import dataclass
9+ from concurrent . futures import ProcessPoolExecutor
910from typing import TYPE_CHECKING
1011
1112import numpy as np
1213import zarr
13- from segy import SegyFile
1414from segy .config import SegyFileSettings
1515from segy .config import SegyHeaderOverrides
1616from segy .standards .codes import MeasurementSystem as SegyMeasurementSystem
3737from mdio .core .utils_write import MAX_SIZE_LIVE_MASK
3838from mdio .core .utils_write import get_constrained_chunksize
3939from mdio .segy import blocked_io
40+ from mdio .segy ._workers import SegyFileInfo
41+ from mdio .segy ._workers import info_worker
4042from mdio .segy .scalar import SCALE_COORDINATE_KEYS
4143from mdio .segy .scalar import _apply_coordinate_scalar
42- from mdio .segy .scalar import _get_coordinate_scalar
4344from mdio .segy .utilities import get_grid_plan
4445
4546if TYPE_CHECKING :
5455 from mdio .builder .schemas import Dataset
5556 from mdio .builder .templates .abstract_dataset_template import AbstractDatasetTemplate
5657 from mdio .core .dimension import Dimension
58+ from mdio .segy ._workers import SegyFileArguments
5759
5860logger = logging .getLogger (__name__ )
5961
@@ -135,37 +137,9 @@ def grid_density_qc(grid: Grid, num_traces: int) -> None:
135137 raise GridTraceSparsityError (grid .shape , num_traces , msg )
136138
137139
138- @dataclass
139- class SegyFileHeaderDump :
140- """Segy metadata information."""
141-
142- text_header : str
143- binary_header_dict : dict
144- raw_binary_headers : bytes
145-
146-
147- def _get_segy_file_header_dump (segy_file : SegyFile ) -> SegyFileHeaderDump :
148- """Reads information from a SEG-Y file."""
149- text_header = segy_file .text_header
150-
151- raw_binary_headers : bytes = segy_file .fs .read_block (
152- fn = segy_file .url ,
153- offset = segy_file .spec .binary_header .offset ,
154- length = segy_file .spec .binary_header .itemsize ,
155- )
156-
157- # We read here twice, but it's ok for now. Only 400-bytes.
158- binary_header_dict = segy_file .binary_header .to_dict ()
159-
160- return SegyFileHeaderDump (
161- text_header = text_header ,
162- binary_header_dict = binary_header_dict ,
163- raw_binary_headers = raw_binary_headers ,
164- )
165-
166-
167140def _scan_for_headers (
168- segy_file : SegyFile ,
141+ segy_file_kwargs : SegyFileArguments ,
142+ segy_file_info : SegyFileInfo ,
169143 template : AbstractDatasetTemplate ,
170144 grid_overrides : dict [str , Any ] | None = None ,
171145) -> tuple [list [Dimension ], SegyHeaderArray ]:
@@ -176,7 +150,8 @@ def _scan_for_headers(
176150 """
177151 full_chunk_size = template .full_chunk_size
178152 segy_dimensions , chunk_size , segy_headers = get_grid_plan (
179- segy_file = segy_file ,
153+ segy_file_kwargs = segy_file_kwargs ,
154+ segy_file_info = segy_file_info ,
180155 return_headers = True ,
181156 template = template ,
182157 chunksize = full_chunk_size ,
@@ -192,12 +167,29 @@ def _scan_for_headers(
192167 return segy_dimensions , segy_headers
193168
194169
195- def _build_and_check_grid (segy_dimensions : list [Dimension ], num_traces : int , segy_headers : SegyHeaderArray ) -> Grid :
170+ def _read_segy_file_info (segy_file_kwargs : SegyFileArguments ) -> SegyFileInfo :
171+ """Read SEG-Y file in a separate process.
172+
173+ This is an ugly workaround for Zarr issues 3487 'Explicitly using fsspec and zarr FsspecStore causes
174+ RuntimeError "Task attached to a different loop"'
175+ """
176+ # TODO (Dmitriy Repin): when Zarr issue 3487 is resolved, we can remove this workaround
177+ # https://github.com/zarr-developers/zarr-python/issues/3487
178+ with ProcessPoolExecutor (max_workers = 1 , mp_context = mp .get_context ("spawn" )) as executor :
179+ future = executor .submit (info_worker , segy_file_kwargs )
180+ return future .result ()
181+
182+
183+ def _build_and_check_grid (
184+ segy_dimensions : list [Dimension ],
185+ segy_file_info : SegyFileInfo ,
186+ segy_headers : SegyHeaderArray ,
187+ ) -> Grid :
196188 """Build and check the grid from the SEG-Y headers and dimensions.
197189
198190 Args:
199191 segy_dimensions: List of of all SEG-Y dimensions to build grid from.
200- num_traces: Number of traces in the SEG-Y file.
192+ segy_file_info: SegyFileInfo instance containing the SEG-Y file information .
201193 segy_headers: Headers read in from SEG-Y file for building the trace map.
202194
203195 Returns:
@@ -207,6 +199,7 @@ def _build_and_check_grid(segy_dimensions: list[Dimension], num_traces: int, seg
207199 GridTraceCountError: If number of traces in SEG-Y file does not match the parsed grid
208200 """
209201 grid = Grid (dims = segy_dimensions )
202+ num_traces = segy_file_info .num_traces
210203 grid_density_qc (grid , num_traces )
211204 grid .build_map (segy_headers )
212205 # Check grid validity by comparing trace numbers
@@ -303,9 +296,9 @@ def populate_non_dim_coordinates(
303296 return dataset , drop_vars_delayed
304297
305298
306- def _get_horizontal_coordinate_unit (segy_info : SegyFileHeaderDump ) -> LengthUnitModel | None :
299+ def _get_horizontal_coordinate_unit (segy_file_info : SegyFileInfo ) -> LengthUnitModel | None :
307300 """Get the coordinate unit from the SEG-Y headers."""
308- measurement_system_code = int (segy_info .binary_header_dict [MEASUREMENT_SYSTEM_KEY ])
301+ measurement_system_code = int (segy_file_info .binary_header_dict [MEASUREMENT_SYSTEM_KEY ])
309302
310303 if measurement_system_code not in (1 , 2 ):
311304 logger .warning (
@@ -359,19 +352,19 @@ def _populate_coordinates(
359352 return dataset , drop_vars_delayed
360353
361354
362- def _add_segy_file_headers (xr_dataset : xr_Dataset , segy_file_header_dump : SegyFileHeaderDump ) -> xr_Dataset :
355+ def _add_segy_file_headers (xr_dataset : xr_Dataset , segy_file_info : SegyFileInfo ) -> xr_Dataset :
363356 save_file_header = os .getenv ("MDIO__IMPORT__SAVE_SEGY_FILE_HEADER" , "" ) in ("1" , "true" , "yes" , "on" )
364357 if not save_file_header :
365358 return xr_dataset
366359
367360 expected_rows = 40
368361 expected_cols = 80
369362
370- text_header_rows = segy_file_header_dump .text_header .splitlines ()
363+ text_header_rows = segy_file_info .text_header .splitlines ()
371364 text_header_cols_bad = [len (row ) != expected_cols for row in text_header_rows ]
372365
373366 if len (text_header_rows ) != expected_rows :
374- err = f"Invalid text header count: expected { expected_rows } , got { len (segy_file_header_dump .text_header )} "
367+ err = f"Invalid text header count: expected { expected_rows } , got { len (segy_file_info .text_header )} "
375368 raise ValueError (err )
376369
377370 if any (text_header_cols_bad ):
@@ -381,12 +374,12 @@ def _add_segy_file_headers(xr_dataset: xr_Dataset, segy_file_header_dump: SegyFi
381374 xr_dataset ["segy_file_header" ] = ((), "" )
382375 xr_dataset ["segy_file_header" ].attrs .update (
383376 {
384- "textHeader" : segy_file_header_dump .text_header ,
385- "binaryHeader" : segy_file_header_dump .binary_header_dict ,
377+ "textHeader" : segy_file_info .text_header ,
378+ "binaryHeader" : segy_file_info .binary_header_dict ,
386379 }
387380 )
388381 if os .getenv ("MDIO__IMPORT__RAW_HEADERS" ) in ("1" , "true" , "yes" , "on" ):
389- raw_binary_base64 = base64 .b64encode (segy_file_header_dump .raw_binary_headers ).decode ("ascii" )
382+ raw_binary_base64 = base64 .b64encode (segy_file_info .raw_binary_headers ).decode ("ascii" )
390383 xr_dataset ["segy_file_header" ].attrs .update ({"rawBinaryHeader" : raw_binary_base64 })
391384
392385 return xr_dataset
@@ -482,6 +475,21 @@ def determine_target_size(var_type: str) -> int:
482475 ds .variables [index ].metadata .chunk_grid = chunk_grid
483476
484477
478+ def _validate_spec_in_template (segy_spec : SegySpec , mdio_template : AbstractDatasetTemplate ) -> None :
479+ """Validate that the SegySpec has all required fields in the MDIO template."""
480+ header_fields = {field .name for field in segy_spec .trace .header .fields }
481+
482+ required_fields = set (mdio_template ._dim_names [:- 1 ]) | set (mdio_template ._coord_names ) | {"coordinate_scalar" }
483+ missing_fields = required_fields - header_fields
484+
485+ if missing_fields :
486+ err = (
487+ f"Required fields { sorted (missing_fields )} for template { mdio_template .name } "
488+ f"not found in the provided segy_spec"
489+ )
490+ raise ValueError (err )
491+
492+
485493def segy_to_mdio ( # noqa PLR0913
486494 segy_spec : SegySpec ,
487495 mdio_template : AbstractDatasetTemplate ,
@@ -507,6 +515,8 @@ def segy_to_mdio( # noqa PLR0913
507515 Raises:
508516 FileExistsError: If the output location already exists and overwrite is False.
509517 """
518+ _validate_spec_in_template (segy_spec , mdio_template )
519+
510520 input_path = _normalize_path (input_path )
511521 output_path = _normalize_path (output_path )
512522
@@ -515,17 +525,21 @@ def segy_to_mdio( # noqa PLR0913
515525 raise FileExistsError (err )
516526
517527 segy_settings = SegyFileSettings (storage_options = input_path .storage_options )
518- segy_file = SegyFile (
519- url = input_path .as_posix (),
520- spec = segy_spec ,
521- settings = segy_settings ,
522- header_overrides = segy_header_overrides ,
528+ segy_file_kwargs : SegyFileArguments = {
529+ "url" : input_path .as_posix (),
530+ "spec" : segy_spec ,
531+ "settings" : segy_settings ,
532+ "header_overrides" : segy_header_overrides ,
533+ }
534+ segy_file_info = _read_segy_file_info (segy_file_kwargs )
535+
536+ segy_dimensions , segy_headers = _scan_for_headers (
537+ segy_file_kwargs ,
538+ segy_file_info ,
539+ template = mdio_template ,
540+ grid_overrides = grid_overrides ,
523541 )
524- segy_info : SegyFileHeaderDump = _get_segy_file_header_dump (segy_file )
525-
526- segy_dimensions , segy_headers = _scan_for_headers (segy_file , mdio_template , grid_overrides )
527-
528- grid = _build_and_check_grid (segy_dimensions , segy_file .num_traces , segy_headers )
542+ grid = _build_and_check_grid (segy_dimensions , segy_file_info , segy_headers )
529543
530544 _ , non_dim_coords = _get_coordinates (grid , segy_headers , mdio_template )
531545 header_dtype = to_structured_type (segy_spec .trace .header .dtype )
@@ -537,7 +551,7 @@ def segy_to_mdio( # noqa PLR0913
537551 logger .warning ("MDIO__IMPORT__RAW_HEADERS is experimental and expected to change or be removed." )
538552 mdio_template = _add_raw_headers_to_template (mdio_template )
539553
540- horizontal_unit = _get_horizontal_coordinate_unit (segy_info )
554+ horizontal_unit = _get_horizontal_coordinate_unit (segy_file_info )
541555 mdio_ds : Dataset = mdio_template .build_dataset (
542556 name = mdio_template .name ,
543557 sizes = grid .shape ,
@@ -554,15 +568,14 @@ def segy_to_mdio( # noqa PLR0913
554568
555569 xr_dataset : xr_Dataset = to_xarray_dataset (mdio_ds = mdio_ds )
556570
557- coordinate_scalar = _get_coordinate_scalar (segy_file )
558571 xr_dataset , drop_vars_delayed = _populate_coordinates (
559572 dataset = xr_dataset ,
560573 grid = grid ,
561574 coords = non_dim_coords ,
562- horizontal_coordinate_scalar = coordinate_scalar ,
575+ horizontal_coordinate_scalar = segy_file_info . coordinate_scalar ,
563576 )
564577
565- xr_dataset = _add_segy_file_headers (xr_dataset , segy_info )
578+ xr_dataset = _add_segy_file_headers (xr_dataset , segy_file_info )
566579
567580 xr_dataset .trace_mask .data [:] = grid .live_mask
568581 # IMPORTANT: Do not drop the "trace_mask" here, as it will be used later in
@@ -583,7 +596,7 @@ def segy_to_mdio( # noqa PLR0913
583596 # This is an memory-expensive and time-consuming read-write operation
584597 # performed in chunks to save the memory
585598 blocked_io .to_zarr (
586- segy_file = segy_file ,
599+ segy_file_kwargs = segy_file_kwargs ,
587600 output_path = output_path ,
588601 grid_map = grid .map ,
589602 dataset = xr_dataset ,
0 commit comments