55import logging
66import os
77from collections .abc import Sequence
8- from datetime import datetime
9- from datetime import timezone
108from importlib import metadata
119from typing import Any
1210
1513from segy import SegyFile
1614from segy .config import SegySettings
1715from segy .schema import HeaderField
16+ from zarr import Blosc
1817
1918from mdio .api .io_utils import process_url
2019from mdio .converters .exceptions import EnvironmentFormatError
2120from mdio .converters .exceptions import GridTraceCountError
2221from mdio .converters .exceptions import GridTraceSparsityError
2322from mdio .core import Grid
24- from mdio .core .grid import _calculate_live_mask_chunksize
23+ from mdio .core .factory import MDIOCreateConfig
24+ from mdio .core .factory import MDIOVariableConfig
25+ from mdio .core .factory import create_empty
2526from mdio .core .utils_write import write_attribute
2627from mdio .segy import blocked_io
2728from mdio .segy .compat import mdio_segy_spec
28- from mdio .segy .helpers_segy import create_zarr_hierarchy
2929from mdio .segy .utilities import get_grid_plan
3030
3131
32+ try :
33+ import zfpy # Base library
34+ from zarr import ZFPY # Codec
35+ except ImportError :
36+ ZFPY = None
37+ zfpy = None
38+
39+
3240logger = logging .getLogger (__name__ )
3341
3442try :
@@ -104,6 +112,28 @@ def grid_density_qc(grid: Grid, num_traces: int) -> None:
104112 raise GridTraceSparsityError (grid .shape , num_traces , msg )
105113
106114
115+ def get_compressor (
116+ lossless : bool , compression_tolerance : float = - 1
117+ ) -> Blosc | ZFPY | None :
118+ """Get the appropriate compressor for the seismic traces."""
119+ if lossless :
120+ compressor = Blosc ("zstd" )
121+ else :
122+ if zfpy is None or ZFPY is None :
123+ msg = (
124+ "Lossy compression requires the 'zfpy' library. It is "
125+ "not installed in your environment. To proceed please "
126+ "install 'zfpy' or install mdio with `--extras lossy`"
127+ )
128+ raise ImportError (msg )
129+
130+ compressor = ZFPY (
131+ mode = zfpy .mode_fixed_accuracy ,
132+ tolerance = compression_tolerance ,
133+ )
134+ return compressor
135+
136+
107137def segy_to_mdio ( # noqa: C901
108138 segy_path : str ,
109139 mdio_path_or_buffer : str ,
@@ -365,14 +395,6 @@ def segy_to_mdio( # noqa: C901
365395 if storage_options_output is None :
366396 storage_options_output = {}
367397
368- store = process_url (
369- url = mdio_path_or_buffer ,
370- mode = "w" ,
371- storage_options = storage_options_output ,
372- memory_cache_size = 0 , # Making sure disk caching is disabled,
373- disk_cache = False , # Making sure disk caching is disabled
374- )
375-
376398 # Open SEG-Y with MDIO's SegySpec. Endianness will be inferred.
377399 mdio_spec = mdio_segy_spec ()
378400 segy_settings = SegySettings (storage_options = storage_options_input )
@@ -407,47 +429,6 @@ def segy_to_mdio( # noqa: C901
407429 logger .warning (f"Ingestion grid shape: { grid .shape } ." )
408430 raise GridTraceCountError (np .sum (grid .live_mask ), num_traces )
409431
410- zarr_root = create_zarr_hierarchy (
411- store = store ,
412- overwrite = overwrite ,
413- )
414-
415- # Get UTC time, then add local timezone information offset.
416- iso_datetime = datetime .now (timezone .utc ).isoformat ()
417-
418- write_attribute (name = "created" , zarr_group = zarr_root , attribute = iso_datetime )
419- write_attribute (name = "api_version" , zarr_group = zarr_root , attribute = API_VERSION )
420-
421- dimensions_dict = [dim .to_dict () for dim in dimensions ]
422- write_attribute (name = "dimension" , zarr_group = zarr_root , attribute = dimensions_dict )
423-
424- # Write trace count
425- trace_count = np .count_nonzero (grid .live_mask )
426- write_attribute (name = "trace_count" , zarr_group = zarr_root , attribute = trace_count )
427-
428- live_mask_chunksize = _calculate_live_mask_chunksize (grid )
429-
430- # Note, live mask is not chunked since it's bool and small.
431- zarr_root ["metadata" ].create_dataset (
432- data = grid .live_mask ,
433- name = "live_mask" ,
434- shape = grid .shape [:- 1 ],
435- chunks = live_mask_chunksize ,
436- dimension_separator = "/" ,
437- )
438-
439- write_attribute (
440- name = "text_header" ,
441- zarr_group = zarr_root ["metadata" ],
442- attribute = text_header .split ("\n " ),
443- )
444-
445- write_attribute (
446- name = "binary_header" ,
447- zarr_group = zarr_root ["metadata" ],
448- attribute = binary_header .to_dict (),
449- )
450-
451432 if chunksize is None :
452433 dim_count = len (index_names ) + 1
453434 if dim_count == 2 :
@@ -470,18 +451,59 @@ def segy_to_mdio( # noqa: C901
470451 suffix = [str (idx ) for idx , value in enumerate (suffix ) if value is not None ]
471452 suffix = "" .join (suffix )
472453
454+ compressor = get_compressor (compression_tolerance , lossless )
455+ header_dtype = segy .spec .trace .header .dtype .newbyteorder ("=" )
456+ var_conf = MDIOVariableConfig (
457+ name = f"chunked_{ suffix } " ,
458+ dtype = "float32" ,
459+ chunks = chunksize ,
460+ compressor = compressor ,
461+ header_dtype = header_dtype ,
462+ )
463+ config = MDIOCreateConfig (path = mdio_path_or_buffer , grid = grid , variables = [var_conf ])
464+
465+ zarr_root = create_empty (
466+ config ,
467+ overwrite = overwrite ,
468+ storage_options = storage_options_output ,
469+ consolidate_meta = False ,
470+ )
471+ data_group , meta_group = zarr_root ["data" ], zarr_root ["metadata" ]
472+ data_array = data_group [f"chunked_{ suffix } " ]
473+ header_array = meta_group [f"chunked_{ suffix } _trace_headers" ]
474+
475+ # Write actual live mask and metadata to empty MDIO
476+ meta_group ["live_mask" ][:] = grid .live_mask
477+ write_attribute (
478+ name = "trace_count" ,
479+ zarr_group = zarr_root ,
480+ attribute = np .count_nonzero (grid .live_mask ),
481+ )
482+ write_attribute (
483+ name = "text_header" ,
484+ zarr_group = zarr_root ["metadata" ],
485+ attribute = text_header .split ("\n " ),
486+ )
487+ write_attribute (
488+ name = "binary_header" ,
489+ zarr_group = zarr_root ["metadata" ],
490+ attribute = binary_header .to_dict (),
491+ )
492+
493+ # Write traces
473494 stats = blocked_io .to_zarr (
474495 segy_file = segy ,
475496 grid = grid ,
476- data_root = zarr_root [ "data" ] ,
477- metadata_root = zarr_root [ "metadata" ] ,
497+ data_array = data_array ,
498+ header_array = header_array ,
478499 name = "_" .join (["chunked" , suffix ]),
479500 dtype = "float32" ,
480501 chunks = chunksize ,
481502 lossless = lossless ,
482503 compression_tolerance = compression_tolerance ,
483504 )
484505
506+ # Write actual stats
485507 for key , value in stats .items ():
486508 write_attribute (name = key , zarr_group = zarr_root , attribute = value )
487509
0 commit comments