Skip to content

Commit 6b5a584

Browse files
authored
fix(zephyr): route block_size/cache_type to file opener, not FS constructor (#3121)
Fixes #3117. Sole remaining CW canary blocker. ## Problem `fsspec.open()` routes all `**kwargs` to the filesystem constructor, not to `fs.open()`. `S3FileSystem.__init__` has `default_block_size` (not `block_size`), so the kwarg leaks into `**kwargs` → `AioSession.__init__()`, which `aiobotocore 2.26.0` rejects: ``` AioSession.__init__() got an unexpected keyword argument 'block_size' ``` This means `block_size`, `cache_type`, and `maxblocks` were **never controlling S3 buffering** — they silently leaked to the session constructor, and older aiobotocore ignored them. <details> <summary>fsspec 2025.3.0 source trace</summary> 1. **`open()` (L491-500)**: passes `**kwargs` to `open_files()`. 2. **`open_files()` (L300)**: `get_fs_token_paths(urlpath, mode, storage_options=kwargs)` — all kwargs become filesystem constructor args. 3. **`open_files()` (L313-322)**: constructs `OpenFile(fs, path, mode, compression, ...)` — **no kwargs forwarded**. 4. **`OpenFile.__enter__()` (L105)**: `f = self.fs.open(self.path, mode=mode)` — **only path and mode** reach `fs.open()`. </details> ## Solution Replace `fsspec.open()` with `url_to_fs()` + `fs.open()` so that `block_size`, `cache_type`, and `cache_options` reach the file opener (`AbstractBufferedFile`) instead of the filesystem constructor. `AbstractFileSystem.open()` (spec.py L1310-1316) passes `block_size`, `cache_options`, and `**kwargs` directly to `_open()`, and handles `compression` at L1318-1324. **`readers.py`** — `open_file()`: ```python # Before: kwargs go to FS constructor → leak to AioSession with fsspec.open(file_path, mode, compression=compression, block_size=16_000_000, cache_type="background", maxblocks=2) as f: # After: kwargs go to fs.open() → reach AbstractBufferedFile/S3File fs, resolved_path = fsspec.core.url_to_fs(file_path) with fs.open(resolved_path, mode, block_size=_READ_BLOCK_SIZE, cache_type=_READ_CACHE_TYPE, cache_options={"maxblocks": _READ_MAX_BLOCKS}, compression=compression) as f: ``` **`writers.py`** — 4 call sites: ```python # Before: block_size goes to FS constructor → AioSession crash with fsspec.open(temp_path, "wb", block_size=64 * 1024 * 1024) as f: # After: block_size goes to fs.open() → controls multipart upload part size fs, resolved_temp = fsspec.core.url_to_fs(temp_path) with fs.open(resolved_temp, "wb", block_size=_WRITE_BLOCK_SIZE) as f: ``` <details> <summary>Backend routing</summary> **S3**: `S3FileSystem._open(block_size=16M, cache_type="background", cache_options={"maxblocks": 2})` → `S3File` → `BackgroundBlockCache(blocksize=16M, maxblocks=2)`. **Local**: `LocalFileSystem._open(block_size=16M, **kwargs)` — `block_size` is a named param (silently ignored), remaining kwargs absorbed by `LocalFileOpener(**kwargs)`. No crash, no effect. </details> <details> <summary>Note: compresslevel=1 was also misrouted (pre-existing)</summary> The `.gz` writer branch had `compresslevel=1` in `fsspec.open()`. This also leaked to the FS constructor — fsspec's compression wrapper calls `compress(f, mode=mode[0])` with no extra kwargs. Dropped in this PR; fixing gzip compression level is a separate concern. </details> ## Safety This change makes the buffering settings actually work for the first time. Previously, S3 reads used fsspec defaults (5MB blocks, "readahead" cache). After: 16MB blocks with background prefetch. S3 writes go from 50MB to 64MB multipart parts. Local file IO is unaffected. ## Testing - [x] `test_backends.py` (9/9), full zephyr suite (351/351) - [x] Manual R2 test: reproduced `AioSession` crash with old code, confirmed fix with new code - [ ] Post-merge: re-run CW canary ferry
1 parent 8d752a7 commit 6b5a584

File tree

3 files changed

+30
-10
lines changed

3 files changed

+30
-10
lines changed

lib/iris/src/iris/cluster/k8s/kubectl.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import logging
2727
import os
2828
import subprocess
29+
import time
2930
from dataclasses import dataclass, field
3031
from datetime import datetime, timezone
3132

@@ -115,9 +116,13 @@ def run(
115116
logger.info("kubectl: %s\n stdin=%s", " ".join(cmd), stdin[:2000])
116117
else:
117118
logger.info("kubectl: %s", " ".join(cmd))
119+
t0 = time.monotonic()
118120
result = subprocess.run(cmd, input=stdin, capture_output=True, text=True, timeout=effective_timeout)
121+
elapsed_ms = (time.monotonic() - t0) * 1000
119122
if result.returncode != 0:
120-
logger.info("kubectl exit %d: stderr=%s", result.returncode, result.stderr.strip()[:500])
123+
logger.info("kubectl exit %d: %dms stderr=%s", result.returncode, elapsed_ms, result.stderr.strip()[:500])
124+
elif elapsed_ms > 2000:
125+
logger.warning("kubectl slow: %dms cmd=%s", elapsed_ms, " ".join(args))
121126
return result
122127

123128
def apply_json(self, manifest: dict) -> None:

lib/zephyr/src/zephyr/readers.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@
2525

2626
logger = logging.getLogger(__name__)
2727

28+
# 16 MB read blocks with background prefetch for S3/remote reads.
29+
_READ_BLOCK_SIZE = 16_000_000
30+
_READ_CACHE_TYPE = "background"
31+
_READ_MAX_BLOCKS = 2
32+
2833

2934
@dataclass
3035
class InputFileSpec:
@@ -77,13 +82,18 @@ def open_file(file_path: str, mode: str = "rb"):
7782
elif file_path.endswith(".xz"):
7883
compression = "xz"
7984

80-
with fsspec.open(
81-
file_path,
85+
# Use url_to_fs + fs.open so that block_size/cache_type reach the file
86+
# opener (AbstractBufferedFile) rather than the filesystem constructor.
87+
# fsspec.open() routes all **kwargs to the FS constructor, where S3's
88+
# AioSession rejects unknown kwargs like block_size.
89+
fs, resolved_path = fsspec.core.url_to_fs(file_path)
90+
with fs.open(
91+
resolved_path,
8292
mode,
93+
block_size=_READ_BLOCK_SIZE,
94+
cache_type=_READ_CACHE_TYPE,
95+
cache_options={"maxblocks": _READ_MAX_BLOCKS},
8396
compression=compression,
84-
block_size=16_000_000,
85-
cache_type="background",
86-
maxblocks=2,
8797
) as f:
8898
yield f
8999

lib/zephyr/src/zephyr/writers.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121

2222
logger = logging.getLogger(__name__)
2323

24+
# 64 MB write blocks — controls S3 multipart upload part size.
25+
_WRITE_BLOCK_SIZE = 64 * 1024 * 1024
26+
2427

2528
def unique_temp_path(output_path: str) -> str:
2629
"""Return a unique temporary path derived from ``output_path``.
@@ -81,22 +84,23 @@ def write_jsonl_file(records: Iterable, output_path: str) -> dict:
8184
encoder = msgspec.json.Encoder()
8285

8386
with atomic_rename(output_path) as temp_path:
87+
fs, resolved_temp = fsspec.core.url_to_fs(temp_path)
8488
if output_path.endswith(".zst"):
8589
import zstandard as zstd
8690

8791
cctx = zstd.ZstdCompressor(level=2, threads=1)
88-
with fsspec.open(temp_path, "wb", block_size=64 * 1024 * 1024) as raw_f:
92+
with fs.open(resolved_temp, "wb", block_size=_WRITE_BLOCK_SIZE) as raw_f:
8993
with cctx.stream_writer(raw_f) as f:
9094
for record in records:
9195
f.write(encoder.encode(record) + b"\n")
9296
count += 1
9397
elif output_path.endswith(".gz"):
94-
with fsspec.open(temp_path, "wb", compression="gzip", compresslevel=1, block_size=64 * 1024 * 1024) as f:
98+
with fs.open(resolved_temp, "wb", block_size=_WRITE_BLOCK_SIZE, compression="gzip") as f:
9599
for record in records:
96100
f.write(encoder.encode(record) + b"\n")
97101
count += 1
98102
else:
99-
with fsspec.open(temp_path, "wb", block_size=64 * 1024 * 1024) as f:
103+
with fs.open(resolved_temp, "wb", block_size=_WRITE_BLOCK_SIZE) as f:
100104
for record in records:
101105
f.write(encoder.encode(record) + b"\n")
102106
count += 1
@@ -367,7 +371,8 @@ def write_binary_file(records: Iterable[bytes], output_path: str) -> dict:
367371

368372
count = 0
369373
with atomic_rename(output_path) as temp_path:
370-
with fsspec.open(temp_path, "wb", block_size=64 * 1024 * 1024) as f:
374+
fs, resolved_temp = fsspec.core.url_to_fs(temp_path)
375+
with fs.open(resolved_temp, "wb", block_size=_WRITE_BLOCK_SIZE) as f:
371376
for record in records:
372377
f.write(record)
373378
count += 1

0 commit comments

Comments
 (0)