Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .isort.cfg
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[settings]
profile = black
known_third_party = aiohttp,click,decorator,fsspec,fuse,google,google_auth_oauthlib,numpy,prettytable,psutil,pytest,pytest_asyncio,requests,resource_monitor,setuptools,yaml
known_third_party = aiohttp,click,decorator,fsspec,fuse,google,google_auth_oauthlib,numpy,prettytable,psutil,pytest,pytest_asyncio,requests,resource_monitor,setuptools,tenacity,yaml
1 change: 1 addition & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ Contents
api
developer
hns_buckets
retries
rapid_storage_support
fuse
changelog
Expand Down
78 changes: 78 additions & 0 deletions docs/source/retries.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
Robust Retries in GCSFS
=======================

To ensure resilience against transient network failures and Google Cloud Storage (GCS) server-side drops, ``gcsfs`` implements a robust, time-bound, and idempotent retry strategy across different bucket types (standard, zonal, and Hierarchical Namespace).

This documentation covers the retry logic applied to different methods and bucket configurations.

Standard (Flat Namespace) Buckets
--------------------------------

For standard flat-namespace buckets, ``gcsfs`` relies on a default retry decorator ``retry_request`` (defined in ``gcsfs/retry.py``) for most operations.

This engine enforces:

1. **Max Retries**: Defaults to 6 attempts (1 initial + 5 retries).
2. **Exponential Backoff**: Resurfaces transient HTTP errors (e.g., 408, 429, 5xx) and retries with exponential backoff (``min(random.random() + 2**(retry-1), 32)``).
3. **Exceptions Retried**:
- ``HttpError`` (for specific retriable status codes)
- ``requests.exceptions.RequestException``
- ``google.auth.exceptions.GoogleAuthError``
- ``ChecksumError``
- ``aiohttp.client_exceptions.ClientError``

This default retry logic applies to standard file operations (e.g., ``cat``, ``put``, ``cp``, ``rm`` on standard buckets).

Batch Operations
----------------

For multi-object deletions (batch deletion), GCSFS uses a custom retry loop that processes up to 5 attempts (1 initial + 4 retries) with exponential backoff and jitter. This logic removes successfully deleted objects from subsequent attempts and focuses retries only on failed selections.

Specialized Buckets (Zonal and Hierarchical Namespace)
------------------------------------------------------

For method calls routed to the **GCS Storage Control API** (applicable strictly to HNS and Zonal buckets), ``gcsfs`` utilizes a custom asynchronous retry wrapper: ``execute_with_timebound_retry``.

This engine enforces several failsafe constraints:

1. **Strict Per-Attempt Timeout**: Every individual gRPC call is bounded by a strict timeout (configured via ``retry_deadline``, defaulting to 30.0s). If the server fails to respond within this threshold, `asyncio.wait_for` forcefully cancels the stalled iteration.
2. **Grace Window**: A persistent 1.0-second grace window is attached to each attempt (yielding the actual timeout applied slightly larger, at ``retry_deadline + 1.0``). This provides sufficient time for native gRPC transport errors to surface accurately before localized client-side thresholds fire.
3. **Count-Bounded Mapping**: The retry loop strictly enforces a maximum attempt cap of exactly 6 (1 initial + 5 fallback retries). After hitting this precise threshold, client errors are propagated directly.
4. **Exponential Backoff and Jitter**: Transient gRPC exceptions undergo custom exponential backoff defined by ``min(random.random() + 2**(attempt-1), 32)``.

**Exceptions Retried**:

The following transient gRPC exceptions are retried:

- ``google.api_core.exceptions.RetryError``
- ``google.api_core.exceptions.DeadlineExceeded``
- ``google.api_core.exceptions.ServiceUnavailable``
- ``google.api_core.exceptions.InternalServerError``
- ``google.api_core.exceptions.TooManyRequests``
- ``google.api_core.exceptions.ResourceExhausted``
- ``google.api_core.exceptions.Unknown``
- ``asyncio.TimeoutError``
- ``google.api_core.exceptions.Unauthenticated`` (when the message contains "Invalid Credentials")


Methods Supported by Time-Bound Retries
----------------------------------------

Below is a serialized list of API functions implicitly wrapped by the failsafe engine:

.. list-table:: **Supported Storage Control API Methods**
:widths: 30 70
:header-rows: 1

* - High-Level Method
- Underlying Storage Control API Call
* - **``mkdir``**
- ``client.create_folder``
* - **``rmdir``** / **``rm``**
- ``client.delete_folder``
* - **``mv``** / **``rename``**
- ``client.rename_folder``
* - **``info``**
- ``client.get_folder``
* - **`_is_bucket_hns_enabled`**
- ``client.get_storage_layout``
17 changes: 12 additions & 5 deletions gcsfs/extended_gcsfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from gcsfs import __version__ as version
from gcsfs import zb_hns_utils
from gcsfs.core import GCSFile, GCSFileSystem
from gcsfs.retry import execute_with_timebound_retry
from gcsfs.zonal_file import ZonalFile

logger = logging.getLogger("gcsfs")
Expand Down Expand Up @@ -129,7 +130,9 @@ async def _get_bucket_type(self, bucket):
client = await self._get_control_plane_client()
bucket_name_value = f"projects/_/buckets/{bucket}/storageLayout"
logger.debug(f"get_storage_layout request for name: {bucket_name_value}")
response = await client.get_storage_layout(name=bucket_name_value)
response = await execute_with_timebound_retry(
client.get_storage_layout, name=bucket_name_value
)

if response.location_type == "zone":
return BucketType.ZONAL_HIERARCHICAL
Expand Down Expand Up @@ -492,7 +495,9 @@ async def _mv(self, path1, path2, **kwargs):

logger.debug(f"rename_folder request: {request}")
client = await self._get_control_plane_client()
operation = await client.rename_folder(request=request)
operation = await execute_with_timebound_retry(
client.rename_folder, request=request
)
await operation.result()
self._update_dircache_after_rename(path1, path2)

Expand Down Expand Up @@ -657,7 +662,7 @@ async def _create_hns_folder(self, path, bucket, key, create_parents):
try:
logger.debug(f"create_folder request: {request}")
client = await self._get_control_plane_client()
await client.create_folder(request=request)
await execute_with_timebound_retry(client.create_folder, request=request)
# Instead of invalidating the parent cache, update it to add the new entry.
parent_path = self._parent(path)
if parent_path in self.dircache:
Expand Down Expand Up @@ -703,7 +708,9 @@ async def _get_directory_info(self, path, bucket, key, generation):

# Verify existence using get_folder API
client = await self._get_control_plane_client()
response = await client.get_folder(request=request)
response = await execute_with_timebound_retry(
client.get_folder, request=request
)

# If successful, return directory metadata
return {
Expand Down Expand Up @@ -776,7 +783,7 @@ async def _rmdir(self, path):

logger.debug(f"delete_folder request: {request}")
client = await self._get_control_plane_client()
await client.delete_folder(request=request)
await execute_with_timebound_retry(client.delete_folder, request=request)

# Remove the directory from the cache and from its parent's listing.
self.dircache.pop(path, None)
Expand Down
103 changes: 103 additions & 0 deletions gcsfs/retry.py
Copy link
Copy Markdown
Contributor

@jasha26 jasha26 Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of the custom asyncio.wait_for logic, using a library like tenacity would look like this. Also did we evaluate google.api_core AsyncRetry:

from tenacity import retry, wait_exponential, stop_after_attempt, retry_if_exception_type

@retry(
    wait=wait_exponential(multiplier=1, min=2, max=32),
    stop=stop_after_attempt(6),
    retry=retry_if_exception_type((api_exceptions.ServiceUnavailable, asyncio.TimeoutError)),
    reraise=True
)
async def call_with_retry(func, *args, **kwargs):
    return await func(*args, **kwargs)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need asyncio.wait_for logic on the client side to make sure that we handle the request stalls and won't wait indefinitely for the call to return. Replaced the custom logic with tenacity as it provides in-built support for retries. AsyncRetry also provides the same functionality but would still need the asyncio.wait_for logic on the client side.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need another dependency?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a similar opinion, but @jasha26 recommended using Tenacity as it might help in future integrations like client-side throttling

AsyncRetry from google.api_core supports retries based on max_timeout instead of max_retries(which is followed in gcsfs for other JSON API retries). To keep the same retry behaviour for JSON APIs and storage control client calls(i.e., limiting the number of retries) I have implemented the custom logic. To use AsyncRetry with the constraint on number of attempts we would have to maintain a wrapper to track the number of attempts which would be almost same as the initial version of this PR without adding much benefit of using AsyncRetry

So we can either have entirely custom implementation or use Tenacity if we want to maintain the max_attempts behaviour across GCSFS

Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,27 @@
import json
import logging
import random
from dataclasses import dataclass

import aiohttp.client_exceptions
import fsspec.config
import google.auth.exceptions
import requests.exceptions
from decorator import decorator
from google.api_core import exceptions as api_exceptions
from tenacity import AsyncRetrying, retry_if_exception, stop_after_attempt

logger = logging.getLogger("gcsfs")

DEFAULT_RETRY_DEADLINE = 30.0
DEFAULT_MAX_RETRIES = 6


@dataclass
class StorageControlRetryConfig:
retry_deadline: float = DEFAULT_RETRY_DEADLINE
max_retries: int = DEFAULT_MAX_RETRIES


class HttpError(Exception):
"""Holds the message and code from cloud errors."""
Expand Down Expand Up @@ -176,3 +189,93 @@ async def retry_request(func, retries=6, *args, **kwargs):
continue
logger.exception(f"{func.__name__} non-retriable exception: {e}")
raise e


def _custom_wait(retry_state):
attempt = retry_state.attempt_number
return min(random.random() + 2 ** (attempt - 1), 32)


def _is_transient_exception(exception):
is_transient = isinstance(
exception,
(
api_exceptions.RetryError,
api_exceptions.DeadlineExceeded,
api_exceptions.ServiceUnavailable,
api_exceptions.InternalServerError,
api_exceptions.TooManyRequests,
api_exceptions.ResourceExhausted,
api_exceptions.Unknown,
asyncio.TimeoutError,
),
)
if (
not is_transient
and isinstance(exception, api_exceptions.Unauthenticated)
and "Invalid Credentials" in str(exception)
):
is_transient = True
return is_transient


def _get_retry_config(
retry_deadline=None, max_retries=None
) -> StorageControlRetryConfig:
conf = fsspec.config.conf.get("gcs", {})
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the purpose of the conf is to pass default values to the class constructor at instantiation, so there is not normally any need to query the conf directly, but the values should be passed in.


return StorageControlRetryConfig(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a dataclass, this already has these defaults.
I think you would be better of passing kwargs round, like in other places inthe codebase. The Config class isn't doing anything for you, except obscuring when the values are being set.

retry_deadline=(
retry_deadline
if retry_deadline is not None
else conf.get("retry_deadline", DEFAULT_RETRY_DEADLINE)
),
max_retries=(
max_retries
if max_retries is not None
else conf.get("max_retries", DEFAULT_MAX_RETRIES)
),
)


async def execute_with_timeout(func, timeout, *args, **kwargs):
"""
Executes a function with a strict timeout using asyncio.wait_for.
It passes timeout to the function itself and uses timeout + 1.0 for wait_for.
"""
return await asyncio.wait_for(
func(*args, timeout=timeout, **kwargs), timeout=timeout + 1.0
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does the arbitrary + 1.0 come from?
How doe we know that func takes a timeout parameter (and if it does, why do we need another one?) ?

Copy link
Copy Markdown
Contributor Author

@Mahalaxmibejugam Mahalaxmibejugam Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The + 1.0 buffer: It gives the underlying gRPC library a 1-second grace period to cleanly abort and raise its own specific timeout error (e.g., DeadlineExceeded) before Python's asyncio aggressively kills the task.

How we know func takes timeout: This wrapper is specifically built for Google Cloud GAPIC (gRPC) client methods (like the ones in storage_control_v2 - create_folder, rename_folder). By standard design, all of these methods accept a timeout kwarg to set the RPC deadline.

Why we need both: The inner timeout is the actual RPC deadline sent to the server and gRPC core. The outer asyncio.wait_for is a hard fail-safe to ensure the Python event loop doesn't hang forever if the network layer freezes and fails to respect the inner deadline.

This is more specifically added based on the issues that were observed in GCSFuse related to request stalling.

)


async def execute_with_timebound_retry(
func, *args, retry_deadline=None, max_retries=None, **kwargs
):
"""
Executes a gRPC storage control API call with a strict per-attempt timeout and an overall
maximum number of retries. Transient errors and timeouts will trigger an exponential backoff loop.
"""
rc = _get_retry_config(retry_deadline, max_retries)

def _before_sleep(retry_state):
logger.debug(
f"{func.__name__} retrying (attempt {retry_state.attempt_number}) "
f"after {retry_state.next_action.sleep:.2f}s due to exception: {retry_state.outcome.exception()}"
)

retryer = AsyncRetrying(
stop=stop_after_attempt(rc.max_retries),
wait=_custom_wait,
retry=retry_if_exception(_is_transient_exception),
before_sleep=_before_sleep,
reraise=True,
)

return await retryer(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see that we need an extra library for this one thing. One might argue that this retry logic belongs in fsspec, but it would be fine to write it out here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you referring to remove entire Tenacity dependency and go back to custom implementation?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @martindurant,

I completely understand the hesitation around adding new dependencies—I’m usually of the same mind when it comes to keeping the project's footprint light. You're absolutely right that this logic might eventually be better suited for fsspec globally.

The main reason I proposed tenacity is that it has become an industry standard for production-grade retries. It handles the nuances of exponential backoff with jitter and async/sync compatibility right out of the box, which can be surprisingly tricky to get 100% right in a custom implementation.

While we can certainly roll a simpler custom version as you suggested, I thought using a battle-tested library might save us from 'reinventing the wheel' and maintenance overhead later. However, if the priority is keeping the dependency list short, we're happy to strip it back.

What do you think is the best balance for the project?

execute_with_timeout,
func,
rc.retry_deadline,
*args,
retry=None,
**kwargs,
)
34 changes: 34 additions & 0 deletions gcsfs/tests/integration/test_extended_hns.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,40 @@ def test_hns_empty_folder_rename_success(self, gcs_hns):
assert not gcsfs.exists(path1)
assert gcsfs.exists(path2)

def test_hns_folder_rename_idempotency_retry_integration(self, gcs_hns):
"""Test that retries explicitly work securely against real GCS by forcefully timing out the client."""
gcsfs = gcs_hns
path1 = f"{TEST_HNS_BUCKET}/integration_retry_old_dir_{uuid.uuid4().hex}"
path2 = f"{TEST_HNS_BUCKET}/integration_retry_new_dir_{uuid.uuid4().hex}"

gcsfs.pipe(f"{path1}/file.txt", b"data")

import asyncio

original_wait_for = asyncio.wait_for
call_count = 0

async def mocked_wait_for(coro, timeout):
nonlocal call_count
call_count += 1
if call_count == 1:
# Schedule the actual GCS network request in the background
asyncio.create_task(coro)
# Intentionally timeout client-side before GCS can return the response
await asyncio.sleep(0.01)
raise asyncio.TimeoutError()

return await original_wait_for(coro, timeout)

from unittest import mock

with mock.patch("gcsfs.retry.asyncio.wait_for", new=mocked_wait_for):
gcsfs.mv(path1, path2)

assert call_count >= 2
assert not gcsfs.exists(path1)
assert gcsfs.exists(path2)

def test_file_rename_using_atomic_mv(
self,
gcs_hns,
Expand Down
Loading
Loading