Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion gcsfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1168,8 +1168,14 @@ def url(self, path):

async def _cat_file(self, path, start=None, end=None, **kwargs):
"""Simple one-shot get of file data"""
# if start and end are both provided and valid, but start >= end, return empty bytes
# Otherwise, _process_limits would generate an invalid HTTP range (e.g. "bytes=5-4"
# for start=5, end=5), causing the server to return the whole file instead of nothing.
if start is not None and end is not None and start >= end >= 0:
return b""
u2 = self.url(path)
if start or end:
# 'if start or end' fails when start=0 or end=0 because 0 is Falsey.
if start is not None or end is not None:
head = {"Range": await self._process_limits(path, start, end)}
else:
head = {}
Expand Down Expand Up @@ -1993,6 +1999,11 @@ def __init__(
Timeout seconds for the asynchronous callback.
generation: str
Object generation.
supports_append: bool
If True, allows opening file in append mode. This is generally not supported
by GCS, but may be supported by subclasses (e.g. ZonalFile). This flag
should be set by subclasses that support append operations. Otherwise,
the mode will be overwritten to "wb" mode with a warning.
"""
bucket, key, path_generation = gcsfs.split_path(path)
if not key:
Expand Down
56 changes: 29 additions & 27 deletions gcsfs/extended_gcsfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import uuid
from enum import Enum
from glob import has_magic
from io import BytesIO

from fsspec import asyn
from fsspec.callbacks import NoOpCallback
Expand Down Expand Up @@ -201,17 +200,15 @@ async def _process_limits_to_offset_and_length(

Returns:
tuple: A tuple containing (offset, length).

Raises:
ValueError: If the calculated range is invalid.
"""
size = file_size

if start is None:
offset = 0
elif start < 0:
size = (await self._info(path))["size"] if size is None else size
offset = size + start
# If start is negative and larger than the file size, we should start from 0.
offset = max(0, size + start)
else:
offset = start

Expand All @@ -224,14 +221,9 @@ async def _process_limits_to_offset_and_length(
else:
effective_end = end

if offset < 0:
raise ValueError(f"Calculated start offset ({offset}) cannot be negative.")
if effective_end < offset:
raise ValueError(
f"Calculated end position ({effective_end}) cannot be before start offset ({offset})."
)
elif effective_end == offset:
length = 0 # Handle zero-length slice
# If the requested end is before/ same as the start, return empty.
if effective_end <= offset:
return offset, 0
else:
length = effective_end - offset # Normal case
size = (await self._info(path))["size"] if size is None else size
Expand Down Expand Up @@ -279,7 +271,7 @@ async def _fetch_range_split(
file_size = size or mrd.persisted_size
if file_size is None:
logger.warning(
"AsyncMultiRangeDownloader (MRD) has no 'persisted_size'. "
f"AsyncMultiRangeDownloader (MRD) for {path} has no 'persisted_size'. "
"Falling back to _info() to get the file size."
)
file_size = (await self._info(path))["size"]
Expand All @@ -294,22 +286,13 @@ async def _fetch_range_split(
):
raise RuntimeError("Request not satisfiable.")

buffers = [] # To hold the results in order
read_ranges = [] # To pass to MRD

for length in chunk_lengths:
buf = BytesIO()
buffers.append(buf)

if length > 0:
read_ranges.append((current_offset, length, buf))

read_ranges.append((current_offset, length))
current_offset += length

if read_ranges:
await mrd.download_ranges(read_ranges)

return [b.getvalue() for b in buffers]
return await zb_hns_utils.download_ranges(read_ranges, mrd)
else:
end = kwargs.get("end")
offset, length = await self._process_limits_to_offset_and_length(
Expand Down Expand Up @@ -359,7 +342,7 @@ async def _cat_file(self, path, start=None, end=None, mrd=None, **kwargs):
file_size = mrd.persisted_size
if file_size is None:
logger.warning(
"AsyncMultiRangeDownloader (MRD) exists but has no 'persisted_size'. "
f"AsyncMultiRangeDownloader (MRD) for {path} has no 'persisted_size'. "
"Falling back to _info() to get the file size. "
"This may result in incorrect behavior for unfinalized objects."
)
Expand Down Expand Up @@ -1330,7 +1313,7 @@ async def _get_file(self, rpath, lpath, callback=None, **kwargs):
size = mrd.persisted_size
if size is None:
logger.warning(
"AsyncMultiRangeDownloader (MRD) has no 'persisted_size'. "
f"AsyncMultiRangeDownloader (MRD) for {rpath} has no 'persisted_size'. "
"Falling back to _info() to get the file size. "
"This may result in incorrect behavior for unfinalized objects."
)
Expand Down Expand Up @@ -1425,6 +1408,25 @@ async def _cp_file(self, path1, path2, acl=None, **kwargs):
"Zonal objects do not support rewrite."
)

async def _merge(self, path, paths, acl=None):
"""Concatenate objects within a single bucket.

For Standard GCS buckets, falls back to the parent class's implementation

Zonal Bucket Support:
Server-side compose is currently NOT supported for Zonal buckets.
"""
bucket, _, _ = self.split_path(path)

if await self._is_zonal_bucket(bucket):
raise NotImplementedError(
"Server-side compose/merge is not supported for Zonal buckets."
)

return await super()._merge(path, paths, acl=acl)

merge = asyn.sync_wrapper(_merge)


async def upload_chunk(fs, location, data, offset, size, content_type):
"""
Expand Down
4 changes: 4 additions & 0 deletions gcsfs/tests/test_extended_gcsfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ async def download_side_effect(read_requests, **kwargs):
side_effect=download_side_effect
)
mock_downloader.persisted_size = None
mock_downloader.bucket_name = "mock_bucket"
mock_downloader.object_name = "mock_object"

mock_create_mrd = mock.AsyncMock(return_value=mock_downloader)
with (
Expand Down Expand Up @@ -992,6 +994,8 @@ async def download_side_effect(read_requests, **kwargs):
downloader.download_ranges = mock.AsyncMock(side_effect=download_side_effect)
downloader.persisted_size = len(file_data)
downloader.close = mock.AsyncMock()
downloader.bucket_name = "bucket"
downloader.object_name = object_name
return downloader


Expand Down
88 changes: 66 additions & 22 deletions gcsfs/tests/test_extended_gcsfs_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,38 +92,53 @@ def test_open_uses_correct_blocksize_and_consistency_for_all_bucket_types(


@pytest.mark.parametrize(
"start,end,exp_offset,exp_length,exp_exc",
"start, end, exp_offset, exp_length",
[
(None, None, 0, file_size, None), # full file
(-10, None, file_size - 10, 10, None), # start negative
(10, -10, 10, file_size - 20, None), # end negative
(20, 20, 20, 0, None), # zero-length slice
(50, 40, None, None, ValueError), # end before start -> raises
(-200, None, None, None, ValueError), # offset negative -> raises
(file_size - 10, 200, file_size - 10, 10, None), # end > size clamps
# --- Standard Slicing ---
(None, None, 0, file_size), # Full file: data[:]
(10, 20, 10, 10), # Middle slice: data[10:20]
(None, 10, 0, 10), # Prefix: data[:10]
(10, None, 10, file_size - 10), # Suffix: data[10:]
# --- Negative Indices ---
(-10, None, file_size - 10, 10), # Last N bytes: data[-10:]
(None, -10, 0, file_size - 10), # All except last N: data[:-10]
(10, -10, 10, file_size - 20), # Positive start, Negative end: data[10:-10]
# --- Zero Length & Empty Reads ---
(20, 20, 20, 0), # Zero-length (Start == End): data[20:20]
(50, 40, 50, 0), # Crossover (Start > End): data[50:40] -> Empty
(
file_size + 10,
file_size + 20,
None,
file_size + 10,
0,
), # Start past EOF: data[110:] -> Empty
# --- Overshoot & Clamping ---
(
-file_size * 2,
None,
), # offset > size -> empty
0,
file_size,
), # Start Overshoot: data[-200:] -> Whole file
(
file_size - 10,
file_size + 100,
file_size - 10,
10,
), # End Overshoot: data[90:200] -> Last 10 bytes
],
)
def test_process_limits_parametrized(
extended_gcsfs, start, end, exp_offset, exp_length, exp_exc
extended_gcsfs, start, end, exp_offset, exp_length
):
if exp_exc is not None:
with pytest.raises(exp_exc):
extended_gcsfs.sync_process_limits_to_offset_and_length(
file_path, start, end
)
else:
offset, length = extended_gcsfs.sync_process_limits_to_offset_and_length(
file_path, start, end
)
assert offset == exp_offset
assert length == exp_length
"""
Verifies that start/end limits are correctly converted to offset/length
"""
offset, length = extended_gcsfs.sync_process_limits_to_offset_and_length(
file_path, start, end
)

assert offset == exp_offset
assert length == exp_length


def test_process_limits_when_file_size_passed(extended_gcsfs):
Expand Down Expand Up @@ -578,3 +593,32 @@ async def test_get_file_exception_cleanup(

# The local file should not exist after the failed download
assert not lpath.exists()


@pytest.mark.asyncio
async def test_merge_zonal_not_supported(async_gcs, zonal_write_mocks):
"""Test _merge for Zonal buckets raises NotImplementedError."""
path = f"{TEST_ZONAL_BUCKET}/merged_file"
paths = [f"{TEST_ZONAL_BUCKET}/file1", f"{TEST_ZONAL_BUCKET}/file2"]

with pytest.raises(
NotImplementedError,
match="Server-side compose/merge is not supported for Zonal buckets.",
):
await async_gcs._merge(path, paths)


@pytest.mark.asyncio
async def test_merge_delegates_to_core_for_non_zonal(async_gcs):
"""Test _merge delegates to core._merge when the bucket is not zonal."""
path = f"{TEST_BUCKET}/merged_file"
paths = [f"{TEST_BUCKET}/file1", f"{TEST_BUCKET}/file2"]

with (
mock.patch.object(async_gcs, "_is_zonal_bucket", return_value=False),
mock.patch(
"gcsfs.core.GCSFileSystem._merge", new_callable=mock.AsyncMock
) as mock_core_merge,
):
await async_gcs._merge(path, paths, acl="public-read")
mock_core_merge.assert_awaited_once_with(path, paths, acl="public-read")
73 changes: 73 additions & 0 deletions gcsfs/tests/test_zb_hns_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,76 @@ async def test_close_mrd(caplog):
"Error closing AsyncMultiRangeDownloader for test-bucket/test-object: Close failed"
in caplog.text
)


@pytest.mark.asyncio
@pytest.mark.parametrize(
"ranges, expected_call_count",
[
([(0, 5), (10, 3)], 1), # Basic case
([(0, 4), (5, 0), (10, 3)], 1), # Mixed empty (should filter middle)
([(0, 0), (10, 0)], 0), # All empty (should not call MRD)
([], 0), # Empty list
],
ids=["basic", "mixed_empty", "all_empty", "empty_list"],
)
async def test_download_ranges_unified(ranges, expected_call_count):
"""Unified test for download_ranges success scenarios."""
mock_mrd = mock.AsyncMock()

# Writes distinct data like b"0-5" to verify mapping
async def side_effect(req_ranges):
for offset, length, buf in req_ranges:
buf.write(f"{offset}-{length}".encode())

mock_mrd.download_ranges.side_effect = side_effect

# Execute
results = await zb_hns_utils.download_ranges(ranges, mock_mrd)

# 1. Verify Results
# Expect empty bytes for 0-length, otherwise expect encoded "{offset}-{length}"
expected_results = [f"{off}-{ln}".encode() if ln > 0 else b"" for off, ln in ranges]
assert results == expected_results

# 2. Verify MRD Interaction
assert mock_mrd.download_ranges.call_count == expected_call_count

if expected_call_count > 0:
# Verify it only received non-zero length ranges
actual_args = mock_mrd.download_ranges.call_args[0][0]
non_empty_ranges = [r for r in ranges if r[1] > 0]

assert len(actual_args) == len(non_empty_ranges)
for (act_off, act_len, act_buf), (exp_off, exp_len) in zip(
actual_args, non_empty_ranges
):
assert act_off == exp_off
assert act_len == exp_len
assert hasattr(act_buf, "write")


@pytest.mark.asyncio
async def test_download_ranges_exception():
"""Test exception propagation (Keep separate as it changes control flow)."""
mock_mrd = mock.AsyncMock()
mock_mrd.download_ranges.side_effect = ValueError("Fail")

with pytest.raises(ValueError, match="Fail"):
await zb_hns_utils.download_ranges([(0, 5)], mock_mrd)


@pytest.mark.asyncio
async def test_download_ranges_validation_limit():
"""
Tests that download_ranges raises a ValueError if the number of ranges
exceeds 1000.
"""
mock_mrd = mock.AsyncMock()
ranges = [(i, 10) for i in range(1001)]

with pytest.raises(
ValueError,
match="Invalid input - number of ranges cannot be more than 1000",
):
await zb_hns_utils.download_ranges(ranges, mock_mrd)
Loading
Loading