Skip to content

Commit fa9761a

Browse files
committed
Remove unnecessary global SegyFile
1 parent 20dbfeb commit fa9761a

File tree

2 files changed

+7
-33
lines changed

2 files changed

+7
-33
lines changed

src/mdio/segy/_workers.py

Lines changed: 3 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
from typing import TYPE_CHECKING
77

88
import numpy as np
9-
from segy import SegyFile
109
from segy.arrays import HeaderArray
1110

1211
from mdio.core.config import MDIOSettings
@@ -15,6 +14,7 @@
1514
from mdio.segy.file import SegyFileWrapper
1615

1716
if TYPE_CHECKING:
17+
from segy import SegyFile
1818
from zarr import Array as zarr_Array
1919

2020
from zarr.core.config import config as zarr_config
@@ -25,28 +25,6 @@
2525

2626
logger = logging.getLogger(__name__)
2727

28-
# Global variable to store opened segy file per worker process
29-
_worker_segy_file = None
30-
31-
32-
def _init_worker(segy_file_kwargs: SegyFileArguments) -> None:
33-
"""Initialize worker process with persistent segy file handle.
34-
35-
This function is called once per worker process to open the segy file,
36-
which is then reused across all tasks in that worker.
37-
38-
Args:
39-
segy_file_kwargs: Arguments to open SegyFile instance.
40-
"""
41-
global _worker_segy_file # noqa: PLW0603
42-
# TODO(BrianMichell): Diagnose and fix handles not being cleaned up on cloud2cloud ingesions.
43-
# https://github.com/TGSAI/mdio-python/pull/712
44-
# https://github.com/TGSAI/mdio-python/pull/701
45-
# The reason for having a global variable is to reduce the number of GET requests for opening the file.
46-
47-
# Open the SEG-Y file once per worker
48-
_worker_segy_file = SegyFile(**segy_file_kwargs)
49-
5028

5129
def header_scan_worker(
5230
segy_file_kwargs: SegyFileArguments,
@@ -91,6 +69,7 @@ def header_scan_worker(
9169

9270

9371
def trace_worker( # noqa: PLR0913
72+
segy_file: SegyFile,
9473
data_array: zarr_Array,
9574
header_array: zarr_Array | None,
9675
raw_header_array: zarr_Array | None,
@@ -99,9 +78,8 @@ def trace_worker( # noqa: PLR0913
9978
) -> SummaryStatistics | None:
10079
"""Writes a subset of traces from a region of the dataset of Zarr file.
10180
102-
Uses pre-opened segy file from _init_worker and receives zarr arrays directly.
103-
10481
Args:
82+
segy_file: The opened SEG-Y file.
10583
data_array: Zarr array for writing trace data.
10684
header_array: Zarr array for writing trace headers (or None if not needed).
10785
raw_header_array: Zarr array for writing raw headers (or None if not needed).
@@ -111,11 +89,6 @@ def trace_worker( # noqa: PLR0913
11189
Returns:
11290
SummaryStatistics object containing statistics about the written traces.
11391
"""
114-
global _worker_segy_file # noqa: PLW0602
115-
116-
# Use the pre-opened segy file from worker initialization
117-
segy_file = _worker_segy_file
118-
11992
# Setting the zarr config to 1 thread to ensure we honor the `MDIO__IMPORT__CPU_COUNT` environment variable.
12093
# The Zarr 3 engine utilizes multiple threads. This can lead to resource contention and unpredictable memory usage.
12194
zarr_config.set({"threading.max_workers": 1})

src/mdio/segy/blocked_io.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import zarr
1313
from dask.array import Array
1414
from dask.array import map_blocks
15+
from segy import SegyFile
1516
from tqdm.auto import tqdm
1617
from zarr import open_group as zarr_open_group
1718

@@ -21,7 +22,6 @@
2122
from mdio.constants import ZarrFormat
2223
from mdio.core.config import MDIOSettings
2324
from mdio.core.indexing import ChunkIterator
24-
from mdio.segy._workers import _init_worker
2525
from mdio.segy._workers import trace_worker
2626
from mdio.segy.creation import SegyPartRecord
2727
from mdio.segy.creation import concat_files
@@ -106,16 +106,17 @@ def to_zarr( # noqa: PLR0913, PLR0915
106106
executor = ProcessPoolExecutor(
107107
max_workers=num_workers,
108108
mp_context=context,
109-
initializer=_init_worker,
110-
initargs=(segy_file_kwargs,),
111109
)
112110

111+
segy_file = SegyFile(**segy_file_kwargs)
112+
113113
with executor:
114114
futures = []
115115
for region in chunk_iter:
116116
# Pass zarr array handles directly to workers
117117
future = executor.submit(
118118
trace_worker,
119+
segy_file,
119120
data_array,
120121
header_array,
121122
raw_header_array,

0 commit comments

Comments
 (0)