Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@

### Bug Fixes

- Fix a reported issue where `FilesExt` fails to retry if it receives certain status code from server.

### Documentation

### Internal Changes

- Refactor unit tests for `FilesExt` to improve its readability.

### API Changes
58 changes: 43 additions & 15 deletions databricks/sdk/mixins/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from .._property import _cached_property
from ..config import Config
from ..errors import AlreadyExists, NotFound
from ..errors.customizer import _RetryAfterCustomizer
from ..errors.mapper import _error_mapper
from ..retries import retried
from ..service import files
Expand Down Expand Up @@ -577,6 +576,27 @@ def __repr__(self) -> str:
return f"<_DbfsPath {self._path}>"


class _RetryableException(Exception):
"""Base class for retryable exceptions in DBFS operations."""

def __init__(self, message: str, http_status_code: int):
super().__init__()
self.message = message
self.http_status_code = http_status_code

def __str__(self) -> str:
return f"{self.message} (HTTP Status: {self.http_status_code})"

@staticmethod
def make_error(response: requests.Response) -> "_RetryableException":
"""Map the response to a retryable exception."""

return _RetryableException(
message=response.text,
http_status_code=response.status_code,
)


class DbfsExt(files.DbfsAPI):
__doc__ = files.DbfsAPI.__doc__

Expand Down Expand Up @@ -885,7 +905,7 @@ def perform():
timeout=self._config.multipart_upload_single_chunk_upload_timeout_seconds,
)

upload_response = self._retry_idempotent_operation(perform, rewind)
upload_response = self._retry_cloud_idempotent_operation(perform, rewind)

if upload_response.status_code in (200, 201):
# Chunk upload successful
Expand Down Expand Up @@ -1097,7 +1117,7 @@ def perform():
)

try:
return self._retry_idempotent_operation(perform)
return self._retry_cloud_idempotent_operation(perform)
except RequestException:
_LOG.warning("Failed to retrieve upload status")
return None
Expand All @@ -1116,7 +1136,7 @@ def perform():
# a 503 or 500 response, then you need to resume the interrupted upload from where it left off.

# Let's follow that for all potentially retryable status codes.
# Together with the catch block below we replicate the logic in _retry_idempotent_operation().
# Together with the catch block below we replicate the logic in _retry_databricks_idempotent_operation().
if upload_response.status_code in self._RETRYABLE_STATUS_CODES:
if retry_count < self._config.multipart_upload_max_retries:
retry_count += 1
Expand Down Expand Up @@ -1243,7 +1263,7 @@ def perform():
timeout=self._config.multipart_upload_single_chunk_upload_timeout_seconds,
)

abort_response = self._retry_idempotent_operation(perform)
abort_response = self._retry_cloud_idempotent_operation(perform)

if abort_response.status_code not in (200, 201):
raise ValueError(abort_response)
Expand All @@ -1265,7 +1285,7 @@ def perform():
timeout=self._config.multipart_upload_single_chunk_upload_timeout_seconds,
)

abort_response = self._retry_idempotent_operation(perform)
abort_response = self._retry_cloud_idempotent_operation(perform)

if abort_response.status_code not in (200, 201):
raise ValueError(abort_response)
Expand All @@ -1283,31 +1303,39 @@ def _create_cloud_provider_session(self):
session.mount("http://", http_adapter)
return session

def _retry_idempotent_operation(
def _retry_cloud_idempotent_operation(
self, operation: Callable[[], requests.Response], before_retry: Callable = None
) -> requests.Response:
"""Perform given idempotent operation with necessary retries. Since operation is idempotent it's
safe to retry it for response codes where server state might have changed.
"""Perform given idempotent operation with necessary retries for requests to non Databricks APIs.
For cloud APIs, we will retry on network errors and on server response codes.
Since operation is idempotent it's safe to retry it for response codes where server state might have changed.
"""

def delegate():
def delegate() -> requests.Response:
response = operation()
if response.status_code in self._RETRYABLE_STATUS_CODES:
attrs = {}
# this will assign "retry_after_secs" to the attrs, essentially making exception look retryable
_RetryAfterCustomizer().customize_error(response, attrs)
raise _error_mapper(response, attrs)
raise _RetryableException.make_error(response)
else:
return response

def extended_is_retryable(e: BaseException) -> Optional[str]:
retry_reason_from_base = _BaseClient._is_retryable(e)
if retry_reason_from_base is not None:
return retry_reason_from_base

if isinstance(e, _RetryableException):
# this is a retriable exception, but not a network error
return f"retryable exception (status_code:{e.http_status_code})"
return None

# following _BaseClient timeout
retry_timeout_seconds = self._config.retry_timeout_seconds or 300

return retried(
timeout=timedelta(seconds=retry_timeout_seconds),
# also retry on network errors (connection error, connection timeout)
# where we believe request didn't reach the server
is_retryable=_BaseClient._is_retryable,
is_retryable=extended_is_retryable,
before_retry=before_retry,
)(delegate)()

Expand Down
Loading
Loading