Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,25 @@ jobs:
pip install -e .
- name: Run Standard Tests
run: |
pip install "git+https://github.com/googleapis/python-storage@main#egg=google-cloud-storage" --force-reinstall --no-cache-dir --upgrade
export GOOGLE_APPLICATION_CREDENTIALS=$(pwd)/gcsfs/tests/fake-secret.json
pytest -vv -s \
--log-format="%(asctime)s %(levelname)s %(message)s" \
--log-date-format="%H:%M:%S" \
gcsfs/ \
--ignore=gcsfs/tests/test_extended_gcsfs.py
--ignore=gcsfs/tests/test_extended_gcsfs.py \
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of adding ignore on CI, please do the same in tests itself as added in main
reason: fsspec also run all gcsfs tests instead of adding --ignore in all ci rules, should be contained in test itself

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Added check for experimental env variable in the test file

--ignore=gcsfs/tests/test_zonal_file.py

- name: Run Extended Tests
run: |
pip install "git+https://github.com/googleapis/python-storage@main#egg=google-cloud-storage" --force-reinstall --no-cache-dir --upgrade
export GOOGLE_APPLICATION_CREDENTIALS=$(pwd)/gcsfs/tests/fake-secret.json
export GCSFS_EXPERIMENTAL_ZB_HNS_SUPPORT="true"
pytest -vv -s \
--log-format="%(asctime)s %(levelname)s %(message)s" \
--log-date-format="%H:%M:%S" \
gcsfs/tests/test_extended_gcsfs.py
gcsfs/tests/test_extended_gcsfs.py \
gcsfs/tests/test_zonal_file.py

lint:
name: lint
Expand Down
47 changes: 47 additions & 0 deletions gcsfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1864,6 +1864,7 @@ def __init__(
self.bucket = bucket
self.key = key
self.acl = acl
self.consistency = consistency
self.checker = get_consistency_checker(consistency)

if "a" in self.mode:
Expand Down Expand Up @@ -2073,6 +2074,18 @@ def _convert_fixed_key_metadata(metadata, *, from_google=False):


async def upload_chunk(fs, location, data, offset, size, content_type):
from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import (
AsyncAppendableObjectWriter,
)

from .extended_gcsfs import ExtendedGcsFileSystem
from .extended_gcsfs import upload_chunk as ext_upload_chunk

if isinstance(fs, ExtendedGcsFileSystem) and isinstance(
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add clear documentation here whats the new behaviour, condition is added instead of overriding functionality in ExtendedFileSystem to avoid change in imports etc. Same for similar methods

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.
Conditional logic is used here since these methods don't belong to a class, so override was not possible.

location, AsyncAppendableObjectWriter
):

return await ext_upload_chunk(fs, location, data, offset, size, content_type)
head = {}
l = len(data)
range = "bytes %i-%i/%i" % (offset, offset + l - 1, size)
Expand Down Expand Up @@ -2101,6 +2114,22 @@ async def initiate_upload(
mode="overwrite",
kms_key_name=None,
):
from .extended_gcsfs import ExtendedGcsFileSystem
from .extended_gcsfs import initiate_upload as ext_initiate_upload

if isinstance(fs, ExtendedGcsFileSystem) and await fs._is_zonal_bucket(bucket):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why isinstance needed here? isnt is_zonal check sufficient to trigger new functionality

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is_zonal_bucket method belongs to ExtendedGcsFileSystem. So the first check is there to make sure is_zonal_bucket method is found in the class.


return await ext_initiate_upload(
fs,
bucket,
key,
content_type,
metadata,
fixed_key_metadata,
mode,
kms_key_name,
)

j = {"name": key}
if metadata:
j["metadata"] = metadata
Expand Down Expand Up @@ -2135,6 +2164,24 @@ async def simple_upload(
mode="overwrite",
kms_key_name=None,
):
from .extended_gcsfs import ExtendedGcsFileSystem
from .extended_gcsfs import simple_upload as ext_simple_upload

if isinstance(fs, ExtendedGcsFileSystem) and await fs._is_zonal_bucket(bucket):

return await ext_simple_upload(
fs,
bucket,
key,
datain,
metadatain,
consistency,
content_type,
fixed_key_metadata,
mode,
kms_key_name,
)

checker = get_consistency_checker(consistency)
path = f"{fs._location}/upload/storage/v1/b/{quote(bucket)}/o"
metadata = {"name": key}
Expand Down
45 changes: 44 additions & 1 deletion gcsfs/extended_gcsfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from google.api_core import exceptions as api_exceptions
from google.api_core import gapic_v1
from google.api_core.client_info import ClientInfo
from google.auth.credentials import AnonymousCredentials
from google.cloud import storage_control_v2
from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient
from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import (
Expand Down Expand Up @@ -48,6 +49,9 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.grpc_client = None
self.storage_control_client = None
self.credential = self.credentials.credentials
if self.credentials.token == "anon":
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets add documentation so reader is clear why only anon has to be added differently, something like anon is used tests to bypass credentials

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this is separate from writes, does it make sense to seggregate the PRs so this can be merged easily. Also verify if all existing also tests run with experimental true, AFAIK creds was the only issue ?

self.credential = AnonymousCredentials()
# initializing grpc and storage control client for Hierarchical and
# zonal bucket operations
self.grpc_client = asyn.sync(self.loop, self._create_grpc_client)
Expand All @@ -59,6 +63,7 @@ def __init__(self, *args, **kwargs):
async def _create_grpc_client(self):
if self.grpc_client is None:
return AsyncGrpcClient(
credentials=self.credential,
client_info=ClientInfo(user_agent=f"{USER_AGENT}/{version}"),
).grpc_client
else:
Expand All @@ -71,7 +76,7 @@ async def _create_control_plane_client(self):
user_agent=f"{USER_AGENT}/{version}"
)
return storage_control_v2.StorageControlAsyncClient(
credentials=self.credentials.credentials, client_info=client_info
credentials=self.credential, client_info=client_info
)

async def _lookup_bucket_type(self, bucket):
Expand Down Expand Up @@ -243,3 +248,41 @@ async def _cat_file(self, path, start=None, end=None, mrd=None, **kwargs):
# Explicit cleanup if we created the MRD
if mrd_created:
await mrd.close()


async def upload_chunk(fs, location, data, offset, size, content_type):
raise NotImplementedError(
"upload_chunk is not implemented yet for ExtendedGcsFileSystem. Please use write() instead."
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

upload_chunk is not implemented yet for ExtendedGcsFileSystem. Please use write() instead.
rephrase to ? upload_chunk is not implemented yet for zonal experimental feature. Please use write() instead.

)


async def initiate_upload(
fs,
bucket,
key,
content_type="application/octet-stream",
metadata=None,
fixed_key_metadata=None,
mode="overwrite",
kms_key_name=None,
):
raise NotImplementedError(
"initiate_upload is not implemented yet for ExtendedGcsFileSystem. Please use write() instead."
)


async def simple_upload(
fs,
bucket,
key,
datain,
metadatain=None,
consistency=None,
content_type="application/octet-stream",
fixed_key_metadata=None,
mode="overwrite",
kms_key_name=None,
):
raise NotImplementedError(
"simple_upload is not implemented yet for ExtendedGcsFileSystem. Please use write() instead."
)
45 changes: 17 additions & 28 deletions gcsfs/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
import shlex
import subprocess
import time
from contextlib import nullcontext
from unittest.mock import patch

import fsspec
import pytest
Expand Down Expand Up @@ -144,32 +142,23 @@ def extended_gcsfs(gcs_factory, populate=True):
os.environ.get("STORAGE_EMULATOR_HOST") == "https://storage.googleapis.com"
)

# Mock authentication if not using a real GCS endpoint,
# since grpc client in extended_gcsfs does not work with anon access
mock_authentication_manager = (
patch("google.auth.default", return_value=(None, "fake-project"))
if not is_real_gcs
else nullcontext()
)

with mock_authentication_manager:
extended_gcsfs = gcs_factory()
try:
# Only create/delete/populate the bucket if we are NOT using the real GCS endpoint
if not is_real_gcs:
try:
extended_gcsfs.rm(TEST_BUCKET, recursive=True)
except FileNotFoundError:
pass
extended_gcsfs.mkdir(TEST_BUCKET)
if populate:
extended_gcsfs.pipe(
{TEST_BUCKET + "/" + k: v for k, v in allfiles.items()}
)
extended_gcsfs.invalidate_cache()
yield extended_gcsfs
finally:
_cleanup_gcs(extended_gcsfs, is_real_gcs)
extended_gcsfs = gcs_factory()
try:
# Only create/delete/populate the bucket if we are NOT using the real GCS endpoint
if not is_real_gcs:
try:
extended_gcsfs.rm(TEST_BUCKET, recursive=True)
except FileNotFoundError:
pass
extended_gcsfs.mkdir(TEST_BUCKET)
if populate:
extended_gcsfs.pipe(
{TEST_BUCKET + "/" + k: v for k, v in allfiles.items()}
)
extended_gcsfs.invalidate_cache()
yield extended_gcsfs
finally:
_cleanup_gcs(extended_gcsfs, is_real_gcs)


@pytest.fixture
Expand Down
26 changes: 8 additions & 18 deletions gcsfs/tests/test_extended_gcsfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,8 @@ def _zonal_mocks_factory(file_data):
if is_real_gcs:
yield None
return
patch_target_lookup_bucket_type = (
"gcsfs.extended_gcsfs.ExtendedGcsFileSystem._lookup_bucket_type"
)
patch_target_sync_lookup_bucket_type = (
"gcsfs.extended_gcsfs.ExtendedGcsFileSystem._sync_lookup_bucket_type"
patch_target_get_bucket_type = (
"gcsfs.extended_gcsfs.ExtendedGcsFileSystem._get_bucket_type"
)
patch_target_create_mrd = (
"google.cloud.storage._experimental.asyncio.async_multi_range_downloader"
Expand All @@ -63,20 +60,16 @@ async def download_side_effect(read_requests, **kwargs):
mock_create_mrd = mock.AsyncMock(return_value=mock_downloader)
with (
mock.patch(
patch_target_sync_lookup_bucket_type,
return_value=BucketType.ZONAL_HIERARCHICAL,
) as mock_sync_lookup_bucket_type,
mock.patch(
patch_target_lookup_bucket_type,
patch_target_get_bucket_type,
return_value=BucketType.ZONAL_HIERARCHICAL,
),
) as mock_get_bucket_type,
mock.patch(patch_target_create_mrd, mock_create_mrd),
mock.patch(
patch_target_gcsfs_cat_file, new_callable=mock.AsyncMock
) as mock_cat_file,
):
mocks = {
"sync_lookup_bucket_type": mock_sync_lookup_bucket_type,
"get_bucket_type": mock_get_bucket_type,
"create_mrd": mock_create_mrd,
"downloader": mock_downloader,
"cat_file": mock_cat_file,
Expand Down Expand Up @@ -119,9 +112,6 @@ def test_read_block_zb(extended_gcsfs, zonal_mocks, subtests):

assert result == expected_data
if mocks:
mocks["sync_lookup_bucket_type"].assert_called_once_with(
TEST_BUCKET
)
if expected_data:
mocks["downloader"].download_ranges.assert_called_with(
[(offset, mock.ANY, mock.ANY)]
Expand Down Expand Up @@ -149,7 +139,7 @@ def test_read_small_zb(extended_gcsfs, zonal_mocks):
# cache drop
assert len(f.cache.cache) < len(out)
if mocks:
mocks["sync_lookup_bucket_type"].assert_called_once_with(TEST_BUCKET)
mocks["get_bucket_type"].assert_called_once_with(TEST_BUCKET)


def test_readline_zb(extended_gcsfs, zonal_mocks):
Expand Down Expand Up @@ -194,7 +184,7 @@ def test_readline_empty_zb(extended_gcsfs, zonal_mocks):
data = b""
if not extended_gcsfs.on_google:
with mock.patch.object(
extended_gcsfs, "_sync_lookup_bucket_type", return_value=BucketType.UNKNOWN
extended_gcsfs, "_get_bucket_type", return_value=BucketType.UNKNOWN
):
with extended_gcsfs.open(b, "wb") as f:
f.write(data)
Expand All @@ -208,7 +198,7 @@ def test_readline_blocksize_zb(extended_gcsfs, zonal_mocks):
data = b"ab\n" + b"a" * (2**18) + b"\nab"
if not extended_gcsfs.on_google:
with mock.patch.object(
extended_gcsfs, "_sync_lookup_bucket_type", return_value=BucketType.UNKNOWN
extended_gcsfs, "_get_bucket_type", return_value=BucketType.UNKNOWN
):
with extended_gcsfs.open(c, "wb") as f:
f.write(data)
Expand Down
31 changes: 31 additions & 0 deletions gcsfs/tests/test_zb_hns_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@

from gcsfs import zb_hns_utils

mock_grpc_client = mock.Mock()
bucket_name = "test-bucket"
object_name = "test-object"
generation = "12345"


@pytest.mark.asyncio
async def test_download_range():
Expand All @@ -27,3 +32,29 @@ async def mock_download_ranges(ranges):

mock_mrd.download_ranges.assert_called_once_with([(offset, length, mock.ANY)])
assert result == expected_data


@pytest.mark.asyncio
async def test_init_aaow():
"""
Tests that init_aaow calls the underlying AsyncAppendableObjectWriter.open
method and returns its result.
"""
mock_writer_instance = mock.AsyncMock()
with mock.patch(
"gcsfs.zb_hns_utils.AsyncAppendableObjectWriter", # The class to patch
new_callable=mock.Mock, # Use a regular Mock for the class
return_value=mock_writer_instance,
) as mock_writer_class:
result = await zb_hns_utils.init_aaow(
mock_grpc_client, bucket_name, object_name, generation
)

mock_writer_class.assert_called_once_with(
client=mock_grpc_client,
bucket_name=bucket_name,
object_name=object_name,
generation=generation,
)
mock_writer_instance.open.assert_awaited_once()
assert result is mock_writer_instance
Loading
Loading