Skip to content

Commit 88d178d

Browse files
Wrap SegyFile to handle asyncio loop conflicts with Zarr. (TGSAI#712)
* SegyFileAsync * Fix test failing on windows * info_worker -> get_segy_file_info * revert ci/cd changes * replace `SegyFileAsync` with `SegyFileWrapper` and refactor SEG-Y file utilities --------- Co-authored-by: Altay Sansal <[email protected]>
1 parent bdebc7e commit 88d178d

File tree

9 files changed

+211
-107
lines changed

9 files changed

+211
-107
lines changed

src/mdio/converters/segy.py

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,7 @@
44

55
import base64
66
import logging
7-
import multiprocessing as mp
87
import os
9-
from concurrent.futures import ProcessPoolExecutor
108
from typing import TYPE_CHECKING
119

1210
import numpy as np
@@ -37,8 +35,7 @@
3735
from mdio.core.utils_write import MAX_SIZE_LIVE_MASK
3836
from mdio.core.utils_write import get_constrained_chunksize
3937
from mdio.segy import blocked_io
40-
from mdio.segy._workers import SegyFileInfo
41-
from mdio.segy._workers import info_worker
38+
from mdio.segy.file import get_segy_file_info
4239
from mdio.segy.scalar import SCALE_COORDINATE_KEYS
4340
from mdio.segy.scalar import _apply_coordinate_scalar
4441
from mdio.segy.utilities import get_grid_plan
@@ -55,7 +52,8 @@
5552
from mdio.builder.schemas import Dataset
5653
from mdio.builder.templates.abstract_dataset_template import AbstractDatasetTemplate
5754
from mdio.core.dimension import Dimension
58-
from mdio.segy._workers import SegyFileArguments
55+
from mdio.segy.file import SegyFileArguments
56+
from mdio.segy.file import SegyFileInfo
5957

6058
logger = logging.getLogger(__name__)
6159

@@ -167,19 +165,6 @@ def _scan_for_headers(
167165
return segy_dimensions, segy_headers
168166

169167

170-
def _read_segy_file_info(segy_file_kwargs: SegyFileArguments) -> SegyFileInfo:
171-
"""Read SEG-Y file in a separate process.
172-
173-
This is an ugly workaround for Zarr issues 3487 'Explicitly using fsspec and zarr FsspecStore causes
174-
RuntimeError "Task attached to a different loop"'
175-
"""
176-
# TODO (Dmitriy Repin): when Zarr issue 3487 is resolved, we can remove this workaround
177-
# https://github.com/zarr-developers/zarr-python/issues/3487
178-
with ProcessPoolExecutor(max_workers=1, mp_context=mp.get_context("spawn")) as executor:
179-
future = executor.submit(info_worker, segy_file_kwargs)
180-
return future.result()
181-
182-
183168
def _build_and_check_grid(
184169
segy_dimensions: list[Dimension],
185170
segy_file_info: SegyFileInfo,
@@ -531,7 +516,7 @@ def segy_to_mdio( # noqa PLR0913
531516
"settings": segy_settings,
532517
"header_overrides": segy_header_overrides,
533518
}
534-
segy_file_info = _read_segy_file_info(segy_file_kwargs)
519+
segy_file_info = get_segy_file_info(segy_file_kwargs)
535520

536521
segy_dimensions, segy_headers = _scan_for_headers(
537522
segy_file_kwargs,

src/mdio/segy/_workers.py

Lines changed: 4 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,17 @@
44

55
import logging
66
import os
7-
from dataclasses import dataclass
87
from typing import TYPE_CHECKING
9-
from typing import TypedDict
108

119
import numpy as np
12-
from segy import SegyFile
1310
from segy.arrays import HeaderArray
1411

1512
from mdio.api.io import _normalize_storage_options
1613
from mdio.segy._raw_trace_wrapper import SegyFileRawTraceWrapper
17-
from mdio.segy.scalar import _get_coordinate_scalar
14+
from mdio.segy.file import SegyFileArguments
15+
from mdio.segy.file import SegyFileWrapper
1816

1917
if TYPE_CHECKING:
20-
from segy.config import SegyFileSettings
21-
from segy.config import SegyHeaderOverrides
22-
from segy.schema import SegySpec
2318
from upath import UPath
2419
from zarr import Array as zarr_Array
2520

@@ -30,22 +25,9 @@
3025
from mdio.builder.schemas.v1.stats import SummaryStatistics
3126
from mdio.constants import fill_value_map
3227

33-
if TYPE_CHECKING:
34-
from numpy.typing import NDArray
35-
36-
3728
logger = logging.getLogger(__name__)
3829

3930

40-
class SegyFileArguments(TypedDict):
41-
"""Arguments to open SegyFile instance creation."""
42-
43-
url: str
44-
spec: SegySpec | None
45-
settings: SegyFileSettings | None
46-
header_overrides: SegyHeaderOverrides | None
47-
48-
4931
def header_scan_worker(
5032
segy_file_kwargs: SegyFileArguments,
5133
trace_range: tuple[int, int],
@@ -64,7 +46,7 @@ def header_scan_worker(
6446
Returns:
6547
HeaderArray parsed from SEG-Y library.
6648
"""
67-
segy_file = SegyFile(**segy_file_kwargs)
49+
segy_file = SegyFileWrapper(**segy_file_kwargs)
6850

6951
slice_ = slice(*trace_range)
7052

@@ -122,7 +104,7 @@ def trace_worker( # noqa: PLR0913
122104
return None
123105

124106
# Open the SEG-Y file in this process since the open file handles cannot be shared across processes.
125-
segy_file = SegyFile(**segy_file_kwargs)
107+
segy_file = SegyFileWrapper(**segy_file_kwargs)
126108

127109
# Setting the zarr config to 1 thread to ensure we honor the `MDIO__IMPORT__MAX_WORKERS` environment variable.
128110
# The Zarr 3 engine utilizes multiple threads. This can lead to resource contention and unpredictable memory usage.
@@ -185,52 +167,3 @@ def trace_worker( # noqa: PLR0913
185167
sum_squares=(np.ma.power(nonzero_samples, 2).sum(dtype="float64")),
186168
histogram=histogram,
187169
)
188-
189-
190-
@dataclass
191-
class SegyFileInfo:
192-
"""SEG-Y file header information."""
193-
194-
num_traces: int
195-
sample_labels: NDArray[np.int32]
196-
text_header: str
197-
binary_header_dict: dict
198-
raw_binary_headers: bytes
199-
coordinate_scalar: int
200-
201-
202-
def info_worker(segy_file_kwargs: SegyFileArguments) -> SegyFileInfo:
203-
"""Reads information from a SEG-Y file.
204-
205-
Args:
206-
segy_file_kwargs: Arguments to open SegyFile instance.
207-
208-
Returns:
209-
SegyFileInfo containing number of traces, sample labels, and header info.
210-
"""
211-
segy_file = SegyFile(**segy_file_kwargs)
212-
num_traces = segy_file.num_traces
213-
sample_labels = segy_file.sample_labels
214-
215-
text_header = segy_file.text_header
216-
217-
# Get header information directly
218-
raw_binary_headers = segy_file.fs.read_block(
219-
fn=segy_file.url,
220-
offset=segy_file.spec.binary_header.offset,
221-
length=segy_file.spec.binary_header.itemsize,
222-
)
223-
224-
# We read here twice, but it's ok for now. Only 400-bytes.
225-
binary_header_dict = segy_file.binary_header.to_dict()
226-
227-
coordinate_scalar = _get_coordinate_scalar(segy_file)
228-
229-
return SegyFileInfo(
230-
num_traces=num_traces,
231-
sample_labels=sample_labels,
232-
text_header=text_header,
233-
binary_header_dict=binary_header_dict,
234-
raw_binary_headers=raw_binary_headers,
235-
coordinate_scalar=coordinate_scalar,
236-
)

src/mdio/segy/blocked_io.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
from xarray import Dataset as xr_Dataset
3636
from zarr import Array as zarr_Array
3737

38-
from mdio.segy._workers import SegyFileArguments
38+
from mdio.segy.file import SegyFileArguments
3939

4040
default_cpus = cpu_count(logical=True)
4141

src/mdio/segy/file.py

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
"""SEG-Y async file support and utilities."""
2+
3+
from __future__ import annotations
4+
5+
import asyncio
6+
import atexit
7+
import logging
8+
import os
9+
import threading
10+
from dataclasses import dataclass
11+
from typing import TYPE_CHECKING
12+
from typing import TypedDict
13+
14+
import fsspec
15+
from fsspec.asyn import AsyncFileSystem
16+
from fsspec.utils import get_protocol
17+
from segy import SegyFile
18+
from segy.config import SegyFileSettings
19+
20+
from mdio.segy.scalar import _get_coordinate_scalar
21+
22+
if TYPE_CHECKING:
23+
from pathlib import Path
24+
25+
from numpy import int32
26+
from numpy.typing import NDArray
27+
from segy.config import SegyHeaderOverrides
28+
from segy.schema.segy import SegySpec
29+
30+
31+
logger = logging.getLogger(__name__)
32+
33+
# Timeout in seconds for stopping async event loop threads during cleanup
34+
MDIO_ASYNCIO_THREAD_STOP_TIMEOUT = 5.0
35+
36+
37+
class SegyFileArguments(TypedDict):
38+
"""Arguments to open SegyFile instance creation."""
39+
40+
url: Path | str
41+
spec: SegySpec | None
42+
settings: SegyFileSettings | None
43+
header_overrides: SegyHeaderOverrides | None
44+
45+
46+
@dataclass
47+
class SegyFileInfo:
48+
"""SEG-Y file header information."""
49+
50+
num_traces: int
51+
sample_labels: NDArray[int32]
52+
text_header: str
53+
binary_header_dict: dict
54+
raw_binary_headers: bytes
55+
coordinate_scalar: int
56+
57+
58+
def _start_asyncio_loop(segy_file_kwargs: SegyFileArguments) -> None:
59+
"""Start asyncio event loop for async filesystems.
60+
61+
If the filesystem is async (e.g., S3, GCS, Azure), creates a new event loop
62+
in a daemon thread and injects it into the storage options.
63+
64+
Args:
65+
segy_file_kwargs: SEG-Y file arguments that will be modified to include the loop.
66+
"""
67+
protocol = get_protocol(str(segy_file_kwargs["url"]))
68+
# Get the filesystem class without instantiating it
69+
fs_class = fsspec.get_filesystem_class(protocol)
70+
# Only create event loop for async filesystems
71+
is_async = issubclass(fs_class, AsyncFileSystem)
72+
if not is_async:
73+
return
74+
75+
# Create a new event loop and thread to run it in a daemon thread.
76+
loop_asyncio = asyncio.new_event_loop()
77+
th_asyncio = threading.Thread(
78+
target=loop_asyncio.run_forever,
79+
name=f"mdio-{os.getpid()}",
80+
daemon=True,
81+
)
82+
th_asyncio.start()
83+
84+
# Add the loop to the storage options to pass as a parameter to AsyncFileSystem.
85+
# Create a new settings object to avoid modifying the original (which may be shared).
86+
old_settings = segy_file_kwargs.get("settings") or SegyFileSettings()
87+
storage_options = {**(old_settings.storage_options or {}), "loop": loop_asyncio}
88+
segy_file_kwargs["settings"] = SegyFileSettings(
89+
endianness=old_settings.endianness,
90+
storage_options=storage_options,
91+
)
92+
93+
# Register a function to stop the event loop and join the thread.
94+
atexit.register(_stop_asyncio_loop, loop_asyncio, th_asyncio)
95+
96+
97+
def _stop_asyncio_loop(loop_asyncio: asyncio.AbstractEventLoop, th_asyncio: threading.Thread) -> None:
98+
"""Stop the asyncio event loop and join the thread.
99+
100+
Args:
101+
loop_asyncio: The asyncio event loop to stop.
102+
th_asyncio: The thread running the event loop.
103+
"""
104+
if loop_asyncio.is_running():
105+
loop_asyncio.call_soon_threadsafe(loop_asyncio.stop)
106+
107+
th_asyncio.join(timeout=MDIO_ASYNCIO_THREAD_STOP_TIMEOUT)
108+
109+
if th_asyncio.is_alive():
110+
# Thread did not terminate within timeout, but daemon threads will be
111+
# terminated by Python interpreter on exit anyway
112+
logger.warning(
113+
"Async event loop thread '%s' did not terminate within %s seconds",
114+
th_asyncio.name,
115+
MDIO_ASYNCIO_THREAD_STOP_TIMEOUT,
116+
)
117+
118+
119+
class SegyFileWrapper(SegyFile):
120+
"""SEG-Y file that can be instantiated side by side with Zarr for cloud access.
121+
122+
This is a workaround for Zarr issues 3487 'Explicitly using fsspec and zarr FsspecStore causes
123+
RuntimeError "Task attached to a different loop"'
124+
125+
# TODO (Dmitriy Repin): when Zarr issue 3487 is resolved, we can remove this workaround
126+
# https://github.com/zarr-developers/zarr-python/issues/3487
127+
128+
Args:
129+
url: Path to the SEG-Y file.
130+
spec: SEG-Y specification.
131+
settings: SEG-Y settings.
132+
header_overrides: SEG-Y header overrides.
133+
"""
134+
135+
def __init__(
136+
self,
137+
url: Path | str,
138+
spec: SegySpec | None = None,
139+
settings: SegyFileSettings | None = None,
140+
header_overrides: SegyHeaderOverrides | None = None,
141+
):
142+
args = SegyFileArguments(
143+
url=url,
144+
spec=spec,
145+
settings=settings,
146+
header_overrides=header_overrides,
147+
)
148+
_start_asyncio_loop(args)
149+
super().__init__(**args)
150+
151+
152+
def get_segy_file_info(segy_file_kwargs: SegyFileArguments) -> SegyFileInfo:
153+
"""Reads information from a SEG-Y file.
154+
155+
Args:
156+
segy_file_kwargs: Arguments to open SegyFile instance.
157+
158+
Returns:
159+
SegyFileInfo containing number of traces, sample labels, and header info.
160+
"""
161+
segy_file = SegyFileWrapper(**segy_file_kwargs)
162+
num_traces = segy_file.num_traces
163+
sample_labels = segy_file.sample_labels
164+
165+
text_header = segy_file.text_header
166+
167+
# Get header information directly
168+
raw_binary_headers = segy_file.fs.read_block(
169+
fn=segy_file.url,
170+
offset=segy_file.spec.binary_header.offset,
171+
length=segy_file.spec.binary_header.itemsize,
172+
)
173+
174+
# We read here twice, but it's ok for now. Only 400-bytes.
175+
binary_header_dict = segy_file.binary_header.to_dict()
176+
177+
coordinate_scalar = _get_coordinate_scalar(segy_file)
178+
179+
return SegyFileInfo(
180+
num_traces=num_traces,
181+
sample_labels=sample_labels,
182+
text_header=text_header,
183+
binary_header_dict=binary_header_dict,
184+
raw_binary_headers=raw_binary_headers,
185+
coordinate_scalar=coordinate_scalar,
186+
)

src/mdio/segy/parsers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
if TYPE_CHECKING:
1919
from segy.arrays import HeaderArray
2020

21-
from mdio.segy._workers import SegyFileArguments
21+
from mdio.segy.file import SegyFileArguments
2222

2323
default_cpus = cpu_count(logical=True)
2424

src/mdio/segy/utilities.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
from segy.arrays import HeaderArray
2020

2121
from mdio.builder.templates.abstract_dataset_template import AbstractDatasetTemplate
22-
from mdio.segy._workers import SegyFileArguments
23-
from mdio.segy._workers import SegyFileInfo
22+
from mdio.segy.file import SegyFileArguments
23+
from mdio.segy.file import SegyFileInfo
2424

2525
logger = logging.getLogger(__name__)
2626

0 commit comments

Comments
 (0)