diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e62989e1..1b6b5a08 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -37,6 +37,7 @@ jobs: pip install -e . - name: Run Standard Tests run: | + pip install "git+https://github.com/googleapis/python-storage@main#egg=google-cloud-storage" --force-reinstall --no-cache-dir --upgrade export GOOGLE_APPLICATION_CREDENTIALS=$(pwd)/gcsfs/tests/fake-secret.json pytest -vv -s \ --log-format="%(asctime)s %(levelname)s %(message)s" \ @@ -44,12 +45,14 @@ jobs: gcsfs/ - name: Run Extended Tests run: | + pip install "git+https://github.com/googleapis/python-storage@main#egg=google-cloud-storage" --force-reinstall --no-cache-dir --upgrade export GOOGLE_APPLICATION_CREDENTIALS=$(pwd)/gcsfs/tests/fake-secret.json export GCSFS_EXPERIMENTAL_ZB_HNS_SUPPORT="true" pytest -vv -s \ --log-format="%(asctime)s %(levelname)s %(message)s" \ --log-date-format="%H:%M:%S" \ - gcsfs/tests/test_extended_gcsfs.py + gcsfs/tests/test_extended_gcsfs.py \ + gcsfs/tests/test_zonal_file.py lint: name: lint diff --git a/gcsfs/core.py b/gcsfs/core.py index 8fe06625..9e5b9578 100644 --- a/gcsfs/core.py +++ b/gcsfs/core.py @@ -1864,6 +1864,7 @@ def __init__( self.bucket = bucket self.key = key self.acl = acl + self.consistency = consistency self.checker = get_consistency_checker(consistency) if "a" in self.mode: @@ -2073,6 +2074,22 @@ def _convert_fixed_key_metadata(metadata, *, from_google=False): async def upload_chunk(fs, location, data, offset, size, content_type): + """ + Uploads a chunk of data. This function has a conditional path to support + experimental features for Zonal buckets to append data using gRPC. + """ + from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( + AsyncAppendableObjectWriter, + ) + + from .extended_gcsfs import ExtendedGcsFileSystem + from .extended_gcsfs import upload_chunk as ext_upload_chunk + + if isinstance(fs, ExtendedGcsFileSystem) and isinstance( + location, AsyncAppendableObjectWriter + ): + + return await ext_upload_chunk(fs, location, data, offset, size, content_type) head = {} l = len(data) range = "bytes %i-%i/%i" % (offset, offset + l - 1, size) @@ -2101,6 +2118,27 @@ async def initiate_upload( mode="overwrite", kms_key_name=None, ): + """ + Initiates a resumable upload. This function has a conditional path to support + experimental features for Zonal buckets to append data using gRPC, returning an + "AsyncAppendableObjectWriter" instance as location. + """ + from .extended_gcsfs import ExtendedGcsFileSystem + from .extended_gcsfs import initiate_upload as ext_initiate_upload + + if isinstance(fs, ExtendedGcsFileSystem) and await fs._is_zonal_bucket(bucket): + + return await ext_initiate_upload( + fs, + bucket, + key, + content_type, + metadata, + fixed_key_metadata, + mode, + kms_key_name, + ) + j = {"name": key} if metadata: j["metadata"] = metadata @@ -2135,6 +2173,28 @@ async def simple_upload( mode="overwrite", kms_key_name=None, ): + """ + Performs a simple, single-request upload. This function has a conditional path to support + experimental features for Zonal buckets to upload data using gRPC. + """ + from .extended_gcsfs import ExtendedGcsFileSystem + from .extended_gcsfs import simple_upload as ext_simple_upload + + if isinstance(fs, ExtendedGcsFileSystem) and await fs._is_zonal_bucket(bucket): + + return await ext_simple_upload( + fs, + bucket, + key, + datain, + metadatain, + consistency, + content_type, + fixed_key_metadata, + mode, + kms_key_name, + ) + checker = get_consistency_checker(consistency) path = f"{fs._location}/upload/storage/v1/b/{quote(bucket)}/o" metadata = {"name": key} diff --git a/gcsfs/extended_gcsfs.py b/gcsfs/extended_gcsfs.py index f4c643a4..133932f0 100644 --- a/gcsfs/extended_gcsfs.py +++ b/gcsfs/extended_gcsfs.py @@ -5,6 +5,7 @@ from google.api_core import exceptions as api_exceptions from google.api_core import gapic_v1 from google.api_core.client_info import ClientInfo +from google.auth.credentials import AnonymousCredentials from google.cloud import storage_control_v2 from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import ( @@ -48,6 +49,9 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.grpc_client = None self.storage_control_client = None + self.credential = self.credentials.credentials + if self.credentials.token == "anon": + self.credential = AnonymousCredentials() # initializing grpc and storage control client for Hierarchical and # zonal bucket operations self.grpc_client = asyn.sync(self.loop, self._create_grpc_client) @@ -59,6 +63,7 @@ def __init__(self, *args, **kwargs): async def _create_grpc_client(self): if self.grpc_client is None: return AsyncGrpcClient( + credentials=self.credential, client_info=ClientInfo(user_agent=f"{USER_AGENT}/{version}"), ).grpc_client else: @@ -71,7 +76,7 @@ async def _create_control_plane_client(self): user_agent=f"{USER_AGENT}/{version}" ) return storage_control_v2.StorageControlAsyncClient( - credentials=self.credentials.credentials, client_info=client_info + credentials=self.credential, client_info=client_info ) async def _lookup_bucket_type(self, bucket): @@ -243,3 +248,41 @@ async def _cat_file(self, path, start=None, end=None, mrd=None, **kwargs): # Explicit cleanup if we created the MRD if mrd_created: await mrd.close() + + +async def upload_chunk(fs, location, data, offset, size, content_type): + raise NotImplementedError( + "upload_chunk is not implemented yet for Zonal experimental feature. Please use write() instead." + ) + + +async def initiate_upload( + fs, + bucket, + key, + content_type="application/octet-stream", + metadata=None, + fixed_key_metadata=None, + mode="overwrite", + kms_key_name=None, +): + raise NotImplementedError( + "initiate_upload is not implemented yet for Zonal experimental feature. Please use write() instead." + ) + + +async def simple_upload( + fs, + bucket, + key, + datain, + metadatain=None, + consistency=None, + content_type="application/octet-stream", + fixed_key_metadata=None, + mode="overwrite", + kms_key_name=None, +): + raise NotImplementedError( + "simple_upload is not implemented yet for Zonal experimental feature. Please use write() instead." + ) diff --git a/gcsfs/tests/conftest.py b/gcsfs/tests/conftest.py index 2134130c..074f5f99 100644 --- a/gcsfs/tests/conftest.py +++ b/gcsfs/tests/conftest.py @@ -3,8 +3,6 @@ import shlex import subprocess import time -from contextlib import nullcontext -from unittest.mock import patch import fsspec import pytest @@ -159,41 +157,18 @@ def final_cleanup(gcs_factory, buckets_to_delete): """ yield # This code runs after the entire test session finishes - use_extended_gcs = os.getenv( - "GCSFS_EXPERIMENTAL_ZB_HNS_SUPPORT", "false" - ).lower() in ( - "true", - "1", - ) - if use_extended_gcs: - is_real_gcs = ( - os.environ.get("STORAGE_EMULATOR_HOST") == "https://storage.googleapis.com" - ) - # Mock authentication if not using a real GCS endpoint, - # since grpc client in extended_gcsfs does not work with anon access - mock_authentication_manager = ( - patch("google.auth.default", return_value=(None, "fake-project")) - if not is_real_gcs - else nullcontext() - ) - else: - mock_authentication_manager = nullcontext() - - with mock_authentication_manager: - gcs = gcs_factory() - for bucket in buckets_to_delete: - # The cleanup logic attempts to delete every bucket that was - # added to the set during the session. For real GCS, only delete if - # created by the test suite. - try: - if gcs.exists(bucket): - gcs.rm(bucket, recursive=True) - logging.info(f"Cleaned up bucket: {bucket}") - except Exception as e: - logging.warning( - f"Failed to perform final cleanup for bucket {bucket}: {e}" - ) + gcs = gcs_factory() + for bucket in buckets_to_delete: + # The cleanup logic attempts to delete every bucket that was + # added to the set during the session. For real GCS, only delete if + # created by the test suite. + try: + if gcs.exists(bucket): + gcs.rm(bucket, recursive=True) + logging.info(f"Cleaned up bucket: {bucket}") + except Exception as e: + logging.warning(f"Failed to perform final cleanup for bucket {bucket}: {e}") @pytest.fixture @@ -203,35 +178,24 @@ def extended_gcsfs(gcs_factory, buckets_to_delete, populate=True): os.environ.get("STORAGE_EMULATOR_HOST") == "https://storage.googleapis.com" ) - # Mock authentication if not using a real GCS endpoint, - # since grpc client in extended_gcsfs does not work with anon access - mock_authentication_manager = ( - patch("google.auth.default", return_value=(None, "fake-project")) - if not is_real_gcs - else nullcontext() - ) - - with mock_authentication_manager: - extended_gcsfs = gcs_factory() - try: - # Only create/delete/populate the bucket if we are NOT using the real GCS endpoint - if not is_real_gcs: - try: - extended_gcsfs.rm(TEST_ZONAL_BUCKET, recursive=True) - except FileNotFoundError: - pass - extended_gcsfs.mkdir(TEST_ZONAL_BUCKET) - # When running against the emulator, the zonal test bucket is - # always created and added to the set for guaranteed cleanup. - buckets_to_delete.add(TEST_ZONAL_BUCKET) - if populate: - extended_gcsfs.pipe( - {TEST_ZONAL_BUCKET + "/" + k: v for k, v in allfiles.items()} - ) - extended_gcsfs.invalidate_cache() - yield extended_gcsfs - finally: - _cleanup_gcs(extended_gcsfs) + extended_gcsfs = gcs_factory() + try: + # Only create/delete/populate the bucket if we are NOT using the real GCS endpoint + if not is_real_gcs: + try: + extended_gcsfs.rm(TEST_ZONAL_BUCKET, recursive=True) + except FileNotFoundError: + pass + extended_gcsfs.mkdir(TEST_ZONAL_BUCKET) + buckets_to_delete.add(TEST_ZONAL_BUCKET) + if populate: + extended_gcsfs.pipe( + {TEST_ZONAL_BUCKET + "/" + k: v for k, v in allfiles.items()} + ) + extended_gcsfs.invalidate_cache() + yield extended_gcsfs + finally: + _cleanup_gcs(extended_gcsfs) @pytest.fixture diff --git a/gcsfs/tests/test_extended_gcsfs.py b/gcsfs/tests/test_extended_gcsfs.py index 28b4ba89..f6a176a9 100644 --- a/gcsfs/tests/test_extended_gcsfs.py +++ b/gcsfs/tests/test_extended_gcsfs.py @@ -49,11 +49,8 @@ def _zonal_mocks_factory(file_data): if is_real_gcs: yield None return - patch_target_lookup_bucket_type = ( - "gcsfs.extended_gcsfs.ExtendedGcsFileSystem._lookup_bucket_type" - ) - patch_target_sync_lookup_bucket_type = ( - "gcsfs.extended_gcsfs.ExtendedGcsFileSystem._sync_lookup_bucket_type" + patch_target_get_bucket_type = ( + "gcsfs.extended_gcsfs.ExtendedGcsFileSystem._get_bucket_type" ) patch_target_create_mrd = ( "google.cloud.storage._experimental.asyncio.async_multi_range_downloader" @@ -78,20 +75,16 @@ async def download_side_effect(read_requests, **kwargs): mock_create_mrd = mock.AsyncMock(return_value=mock_downloader) with ( mock.patch( - patch_target_sync_lookup_bucket_type, - return_value=BucketType.ZONAL_HIERARCHICAL, - ) as mock_sync_lookup_bucket_type, - mock.patch( - patch_target_lookup_bucket_type, + patch_target_get_bucket_type, return_value=BucketType.ZONAL_HIERARCHICAL, - ), + ) as mock_get_bucket_type, mock.patch(patch_target_create_mrd, mock_create_mrd), mock.patch( patch_target_gcsfs_cat_file, new_callable=mock.AsyncMock ) as mock_cat_file, ): mocks = { - "sync_lookup_bucket_type": mock_sync_lookup_bucket_type, + "get_bucket_type": mock_get_bucket_type, "create_mrd": mock_create_mrd, "downloader": mock_downloader, "cat_file": mock_cat_file, @@ -134,9 +127,6 @@ def test_read_block_zb(extended_gcsfs, zonal_mocks, subtests): assert result == expected_data if mocks: - mocks["sync_lookup_bucket_type"].assert_called_once_with( - TEST_ZONAL_BUCKET - ) if expected_data: mocks["downloader"].download_ranges.assert_called_with( [(offset, mock.ANY, mock.ANY)] @@ -164,9 +154,7 @@ def test_read_small_zb(extended_gcsfs, zonal_mocks): # cache drop assert len(f.cache.cache) < len(out) if mocks: - mocks["sync_lookup_bucket_type"].assert_called_once_with( - TEST_ZONAL_BUCKET - ) + mocks["get_bucket_type"].assert_called_once_with(TEST_ZONAL_BUCKET) def test_readline_zb(extended_gcsfs, zonal_mocks): @@ -211,7 +199,7 @@ def test_readline_empty_zb(extended_gcsfs, zonal_mocks): data = b"" if not extended_gcsfs.on_google: with mock.patch.object( - extended_gcsfs, "_sync_lookup_bucket_type", return_value=BucketType.UNKNOWN + extended_gcsfs, "_get_bucket_type", return_value=BucketType.UNKNOWN ): with extended_gcsfs.open(b, "wb") as f: f.write(data) @@ -225,7 +213,7 @@ def test_readline_blocksize_zb(extended_gcsfs, zonal_mocks): data = b"ab\n" + b"a" * (2**18) + b"\nab" if not extended_gcsfs.on_google: with mock.patch.object( - extended_gcsfs, "_sync_lookup_bucket_type", return_value=BucketType.UNKNOWN + extended_gcsfs, "_get_bucket_type", return_value=BucketType.UNKNOWN ): with extended_gcsfs.open(c, "wb") as f: f.write(data) diff --git a/gcsfs/tests/test_zb_hns_utils.py b/gcsfs/tests/test_zb_hns_utils.py index a64e6793..77c287d1 100644 --- a/gcsfs/tests/test_zb_hns_utils.py +++ b/gcsfs/tests/test_zb_hns_utils.py @@ -4,6 +4,11 @@ from gcsfs import zb_hns_utils +mock_grpc_client = mock.Mock() +bucket_name = "test-bucket" +object_name = "test-object" +generation = "12345" + @pytest.mark.asyncio async def test_download_range(): @@ -27,3 +32,29 @@ async def mock_download_ranges(ranges): mock_mrd.download_ranges.assert_called_once_with([(offset, length, mock.ANY)]) assert result == expected_data + + +@pytest.mark.asyncio +async def test_init_aaow(): + """ + Tests that init_aaow calls the underlying AsyncAppendableObjectWriter.open + method and returns its result. + """ + mock_writer_instance = mock.AsyncMock() + with mock.patch( + "gcsfs.zb_hns_utils.AsyncAppendableObjectWriter", # The class to patch + new_callable=mock.Mock, # Use a regular Mock for the class + return_value=mock_writer_instance, + ) as mock_writer_class: + result = await zb_hns_utils.init_aaow( + mock_grpc_client, bucket_name, object_name, generation + ) + + mock_writer_class.assert_called_once_with( + client=mock_grpc_client, + bucket_name=bucket_name, + object_name=object_name, + generation=generation, + ) + mock_writer_instance.open.assert_awaited_once() + assert result is mock_writer_instance diff --git a/gcsfs/tests/test_zonal_file.py b/gcsfs/tests/test_zonal_file.py new file mode 100644 index 00000000..38f1961e --- /dev/null +++ b/gcsfs/tests/test_zonal_file.py @@ -0,0 +1,152 @@ +"""Tests for ZonalFile write operations.""" + +import os +from unittest import mock + +import pytest + +from gcsfs.extended_gcsfs import BucketType +from gcsfs.tests.settings import TEST_BUCKET + +file_path = f"{TEST_BUCKET}/zonal-file-test" +test_data = b"hello world" + +REQUIRED_ENV_VAR = "GCSFS_EXPERIMENTAL_ZB_HNS_SUPPORT" + +# If the condition is True, only then tests in this file are run. +should_run = os.getenv(REQUIRED_ENV_VAR, "false").lower() in ( + "true", + "1", +) +pytestmark = pytest.mark.skipif( + not should_run, reason=f"Skipping tests: {REQUIRED_ENV_VAR} env variable is not set" +) + + +@pytest.fixture +def zonal_write_mocks(): + """A fixture for mocking Zonal bucket write functionality.""" + + patch_target_get_bucket_type = ( + "gcsfs.extended_gcsfs.ExtendedGcsFileSystem._get_bucket_type" + ) + patch_target_init_aaow = "gcsfs.zb_hns_utils.init_aaow" + + mock_aaow = mock.AsyncMock() + mock_init_aaow = mock.AsyncMock(return_value=mock_aaow) + with ( + mock.patch( + patch_target_get_bucket_type, + return_value=BucketType.ZONAL_HIERARCHICAL, + ), + mock.patch(patch_target_init_aaow, mock_init_aaow), + ): + mocks = { + "aaow": mock_aaow, + "init_aaow": mock_init_aaow, + } + yield mocks + + +@pytest.mark.parametrize( + "setup_action, error_match", + [ + (lambda f: setattr(f, "mode", "rb"), "File not in write mode"), + (lambda f: setattr(f, "closed", True), "I/O operation on closed file"), + ( + lambda f: setattr(f, "forced", True), + "This file has been force-flushed, can only close", + ), + ], + ids=["not_writable", "closed", "force_flushed"], +) +def test_zonal_file_write_value_errors( + extended_gcsfs, zonal_write_mocks, setup_action, error_match # noqa: F841 +): + """Test ZonalFile.write raises ValueError for invalid states.""" + with extended_gcsfs.open(file_path, "wb") as f: + setup_action(f) + with pytest.raises(ValueError, match=error_match): + f.write(test_data) + + +def test_zonal_file_write_success(extended_gcsfs, zonal_write_mocks): + """Test that writing to a ZonalFile calls the underlying writer's append method.""" + with extended_gcsfs.open(file_path, "wb") as f: + f.write(test_data) + + zonal_write_mocks["aaow"].append.assert_awaited_once_with(test_data) + + +def test_zonal_file_open_write_mode(extended_gcsfs, zonal_write_mocks): + """Test that opening a ZonalFile in write mode initializes the writer.""" + bucket, key, _ = extended_gcsfs.split_path(file_path) + with extended_gcsfs.open(file_path, "wb"): + pass + + zonal_write_mocks["init_aaow"].assert_called_once_with( + extended_gcsfs.grpc_client, bucket, key + ) + + +def test_zonal_file_flush(extended_gcsfs, zonal_write_mocks): + """Test that flush calls the underlying writer's flush method.""" + with extended_gcsfs.open(file_path, "wb") as f: + f.flush() + + zonal_write_mocks["aaow"].flush.assert_awaited() + + +def test_zonal_file_commit(extended_gcsfs, zonal_write_mocks): + """Test that commit finalizes the write and sets autocommit to True.""" + with extended_gcsfs.open(file_path, "wb") as f: + f.commit() + + zonal_write_mocks["aaow"].finalize.assert_awaited_once() + assert f.autocommit is True + + +def test_zonal_file_discard(extended_gcsfs, zonal_write_mocks): # noqa: F841 + """Test that discard on a ZonalFile logs a warning.""" + with mock.patch("gcsfs.zonal_file.logger") as mock_logger: + with extended_gcsfs.open(file_path, "wb") as f: + f.discard() + mock_logger.warning.assert_called_once() + assert ( + "Discard is not applicable for Zonal Buckets" + in mock_logger.warning.call_args[0][0] + ) + + +def test_zonal_file_close(extended_gcsfs, zonal_write_mocks): + """Test that close finalizes the write by default (autocommit=True).""" + with extended_gcsfs.open(file_path, "wb"): + pass + zonal_write_mocks["aaow"].close.assert_awaited_once_with(finalize_on_close=True) + + +def test_zonal_file_close_with_autocommit_false(extended_gcsfs, zonal_write_mocks): + """Test that close does not finalize the write when autocommit is False.""" + + with extended_gcsfs.open(file_path, "wb", autocommit=False): + pass # close is called on exit + + zonal_write_mocks["aaow"].close.assert_awaited_once_with(finalize_on_close=False) + + +@pytest.mark.parametrize( + "method_name", + [ + ("_initiate_upload"), + ("_simple_upload"), + ("_upload_chunk"), + ], +) +def test_zonal_file_not_implemented_methods( + extended_gcsfs, zonal_write_mocks, method_name # noqa: F841 +): + """Test that some GCSFile methods are not implemented for ZonalFile.""" + with extended_gcsfs.open(file_path, "wb") as f: + method_to_call = getattr(f, method_name) + with pytest.raises(NotImplementedError): + method_to_call() diff --git a/gcsfs/zb_hns_utils.py b/gcsfs/zb_hns_utils.py index 648974e2..57d5d842 100644 --- a/gcsfs/zb_hns_utils.py +++ b/gcsfs/zb_hns_utils.py @@ -1,5 +1,9 @@ from io import BytesIO +from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( + AsyncAppendableObjectWriter, +) + async def download_range(offset, length, mrd): """ @@ -11,3 +15,18 @@ async def download_range(offset, length, mrd): buffer = BytesIO() await mrd.download_ranges([(offset, length, buffer)]) return buffer.getvalue() + + +async def init_aaow(grpc_client, bucket_name, object_name, generation=None): + """ + Creates and opens the AsyncAppendableObjectWriter. + """ + + writer = AsyncAppendableObjectWriter( + client=grpc_client, + bucket_name=bucket_name, + object_name=object_name, + generation=generation, + ) + await writer.open() + return writer diff --git a/gcsfs/zonal_file.py b/gcsfs/zonal_file.py index 93afb84c..b705766b 100644 --- a/gcsfs/zonal_file.py +++ b/gcsfs/zonal_file.py @@ -1,10 +1,15 @@ +import logging + from fsspec import asyn from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import ( AsyncMultiRangeDownloader, ) +from gcsfs import zb_hns_utils from gcsfs.core import GCSFile +logger = logging.getLogger("gcsfs.zonal_file") + class ZonalFile(GCSFile): """ @@ -22,6 +27,14 @@ def __init__(self, *args, **kwargs): self.mrd = asyn.sync( self.gcsfs.loop, self._init_mrd, self.bucket, self.key, self.generation ) + elif "w" in self.mode: + self.aaow = asyn.sync( + self.gcsfs.loop, + zb_hns_utils.init_aaow, + self.gcsfs.grpc_client, + self.bucket, + self.key, + ) else: raise NotImplementedError( "Only read operations are currently supported for Zonal buckets." @@ -47,10 +60,110 @@ def _fetch_range(self, start, end): return b"" raise + def write(self, data): + """ + Writes data using AsyncAppendableObjectWriter. + + For more details, see the documentation for AsyncAppendableObjectWriter: + https://github.com/googleapis/python-storage/blob/9e6fefdc24a12a9189f7119bc9119e84a061842f/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py#L38 + """ + if self.closed: + raise ValueError("I/O operation on closed file.") + if not self.writable(): + raise ValueError("File not in write mode") + if self.forced: + raise ValueError("This file has been force-flushed, can only close") + + asyn.sync(self.gcsfs.loop, self.aaow.append, data) + + def flush(self, force=False): + """ + Flushes the AsyncAppendableObjectWriter, sending all buffered data + to the server. + """ + if self.closed: + raise ValueError("Flush on closed file") + if force and self.forced: + raise ValueError("Force flush cannot be called more than once") + if force: + self.forced = True + + if self.readable(): + # no-op to flush on read-mode + return + + asyn.sync(self.gcsfs.loop, self.aaow.flush) + + def commit(self): + """ + Commits the write by finalizing the AsyncAppendableObjectWriter. + """ + if not self.writable(): + raise ValueError("File not in write mode") + self.autocommit = True + asyn.sync(self.gcsfs.loop, self.aaow.finalize) + + def discard(self): + """Discard is not applicable for Zonal Buckets. Log a warning instead.""" + logger.warning( + "Discard is not applicable for Zonal Buckets. \ + Data is uploaded via streaming and cannot be cancelled." + ) + + def _initiate_upload(self): + """Initiates the upload for Zonal buckets using gRPC.""" + from gcsfs.extended_gcsfs import initiate_upload + + self.location = asyn.sync( + self.gcsfs.loop, + initiate_upload, + self.gcsfs, + self.bucket, + self.key, + self.content_type, + self.metadata, + self.fixed_key_metadata, + mode="create" if "x" in self.mode else "overwrite", + kms_key_name=self.kms_key_name, + timeout=self.timeout, + ) + + def _simple_upload(self): + """Performs a simple upload for Zonal buckets using gRPC.""" + from gcsfs.extended_gcsfs import simple_upload + + self.buffer.seek(0) + data = self.buffer.read() + asyn.sync( + self.gcsfs.loop, + simple_upload, + self.gcsfs, + self.bucket, + self.key, + data, + self.metadata, + self.consistency, + self.content_type, + self.fixed_key_metadata, + mode="create" if "x" in self.mode else "overwrite", + kms_key_name=self.kms_key_name, + timeout=self.timeout, + ) + + def _upload_chunk(self, final=False): + raise NotImplementedError( + "_upload_chunk is not implemented yet for ZonalFile. Please use write() instead." + ) + def close(self): """ - Closes the ZonalFile and the underlying AsyncMultiRangeDownloader. + Closes the ZonalFile and the underlying AsyncMultiRangeDownloader and AsyncAppendableObjectWriter. + If in write mode, finalizes the write if autocommit is True. """ - if self.mrd: + if hasattr(self, "mrd") and self.mrd: asyn.sync(self.gcsfs.loop, self.mrd.close) + if hasattr(self, "aaow") and self.aaow: + asyn.sync( + self.gcsfs.loop, self.aaow.close, finalize_on_close=self.autocommit + ) super().close()