Skip to content

Commit 00beeea

Browse files
committed
Strip out local gcs fake server and revert some regressions
1 parent 20cca4b commit 00beeea

File tree

5 files changed

+19
-60
lines changed

5 files changed

+19
-60
lines changed

src/mdio/api/io.py

Lines changed: 11 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
from xarray.backends.writers import to_zarr as xr_to_zarr
1414

1515
from mdio.constants import ZarrFormat
16-
from mdio.core.config import MDIOSettings
1716
from mdio.core.zarr_io import zarr_warnings_suppress_unstable_structs_v3
1817

1918
if TYPE_CHECKING:
@@ -26,39 +25,13 @@
2625

2726

2827
def _normalize_path(path: UPath | Path | str) -> UPath:
29-
"""Normalize a path to a UPath."""
3028
return UPath(path)
3129

3230

3331
def _normalize_storage_options(path: UPath) -> dict[str, Any] | None:
34-
"""Normalize storage options from UPath."""
3532
return None if len(path.storage_options) == 0 else path.storage_options
3633

3734

38-
def _get_gcs_store(path: UPath) -> tuple[Any, dict[str, Any] | None]:
39-
"""Get store and storage options, using local fake GCS server if enabled.
40-
41-
Args:
42-
path: UPath pointing to storage location.
43-
44-
Returns:
45-
Tuple of (store, storage_options) where store is either a mapper or path string.
46-
"""
47-
settings = MDIOSettings()
48-
49-
if settings.local_gcs_server and str(path).startswith("gs://"):
50-
import gcsfs # noqa: PLC0415
51-
52-
fs = gcsfs.GCSFileSystem(
53-
endpoint_url="http://localhost:4443",
54-
token="anon", # noqa: S106
55-
)
56-
store = fs.get_mapper(path.as_posix().replace("gs://", ""))
57-
return store, None
58-
59-
return path.as_posix(), _normalize_storage_options(path)
60-
61-
6235
def open_mdio(input_path: UPath | Path | str, chunks: T_Chunks = None) -> xr_Dataset:
6336
"""Open a Zarr dataset from the specified universal file path.
6437
@@ -114,22 +87,17 @@ def to_mdio( # noqa: PLR0913
11487
the region of existing MDIO array(s) in which to write this dataset's data.
11588
"""
11689
output_path = _normalize_path(output_path)
90+
storage_options = _normalize_storage_options(output_path)
11791
zarr_format = zarr.config.get("default_zarr_format")
11892

119-
store, storage_options = _get_gcs_store(output_path)
120-
121-
kwargs = {
122-
"dataset": dataset,
123-
"store": store,
124-
"mode": mode,
125-
"compute": compute,
126-
"consolidated": zarr_format == ZarrFormat.V2,
127-
"region": region,
128-
"write_empty_chunks": False,
129-
}
130-
131-
if storage_options is not None and not isinstance(store, dict):
132-
kwargs["storage_options"] = storage_options
133-
13493
with zarr_warnings_suppress_unstable_structs_v3():
135-
xr_to_zarr(**kwargs)
94+
xr_to_zarr(
95+
dataset,
96+
store=output_path.as_posix(), # xarray doesn't like URI when file:// is protocol
97+
mode=mode,
98+
compute=compute,
99+
consolidated=zarr_format == ZarrFormat.V2, # on for v2, off for v3
100+
region=region,
101+
storage_options=storage_options,
102+
write_empty_chunks=False,
103+
)

src/mdio/converters/segy.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -597,6 +597,7 @@ def segy_to_mdio( # noqa PLR0913
597597

598598
# Now we can drop them to simplify chunked write of the data variable
599599
xr_dataset = xr_dataset.drop_vars(drop_vars_delayed)
600+
600601
# Write the headers and traces in chunks using grid_map to indicate dead traces
601602
default_variable_name = mdio_template.default_variable_name
602603
# This is an memory-expensive and time-consuming read-write operation

src/mdio/core/config.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,5 @@ class MDIOSettings(BaseSettings):
5656
description="Whether to ignore validation checks",
5757
alias="MDIO_IGNORE_CHECKS",
5858
)
59-
local_gcs_server: bool = Field(
60-
default=False,
61-
description="Whether to use local fake GCS server for testing (localhost:4443)",
62-
alias="MDIO__LOCAL_GCS_SERVER",
63-
)
6459

6560
model_config = SettingsConfigDict(case_sensitive=True)

src/mdio/segy/_workers.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ def trace_worker( # noqa: PLR0913
9797
header_array: zarr_Array | None,
9898
raw_header_array: zarr_Array | None,
9999
region: dict[str, slice],
100-
grid_map_data: np.ndarray,
100+
grid_map: zarr_Array,
101101
) -> SummaryStatistics | None:
102102
"""Writes a subset of traces from a region of the dataset of Zarr file.
103103
@@ -108,7 +108,7 @@ def trace_worker( # noqa: PLR0913
108108
header_array: Zarr array for writing trace headers (or None if not needed).
109109
raw_header_array: Zarr array for writing raw headers (or None if not needed).
110110
region: Region of the dataset to write to.
111-
grid_map_data: Numpy array mapping live traces to their positions in the dataset.
111+
grid_map: Zarr array mapping live traces to their positions in the dataset.
112112
113113
Returns:
114114
SummaryStatistics object containing statistics about the written traces.
@@ -119,7 +119,7 @@ def trace_worker( # noqa: PLR0913
119119
segy_file = _worker_segy_file
120120

121121
region_slices = tuple(region.values())
122-
local_grid_map = grid_map_data[region_slices[:-1]] # minus last (vertical) axis
122+
local_grid_map = grid_map[region_slices[:-1]] # minus last (vertical) axis
123123

124124
# The dtype.max is the sentinel value for the grid map.
125125
# Normally, this is uint32, but some grids need to be promoted to uint64.

src/mdio/segy/blocked_io.py

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from tqdm.auto import tqdm
1616
from zarr import open_group as zarr_open_group
1717

18-
from mdio.api.io import _get_gcs_store
18+
from mdio.api.io import _normalize_storage_options
1919
from mdio.builder.schemas.v1.stats import CenteredBinHistogram
2020
from mdio.builder.schemas.v1.stats import SummaryStatistics
2121
from mdio.constants import ZarrFormat
@@ -84,10 +84,9 @@ def to_zarr( # noqa: PLR0913, PLR0915
8484
zarr_format = zarr.config.get("default_zarr_format")
8585

8686
# Open zarr group once in main process
87-
store, storage_options = _get_gcs_store(output_path)
88-
87+
storage_options = _normalize_storage_options(output_path)
8988
zarr_group = zarr_open_group(
90-
store,
89+
output_path.as_posix(),
9190
mode="r+",
9291
storage_options=storage_options,
9392
use_consolidated=zarr_format == ZarrFormat.V2,
@@ -100,10 +99,6 @@ def to_zarr( # noqa: PLR0913, PLR0915
10099
header_array = zarr_group.get("headers")
101100
raw_header_array = zarr_group.get("raw_headers")
102101

103-
# Convert grid_map to numpy array for serialization
104-
# grid_map is an in-memory zarr array, so we can read it all at once
105-
grid_map_data = grid_map[:]
106-
107102
# For Unix async writes with s3fs/fsspec & multiprocessing, use 'spawn' instead of default
108103
# 'fork' to avoid deadlocks on cloud stores. Slower but necessary. Default on Windows.
109104
num_workers = min(num_chunks, settings.import_cpus)
@@ -127,7 +122,7 @@ def to_zarr( # noqa: PLR0913, PLR0915
127122
header_array,
128123
raw_header_array,
129124
region,
130-
grid_map_data,
125+
grid_map,
131126
)
132127
futures.append(future)
133128

0 commit comments

Comments
 (0)