Skip to content

Commit 47c2d76

Browse files
ankitaluthra1suni72Mahalaxmibejugam
authored
Feat: Introduce ExtendedGcsFileSystem for Zonal Bucket gRPC Read Path (#707)
Co-authored-by: Sunidhi Chandra <[email protected]> Co-authored-by: Mahalaxmi <[email protected]>
1 parent 5315c70 commit 47c2d76

File tree

13 files changed

+805
-6
lines changed

13 files changed

+805
-6
lines changed

.github/workflows/ci.yml

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,22 @@ jobs:
3535
- name: install
3636
run: |
3737
pip install -e .
38-
- name: Run tests
38+
- name: Run Standard Tests
3939
run: |
4040
export GOOGLE_APPLICATION_CREDENTIALS=$(pwd)/gcsfs/tests/fake-secret.json
4141
pytest -vv -s \
4242
--log-format="%(asctime)s %(levelname)s %(message)s" \
4343
--log-date-format="%H:%M:%S" \
44-
gcsfs/
44+
gcsfs/ \
45+
--ignore=gcsfs/tests/test_extended_gcsfs.py
46+
- name: Run Extended Tests
47+
run: |
48+
export GOOGLE_APPLICATION_CREDENTIALS=$(pwd)/gcsfs/tests/fake-secret.json
49+
export GCSFS_EXPERIMENTAL_ZB_HNS_SUPPORT="true"
50+
pytest -vv -s \
51+
--log-format="%(asctime)s %(levelname)s %(message)s" \
52+
--log-date-format="%H:%M:%S" \
53+
gcsfs/tests/test_extended_gcsfs.py
4554
4655
lint:
4756
name: lint

.isort.cfg

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
[settings]
2+
profile = black
23
known_third_party = aiohttp,click,decorator,fsspec,fuse,google,google_auth_oauthlib,pytest,requests,setuptools

environment_gcsfs.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@ dependencies:
1313
- google-auth-oauthlib
1414
- google-cloud-core
1515
- google-cloud-storage
16+
- grpcio
1617
- pytest
1718
- pytest-timeout
1819
- pytest-asyncio
20+
- pytest-subtests
1921
- requests
2022
- ujson
2123
- pip:

gcsfs/__init__.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,29 @@
1+
import logging
2+
import os
3+
14
from ._version import get_versions
25

6+
logger = logging.getLogger(__name__)
37
__version__ = get_versions()["version"]
48
del get_versions
59
from .core import GCSFileSystem
610
from .mapping import GCSMap
711

12+
if os.getenv("GCSFS_EXPERIMENTAL_ZB_HNS_SUPPORT", "false").lower() in ("true", "1"):
13+
try:
14+
from .extended_gcsfs import ExtendedGcsFileSystem as GCSFileSystem
15+
16+
logger.info(
17+
"gcsfs experimental features enabled via GCSFS_EXPERIMENTAL_ZB_HNS_SUPPORT."
18+
)
19+
except ImportError as e:
20+
logger.warning(
21+
f"GCSFS_EXPERIMENTAL_ZB_HNS_SUPPORT is set, but failed to import experimental features: {e}"
22+
)
23+
# Fallback to core GCSFileSystem, do not register here
24+
25+
# TODO: GCSMap still refers to the original GCSFileSystem. This will be
26+
# addressed in a future update.
827
__all__ = ["GCSFileSystem", "GCSMap"]
928

1029
from . import _version

gcsfs/extended_gcsfs.py

Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
import logging
2+
from enum import Enum
3+
4+
from fsspec import asyn
5+
from google.api_core import exceptions as api_exceptions
6+
from google.api_core import gapic_v1
7+
from google.api_core.client_info import ClientInfo
8+
from google.cloud import storage_control_v2
9+
from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient
10+
from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import (
11+
AsyncMultiRangeDownloader,
12+
)
13+
14+
from gcsfs import __version__ as version
15+
from gcsfs import zb_hns_utils
16+
from gcsfs.core import GCSFile, GCSFileSystem
17+
from gcsfs.zonal_file import ZonalFile
18+
19+
logger = logging.getLogger("gcsfs")
20+
21+
USER_AGENT = "python-gcsfs"
22+
23+
24+
class BucketType(Enum):
25+
ZONAL_HIERARCHICAL = "ZONAL_HIERARCHICAL"
26+
HIERARCHICAL = "HIERARCHICAL"
27+
NON_HIERARCHICAL = "NON_HIERARCHICAL"
28+
UNKNOWN = "UNKNOWN"
29+
30+
31+
gcs_file_types = {
32+
BucketType.ZONAL_HIERARCHICAL: ZonalFile,
33+
BucketType.NON_HIERARCHICAL: GCSFile,
34+
BucketType.HIERARCHICAL: GCSFile,
35+
BucketType.UNKNOWN: GCSFile,
36+
}
37+
38+
39+
class ExtendedGcsFileSystem(GCSFileSystem):
40+
"""
41+
This class will be used when GCSFS_EXPERIMENTAL_ZB_HNS_SUPPORT env variable is set to true.
42+
ExtendedGcsFileSystem is a subclass of GCSFileSystem that adds new logic for bucket types
43+
including zonal and hierarchical. For buckets without special properties, it forwards requests
44+
to the parent class GCSFileSystem for default processing.
45+
"""
46+
47+
def __init__(self, *args, **kwargs):
48+
super().__init__(*args, **kwargs)
49+
self.grpc_client = None
50+
self.storage_control_client = None
51+
# initializing grpc and storage control client for Hierarchical and
52+
# zonal bucket operations
53+
self.grpc_client = asyn.sync(self.loop, self._create_grpc_client)
54+
self._storage_control_client = asyn.sync(
55+
self.loop, self._create_control_plane_client
56+
)
57+
self._storage_layout_cache = {}
58+
59+
async def _create_grpc_client(self):
60+
if self.grpc_client is None:
61+
return AsyncGrpcClient(
62+
client_info=ClientInfo(user_agent=f"{USER_AGENT}/{version}"),
63+
).grpc_client
64+
else:
65+
return self.grpc_client
66+
67+
async def _create_control_plane_client(self):
68+
# Initialize the storage control plane client for bucket
69+
# metadata operations
70+
client_info = gapic_v1.client_info.ClientInfo(
71+
user_agent=f"{USER_AGENT}/{version}"
72+
)
73+
return storage_control_v2.StorageControlAsyncClient(
74+
credentials=self.credentials.credentials, client_info=client_info
75+
)
76+
77+
async def _lookup_bucket_type(self, bucket):
78+
if bucket in self._storage_layout_cache:
79+
return self._storage_layout_cache[bucket]
80+
bucket_type = await self._get_bucket_type(bucket)
81+
# Dont cache UNKNOWN type
82+
if bucket_type == BucketType.UNKNOWN:
83+
return BucketType.UNKNOWN
84+
self._storage_layout_cache[bucket] = bucket_type
85+
return self._storage_layout_cache[bucket]
86+
87+
_sync_lookup_bucket_type = asyn.sync_wrapper(_lookup_bucket_type)
88+
89+
async def _get_bucket_type(self, bucket):
90+
try:
91+
bucket_name_value = f"projects/_/buckets/{bucket}/storageLayout"
92+
response = await self._storage_control_client.get_storage_layout(
93+
name=bucket_name_value
94+
)
95+
96+
if response.location_type == "zone":
97+
return BucketType.ZONAL_HIERARCHICAL
98+
else:
99+
# This should be updated to include HNS in the future
100+
return BucketType.NON_HIERARCHICAL
101+
except api_exceptions.NotFound:
102+
logger.warning(f"Error: Bucket {bucket} not found or you lack permissions.")
103+
return BucketType.UNKNOWN
104+
except Exception as e:
105+
logger.error(
106+
f"Could not determine bucket type for bucket name {bucket}: {e}"
107+
)
108+
# Default to UNKNOWN in case bucket type is not obtained
109+
return BucketType.UNKNOWN
110+
111+
def _open(
112+
self,
113+
path,
114+
mode="rb",
115+
block_size=None,
116+
cache_options=None,
117+
acl=None,
118+
consistency=None,
119+
metadata=None,
120+
autocommit=True,
121+
fixed_key_metadata=None,
122+
generation=None,
123+
**kwargs,
124+
):
125+
"""
126+
Open a file.
127+
"""
128+
bucket, _, _ = self.split_path(path)
129+
bucket_type = self._sync_lookup_bucket_type(bucket)
130+
return gcs_file_types[bucket_type](
131+
self,
132+
path,
133+
mode,
134+
block_size,
135+
cache_options=cache_options,
136+
consistency=consistency,
137+
metadata=metadata,
138+
acl=acl,
139+
autocommit=autocommit,
140+
fixed_key_metadata=fixed_key_metadata,
141+
generation=generation,
142+
**kwargs,
143+
)
144+
145+
# Replacement method for _process_limits to support new params (offset and length) for MRD.
146+
async def _process_limits_to_offset_and_length(self, path, start, end):
147+
"""
148+
Calculates the read offset and length from start and end parameters.
149+
150+
Args:
151+
path (str): The path to the file.
152+
start (int | None): The starting byte position.
153+
end (int | None): The ending byte position.
154+
155+
Returns:
156+
tuple: A tuple containing (offset, length).
157+
158+
Raises:
159+
ValueError: If the calculated range is invalid.
160+
"""
161+
size = None
162+
163+
if start is None:
164+
offset = 0
165+
elif start < 0:
166+
size = (await self._info(path))["size"] if size is None else size
167+
offset = size + start
168+
else:
169+
offset = start
170+
171+
if end is None:
172+
size = (await self._info(path))["size"] if size is None else size
173+
effective_end = size
174+
elif end < 0:
175+
size = (await self._info(path))["size"] if size is None else size
176+
effective_end = size + end
177+
else:
178+
effective_end = end
179+
180+
if offset < 0:
181+
raise ValueError(f"Calculated start offset ({offset}) cannot be negative.")
182+
if effective_end < offset:
183+
raise ValueError(
184+
f"Calculated end position ({effective_end}) cannot be before start offset ({offset})."
185+
)
186+
elif effective_end == offset:
187+
length = 0 # Handle zero-length slice
188+
else:
189+
length = effective_end - offset # Normal case
190+
size = (await self._info(path))["size"] if size is None else size
191+
if effective_end > size:
192+
length = max(0, size - offset) # Clamp and ensure non-negative
193+
194+
return offset, length
195+
196+
sync_process_limits_to_offset_and_length = asyn.sync_wrapper(
197+
_process_limits_to_offset_and_length
198+
)
199+
200+
async def _is_zonal_bucket(self, bucket):
201+
bucket_type = await self._lookup_bucket_type(bucket)
202+
return bucket_type == BucketType.ZONAL_HIERARCHICAL
203+
204+
async def _cat_file(self, path, start=None, end=None, mrd=None, **kwargs):
205+
"""Fetch a file's contents as bytes, with an optimized path for Zonal buckets.
206+
207+
This method overrides the parent `_cat_file` to read objects in Zonal buckets using gRPC.
208+
209+
Args:
210+
path (str): The full GCS path to the file (e.g., "bucket/object").
211+
start (int, optional): The starting byte position to read from.
212+
end (int, optional): The ending byte position to read to.
213+
mrd (AsyncMultiRangeDownloader, optional): An existing multi-range
214+
downloader instance. If not provided, a new one will be created for Zonal buckets.
215+
216+
Returns:
217+
bytes: The content of the file or file range.
218+
"""
219+
mrd = kwargs.pop("mrd", None)
220+
mrd_created = False
221+
222+
# A new MRD is required when read is done directly by the
223+
# GCSFilesystem class without creating a GCSFile object first.
224+
if mrd is None:
225+
bucket, object_name, generation = self.split_path(path)
226+
# Fall back to default implementation if not a zonal bucket
227+
if not await self._is_zonal_bucket(bucket):
228+
return await super()._cat_file(path, start=start, end=end, **kwargs)
229+
230+
mrd = await AsyncMultiRangeDownloader.create_mrd(
231+
self.grpc_client, bucket, object_name, generation
232+
)
233+
mrd_created = True
234+
235+
offset, length = await self._process_limits_to_offset_and_length(
236+
path, start, end
237+
)
238+
try:
239+
return await zb_hns_utils.download_range(
240+
offset=offset, length=length, mrd=mrd
241+
)
242+
finally:
243+
# Explicit cleanup if we created the MRD
244+
if mrd_created:
245+
await mrd.close()

gcsfs/tests/conftest.py

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1+
import logging
12
import os
23
import shlex
34
import subprocess
45
import time
6+
from contextlib import nullcontext
7+
from unittest.mock import patch
58

69
import fsspec
710
import pytest
@@ -91,10 +94,9 @@ def docker_gcs():
9194
def gcs_factory(docker_gcs):
9295
params["endpoint_url"] = docker_gcs
9396

94-
def factory(default_location=None):
97+
def factory(**kwargs):
9598
GCSFileSystem.clear_instance_cache()
96-
params["default_location"] = default_location
97-
return fsspec.filesystem("gcs", **params)
99+
return fsspec.filesystem("gcs", **params, **kwargs)
98100

99101
return factory
100102

@@ -125,6 +127,51 @@ def gcs(gcs_factory, populate=True):
125127
pass
126128

127129

130+
def _cleanup_gcs(gcs, is_real_gcs):
131+
"""Only remove the bucket/contents if we are NOT using the real GCS, logging a warning on failure."""
132+
if is_real_gcs:
133+
return
134+
try:
135+
gcs.rm(TEST_BUCKET, recursive=True)
136+
except Exception as e:
137+
logging.warning(f"Failed to clean up GCS bucket {TEST_BUCKET}: {e}")
138+
139+
140+
@pytest.fixture
141+
def extended_gcsfs(gcs_factory, populate=True):
142+
# Check if we are running against a real GCS endpoint
143+
is_real_gcs = (
144+
os.environ.get("STORAGE_EMULATOR_HOST") == "https://storage.googleapis.com"
145+
)
146+
147+
# Mock authentication if not using a real GCS endpoint,
148+
# since grpc client in extended_gcsfs does not work with anon access
149+
mock_authentication_manager = (
150+
patch("google.auth.default", return_value=(None, "fake-project"))
151+
if not is_real_gcs
152+
else nullcontext()
153+
)
154+
155+
with mock_authentication_manager:
156+
extended_gcsfs = gcs_factory()
157+
try:
158+
# Only create/delete/populate the bucket if we are NOT using the real GCS endpoint
159+
if not is_real_gcs:
160+
try:
161+
extended_gcsfs.rm(TEST_BUCKET, recursive=True)
162+
except FileNotFoundError:
163+
pass
164+
extended_gcsfs.mkdir(TEST_BUCKET)
165+
if populate:
166+
extended_gcsfs.pipe(
167+
{TEST_BUCKET + "/" + k: v for k, v in allfiles.items()}
168+
)
169+
extended_gcsfs.invalidate_cache()
170+
yield extended_gcsfs
171+
finally:
172+
_cleanup_gcs(extended_gcsfs, is_real_gcs)
173+
174+
128175
@pytest.fixture
129176
def gcs_versioned(gcs_factory):
130177
gcs = gcs_factory()

0 commit comments

Comments
 (0)