Skip to content

Commit 616971e

Browse files
committed
Fix regression causing metadata request overload
1 parent 693bf0b commit 616971e

File tree

2 files changed

+21
-12
lines changed

2 files changed

+21
-12
lines changed

src/mdio/segy/_workers.py

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,14 @@
99
import numpy as np
1010
from segy.arrays import HeaderArray
1111

12-
from mdio.api.io import _normalize_storage_options
1312
from mdio.segy._raw_trace_wrapper import SegyFileRawTraceWrapper
1413
from mdio.segy.file import SegyFileArguments
1514
from mdio.segy.file import SegyFileWrapper
1615

1716
if TYPE_CHECKING:
18-
from upath import UPath
1917
from zarr import Array as zarr_Array
18+
from zarr import Group as zarr_Group
2019

21-
from zarr import open_group as zarr_open_group
2220
from zarr.core.config import config as zarr_config
2321

2422
from mdio.builder.schemas.v1.stats import CenteredBinHistogram
@@ -75,7 +73,7 @@ def header_scan_worker(
7573

7674
def trace_worker( # noqa: PLR0913
7775
segy_file_kwargs: SegyFileArguments,
78-
output_path: UPath,
76+
zarr_group: zarr_Group,
7977
data_variable_name: str,
8078
region: dict[str, slice],
8179
grid_map: zarr_Array,
@@ -84,9 +82,7 @@ def trace_worker( # noqa: PLR0913
8482
8583
Args:
8684
segy_file_kwargs: Arguments to open SegyFile instance.
87-
output_path: Universal Path for the output Zarr dataset
88-
(e.g. local file path or cloud storage URI) the location
89-
also includes storage options for cloud storage.
85+
zarr_group: Zarr group to write to.
9086
data_variable_name: Name of the data variable to write.
9187
region: Region of the dataset to write to.
9288
grid_map: Zarr array mapping live traces to their positions in the dataset.
@@ -108,14 +104,11 @@ def trace_worker( # noqa: PLR0913
108104

109105
# Setting the zarr config to 1 thread to ensure we honor the `MDIO__IMPORT__MAX_WORKERS` environment variable.
110106
# The Zarr 3 engine utilizes multiple threads. This can lead to resource contention and unpredictable memory usage.
107+
# This remains set here to ensure that each worker does not use more than 1 thread.
111108
zarr_config.set({"threading.max_workers": 1})
112109

113110
live_trace_indexes = local_grid_map[not_null].tolist()
114111

115-
# Open the zarr group to write directly
116-
storage_options = _normalize_storage_options(output_path)
117-
zarr_group = zarr_open_group(output_path.as_posix(), mode="r+", storage_options=storage_options)
118-
119112
header_key = "headers"
120113
raw_header_key = "raw_headers"
121114

src/mdio/segy/blocked_io.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,22 @@ def to_zarr( # noqa: PLR0913, PLR0915
7474
"""
7575
data = dataset[data_variable_name]
7676

77+
storage_options = _normalize_storage_options(output_path)
78+
zarr_format = zarr.config.get("default_zarr_format")
79+
# TODO(BrianMichell): This reverts changes made in #701
80+
# https://github.com/TGSAI/mdio-python/pull/701
81+
zarr_group = zarr_open_group(
82+
output_path.as_posix(),
83+
mode="r+",
84+
storage_options=storage_options,
85+
use_consolidated=zarr_format == ZarrFormat.V2,
86+
zarr_version=zarr_format,
87+
zarr_format=zarr_format,
88+
)
89+
# Setting the zarr_format and zarr_version explicitly to avoid searching for the
90+
# correct version that has already been serialized.
91+
# We know it's correct to get from the existing config because this function is only used during ingestion.
92+
7793
final_stats = _create_stats()
7894

7995
data_variable_chunks = data.encoding.get("chunks")
@@ -90,7 +106,7 @@ def to_zarr( # noqa: PLR0913, PLR0915
90106

91107
with executor:
92108
futures = []
93-
common_args = (segy_file_kwargs, output_path, data_variable_name)
109+
common_args = (segy_file_kwargs, zarr_group, data_variable_name)
94110
for region in chunk_iter:
95111
subset_args = (region, grid_map)
96112
future = executor.submit(trace_worker, *common_args, *subset_args)

0 commit comments

Comments
 (0)