55import  logging 
66import  os 
77from  collections .abc  import  Sequence 
8- from  datetime  import  datetime 
9- from  datetime  import  timezone 
108from  importlib  import  metadata 
119from  typing  import  Any 
1210
1513from  segy  import  SegyFile 
1614from  segy .config  import  SegySettings 
1715from  segy .schema  import  HeaderField 
16+ from  zarr  import  Blosc 
1817
1918from  mdio .api .io_utils  import  process_url 
2019from  mdio .converters .exceptions  import  EnvironmentFormatError 
2120from  mdio .converters .exceptions  import  GridTraceCountError 
2221from  mdio .converters .exceptions  import  GridTraceSparsityError 
2322from  mdio .core  import  Grid 
23+ from  mdio .core .factory  import  MDIOCreateConfig 
24+ from  mdio .core .factory  import  MDIOVariableConfig 
25+ from  mdio .core .factory  import  create_empty 
2426from  mdio .core .utils_write  import  write_attribute 
2527from  mdio .segy  import  blocked_io 
2628from  mdio .segy .compat  import  mdio_segy_spec 
27- from  mdio .segy .helpers_segy  import  create_zarr_hierarchy 
2829from  mdio .segy .utilities  import  get_grid_plan 
2930
3031
32+ try :
33+     import  zfpy   # Base library 
34+     from  zarr  import  ZFPY   # Codec 
35+ except  ImportError :
36+     ZFPY  =  None 
37+     zfpy  =  None 
38+ 
39+ 
3140logger  =  logging .getLogger (__name__ )
3241
3342try :
@@ -103,6 +112,28 @@ def grid_density_qc(grid: Grid, num_traces: int) -> None:
103112            raise  GridTraceSparsityError (grid .shape , num_traces , msg )
104113
105114
115+ def  get_compressor (
116+     lossless : bool , compression_tolerance : float  =  - 1 
117+ ) ->  Blosc  |  ZFPY  |  None :
118+     """Get the appropriate compressor for the seismic traces.""" 
119+     if  lossless :
120+         compressor  =  Blosc ("zstd" )
121+     else :
122+         if  zfpy  is  None  or  ZFPY  is  None :
123+             msg  =  (
124+                 "Lossy compression requires the 'zfpy' library. It is " 
125+                 "not installed in your environment. To proceed please " 
126+                 "install 'zfpy' or install mdio with `--extras lossy`" 
127+             )
128+             raise  ImportError (msg )
129+ 
130+         compressor  =  ZFPY (
131+             mode = zfpy .mode_fixed_accuracy ,
132+             tolerance = compression_tolerance ,
133+         )
134+     return  compressor 
135+ 
136+ 
106137def  segy_to_mdio (  # noqa: C901 
107138    segy_path : str ,
108139    mdio_path_or_buffer : str ,
@@ -364,14 +395,6 @@ def segy_to_mdio(  # noqa: C901
364395    if  storage_options_output  is  None :
365396        storage_options_output  =  {}
366397
367-     store  =  process_url (
368-         url = mdio_path_or_buffer ,
369-         mode = "w" ,
370-         storage_options = storage_options_output ,
371-         memory_cache_size = 0 ,  # Making sure disk caching is disabled, 
372-         disk_cache = False ,  # Making sure disk caching is disabled 
373-     )
374- 
375398    # Open SEG-Y with MDIO's SegySpec. Endianness will be inferred. 
376399    mdio_spec  =  mdio_segy_spec ()
377400    segy_settings  =  SegySettings (storage_options = storage_options_input )
@@ -406,45 +429,6 @@ def segy_to_mdio(  # noqa: C901
406429        logger .warning (f"Ingestion grid shape: { grid .shape }  ." )
407430        raise  GridTraceCountError (np .sum (grid .live_mask ), num_traces )
408431
409-     zarr_root  =  create_zarr_hierarchy (
410-         store = store ,
411-         overwrite = overwrite ,
412-     )
413- 
414-     # Get UTC time, then add local timezone information offset. 
415-     iso_datetime  =  datetime .now (timezone .utc ).isoformat ()
416- 
417-     write_attribute (name = "created" , zarr_group = zarr_root , attribute = iso_datetime )
418-     write_attribute (name = "api_version" , zarr_group = zarr_root , attribute = API_VERSION )
419- 
420-     dimensions_dict  =  [dim .to_dict () for  dim  in  dimensions ]
421-     write_attribute (name = "dimension" , zarr_group = zarr_root , attribute = dimensions_dict )
422- 
423-     # Write trace count 
424-     trace_count  =  np .count_nonzero (grid .live_mask )
425-     write_attribute (name = "trace_count" , zarr_group = zarr_root , attribute = trace_count )
426- 
427-     # Note, live mask is not chunked since it's bool and small. 
428-     zarr_root ["metadata" ].create_dataset (
429-         data = grid .live_mask ,
430-         name = "live_mask" ,
431-         shape = grid .shape [:- 1 ],
432-         chunks = - 1 ,
433-         dimension_separator = "/" ,
434-     )
435- 
436-     write_attribute (
437-         name = "text_header" ,
438-         zarr_group = zarr_root ["metadata" ],
439-         attribute = text_header .split ("\n " ),
440-     )
441- 
442-     write_attribute (
443-         name = "binary_header" ,
444-         zarr_group = zarr_root ["metadata" ],
445-         attribute = binary_header .to_dict (),
446-     )
447- 
448432    if  chunksize  is  None :
449433        dim_count  =  len (index_names ) +  1 
450434        if  dim_count  ==  2 :
@@ -467,18 +451,59 @@ def segy_to_mdio(  # noqa: C901
467451        suffix  =  [str (idx ) for  idx , value  in  enumerate (suffix ) if  value  is  not   None ]
468452        suffix  =  "" .join (suffix )
469453
454+     compressor  =  get_compressor (compression_tolerance , lossless )
455+     header_dtype  =  segy .spec .trace .header .dtype .newbyteorder ("=" )
456+     var_conf  =  MDIOVariableConfig (
457+         name = f"chunked_{ suffix }  " ,
458+         dtype = "float32" ,
459+         chunks = chunksize ,
460+         compressor = compressor ,
461+         header_dtype = header_dtype ,
462+     )
463+     config  =  MDIOCreateConfig (path = mdio_path_or_buffer , grid = grid , variables = [var_conf ])
464+ 
465+     zarr_root  =  create_empty (
466+         config ,
467+         overwrite = overwrite ,
468+         storage_options = storage_options_output ,
469+         consolidate_meta = False ,
470+     )
471+     data_group , meta_group  =  zarr_root ["data" ], zarr_root ["metadata" ]
472+     data_array  =  data_group [f"chunked_{ suffix }  " ]
473+     header_array  =  meta_group [f"chunked_{ suffix }  _trace_headers" ]
474+ 
475+     # Write actual live mask and metadata to empty MDIO 
476+     meta_group ["live_mask" ][:] =  grid .live_mask 
477+     write_attribute (
478+         name = "trace_count" ,
479+         zarr_group = zarr_root ,
480+         attribute = np .count_nonzero (grid .live_mask ),
481+     )
482+     write_attribute (
483+         name = "text_header" ,
484+         zarr_group = zarr_root ["metadata" ],
485+         attribute = text_header .split ("\n " ),
486+     )
487+     write_attribute (
488+         name = "binary_header" ,
489+         zarr_group = zarr_root ["metadata" ],
490+         attribute = binary_header .to_dict (),
491+     )
492+ 
493+     # Write traces 
470494    stats  =  blocked_io .to_zarr (
471495        segy_file = segy ,
472496        grid = grid ,
473-         data_root = zarr_root [ "data" ] ,
474-         metadata_root = zarr_root [ "metadata" ] ,
497+         data_array = data_array ,
498+         header_array = header_array ,
475499        name = "_" .join (["chunked" , suffix ]),
476500        dtype = "float32" ,
477501        chunks = chunksize ,
478502        lossless = lossless ,
479503        compression_tolerance = compression_tolerance ,
480504    )
481505
506+     # Write actual stats 
482507    for  key , value  in  stats .items ():
483508        write_attribute (name = key , zarr_group = zarr_root , attribute = value )
484509
0 commit comments