From cce7e995424404e541ca9962743681ba944bd5c8 Mon Sep 17 00:00:00 2001 From: Sunidhi Chandra Date: Mon, 17 Nov 2025 09:46:01 +0000 Subject: [PATCH 01/15] Support write mode in ZonalFile and override related methods --- gcsfs/zb_hns_utils.py | 15 ++++++++++ gcsfs/zonal_file.py | 70 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 85 insertions(+) diff --git a/gcsfs/zb_hns_utils.py b/gcsfs/zb_hns_utils.py index 648974e2..6bbaece2 100644 --- a/gcsfs/zb_hns_utils.py +++ b/gcsfs/zb_hns_utils.py @@ -11,3 +11,18 @@ async def download_range(offset, length, mrd): buffer = BytesIO() await mrd.download_ranges([(offset, length, buffer)]) return buffer.getvalue() + + +async def init_aaow(grpc_client, bucket_name, object_name, generation=None): + """ + Creates and opens the AsyncAppendableObjectWriter. + """ + + writer = AsyncAppendableObjectWriter( + client=grpc_client, + bucket_name=bucket_name, + object_name=object_name, + generation=generation, + ) + await writer.open() + return writer diff --git a/gcsfs/zonal_file.py b/gcsfs/zonal_file.py index 93afb84c..07203bd1 100644 --- a/gcsfs/zonal_file.py +++ b/gcsfs/zonal_file.py @@ -22,6 +22,10 @@ def __init__(self, *args, **kwargs): self.mrd = asyn.sync( self.gcsfs.loop, self._init_mrd, self.bucket, self.key, self.generation ) + elif "w" in self.mode: + self.aaow = asyn.sync( + self.gcsfs.loop, self._init_aaow, self.bucket, self.key, self.generation + ) else: raise NotImplementedError( "Only read operations are currently supported for Zonal buckets." @@ -47,10 +51,76 @@ def _fetch_range(self, start, end): return b"" raise + async def _init_aaow( + self, bucket_name, object_name, generation=None, overwrite=True + ): + """ + Initializes the AsyncAppendableObjectWriter. + """ + if generation is None and await self.gcsfs._exists(self.path): + info = self.gcsfs.info(self.path) + generation = info.get("generation") + + return await zb_hns_utils.init_aaow( + self.gcsfs.grpc_client, bucket_name, object_name, generation, overwrite + ) + + def flush(self, force=False): + raise NotImplementedError( + "Write operations are not yet implemented for Zonal buckets." + ) + + def commit(self): + raise NotImplementedError( + "Write operations are not yet implemented for Zonal buckets." + ) + + def discard(self): + raise NotImplementedError( + "Write operations are not yet implemented for Zonal buckets." + ) + + async def initiate_upload( + fs, + bucket, + key, + content_type="application/octet-stream", + metadata=None, + fixed_key_metadata=None, + mode="overwrite", + kms_key_name=None, + ): + raise NotImplementedError( + "Write operations are not yet implemented for Zonal buckets." + ) + + async def simple_upload( + fs, + bucket, + key, + datain, + metadatain=None, + consistency=None, + content_type="application/octet-stream", + fixed_key_metadata=None, + mode="overwrite", + kms_key_name=None, + ): + raise NotImplementedError( + "Write operations are not yet implemented for Zonal buckets." + ) + + async def upload_chunk(fs, location, data, offset, size, content_type): + raise NotImplementedError( + "Write operations are not yet implemented for Zonal buckets." + ) + def close(self): """ Closes the ZonalFile and the underlying AsyncMultiRangeDownloader. """ if self.mrd: asyn.sync(self.gcsfs.loop, self.mrd.close) + if self.aaow: + asyn.sync(self.gcsfs.loop, self.aaow.close) super().close() From 1a855e06108d275b4075f2625adae71060f071b2 Mon Sep 17 00:00:00 2001 From: Sunidhi Chandra Date: Mon, 17 Nov 2025 10:39:51 +0000 Subject: [PATCH 02/15] Add unit test for init_aaow method in zb_hns_utils --- gcsfs/tests/test_zb_hns_utils.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/gcsfs/tests/test_zb_hns_utils.py b/gcsfs/tests/test_zb_hns_utils.py index a64e6793..77c287d1 100644 --- a/gcsfs/tests/test_zb_hns_utils.py +++ b/gcsfs/tests/test_zb_hns_utils.py @@ -4,6 +4,11 @@ from gcsfs import zb_hns_utils +mock_grpc_client = mock.Mock() +bucket_name = "test-bucket" +object_name = "test-object" +generation = "12345" + @pytest.mark.asyncio async def test_download_range(): @@ -27,3 +32,29 @@ async def mock_download_ranges(ranges): mock_mrd.download_ranges.assert_called_once_with([(offset, length, mock.ANY)]) assert result == expected_data + + +@pytest.mark.asyncio +async def test_init_aaow(): + """ + Tests that init_aaow calls the underlying AsyncAppendableObjectWriter.open + method and returns its result. + """ + mock_writer_instance = mock.AsyncMock() + with mock.patch( + "gcsfs.zb_hns_utils.AsyncAppendableObjectWriter", # The class to patch + new_callable=mock.Mock, # Use a regular Mock for the class + return_value=mock_writer_instance, + ) as mock_writer_class: + result = await zb_hns_utils.init_aaow( + mock_grpc_client, bucket_name, object_name, generation + ) + + mock_writer_class.assert_called_once_with( + client=mock_grpc_client, + bucket_name=bucket_name, + object_name=object_name, + generation=generation, + ) + mock_writer_instance.open.assert_awaited_once() + assert result is mock_writer_instance From c38f1ee9dc7dd76f67d454cba81f8c90caf3e94d Mon Sep 17 00:00:00 2001 From: Sunidhi Chandra Date: Mon, 17 Nov 2025 14:01:48 +0000 Subject: [PATCH 03/15] Implement zonal write methods --- gcsfs/zonal_file.py | 69 +++++++++++++++++++++++++++++++++++---------- 1 file changed, 54 insertions(+), 15 deletions(-) diff --git a/gcsfs/zonal_file.py b/gcsfs/zonal_file.py index 07203bd1..ce635340 100644 --- a/gcsfs/zonal_file.py +++ b/gcsfs/zonal_file.py @@ -1,3 +1,5 @@ +import logging + from fsspec import asyn from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import ( AsyncMultiRangeDownloader, @@ -5,6 +7,8 @@ from gcsfs.core import GCSFile +logger = logging.getLogger("gcsfs.zonal_file") + class ZonalFile(GCSFile): """ @@ -65,19 +69,51 @@ async def _init_aaow( self.gcsfs.grpc_client, bucket_name, object_name, generation, overwrite ) + def write(self, data): + """ + Writes data using AsyncAppendableObjectWriter. + """ + if not self.writable(): + raise ValueError("File not in write mode") + if self.closed: + raise ValueError("I/O operation on closed file.") + if self.forced: + raise ValueError("This file has been force-flushed, can only close") + + asyn.sync(self.gcsfs.loop, self.aaow.append, data) + def flush(self, force=False): - raise NotImplementedError( - "Write operations are not yet implemented for Zonal buckets." - ) + """ + Flushes the AsyncAppendableObjectWriter, sending all buffered data + to the server. + """ + if self.closed: + raise ValueError("Flush on closed file") + if force and self.forced: + raise ValueError("Force flush cannot be called more than once") + if force: + self.forced = True + + if self.readable(): + # no-op to flush on read-mode + return + + asyn.sync(self.gcsfs.loop, self.aaow.flush) def commit(self): - raise NotImplementedError( - "Write operations are not yet implemented for Zonal buckets." - ) + """ + Commits the write by finalizing the AsyncAppendableObjectWriter. + """ + if not self.writable(): + raise ValueError("File not in write mode") + self.autocommit = True + asyn.sync(self.gcsfs.loop, self.aaow.finalize) def discard(self): - raise NotImplementedError( - "Write operations are not yet implemented for Zonal buckets." + """Discard is not applicable for Zonal Buckets. Log a warning instead.""" + logger.warning( + "Discard is unavailable for Zonal Buckets. \ + Data is uploaded via streaming and cannot be cancelled." ) async def initiate_upload( @@ -91,7 +127,7 @@ async def initiate_upload( kms_key_name=None, ): raise NotImplementedError( - "Write operations are not yet implemented for Zonal buckets." + "Initiate_upload operation is not applicable for Zonal buckets. Please use write() instead." ) async def simple_upload( @@ -107,20 +143,23 @@ async def simple_upload( kms_key_name=None, ): raise NotImplementedError( - "Write operations are not yet implemented for Zonal buckets." + "Simple_upload operation is not applicable for Zonal buckets. Please use write() instead." ) async def upload_chunk(fs, location, data, offset, size, content_type): raise NotImplementedError( - "Write operations are not yet implemented for Zonal buckets." + "Upload_chunk operation is not applicable for Zonal buckets. Please use write() instead." ) def close(self): """ - Closes the ZonalFile and the underlying AsyncMultiRangeDownloader. + Closes the ZonalFile and the underlying AsyncMultiRangeDownloader and AsyncAppendableObjectWriter. + If in write mode, finalizes the write if autocommit is True. """ - if self.mrd: + if hasattr(self, "mrd") and self.mrd: asyn.sync(self.gcsfs.loop, self.mrd.close) - if self.aaow: - asyn.sync(self.gcsfs.loop, self.aaow.close) + if hasattr(self, "aaow") and self.aaow: + asyn.sync( + self.gcsfs.loop, self.aaow.close, finalize_on_close=self.autocommit + ) super().close() From c104f3df348ae32388dbac604e94fa13cae73e3d Mon Sep 17 00:00:00 2001 From: Sunidhi Chandra Date: Mon, 17 Nov 2025 15:44:20 +0000 Subject: [PATCH 04/15] Remove overwrite parameter since it is not added in AAOW. instead use generation=None to overwrite --- gcsfs/zonal_file.py | 9 +++------ requirements.txt | 2 +- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/gcsfs/zonal_file.py b/gcsfs/zonal_file.py index ce635340..6ff8ed18 100644 --- a/gcsfs/zonal_file.py +++ b/gcsfs/zonal_file.py @@ -28,7 +28,7 @@ def __init__(self, *args, **kwargs): ) elif "w" in self.mode: self.aaow = asyn.sync( - self.gcsfs.loop, self._init_aaow, self.bucket, self.key, self.generation + self.gcsfs.loop, self._init_aaow, self.bucket, self.key ) else: raise NotImplementedError( @@ -56,17 +56,14 @@ def _fetch_range(self, start, end): raise async def _init_aaow( - self, bucket_name, object_name, generation=None, overwrite=True + self, bucket_name, object_name, generation=None ): """ Initializes the AsyncAppendableObjectWriter. """ - if generation is None and await self.gcsfs._exists(self.path): - info = self.gcsfs.info(self.path) - generation = info.get("generation") return await zb_hns_utils.init_aaow( - self.gcsfs.grpc_client, bucket_name, object_name, generation, overwrite + self.gcsfs.grpc_client, bucket_name, object_name, generation ) def write(self, data): diff --git a/requirements.txt b/requirements.txt index 28e2cafa..54369238 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,6 +3,6 @@ decorator>4.1.2 fsspec==2025.10.0 google-auth>=1.2 google-auth-oauthlib -google-cloud-storage +git+https://github.com/googleapis/python-storage.git@bidi-writes-6#egg=google-cloud-storage google-cloud-storage-control requests From 8427749cfea3172cfa7353de7a35131bab3fdec3 Mon Sep 17 00:00:00 2001 From: Sunidhi Chandra Date: Wed, 19 Nov 2025 15:16:27 +0000 Subject: [PATCH 05/15] Remove redundant init_aaow method from zonal_file --- gcsfs/zb_hns_utils.py | 4 ++++ gcsfs/zonal_file.py | 18 ++++++------------ requirements.txt | 2 +- 3 files changed, 11 insertions(+), 13 deletions(-) diff --git a/gcsfs/zb_hns_utils.py b/gcsfs/zb_hns_utils.py index 6bbaece2..57d5d842 100644 --- a/gcsfs/zb_hns_utils.py +++ b/gcsfs/zb_hns_utils.py @@ -1,5 +1,9 @@ from io import BytesIO +from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( + AsyncAppendableObjectWriter, +) + async def download_range(offset, length, mrd): """ diff --git a/gcsfs/zonal_file.py b/gcsfs/zonal_file.py index 6ff8ed18..7c28aab0 100644 --- a/gcsfs/zonal_file.py +++ b/gcsfs/zonal_file.py @@ -5,6 +5,7 @@ AsyncMultiRangeDownloader, ) +from gcsfs import zb_hns_utils from gcsfs.core import GCSFile logger = logging.getLogger("gcsfs.zonal_file") @@ -28,7 +29,11 @@ def __init__(self, *args, **kwargs): ) elif "w" in self.mode: self.aaow = asyn.sync( - self.gcsfs.loop, self._init_aaow, self.bucket, self.key + self.gcsfs.loop, + zb_hns_utils.init_aaow, + self.gcsfs.grpc_client, + self.bucket, + self.key, ) else: raise NotImplementedError( @@ -55,17 +60,6 @@ def _fetch_range(self, start, end): return b"" raise - async def _init_aaow( - self, bucket_name, object_name, generation=None - ): - """ - Initializes the AsyncAppendableObjectWriter. - """ - - return await zb_hns_utils.init_aaow( - self.gcsfs.grpc_client, bucket_name, object_name, generation - ) - def write(self, data): """ Writes data using AsyncAppendableObjectWriter. diff --git a/requirements.txt b/requirements.txt index 54369238..28e2cafa 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,6 +3,6 @@ decorator>4.1.2 fsspec==2025.10.0 google-auth>=1.2 google-auth-oauthlib -git+https://github.com/googleapis/python-storage.git@bidi-writes-6#egg=google-cloud-storage +google-cloud-storage google-cloud-storage-control requests From 64f2513c2c3627d15f5b2d9fc567a9df6a8a3361 Mon Sep 17 00:00:00 2001 From: Sunidhi Chandra Date: Thu, 20 Nov 2025 11:38:19 +0000 Subject: [PATCH 06/15] update statement for NotImplementedErrors --- gcsfs/zonal_file.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/gcsfs/zonal_file.py b/gcsfs/zonal_file.py index 7c28aab0..9a64fcfb 100644 --- a/gcsfs/zonal_file.py +++ b/gcsfs/zonal_file.py @@ -118,7 +118,7 @@ async def initiate_upload( kms_key_name=None, ): raise NotImplementedError( - "Initiate_upload operation is not applicable for Zonal buckets. Please use write() instead." + "Initiate_upload operation is not implemented yet for Zonal buckets. Please use write() instead." ) async def simple_upload( @@ -134,12 +134,12 @@ async def simple_upload( kms_key_name=None, ): raise NotImplementedError( - "Simple_upload operation is not applicable for Zonal buckets. Please use write() instead." + "Simple_upload operation is not implemented yet for Zonal buckets. Please use write() instead." ) async def upload_chunk(fs, location, data, offset, size, content_type): raise NotImplementedError( - "Upload_chunk operation is not applicable for Zonal buckets. Please use write() instead." + "Upload_chunk operation is not implemented yet for Zonal buckets. Please use write() instead." ) def close(self): From feea4d36b46cff44808bc51977b4758239a029c8 Mon Sep 17 00:00:00 2001 From: Sunidhi Chandra Date: Thu, 20 Nov 2025 12:34:30 +0000 Subject: [PATCH 07/15] set autocommit=false as default for ZonalFile --- gcsfs/extended_gcsfs.py | 13 +++++++++++-- gcsfs/zonal_file.py | 17 ++++++++++++++--- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/gcsfs/extended_gcsfs.py b/gcsfs/extended_gcsfs.py index f4c643a4..041ff8cb 100644 --- a/gcsfs/extended_gcsfs.py +++ b/gcsfs/extended_gcsfs.py @@ -117,7 +117,7 @@ def _open( acl=None, consistency=None, metadata=None, - autocommit=True, + autocommit=None, fixed_key_metadata=None, generation=None, **kwargs, @@ -126,7 +126,16 @@ def _open( Open a file. """ bucket, _, _ = self.split_path(path) + + # Determine the final autocommit value based on bucket type. For zonal + # buckets, default autocommit is False; for others, it is True. + final_autocommit = autocommit bucket_type = self._sync_lookup_bucket_type(bucket) + if autocommit is None: + if bucket_type == BucketType.ZONAL_HIERARCHICAL: + final_autocommit = False + else: + final_autocommit = True return gcs_file_types[bucket_type]( self, path, @@ -136,7 +145,7 @@ def _open( consistency=consistency, metadata=metadata, acl=acl, - autocommit=autocommit, + autocommit=final_autocommit, fixed_key_metadata=fixed_key_metadata, generation=generation, **kwargs, diff --git a/gcsfs/zonal_file.py b/gcsfs/zonal_file.py index 9a64fcfb..0a105d84 100644 --- a/gcsfs/zonal_file.py +++ b/gcsfs/zonal_file.py @@ -10,6 +10,8 @@ logger = logging.getLogger("gcsfs.zonal_file") +DEFAULT_BLOCK_SIZE = 5 * 2**20 + class ZonalFile(GCSFile): """ @@ -17,11 +19,20 @@ class ZonalFile(GCSFile): Zonal buckets only using a high-performance gRPC path. """ - def __init__(self, *args, **kwargs): + def __init__( + self, + gcsfs, + path, + mode="rb", + block_size=DEFAULT_BLOCK_SIZE, + autocommit=False, + *args, + **kwargs, + ): """ Initializes the ZonalFile object. """ - super().__init__(*args, **kwargs) + super().__init__(gcsfs, path, mode, block_size, autocommit, *args, **kwargs) self.mrd = None if "r" in self.mode: self.mrd = asyn.sync( @@ -37,7 +48,7 @@ def __init__(self, *args, **kwargs): ) else: raise NotImplementedError( - "Only read operations are currently supported for Zonal buckets." + "Only 'r' and 'w' modes are currently supported for Zonal buckets." ) async def _init_mrd(self, bucket_name, object_name, generation=None): From 0c719f5476b43f2c045650a0c2efdfe00d225b85 Mon Sep 17 00:00:00 2001 From: Sunidhi Chandra Date: Fri, 21 Nov 2025 08:46:01 +0000 Subject: [PATCH 08/15] add logic to route upload methods to zonal implementation --- gcsfs/core.py | 47 +++++++++++++++++++++++++++ gcsfs/extended_gcsfs.py | 38 ++++++++++++++++++++++ gcsfs/zonal_file.py | 70 +++++++++++++++++++++++------------------ 3 files changed, 125 insertions(+), 30 deletions(-) diff --git a/gcsfs/core.py b/gcsfs/core.py index 8fe06625..09b010d3 100644 --- a/gcsfs/core.py +++ b/gcsfs/core.py @@ -1864,6 +1864,7 @@ def __init__( self.bucket = bucket self.key = key self.acl = acl + self.consistency = consistency self.checker = get_consistency_checker(consistency) if "a" in self.mode: @@ -2073,6 +2074,18 @@ def _convert_fixed_key_metadata(metadata, *, from_google=False): async def upload_chunk(fs, location, data, offset, size, content_type): + from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( + AsyncAppendableObjectWriter, + ) + + from .extended_gcsfs import ExtendedGcsFileSystem + from .extended_gcsfs import upload_chunk as ext_upload_chunk + + if isinstance(fs, ExtendedGcsFileSystem) and isinstance( + location, AsyncAppendableObjectWriter + ): + + return await ext_upload_chunk(fs, location, data, offset, size, content_type) head = {} l = len(data) range = "bytes %i-%i/%i" % (offset, offset + l - 1, size) @@ -2101,6 +2114,22 @@ async def initiate_upload( mode="overwrite", kms_key_name=None, ): + from .extended_gcsfs import ExtendedGcsFileSystem + from .extended_gcsfs import initiate_upload as ext_initiate_upload + + if isinstance(fs, ExtendedGcsFileSystem) and await fs._is_zonal_bucket(bucket): + + return await ext_initiate_upload( + fs, + bucket, + key, + content_type, + metadata, + fixed_key_metadata, + mode, + kms_key_name, + ) + j = {"name": key} if metadata: j["metadata"] = metadata @@ -2135,6 +2164,24 @@ async def simple_upload( mode="overwrite", kms_key_name=None, ): + from .extended_gcsfs import ExtendedGcsFileSystem + from .extended_gcsfs import simple_upload as ext_simple_upload + + if isinstance(fs, ExtendedGcsFileSystem) and await fs._is_zonal_bucket(bucket): + + return await ext_simple_upload( + fs, + bucket, + key, + datain, + metadatain, + consistency, + content_type, + fixed_key_metadata, + mode, + kms_key_name, + ) + checker = get_consistency_checker(consistency) path = f"{fs._location}/upload/storage/v1/b/{quote(bucket)}/o" metadata = {"name": key} diff --git a/gcsfs/extended_gcsfs.py b/gcsfs/extended_gcsfs.py index 041ff8cb..ff07571c 100644 --- a/gcsfs/extended_gcsfs.py +++ b/gcsfs/extended_gcsfs.py @@ -252,3 +252,41 @@ async def _cat_file(self, path, start=None, end=None, mrd=None, **kwargs): # Explicit cleanup if we created the MRD if mrd_created: await mrd.close() + + +async def upload_chunk(fs, location, data, offset, size, content_type): + raise NotImplementedError( + "upload_chunk is not implemented yet for ExtendedGcsFileSystem. Please use write() instead." + ) + + +async def initiate_upload( + fs, + bucket, + key, + content_type="application/octet-stream", + metadata=None, + fixed_key_metadata=None, + mode="overwrite", + kms_key_name=None, +): + raise NotImplementedError( + "initiate_upload is not implemented yet for ExtendedGcsFileSystem. Please use write() instead." + ) + + +async def simple_upload( + fs, + bucket, + key, + datain, + metadatain=None, + consistency=None, + content_type="application/octet-stream", + fixed_key_metadata=None, + mode="overwrite", + kms_key_name=None, +): + raise NotImplementedError( + "simple_upload is not implemented yet for ExtendedGcsFileSystem. Please use write() instead." + ) diff --git a/gcsfs/zonal_file.py b/gcsfs/zonal_file.py index 0a105d84..5852541d 100644 --- a/gcsfs/zonal_file.py +++ b/gcsfs/zonal_file.py @@ -75,10 +75,10 @@ def write(self, data): """ Writes data using AsyncAppendableObjectWriter. """ - if not self.writable(): - raise ValueError("File not in write mode") if self.closed: raise ValueError("I/O operation on closed file.") + if not self.writable(): + raise ValueError("File not in write mode") if self.forced: raise ValueError("This file has been force-flushed, can only close") @@ -118,39 +118,49 @@ def discard(self): Data is uploaded via streaming and cannot be cancelled." ) - async def initiate_upload( - fs, - bucket, - key, - content_type="application/octet-stream", - metadata=None, - fixed_key_metadata=None, - mode="overwrite", - kms_key_name=None, - ): - raise NotImplementedError( - "Initiate_upload operation is not implemented yet for Zonal buckets. Please use write() instead." + def _initiate_upload(self): + """Initiates the upload for Zonal buckets using gRPC.""" + from gcsfs.extended_gcsfs import initiate_upload + + self.location = asyn.sync( + self.gcsfs.loop, + initiate_upload, + self.gcsfs, + self.bucket, + self.key, + self.content_type, + self.metadata, + self.fixed_key_metadata, + mode="create" if "x" in self.mode else "overwrite", + kms_key_name=self.kms_key_name, + timeout=self.timeout, ) - async def simple_upload( - fs, - bucket, - key, - datain, - metadatain=None, - consistency=None, - content_type="application/octet-stream", - fixed_key_metadata=None, - mode="overwrite", - kms_key_name=None, - ): - raise NotImplementedError( - "Simple_upload operation is not implemented yet for Zonal buckets. Please use write() instead." + def _simple_upload(self): + """Performs a simple upload for Zonal buckets using gRPC.""" + from gcsfs.extended_gcsfs import simple_upload + + self.buffer.seek(0) + data = self.buffer.read() + asyn.sync( + self.gcsfs.loop, + simple_upload, + self.gcsfs, + self.bucket, + self.key, + data, + self.metadata, + self.consistency, + self.content_type, + self.fixed_key_metadata, + mode="create" if "x" in self.mode else "overwrite", + kms_key_name=self.kms_key_name, + timeout=self.timeout, ) - async def upload_chunk(fs, location, data, offset, size, content_type): + def _upload_chunk(self, final=False): raise NotImplementedError( - "Upload_chunk operation is not implemented yet for Zonal buckets. Please use write() instead." + "_upload_chunk is not implemented yet for ZonalFile. Please use write() instead." ) def close(self): From ca976d637d42d785213b2779d696b456d30a271f Mon Sep 17 00:00:00 2001 From: Sunidhi Chandra Date: Fri, 21 Nov 2025 10:40:11 +0000 Subject: [PATCH 09/15] Fixed auth issue when using token="anon" added unit tests for zonal file methods changed zonal_mocks to mock _get_bucket_type instead of mocking two methods --- gcsfs/extended_gcsfs.py | 7 +- gcsfs/tests/conftest.py | 45 ++++------ gcsfs/tests/test_extended_gcsfs.py | 26 ++---- gcsfs/tests/test_zonal_file.py | 140 +++++++++++++++++++++++++++++ 4 files changed, 171 insertions(+), 47 deletions(-) create mode 100644 gcsfs/tests/test_zonal_file.py diff --git a/gcsfs/extended_gcsfs.py b/gcsfs/extended_gcsfs.py index ff07571c..36329676 100644 --- a/gcsfs/extended_gcsfs.py +++ b/gcsfs/extended_gcsfs.py @@ -5,6 +5,7 @@ from google.api_core import exceptions as api_exceptions from google.api_core import gapic_v1 from google.api_core.client_info import ClientInfo +from google.auth.credentials import AnonymousCredentials from google.cloud import storage_control_v2 from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import ( @@ -48,6 +49,9 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.grpc_client = None self.storage_control_client = None + self.credential = self.credentials.credentials + if self.credentials.token == "anon": + self.credential = AnonymousCredentials() # initializing grpc and storage control client for Hierarchical and # zonal bucket operations self.grpc_client = asyn.sync(self.loop, self._create_grpc_client) @@ -59,6 +63,7 @@ def __init__(self, *args, **kwargs): async def _create_grpc_client(self): if self.grpc_client is None: return AsyncGrpcClient( + credentials=self.credential, client_info=ClientInfo(user_agent=f"{USER_AGENT}/{version}"), ).grpc_client else: @@ -71,7 +76,7 @@ async def _create_control_plane_client(self): user_agent=f"{USER_AGENT}/{version}" ) return storage_control_v2.StorageControlAsyncClient( - credentials=self.credentials.credentials, client_info=client_info + credentials=self.credential, client_info=client_info ) async def _lookup_bucket_type(self, bucket): diff --git a/gcsfs/tests/conftest.py b/gcsfs/tests/conftest.py index 5d5e0179..c713bb4c 100644 --- a/gcsfs/tests/conftest.py +++ b/gcsfs/tests/conftest.py @@ -3,8 +3,6 @@ import shlex import subprocess import time -from contextlib import nullcontext -from unittest.mock import patch import fsspec import pytest @@ -144,32 +142,23 @@ def extended_gcsfs(gcs_factory, populate=True): os.environ.get("STORAGE_EMULATOR_HOST") == "https://storage.googleapis.com" ) - # Mock authentication if not using a real GCS endpoint, - # since grpc client in extended_gcsfs does not work with anon access - mock_authentication_manager = ( - patch("google.auth.default", return_value=(None, "fake-project")) - if not is_real_gcs - else nullcontext() - ) - - with mock_authentication_manager: - extended_gcsfs = gcs_factory() - try: - # Only create/delete/populate the bucket if we are NOT using the real GCS endpoint - if not is_real_gcs: - try: - extended_gcsfs.rm(TEST_BUCKET, recursive=True) - except FileNotFoundError: - pass - extended_gcsfs.mkdir(TEST_BUCKET) - if populate: - extended_gcsfs.pipe( - {TEST_BUCKET + "/" + k: v for k, v in allfiles.items()} - ) - extended_gcsfs.invalidate_cache() - yield extended_gcsfs - finally: - _cleanup_gcs(extended_gcsfs, is_real_gcs) + extended_gcsfs = gcs_factory() + try: + # Only create/delete/populate the bucket if we are NOT using the real GCS endpoint + if not is_real_gcs: + try: + extended_gcsfs.rm(TEST_BUCKET, recursive=True) + except FileNotFoundError: + pass + extended_gcsfs.mkdir(TEST_BUCKET) + if populate: + extended_gcsfs.pipe( + {TEST_BUCKET + "/" + k: v for k, v in allfiles.items()} + ) + extended_gcsfs.invalidate_cache() + yield extended_gcsfs + finally: + _cleanup_gcs(extended_gcsfs, is_real_gcs) @pytest.fixture diff --git a/gcsfs/tests/test_extended_gcsfs.py b/gcsfs/tests/test_extended_gcsfs.py index 1ff2b653..9d256fa0 100644 --- a/gcsfs/tests/test_extended_gcsfs.py +++ b/gcsfs/tests/test_extended_gcsfs.py @@ -34,11 +34,8 @@ def _zonal_mocks_factory(file_data): if is_real_gcs: yield None return - patch_target_lookup_bucket_type = ( - "gcsfs.extended_gcsfs.ExtendedGcsFileSystem._lookup_bucket_type" - ) - patch_target_sync_lookup_bucket_type = ( - "gcsfs.extended_gcsfs.ExtendedGcsFileSystem._sync_lookup_bucket_type" + patch_target_get_bucket_type = ( + "gcsfs.extended_gcsfs.ExtendedGcsFileSystem._get_bucket_type" ) patch_target_create_mrd = ( "google.cloud.storage._experimental.asyncio.async_multi_range_downloader" @@ -63,20 +60,16 @@ async def download_side_effect(read_requests, **kwargs): mock_create_mrd = mock.AsyncMock(return_value=mock_downloader) with ( mock.patch( - patch_target_sync_lookup_bucket_type, - return_value=BucketType.ZONAL_HIERARCHICAL, - ) as mock_sync_lookup_bucket_type, - mock.patch( - patch_target_lookup_bucket_type, + patch_target_get_bucket_type, return_value=BucketType.ZONAL_HIERARCHICAL, - ), + ) as mock_get_bucket_type, mock.patch(patch_target_create_mrd, mock_create_mrd), mock.patch( patch_target_gcsfs_cat_file, new_callable=mock.AsyncMock ) as mock_cat_file, ): mocks = { - "sync_lookup_bucket_type": mock_sync_lookup_bucket_type, + "get_bucket_type": mock_get_bucket_type, "create_mrd": mock_create_mrd, "downloader": mock_downloader, "cat_file": mock_cat_file, @@ -119,9 +112,6 @@ def test_read_block_zb(extended_gcsfs, zonal_mocks, subtests): assert result == expected_data if mocks: - mocks["sync_lookup_bucket_type"].assert_called_once_with( - TEST_BUCKET - ) if expected_data: mocks["downloader"].download_ranges.assert_called_with( [(offset, mock.ANY, mock.ANY)] @@ -149,7 +139,7 @@ def test_read_small_zb(extended_gcsfs, zonal_mocks): # cache drop assert len(f.cache.cache) < len(out) if mocks: - mocks["sync_lookup_bucket_type"].assert_called_once_with(TEST_BUCKET) + mocks["get_bucket_type"].assert_called_once_with(TEST_BUCKET) def test_readline_zb(extended_gcsfs, zonal_mocks): @@ -194,7 +184,7 @@ def test_readline_empty_zb(extended_gcsfs, zonal_mocks): data = b"" if not extended_gcsfs.on_google: with mock.patch.object( - extended_gcsfs, "_sync_lookup_bucket_type", return_value=BucketType.UNKNOWN + extended_gcsfs, "_get_bucket_type", return_value=BucketType.UNKNOWN ): with extended_gcsfs.open(b, "wb") as f: f.write(data) @@ -208,7 +198,7 @@ def test_readline_blocksize_zb(extended_gcsfs, zonal_mocks): data = b"ab\n" + b"a" * (2**18) + b"\nab" if not extended_gcsfs.on_google: with mock.patch.object( - extended_gcsfs, "_sync_lookup_bucket_type", return_value=BucketType.UNKNOWN + extended_gcsfs, "_get_bucket_type", return_value=BucketType.UNKNOWN ): with extended_gcsfs.open(c, "wb") as f: f.write(data) diff --git a/gcsfs/tests/test_zonal_file.py b/gcsfs/tests/test_zonal_file.py new file mode 100644 index 00000000..5fb0781a --- /dev/null +++ b/gcsfs/tests/test_zonal_file.py @@ -0,0 +1,140 @@ +"""Tests for ZonalFile write operations.""" + +from unittest import mock + +import pytest + +from gcsfs.extended_gcsfs import BucketType +from gcsfs.tests.settings import TEST_BUCKET + +file_path = f"{TEST_BUCKET}/zonal-file-test" +test_data = b"hello world" + + +@pytest.fixture +def zonal_write_mocks(): + """A fixture for mocking Zonal bucket write functionality.""" + + patch_target_get_bucket_type = ( + "gcsfs.extended_gcsfs.ExtendedGcsFileSystem._get_bucket_type" + ) + patch_target_init_aaow = "gcsfs.zb_hns_utils.init_aaow" + + mock_aaow = mock.AsyncMock() + mock_init_aaow = mock.AsyncMock(return_value=mock_aaow) + with ( + mock.patch( + patch_target_get_bucket_type, + return_value=BucketType.ZONAL_HIERARCHICAL, + ), + mock.patch(patch_target_init_aaow, mock_init_aaow), + ): + mocks = { + "aaow": mock_aaow, + "init_aaow": mock_init_aaow, + } + yield mocks + + +@pytest.mark.parametrize( + "setup_action, error_match", + [ + (lambda f: setattr(f, "mode", "rb"), "File not in write mode"), + (lambda f: setattr(f, "closed", True), "I/O operation on closed file"), + ( + lambda f: setattr(f, "forced", True), + "This file has been force-flushed, can only close", + ), + ], + ids=["not_writable", "closed", "force_flushed"], +) +def test_zonal_file_write_value_errors( + extended_gcsfs, zonal_write_mocks, setup_action, error_match # noqa: F841 +): + """Test ZonalFile.write raises ValueError for invalid states.""" + with extended_gcsfs.open(file_path, "wb") as f: + setup_action(f) + with pytest.raises(ValueError, match=error_match): + f.write(test_data) + + +def test_zonal_file_write_success(extended_gcsfs, zonal_write_mocks): + """Test that writing to a ZonalFile calls the underlying writer's append method.""" + with extended_gcsfs.open(file_path, "wb") as f: + f.write(test_data) + + zonal_write_mocks["aaow"].append.assert_awaited_once_with(test_data) + + +def test_zonal_file_open_write_mode(extended_gcsfs, zonal_write_mocks): + """Test that opening a ZonalFile in write mode initializes the writer.""" + bucket, key, _ = extended_gcsfs.split_path(file_path) + with extended_gcsfs.open(file_path, "wb"): + pass + + zonal_write_mocks["init_aaow"].assert_called_once_with( + extended_gcsfs.grpc_client, bucket, key + ) + + +def test_zonal_file_flush(extended_gcsfs, zonal_write_mocks): + """Test that flush calls the underlying writer's flush method.""" + with extended_gcsfs.open(file_path, "wb") as f: + f.flush() + + zonal_write_mocks["aaow"].flush.assert_awaited() + + +def test_zonal_file_commit(extended_gcsfs, zonal_write_mocks): + """Test that commit finalizes the write and sets autocommit to True.""" + with extended_gcsfs.open(file_path, "wb") as f: + f.commit() + + zonal_write_mocks["aaow"].finalize.assert_awaited_once() + assert f.autocommit is True + + +def test_zonal_file_discard(extended_gcsfs, zonal_write_mocks): # noqa: F841 + """Test that discard on a ZonalFile logs a warning.""" + with mock.patch("gcsfs.zonal_file.logger") as mock_logger: + with extended_gcsfs.open(file_path, "wb") as f: + f.discard() + mock_logger.warning.assert_called_once() + assert ( + "Discard is unavailable for Zonal Buckets" + in mock_logger.warning.call_args[0][0] + ) + + +def test_zonal_file_close(extended_gcsfs, zonal_write_mocks): + """Test that close finalizes the write by default (autocommit=True).""" + with extended_gcsfs.open(file_path, "wb"): + pass + zonal_write_mocks["aaow"].close.assert_awaited_once_with(finalize_on_close=True) + + +def test_zonal_file_close_with_autocommit_false(extended_gcsfs, zonal_write_mocks): + """Test that close does not finalize the write when autocommit is False.""" + + with extended_gcsfs.open(file_path, "wb", autocommit=False): + pass # close is called on exit + + zonal_write_mocks["aaow"].close.assert_awaited_once_with(finalize_on_close=False) + + +@pytest.mark.parametrize( + "method_name", + [ + ("_initiate_upload"), + ("_simple_upload"), + ("_upload_chunk"), + ], +) +def test_zonal_file_not_implemented_methods( + extended_gcsfs, zonal_write_mocks, method_name # noqa: F841 +): + """Test that some GCSFile methods are not implemented for ZonalFile.""" + with extended_gcsfs.open(file_path, "wb") as f: + method_to_call = getattr(f, method_name) + with pytest.raises(NotImplementedError): + method_to_call() From 866d61e489f6d93270c72f93ef980e054fae8c40 Mon Sep 17 00:00:00 2001 From: Sunidhi Chandra Date: Fri, 21 Nov 2025 10:58:11 +0000 Subject: [PATCH 10/15] Revert "set autocommit=false as default for ZonalFile" This reverts commit d51c8c308975e516e678f85c37feae52470decff. --- gcsfs/extended_gcsfs.py | 13 ++----------- gcsfs/zonal_file.py | 17 +++-------------- 2 files changed, 5 insertions(+), 25 deletions(-) diff --git a/gcsfs/extended_gcsfs.py b/gcsfs/extended_gcsfs.py index 36329676..21277425 100644 --- a/gcsfs/extended_gcsfs.py +++ b/gcsfs/extended_gcsfs.py @@ -122,7 +122,7 @@ def _open( acl=None, consistency=None, metadata=None, - autocommit=None, + autocommit=True, fixed_key_metadata=None, generation=None, **kwargs, @@ -131,16 +131,7 @@ def _open( Open a file. """ bucket, _, _ = self.split_path(path) - - # Determine the final autocommit value based on bucket type. For zonal - # buckets, default autocommit is False; for others, it is True. - final_autocommit = autocommit bucket_type = self._sync_lookup_bucket_type(bucket) - if autocommit is None: - if bucket_type == BucketType.ZONAL_HIERARCHICAL: - final_autocommit = False - else: - final_autocommit = True return gcs_file_types[bucket_type]( self, path, @@ -150,7 +141,7 @@ def _open( consistency=consistency, metadata=metadata, acl=acl, - autocommit=final_autocommit, + autocommit=autocommit, fixed_key_metadata=fixed_key_metadata, generation=generation, **kwargs, diff --git a/gcsfs/zonal_file.py b/gcsfs/zonal_file.py index 5852541d..7869398f 100644 --- a/gcsfs/zonal_file.py +++ b/gcsfs/zonal_file.py @@ -10,8 +10,6 @@ logger = logging.getLogger("gcsfs.zonal_file") -DEFAULT_BLOCK_SIZE = 5 * 2**20 - class ZonalFile(GCSFile): """ @@ -19,20 +17,11 @@ class ZonalFile(GCSFile): Zonal buckets only using a high-performance gRPC path. """ - def __init__( - self, - gcsfs, - path, - mode="rb", - block_size=DEFAULT_BLOCK_SIZE, - autocommit=False, - *args, - **kwargs, - ): + def __init__(self, *args, **kwargs): """ Initializes the ZonalFile object. """ - super().__init__(gcsfs, path, mode, block_size, autocommit, *args, **kwargs) + super().__init__(*args, **kwargs) self.mrd = None if "r" in self.mode: self.mrd = asyn.sync( @@ -48,7 +37,7 @@ def __init__( ) else: raise NotImplementedError( - "Only 'r' and 'w' modes are currently supported for Zonal buckets." + "Only read operations are currently supported for Zonal buckets." ) async def _init_mrd(self, bucket_name, object_name, generation=None): From 9fdcbe08a51e602d11fbc950ce32ecf01a24a2c2 Mon Sep 17 00:00:00 2001 From: Sunidhi Chandra Date: Fri, 21 Nov 2025 11:06:35 +0000 Subject: [PATCH 11/15] Update ci pipeline to run test_zonal_file with Extended tests only --- .github/workflows/ci.yml | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f9247e11..c525a01b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -37,20 +37,25 @@ jobs: pip install -e . - name: Run Standard Tests run: | + pip install "git+https://github.com/googleapis/python-storage@main#egg=google-cloud-storage" --force-reinstall --no-cache-dir --upgrade export GOOGLE_APPLICATION_CREDENTIALS=$(pwd)/gcsfs/tests/fake-secret.json pytest -vv -s \ --log-format="%(asctime)s %(levelname)s %(message)s" \ --log-date-format="%H:%M:%S" \ gcsfs/ \ - --ignore=gcsfs/tests/test_extended_gcsfs.py + --ignore=gcsfs/tests/test_extended_gcsfs.py \ + --ignore=gcsfs/tests/test_zonal_file.py + - name: Run Extended Tests run: | + pip install "git+https://github.com/googleapis/python-storage@main#egg=google-cloud-storage" --force-reinstall --no-cache-dir --upgrade export GOOGLE_APPLICATION_CREDENTIALS=$(pwd)/gcsfs/tests/fake-secret.json export GCSFS_EXPERIMENTAL_ZB_HNS_SUPPORT="true" pytest -vv -s \ --log-format="%(asctime)s %(levelname)s %(message)s" \ --log-date-format="%H:%M:%S" \ - gcsfs/tests/test_extended_gcsfs.py + gcsfs/tests/test_extended_gcsfs.py \ + gcsfs/tests/test_zonal_file.py lint: name: lint From 81fb615cad25ec55303daf29a4132777701b94e5 Mon Sep 17 00:00:00 2001 From: Mahalaxmibejugam <60227368+Mahalaxmibejugam@users.noreply.github.com> Date: Wed, 26 Nov 2025 12:49:06 +0530 Subject: [PATCH 12/15] Separate versioned and non-versioned tests to use different bucket --- docs/source/developer.rst | 7 +- gcsfs/tests/conftest.py | 74 ++++++++++++++++---- gcsfs/tests/settings.py | 3 +- gcsfs/tests/test_core.py | 64 ----------------- gcsfs/tests/test_core_versioned.py | 107 +++++++++++++++++++++++++++++ 5 files changed, 174 insertions(+), 81 deletions(-) create mode 100644 gcsfs/tests/test_core_versioned.py diff --git a/docs/source/developer.rst b/docs/source/developer.rst index 4841d42f..2c945194 100644 --- a/docs/source/developer.rst +++ b/docs/source/developer.rst @@ -18,9 +18,10 @@ real GCS. A small number of tests run differently or are skipped. If you want to actually test against real GCS, then you should set STORAGE_EMULATOR_HOST to "https://storage.googleapis.com" and also -provide appropriate GCSFS_TEST_BUCKET and GCSFS_TEST_PROJECT, as well -as setting your default google credentials (or providing them via the -fsspec config). +provide appropriate GCSFS_TEST_BUCKET, GCSFS_TEST_VERSIONED_BUCKET +(To use for tests that target GCS object versioning, this bucket must have object versioning enabled) +and GCSFS_TEST_PROJECT, as well as setting your default google +credentials (or providing them via the fsspec config). .. _fake-gcs-server: https://github.com/fsouza/fake-gcs-server diff --git a/gcsfs/tests/conftest.py b/gcsfs/tests/conftest.py index 5d5e0179..1d265c3a 100644 --- a/gcsfs/tests/conftest.py +++ b/gcsfs/tests/conftest.py @@ -9,9 +9,10 @@ import fsspec import pytest import requests +from google.cloud import storage from gcsfs import GCSFileSystem -from gcsfs.tests.settings import TEST_BUCKET +from gcsfs.tests.settings import TEST_BUCKET, TEST_VERSIONED_BUCKET files = { "test/accounts.1.json": ( @@ -176,21 +177,68 @@ def extended_gcsfs(gcs_factory, populate=True): def gcs_versioned(gcs_factory): gcs = gcs_factory() gcs.version_aware = True - try: - try: - gcs.rm(gcs.find(TEST_BUCKET, versions=True)) - except FileNotFoundError: - pass - - try: - gcs.mkdir(TEST_BUCKET, enable_versioning=True) - except Exception: - pass + is_real_gcs = ( + os.environ.get("STORAGE_EMULATOR_HOST") == "https://storage.googleapis.com" + ) + try: # ensure we're empty. + if is_real_gcs: + # For real GCS, we assume the bucket exists and only clean its contents. + try: + cleanup_versioned_bucket(gcs, TEST_VERSIONED_BUCKET) + except Exception as e: + logging.warning( + f"Failed to empty versioned bucket {TEST_VERSIONED_BUCKET}: {e}" + ) + else: + # For emulators, we delete and recreate the bucket for a clean state. + try: + gcs.rm(TEST_VERSIONED_BUCKET, recursive=True) + except FileNotFoundError: + pass + gcs.mkdir(TEST_VERSIONED_BUCKET, enable_versioning=True) gcs.invalidate_cache() yield gcs finally: try: - gcs.rm(gcs.find(TEST_BUCKET, versions=True)) - gcs.rm(TEST_BUCKET) + if not is_real_gcs: + gcs.rm(gcs.find(TEST_VERSIONED_BUCKET, versions=True)) + gcs.rm(TEST_VERSIONED_BUCKET) except: # noqa: E722 pass + + +def cleanup_versioned_bucket(gcs, bucket_name, prefix=None): + """ + Deletes all object versions in a bucket using the google-cloud-storage client, + ensuring it uses the same credentials as the gcsfs instance. + """ + # Define a retry policy for API calls to handle rate limiting. + # This can retry on 429 Too Many Requests errors, which can happen + # when deleting many object versions quickly. + from google.api_core.retry import Retry + + retry_policy = Retry( + initial=1.0, # Initial delay in seconds + maximum=30.0, # Maximum delay in seconds + multiplier=1.2, # Backoff factor + ) + + client = storage.Client( + credentials=gcs.credentials.credentials, project=gcs.project + ) + + # List all blobs, including old versions + blobs_to_delete = list(client.list_blobs(bucket_name, versions=True, prefix=prefix)) + + if not blobs_to_delete: + logging.info("No object versions to delete in %s.", bucket_name) + return + + logging.info( + "Deleting %d object versions from %s.", len(blobs_to_delete), bucket_name + ) + time.sleep(2) + for blob in blobs_to_delete: + blob.delete(retry=retry_policy) + + logging.info("Successfully deleted %d object versions.", len(blobs_to_delete)) diff --git a/gcsfs/tests/settings.py b/gcsfs/tests/settings.py index 4df10283..9921edf4 100644 --- a/gcsfs/tests/settings.py +++ b/gcsfs/tests/settings.py @@ -1,8 +1,9 @@ import os TEST_BUCKET = os.getenv("GCSFS_TEST_BUCKET", "gcsfs_test") +TEST_VERSIONED_BUCKET = os.getenv("GCSFS_TEST_VERSIONED_BUCKET", "gcsfs_test_versioned") TEST_PROJECT = os.getenv("GCSFS_TEST_PROJECT", "project") -TEST_REQUESTER_PAYS_BUCKET = "gcsfs_test_req_pay" +TEST_REQUESTER_PAYS_BUCKET = f"{TEST_BUCKET}_req_pay" TEST_KMS_KEY = os.getenv( "GCSFS_TEST_KMS_KEY", f"projects/{TEST_PROJECT}/locations/us/keyRings/gcsfs_test/cryptKeys/gcsfs_test_key", diff --git a/gcsfs/tests/test_core.py b/gcsfs/tests/test_core.py index a93dbd4c..dae8454b 100644 --- a/gcsfs/tests/test_core.py +++ b/gcsfs/tests/test_core.py @@ -1392,70 +1392,6 @@ def test_deep_find_wthdirs(gcs): ] -def test_info_versioned(gcs_versioned): - with gcs_versioned.open(a, "wb") as wo: - wo.write(b"v1") - v1 = gcs_versioned.info(a)["generation"] - assert v1 is not None - with gcs_versioned.open(a, "wb") as wo: - wo.write(b"v2") - v2 = gcs_versioned.info(a)["generation"] - assert v2 is not None and v1 != v2 - assert gcs_versioned.info(f"{a}#{v1}")["generation"] == v1 - assert gcs_versioned.info(f"{a}?generation={v2}")["generation"] == v2 - - -def test_cat_versioned(gcs_versioned): - with gcs_versioned.open(a, "wb") as wo: - wo.write(b"v1") - v1 = gcs_versioned.info(a)["generation"] - assert v1 is not None - with gcs_versioned.open(a, "wb") as wo: - wo.write(b"v2") - gcs_versioned.cat(f"{a}#{v1}") == b"v1" - - -def test_cp_versioned(gcs_versioned): - with gcs_versioned.open(a, "wb") as wo: - wo.write(b"v1") - v1 = gcs_versioned.info(a)["generation"] - assert v1 is not None - with gcs_versioned.open(a, "wb") as wo: - wo.write(b"v2") - gcs_versioned.cp_file(f"{a}#{v1}", b) - assert gcs_versioned.cat(b) == b"v1" - - -def test_ls_versioned(gcs_versioned): - import posixpath - - with gcs_versioned.open(a, "wb") as wo: - wo.write(b"v1") - v1 = gcs_versioned.info(a)["generation"] - with gcs_versioned.open(a, "wb") as wo: - wo.write(b"v2") - v2 = gcs_versioned.info(a)["generation"] - dpath = posixpath.dirname(a) - versions = {f"{a}#{v1}", f"{a}#{v2}"} - assert versions == set(gcs_versioned.ls(dpath, versions=True)) - assert versions == { - entry["name"] for entry in gcs_versioned.ls(dpath, detail=True, versions=True) - } - assert gcs_versioned.ls(TEST_BUCKET, versions=True) == ["gcsfs_test/tmp"] - - -def test_find_versioned(gcs_versioned): - with gcs_versioned.open(a, "wb") as wo: - wo.write(b"v1") - v1 = gcs_versioned.info(a)["generation"] - with gcs_versioned.open(a, "wb") as wo: - wo.write(b"v2") - v2 = gcs_versioned.info(a)["generation"] - versions = {f"{a}#{v1}", f"{a}#{v2}"} - assert versions == set(gcs_versioned.find(a, versions=True)) - assert versions == set(gcs_versioned.find(a, detail=True, versions=True)) - - def test_cp_directory_recursive(gcs): src = TEST_BUCKET + "/src" src_file = src + "/file" diff --git a/gcsfs/tests/test_core_versioned.py b/gcsfs/tests/test_core_versioned.py new file mode 100644 index 00000000..ad549ce0 --- /dev/null +++ b/gcsfs/tests/test_core_versioned.py @@ -0,0 +1,107 @@ +import os +import posixpath + +import pytest +from google.cloud import storage + +from gcsfs import GCSFileSystem +from gcsfs.tests.settings import TEST_VERSIONED_BUCKET + +a = TEST_VERSIONED_BUCKET + "/tmp/test/a" +b = TEST_VERSIONED_BUCKET + "/tmp/test/b" + + +def is_versioning_enabled(): + """ + Helper function to check if the test bucket has versioning enabled. + Returns a tuple of (bool, reason_string). + """ + # Don't skip when using an emulator, as we create the versioned bucket ourselves. + if os.environ.get("STORAGE_EMULATOR_HOST") != "https://storage.googleapis.com": + return True, "" + try: + gcs = GCSFileSystem(project=os.getenv("GCSFS_TEST_PROJECT", "project")) + client = storage.Client( + credentials=gcs.credentials.credentials, project=gcs.project + ) + bucket = client.get_bucket(TEST_VERSIONED_BUCKET) + if bucket.versioning_enabled: + return True, "" + return ( + False, + f"Bucket '{TEST_VERSIONED_BUCKET}' does not have versioning enabled.", + ) + except Exception as e: + return ( + False, + f"Could not verify versioning status for bucket '{TEST_VERSIONED_BUCKET}': {e}", + ) + + +pytestmark = pytest.mark.skipif( + not is_versioning_enabled()[0], reason=is_versioning_enabled()[1] +) + + +def test_info_versioned(gcs_versioned): + with gcs_versioned.open(a, "wb") as wo: + wo.write(b"v1") + v1 = gcs_versioned.info(a)["generation"] + assert v1 is not None + with gcs_versioned.open(a, "wb") as wo: + wo.write(b"v2") + v2 = gcs_versioned.info(a)["generation"] + assert v2 is not None and v1 != v2 + assert gcs_versioned.info(f"{a}#{v1}")["generation"] == v1 + assert gcs_versioned.info(f"{a}?generation={v2}")["generation"] == v2 + + +def test_cat_versioned(gcs_versioned): + with gcs_versioned.open(b, "wb") as wo: + wo.write(b"v1") + v1 = gcs_versioned.info(b)["generation"] + assert v1 is not None + with gcs_versioned.open(b, "wb") as wo: + wo.write(b"v2") + assert gcs_versioned.cat(f"{b}#{v1}") == b"v1" + + +def test_cp_versioned(gcs_versioned): + with gcs_versioned.open(a, "wb") as wo: + wo.write(b"v1") + v1 = gcs_versioned.info(a)["generation"] + assert v1 is not None + with gcs_versioned.open(a, "wb") as wo: + wo.write(b"v2") + gcs_versioned.cp_file(f"{a}#{v1}", b) + assert gcs_versioned.cat(b) == b"v1" + + +def test_ls_versioned(gcs_versioned): + with gcs_versioned.open(b, "wb") as wo: + wo.write(b"v1") + v1 = gcs_versioned.info(b)["generation"] + with gcs_versioned.open(b, "wb") as wo: + wo.write(b"v2") + v2 = gcs_versioned.info(b)["generation"] + dpath = posixpath.dirname(b) + versions = {f"{b}#{v1}", f"{b}#{v2}"} + assert versions == set(gcs_versioned.ls(dpath, versions=True)) + assert versions == { + entry["name"] for entry in gcs_versioned.ls(dpath, detail=True, versions=True) + } + assert gcs_versioned.ls(TEST_VERSIONED_BUCKET, versions=True) == [ + f"{TEST_VERSIONED_BUCKET}/tmp" + ] + + +def test_find_versioned(gcs_versioned): + with gcs_versioned.open(a, "wb") as wo: + wo.write(b"v1") + v1 = gcs_versioned.info(a)["generation"] + with gcs_versioned.open(a, "wb") as wo: + wo.write(b"v2") + v2 = gcs_versioned.info(a)["generation"] + versions = {f"{a}#{v1}", f"{a}#{v2}"} + assert versions == set(gcs_versioned.find(a, versions=True)) + assert versions == set(gcs_versioned.find(a, detail=True, versions=True)) From 51122764f34c0a357c190694942932be5a36c079 Mon Sep 17 00:00:00 2001 From: Mahalaxmibejugam <60227368+Mahalaxmibejugam@users.noreply.github.com> Date: Mon, 1 Dec 2025 12:32:49 +0530 Subject: [PATCH 13/15] Update cleanup logic in tests to empty the bucket instead of deleting the bucket --- docs/source/developer.rst | 6 +- gcsfs/tests/conftest.py | 133 ++++++++++++++++++-------- gcsfs/tests/derived/gcsfs_fixtures.py | 30 +++--- gcsfs/tests/settings.py | 1 + gcsfs/tests/test_core_versioned.py | 12 +++ gcsfs/tests/test_extended_gcsfs.py | 20 ++-- 6 files changed, 134 insertions(+), 68 deletions(-) diff --git a/docs/source/developer.rst b/docs/source/developer.rst index 2c945194..421ab455 100644 --- a/docs/source/developer.rst +++ b/docs/source/developer.rst @@ -19,9 +19,9 @@ real GCS. A small number of tests run differently or are skipped. If you want to actually test against real GCS, then you should set STORAGE_EMULATOR_HOST to "https://storage.googleapis.com" and also provide appropriate GCSFS_TEST_BUCKET, GCSFS_TEST_VERSIONED_BUCKET -(To use for tests that target GCS object versioning, this bucket must have object versioning enabled) -and GCSFS_TEST_PROJECT, as well as setting your default google -credentials (or providing them via the fsspec config). +(To use for tests that target GCS object versioning, this bucket must have versioning enabled), +GCSFS_ZONAL_TEST_BUCKET(To use for testing Rapid storage features) and GCSFS_TEST_PROJECT, +as well as setting your default google credentials (or providing them via the fsspec config). .. _fake-gcs-server: https://github.com/fsouza/fake-gcs-server diff --git a/gcsfs/tests/conftest.py b/gcsfs/tests/conftest.py index 1d265c3a..1b118816 100644 --- a/gcsfs/tests/conftest.py +++ b/gcsfs/tests/conftest.py @@ -12,7 +12,7 @@ from google.cloud import storage from gcsfs import GCSFileSystem -from gcsfs.tests.settings import TEST_BUCKET, TEST_VERSIONED_BUCKET +from gcsfs.tests.settings import TEST_BUCKET, TEST_VERSIONED_BUCKET, TEST_ZONAL_BUCKET files = { "test/accounts.1.json": ( @@ -60,7 +60,7 @@ def stop_docker(container): subprocess.call(["docker", "rm", "-f", "-v", cid]) -@pytest.fixture(scope="module") +@pytest.fixture(scope="session") def docker_gcs(): if "STORAGE_EMULATOR_HOST" in os.environ: # assume using real API or otherwise have a server already set up @@ -91,7 +91,7 @@ def docker_gcs(): stop_docker(container) -@pytest.fixture +@pytest.fixture(scope="session") def gcs_factory(docker_gcs): params["endpoint_url"] = docker_gcs @@ -102,44 +102,83 @@ def factory(**kwargs): return factory +@pytest.fixture(scope="session") +def buckets_to_delete(): + """A set to keep track of buckets created during the test session.""" + return set() + + @pytest.fixture -def gcs(gcs_factory, populate=True): +def gcs(gcs_factory, buckets_to_delete, populate=True): gcs = gcs_factory() - try: - # ensure we're empty. - try: - gcs.rm(TEST_BUCKET, recursive=True) - except FileNotFoundError: - pass - try: + try: # ensure we're empty. + # Create the bucket if it doesn't exist, otherwise clean it. + if not gcs.exists(TEST_BUCKET): gcs.mkdir(TEST_BUCKET) - except Exception: - pass + buckets_to_delete.add(TEST_BUCKET) + else: + try: + gcs.rm(gcs.find(TEST_BUCKET)) + except Exception as e: + logging.warning(f"Failed to empty bucket {TEST_BUCKET}: {e}") if populate: gcs.pipe({TEST_BUCKET + "/" + k: v for k, v in allfiles.items()}) gcs.invalidate_cache() yield gcs finally: - try: - gcs.rm(gcs.find(TEST_BUCKET)) - gcs.rm(TEST_BUCKET) - except: # noqa: E722 - pass + _cleanup_gcs(gcs) -def _cleanup_gcs(gcs, is_real_gcs): - """Only remove the bucket/contents if we are NOT using the real GCS, logging a warning on failure.""" - if is_real_gcs: - return +def _cleanup_gcs(gcs): + """Clean the bucket contents, logging a warning on failure.""" try: - gcs.rm(TEST_BUCKET, recursive=True) + gcs.rm(gcs.find(TEST_BUCKET)) except Exception as e: logging.warning(f"Failed to clean up GCS bucket {TEST_BUCKET}: {e}") +@pytest.fixture(scope="session", autouse=True) +def final_cleanup(gcs_factory, buckets_to_delete): + """A session-scoped fixture to delete the test buckets after all tests are run.""" + yield + # This code runs after the entire test session finishes + use_extended_gcs = os.getenv( + "GCSFS_EXPERIMENTAL_ZB_HNS_SUPPORT", "false" + ).lower() in ( + "true", + "1", + ) + + if use_extended_gcs: + is_real_gcs = ( + os.environ.get("STORAGE_EMULATOR_HOST") == "https://storage.googleapis.com" + ) + mock_authentication_manager = ( + patch("google.auth.default", return_value=(None, "fake-project")) + if not is_real_gcs + else nullcontext() + ) + else: + mock_authentication_manager = nullcontext() + + with mock_authentication_manager: + gcs = gcs_factory() + for bucket in buckets_to_delete: + # For real GCS, only delete if created by the test suite. + # For emulators, always delete. + try: + if gcs.exists(bucket): + gcs.rm(bucket, recursive=True) + logging.info(f"Cleaned up bucket: {bucket}") + except Exception as e: + logging.warning( + f"Failed to perform final cleanup for bucket {bucket}: {e}" + ) + + @pytest.fixture -def extended_gcsfs(gcs_factory, populate=True): +def extended_gcsfs(gcs_factory, buckets_to_delete, populate=True): # Check if we are running against a real GCS endpoint is_real_gcs = ( os.environ.get("STORAGE_EMULATOR_HOST") == "https://storage.googleapis.com" @@ -159,52 +198,62 @@ def extended_gcsfs(gcs_factory, populate=True): # Only create/delete/populate the bucket if we are NOT using the real GCS endpoint if not is_real_gcs: try: - extended_gcsfs.rm(TEST_BUCKET, recursive=True) + extended_gcsfs.rm(TEST_ZONAL_BUCKET, recursive=True) except FileNotFoundError: pass - extended_gcsfs.mkdir(TEST_BUCKET) + extended_gcsfs.mkdir(TEST_ZONAL_BUCKET) + buckets_to_delete.add(TEST_ZONAL_BUCKET) if populate: extended_gcsfs.pipe( - {TEST_BUCKET + "/" + k: v for k, v in allfiles.items()} + {TEST_ZONAL_BUCKET + "/" + k: v for k, v in allfiles.items()} ) extended_gcsfs.invalidate_cache() yield extended_gcsfs finally: - _cleanup_gcs(extended_gcsfs, is_real_gcs) + _cleanup_gcs(extended_gcsfs) @pytest.fixture -def gcs_versioned(gcs_factory): +def gcs_versioned(gcs_factory, buckets_to_delete): gcs = gcs_factory() gcs.version_aware = True is_real_gcs = ( os.environ.get("STORAGE_EMULATOR_HOST") == "https://storage.googleapis.com" ) try: # ensure we're empty. + # The versioned bucket might be created by `is_versioning_enabled` + # in test_core_versioned.py. We must register it for cleanup only if + # it was created by this test run. + try: + from gcsfs.tests.test_core_versioned import ( + _VERSIONED_BUCKET_CREATED_BY_TESTS, + ) + + if _VERSIONED_BUCKET_CREATED_BY_TESTS: + buckets_to_delete.add(TEST_VERSIONED_BUCKET) + except ImportError: + pass # test_core_versioned is not being run if is_real_gcs: - # For real GCS, we assume the bucket exists and only clean its contents. - try: - cleanup_versioned_bucket(gcs, TEST_VERSIONED_BUCKET) - except Exception as e: - logging.warning( - f"Failed to empty versioned bucket {TEST_VERSIONED_BUCKET}: {e}" - ) + cleanup_versioned_bucket(gcs, TEST_VERSIONED_BUCKET) else: - # For emulators, we delete and recreate the bucket for a clean state. + # For emulators, we delete and recreate the bucket for a clean state try: gcs.rm(TEST_VERSIONED_BUCKET, recursive=True) except FileNotFoundError: pass gcs.mkdir(TEST_VERSIONED_BUCKET, enable_versioning=True) + buckets_to_delete.add(TEST_VERSIONED_BUCKET) gcs.invalidate_cache() yield gcs finally: + # Ensure the bucket is empty after the test. try: - if not is_real_gcs: - gcs.rm(gcs.find(TEST_VERSIONED_BUCKET, versions=True)) - gcs.rm(TEST_VERSIONED_BUCKET) - except: # noqa: E722 - pass + if is_real_gcs: + cleanup_versioned_bucket(gcs, TEST_VERSIONED_BUCKET) + except Exception as e: + logging.warning( + f"Failed to clean up versioned bucket {TEST_VERSIONED_BUCKET} after test: {e}" + ) def cleanup_versioned_bucket(gcs, bucket_name, prefix=None): diff --git a/gcsfs/tests/derived/gcsfs_fixtures.py b/gcsfs/tests/derived/gcsfs_fixtures.py index 21ce2431..f963262c 100644 --- a/gcsfs/tests/derived/gcsfs_fixtures.py +++ b/gcsfs/tests/derived/gcsfs_fixtures.py @@ -1,37 +1,35 @@ +import logging + import fsspec import pytest from fsspec.tests.abstract import AbstractFixtures from gcsfs.core import GCSFileSystem -from gcsfs.tests.conftest import allfiles +from gcsfs.tests.conftest import _cleanup_gcs, allfiles from gcsfs.tests.settings import TEST_BUCKET class GcsfsFixtures(AbstractFixtures): @pytest.fixture(scope="class") - def fs(self, docker_gcs): + def fs(self, docker_gcs, buckets_to_delete): GCSFileSystem.clear_instance_cache() gcs = fsspec.filesystem("gcs", endpoint_url=docker_gcs) - try: - # ensure we're empty. - try: - gcs.rm(TEST_BUCKET, recursive=True) - except FileNotFoundError: - pass - try: + try: # ensure we're empty. + # Create the bucket if it doesn't exist, otherwise clean it. + if not gcs.exists(TEST_BUCKET): + buckets_to_delete.add(TEST_BUCKET) gcs.mkdir(TEST_BUCKET) - except Exception: - pass + else: + try: + gcs.rm(gcs.find(TEST_BUCKET)) + except Exception as e: + logging.warning(f"Failed to empty bucket {TEST_BUCKET}: {e}") gcs.pipe({TEST_BUCKET + "/" + k: v for k, v in allfiles.items()}) gcs.invalidate_cache() yield gcs finally: - try: - gcs.rm(gcs.find(TEST_BUCKET)) - gcs.rm(TEST_BUCKET) - except: # noqa: E722 - pass + _cleanup_gcs(gcs) @pytest.fixture def fs_path(self): diff --git a/gcsfs/tests/settings.py b/gcsfs/tests/settings.py index 9921edf4..c3d25905 100644 --- a/gcsfs/tests/settings.py +++ b/gcsfs/tests/settings.py @@ -2,6 +2,7 @@ TEST_BUCKET = os.getenv("GCSFS_TEST_BUCKET", "gcsfs_test") TEST_VERSIONED_BUCKET = os.getenv("GCSFS_TEST_VERSIONED_BUCKET", "gcsfs_test_versioned") +TEST_ZONAL_BUCKET = os.getenv("GCSFS_ZONAL_TEST_BUCKET", "gcsfs_zonal_test") TEST_PROJECT = os.getenv("GCSFS_TEST_PROJECT", "project") TEST_REQUESTER_PAYS_BUCKET = f"{TEST_BUCKET}_req_pay" TEST_KMS_KEY = os.getenv( diff --git a/gcsfs/tests/test_core_versioned.py b/gcsfs/tests/test_core_versioned.py index ad549ce0..24821114 100644 --- a/gcsfs/tests/test_core_versioned.py +++ b/gcsfs/tests/test_core_versioned.py @@ -1,3 +1,4 @@ +import logging import os import posixpath @@ -10,6 +11,9 @@ a = TEST_VERSIONED_BUCKET + "/tmp/test/a" b = TEST_VERSIONED_BUCKET + "/tmp/test/b" +# Flag to track if the bucket was created by this test run. +_VERSIONED_BUCKET_CREATED_BY_TESTS = False + def is_versioning_enabled(): """ @@ -17,10 +21,18 @@ def is_versioning_enabled(): Returns a tuple of (bool, reason_string). """ # Don't skip when using an emulator, as we create the versioned bucket ourselves. + global _VERSIONED_BUCKET_CREATED_BY_TESTS if os.environ.get("STORAGE_EMULATOR_HOST") != "https://storage.googleapis.com": return True, "" try: gcs = GCSFileSystem(project=os.getenv("GCSFS_TEST_PROJECT", "project")) + if not gcs.exists(TEST_VERSIONED_BUCKET): + logging.info( + f"Creating versioned bucket for tests: {TEST_VERSIONED_BUCKET}" + ) + gcs.mkdir(TEST_VERSIONED_BUCKET, enable_versioning=True) + _VERSIONED_BUCKET_CREATED_BY_TESTS = True + client = storage.Client( credentials=gcs.credentials.credentials, project=gcs.project ) diff --git a/gcsfs/tests/test_extended_gcsfs.py b/gcsfs/tests/test_extended_gcsfs.py index b352846a..28b4ba89 100644 --- a/gcsfs/tests/test_extended_gcsfs.py +++ b/gcsfs/tests/test_extended_gcsfs.py @@ -11,17 +11,21 @@ from google.cloud.storage.exceptions import DataCorruption from gcsfs.extended_gcsfs import BucketType -from gcsfs.tests.conftest import a, b, c, csv_files, files, text_files -from gcsfs.tests.settings import TEST_BUCKET +from gcsfs.tests.conftest import csv_files, files, text_files +from gcsfs.tests.settings import TEST_ZONAL_BUCKET file = "test/accounts.1.json" -file_path = f"{TEST_BUCKET}/{file}" +file_path = f"{TEST_ZONAL_BUCKET}/{file}" json_data = files[file] lines = io.BytesIO(json_data).readlines() file_size = len(json_data) REQUIRED_ENV_VAR = "GCSFS_EXPERIMENTAL_ZB_HNS_SUPPORT" +a = TEST_ZONAL_BUCKET + "/tmp/test/a" +b = TEST_ZONAL_BUCKET + "/tmp/test/b" +c = TEST_ZONAL_BUCKET + "/tmp/test/c" + # If the condition is True, only then tests in this file are run. should_run = os.getenv(REQUIRED_ENV_VAR, "false").lower() in ( "true", @@ -131,7 +135,7 @@ def test_read_block_zb(extended_gcsfs, zonal_mocks, subtests): assert result == expected_data if mocks: mocks["sync_lookup_bucket_type"].assert_called_once_with( - TEST_BUCKET + TEST_ZONAL_BUCKET ) if expected_data: mocks["downloader"].download_ranges.assert_called_with( @@ -143,7 +147,7 @@ def test_read_block_zb(extended_gcsfs, zonal_mocks, subtests): def test_read_small_zb(extended_gcsfs, zonal_mocks): csv_file = "2014-01-01.csv" - csv_file_path = f"{TEST_BUCKET}/{csv_file}" + csv_file_path = f"{TEST_ZONAL_BUCKET}/{csv_file}" csv_data = csv_files[csv_file] with zonal_mocks(csv_data) as mocks: @@ -160,7 +164,9 @@ def test_read_small_zb(extended_gcsfs, zonal_mocks): # cache drop assert len(f.cache.cache) < len(out) if mocks: - mocks["sync_lookup_bucket_type"].assert_called_once_with(TEST_BUCKET) + mocks["sync_lookup_bucket_type"].assert_called_once_with( + TEST_ZONAL_BUCKET + ) def test_readline_zb(extended_gcsfs, zonal_mocks): @@ -169,7 +175,7 @@ def test_readline_zb(extended_gcsfs, zonal_mocks): ) for k, data in all_items: with zonal_mocks(data): - with extended_gcsfs.open("/".join([TEST_BUCKET, k]), "rb") as f: + with extended_gcsfs.open("/".join([TEST_ZONAL_BUCKET, k]), "rb") as f: result = f.readline() expected = data.split(b"\n")[0] + (b"\n" if data.count(b"\n") else b"") assert result == expected From 83581b24487b0dbd0519cf4b202b3c2c33e66746 Mon Sep 17 00:00:00 2001 From: Sunidhi Chandra Date: Wed, 3 Dec 2025 07:55:44 +0000 Subject: [PATCH 14/15] add logic to skip test in test_file itself --- gcsfs/tests/test_extended_gcsfs.py | 4 +--- gcsfs/tests/test_zonal_file.py | 12 ++++++++++++ 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/gcsfs/tests/test_extended_gcsfs.py b/gcsfs/tests/test_extended_gcsfs.py index b3cbb315..f6a176a9 100644 --- a/gcsfs/tests/test_extended_gcsfs.py +++ b/gcsfs/tests/test_extended_gcsfs.py @@ -154,9 +154,7 @@ def test_read_small_zb(extended_gcsfs, zonal_mocks): # cache drop assert len(f.cache.cache) < len(out) if mocks: - mocks["get_bucket_type"].assert_called_once_with( - TEST_ZONAL_BUCKET - ) + mocks["get_bucket_type"].assert_called_once_with(TEST_ZONAL_BUCKET) def test_readline_zb(extended_gcsfs, zonal_mocks): diff --git a/gcsfs/tests/test_zonal_file.py b/gcsfs/tests/test_zonal_file.py index 5fb0781a..fa1552b6 100644 --- a/gcsfs/tests/test_zonal_file.py +++ b/gcsfs/tests/test_zonal_file.py @@ -1,5 +1,6 @@ """Tests for ZonalFile write operations.""" +import os from unittest import mock import pytest @@ -10,6 +11,17 @@ file_path = f"{TEST_BUCKET}/zonal-file-test" test_data = b"hello world" +REQUIRED_ENV_VAR = "GCSFS_EXPERIMENTAL_ZB_HNS_SUPPORT" + +# If the condition is True, only then tests in this file are run. +should_run = os.getenv(REQUIRED_ENV_VAR, "false").lower() in ( + "true", + "1", +) +pytestmark = pytest.mark.skipif( + not should_run, reason=f"Skipping tests: {REQUIRED_ENV_VAR} env variable is not set" +) + @pytest.fixture def zonal_write_mocks(): From bed5bc10704cb0fe5c65b763cb6f910dc92647c4 Mon Sep 17 00:00:00 2001 From: Sunidhi Chandra Date: Thu, 4 Dec 2025 05:03:11 +0000 Subject: [PATCH 15/15] Added comments for clarity --- gcsfs/core.py | 13 +++++++++++++ gcsfs/extended_gcsfs.py | 6 +++--- gcsfs/zonal_file.py | 5 ++++- 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/gcsfs/core.py b/gcsfs/core.py index 09b010d3..9e5b9578 100644 --- a/gcsfs/core.py +++ b/gcsfs/core.py @@ -2074,6 +2074,10 @@ def _convert_fixed_key_metadata(metadata, *, from_google=False): async def upload_chunk(fs, location, data, offset, size, content_type): + """ + Uploads a chunk of data. This function has a conditional path to support + experimental features for Zonal buckets to append data using gRPC. + """ from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( AsyncAppendableObjectWriter, ) @@ -2114,6 +2118,11 @@ async def initiate_upload( mode="overwrite", kms_key_name=None, ): + """ + Initiates a resumable upload. This function has a conditional path to support + experimental features for Zonal buckets to append data using gRPC, returning an + "AsyncAppendableObjectWriter" instance as location. + """ from .extended_gcsfs import ExtendedGcsFileSystem from .extended_gcsfs import initiate_upload as ext_initiate_upload @@ -2164,6 +2173,10 @@ async def simple_upload( mode="overwrite", kms_key_name=None, ): + """ + Performs a simple, single-request upload. This function has a conditional path to support + experimental features for Zonal buckets to upload data using gRPC. + """ from .extended_gcsfs import ExtendedGcsFileSystem from .extended_gcsfs import simple_upload as ext_simple_upload diff --git a/gcsfs/extended_gcsfs.py b/gcsfs/extended_gcsfs.py index 21277425..133932f0 100644 --- a/gcsfs/extended_gcsfs.py +++ b/gcsfs/extended_gcsfs.py @@ -252,7 +252,7 @@ async def _cat_file(self, path, start=None, end=None, mrd=None, **kwargs): async def upload_chunk(fs, location, data, offset, size, content_type): raise NotImplementedError( - "upload_chunk is not implemented yet for ExtendedGcsFileSystem. Please use write() instead." + "upload_chunk is not implemented yet for Zonal experimental feature. Please use write() instead." ) @@ -267,7 +267,7 @@ async def initiate_upload( kms_key_name=None, ): raise NotImplementedError( - "initiate_upload is not implemented yet for ExtendedGcsFileSystem. Please use write() instead." + "initiate_upload is not implemented yet for Zonal experimental feature. Please use write() instead." ) @@ -284,5 +284,5 @@ async def simple_upload( kms_key_name=None, ): raise NotImplementedError( - "simple_upload is not implemented yet for ExtendedGcsFileSystem. Please use write() instead." + "simple_upload is not implemented yet for Zonal experimental feature. Please use write() instead." ) diff --git a/gcsfs/zonal_file.py b/gcsfs/zonal_file.py index 7869398f..b705766b 100644 --- a/gcsfs/zonal_file.py +++ b/gcsfs/zonal_file.py @@ -63,6 +63,9 @@ def _fetch_range(self, start, end): def write(self, data): """ Writes data using AsyncAppendableObjectWriter. + + For more details, see the documentation for AsyncAppendableObjectWriter: + https://github.com/googleapis/python-storage/blob/9e6fefdc24a12a9189f7119bc9119e84a061842f/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py#L38 """ if self.closed: raise ValueError("I/O operation on closed file.") @@ -103,7 +106,7 @@ def commit(self): def discard(self): """Discard is not applicable for Zonal Buckets. Log a warning instead.""" logger.warning( - "Discard is unavailable for Zonal Buckets. \ + "Discard is not applicable for Zonal Buckets. \ Data is uploaded via streaming and cannot be cancelled." )