Skip to content
17 changes: 12 additions & 5 deletions gcsfs/extended_gcsfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from gcsfs import __version__ as version
from gcsfs import zb_hns_utils
from gcsfs.core import GCSFile, GCSFileSystem
from gcsfs.retry import execute_with_timebound_retry
from gcsfs.zonal_file import ZonalFile

logger = logging.getLogger("gcsfs")
Expand Down Expand Up @@ -130,7 +131,9 @@ async def _get_bucket_type(self, bucket):
client = await self._get_control_plane_client()
bucket_name_value = f"projects/_/buckets/{bucket}/storageLayout"
logger.debug(f"get_storage_layout request for name: {bucket_name_value}")
response = await client.get_storage_layout(name=bucket_name_value)
response = await execute_with_timebound_retry(
client.get_storage_layout, name=bucket_name_value
)

if response.location_type == "zone":
return BucketType.ZONAL_HIERARCHICAL
Expand Down Expand Up @@ -509,7 +512,9 @@ async def _mv(self, path1, path2, **kwargs):

logger.debug(f"rename_folder request: {request}")
client = await self._get_control_plane_client()
operation = await client.rename_folder(request=request)
operation = await execute_with_timebound_retry(
client.rename_folder, request=request
)
await operation.result()
self._update_dircache_after_rename(path1, path2)

Expand Down Expand Up @@ -622,7 +627,7 @@ async def _mkdir(
try:
logger.debug(f"create_folder request: {request}")
client = await self._get_control_plane_client()
await client.create_folder(request=request)
await execute_with_timebound_retry(client.create_folder, request=request)
# Instead of invalidating the parent cache, update it to add the new entry.
parent_path = self._parent(path)
if parent_path in self.dircache:
Expand Down Expand Up @@ -668,7 +673,9 @@ async def _get_directory_info(self, path, bucket, key, generation):

# Verify existence using get_folder API
client = await self._get_control_plane_client()
response = await client.get_folder(request=request)
response = await execute_with_timebound_retry(
client.get_folder, request=request
)

# If successful, return directory metadata
return {
Expand Down Expand Up @@ -741,7 +748,7 @@ async def _rmdir(self, path):

logger.debug(f"delete_folder request: {request}")
client = await self._get_control_plane_client()
await client.delete_folder(request=request)
await execute_with_timebound_retry(client.delete_folder, request=request)

# Remove the directory from the cache and from its parent's listing.
self.dircache.pop(path, None)
Expand Down
59 changes: 59 additions & 0 deletions gcsfs/retry.py
Copy link
Copy Markdown
Contributor

@jasha26 jasha26 Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of the custom asyncio.wait_for logic, using a library like tenacity would look like this. Also did we evaluate google.api_core AsyncRetry:

from tenacity import retry, wait_exponential, stop_after_attempt, retry_if_exception_type

@retry(
    wait=wait_exponential(multiplier=1, min=2, max=32),
    stop=stop_after_attempt(6),
    retry=retry_if_exception_type((api_exceptions.ServiceUnavailable, asyncio.TimeoutError)),
    reraise=True
)
async def call_with_retry(func, *args, **kwargs):
    return await func(*args, **kwargs)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need asyncio.wait_for logic on the client side to make sure that we handle the request stalls and won't wait indefinitely for the call to return. Replaced the custom logic with tenacity as it provides in-built support for retries. AsyncRetry also provides the same functionality but would still need the asyncio.wait_for logic on the client side.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need another dependency?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a similar opinion, but @jasha26 recommended using Tenacity as it might help in future integrations like client-side throttling

AsyncRetry from google.api_core supports retries based on max_timeout instead of max_retries(which is followed in gcsfs for other JSON API retries). To keep the same retry behaviour for JSON APIs and storage control client calls(i.e., limiting the number of retries) I have implemented the custom logic. To use AsyncRetry with the constraint on number of attempts we would have to maintain a wrapper to track the number of attempts which would be almost same as the initial version of this PR without adding much benefit of using AsyncRetry

So we can either have entirely custom implementation or use Tenacity if we want to maintain the max_attempts behaviour across GCSFS

Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import google.auth.exceptions
import requests.exceptions
from decorator import decorator
from google.api_core import exceptions as api_exceptions

logger = logging.getLogger("gcsfs")

Expand Down Expand Up @@ -176,3 +177,61 @@ async def retry_request(func, retries=6, *args, **kwargs):
continue
logger.exception(f"{func.__name__} non-retriable exception: {e}")
raise e


async def execute_with_timebound_retry(
func, *args, retry_deadline=30.0, max_retries=6, **kwargs
Copy link
Copy Markdown
Contributor

@jasha26 jasha26 Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't really hard code these values, we need the ability so these can be overidden via multiple mechanisms like call site overrides, fsspec config overrides etc.

So i'd recommend we do something like below:

from fsspec.config import conf

@dataclass
class RetryConfig:
    max_retries: int = 6
    min_delay: float = 2.0
    max_delay: float = 32.0
    retry_deadline: float = 30.0

def get_resolved_retry_config(call_kwargs) -> RetryConfig:
    """
    Resolves retry configuration with a clear hierarchy of overrides:
    1. Explicit call-site arguments (e.g., max_retries=10)
    2. fsspec.config settings (e.g., ~/.config/fsspec/conf.json)
    3. Hardcoded Defaults from the RetryConfig template
    """
    # 1. Start with the default template
    default = RetryConfig()

    # 2. Resolve parameters from Env Vars or fsspec.config, or use defaults
    resolved_max_retries = int(
        call_kwargs.get("max_retries")
        or conf.get("gcsfs.retry.max_retries", default.max_retries)
    )
    
    resolved_deadline = float(
        call_kwargs.get("retry_deadline")
        or conf.get("gcsfs.retry.deadline", default.retry_deadline)
    )

    return RetryConfig(
        max_retries=resolved_max_retries,
        retry_deadline=resolved_deadline,
        min_delay=default.min_delay,
        max_delay=default.max_delay
    )

async def with_retry(func, *args, **kwargs):
    config = get_resolved_retry_config(kwargs)
    
    # Define transient errors consistent with GCS client best practices.
    RETRYABLE_ERRORS = (
        api_exceptions.ServiceUnavailable,
        api_exceptions.DeadlineExceeded,
        api_exceptions.InternalServerError,
        api_exceptions.TooManyRequests,
        asyncio.TimeoutError,
    )

    # Replaces custom loop with a declarative tenacity decorator.
    @retry(
        stop=stop_after_attempt(config.max_retries),
        wait=wait_exponential(multiplier=1, min=config.min_delay, max=config.max_delay),
        retry=retry_if_exception_type(RETRYABLE_ERRORS),
        reraise=True
    )
    async def _wrapped_call():
        return await func(*args, **kwargs)
    
    return await _wrapped_call()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not a fan of adding all these environment variables. fsspec already has a way to specify instantiation kwargs using specially formatted environment variables or files ( https://filesystem-spec.readthedocs.io/en/latest/features.html#configuration ).

Since retries are useful in multiple backends and have the same concepts (number of times, backoff factor, max wait, etc.), we could even make a general fsspec class for this.

):
"""
Executes a gRPC storage control API call with a strict per-attempt timeout and an overall
maximum number of retries. Transient errors and timeouts will trigger an exponential backoff loop.
"""
attempt = 0
while True:
try:
# We enforce a per-call timeout by passing `timeout=retry_deadline` to the API call.
# asyncio.wait_for serves as a hard local fallback to cancel the task if the gRPC timeout fails to abort.
return await asyncio.wait_for(
func(*args, timeout=retry_deadline, retry=None, **kwargs),
timeout=retry_deadline + 1.0,
)
except Exception as e:
# Determine if the exception is transient and should be retried.
is_transient = isinstance(
e,
(
api_exceptions.RetryError,
api_exceptions.DeadlineExceeded,
api_exceptions.ServiceUnavailable,
api_exceptions.InternalServerError,
api_exceptions.TooManyRequests,
api_exceptions.ResourceExhausted,
api_exceptions.Unknown,
asyncio.TimeoutError,
),
)

# Workaround: retry on 401s / Unauthenticated during transient token lapses
if (
not is_transient
and isinstance(e, api_exceptions.Unauthenticated)
and "Invalid Credentials" in str(e)
):
is_transient = True

if not is_transient:
raise e

attempt += 1

if max_retries is not None and attempt >= max_retries:
logger.exception(
f"{func.__name__} out of max retries ({max_retries}) on exception: {e}"
)
raise e

sleep_time = min(random.random() + 2 ** (attempt - 1), 32)
logger.debug(
f"{func.__name__} retrying (attempt {attempt}) after {sleep_time:.2f}s due to exception: {e}"
)
await asyncio.sleep(sleep_time)
34 changes: 34 additions & 0 deletions gcsfs/tests/integration/test_extended_hns.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,40 @@ def test_hns_empty_folder_rename_success(self, gcs_hns):
assert not gcsfs.exists(path1)
assert gcsfs.exists(path2)

def test_hns_folder_rename_idempotency_retry_integration(self, gcs_hns):
"""Test that retries explicitly work securely against real GCS by forcefully timing out the client."""
gcsfs = gcs_hns
path1 = f"{TEST_HNS_BUCKET}/integration_retry_old_dir_{uuid.uuid4().hex}"
path2 = f"{TEST_HNS_BUCKET}/integration_retry_new_dir_{uuid.uuid4().hex}"

gcsfs.pipe(f"{path1}/file.txt", b"data")

import asyncio

original_wait_for = asyncio.wait_for
call_count = 0

async def mocked_wait_for(coro, timeout):
nonlocal call_count
call_count += 1
if call_count == 1:
# Schedule the actual GCS network request in the background
asyncio.create_task(coro)
# Intentionally timeout client-side before GCS can return the response
await asyncio.sleep(0.01)
raise asyncio.TimeoutError()

return await original_wait_for(coro, timeout)

from unittest import mock

with mock.patch("gcsfs.retry.asyncio.wait_for", new=mocked_wait_for):
gcsfs.mv(path1, path2)

assert call_count >= 2
assert not gcsfs.exists(path1)
assert gcsfs.exists(path2)

def test_file_rename_using_atomic_mv(
self,
gcs_hns,
Expand Down
Loading
Loading