Skip to content

Commit 9dd3215

Browse files
Yuanjieding/experimental files ext (#1009)
## What changes are proposed in this pull request? **WHAT** - This PR fixes the issue that the old retry mechanism failed to identify the retriable errors. - It has done that by providing a new mechanism to support distinguishing retriable and non-retriable errors solely by its status code, which is the expected behavior when calling _retry_cloud_idempotent_operation. - The old _retry_idempotent_operation is removed because it is out of use and can be misleading. - The PR also refactors the `test_files`, which are the unit tests for the `FilesExt` class. - Added typing for all methods and argument lists - Introduced `UploadTestCase` as the base class for upload test cases, in order to reduce the code duplication - Renamed all `chunk` to `part`, to align the terminology with the HTTP API. **WHY** - The previous implementation utilizes error mappers that's designed for calling Databricks APIs (aka internal APIs), which the retry functions relies on to determine whether the requests is retriable or not. However, when calling the uploading API, it could be calling a CSP API (aka external API), which we wish to distinguish whether it is retriable solely on it's status code. - Refactor of unit test file is done to improve its readability and maintainability. ## How is this tested? Unit tests. **ALWAYS ANSWER THIS QUESTION:** Answer with "N/A" if tests are not applicable to your PR (e.g. if the PR only modifies comments). Do not be afraid of answering "Not tested" if the PR has not been tested. Being clear about what has been done and not done provides important context to the reviewers.
1 parent 9548a18 commit 9dd3215

File tree

3 files changed

+660
-650
lines changed

3 files changed

+660
-650
lines changed

NEXT_CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,12 @@
66

77
### Bug Fixes
88

9+
- Fix a reported issue where `FilesExt` fails to retry if it receives certain status code from server.
10+
911
### Documentation
1012

1113
### Internal Changes
1214

15+
- Refactor unit tests for `FilesExt` to improve its readability.
16+
1317
### API Changes

databricks/sdk/mixins/files.py

Lines changed: 43 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
from .._property import _cached_property
2929
from ..config import Config
3030
from ..errors import AlreadyExists, NotFound
31-
from ..errors.customizer import _RetryAfterCustomizer
3231
from ..errors.mapper import _error_mapper
3332
from ..retries import retried
3433
from ..service import files
@@ -577,6 +576,27 @@ def __repr__(self) -> str:
577576
return f"<_DbfsPath {self._path}>"
578577

579578

579+
class _RetryableException(Exception):
580+
"""Base class for retryable exceptions in DBFS operations."""
581+
582+
def __init__(self, message: str, http_status_code: int):
583+
super().__init__()
584+
self.message = message
585+
self.http_status_code = http_status_code
586+
587+
def __str__(self) -> str:
588+
return f"{self.message} (HTTP Status: {self.http_status_code})"
589+
590+
@staticmethod
591+
def make_error(response: requests.Response) -> "_RetryableException":
592+
"""Map the response to a retryable exception."""
593+
594+
return _RetryableException(
595+
message=response.text,
596+
http_status_code=response.status_code,
597+
)
598+
599+
580600
class DbfsExt(files.DbfsAPI):
581601
__doc__ = files.DbfsAPI.__doc__
582602

@@ -885,7 +905,7 @@ def perform():
885905
timeout=self._config.multipart_upload_single_chunk_upload_timeout_seconds,
886906
)
887907

888-
upload_response = self._retry_idempotent_operation(perform, rewind)
908+
upload_response = self._retry_cloud_idempotent_operation(perform, rewind)
889909

890910
if upload_response.status_code in (200, 201):
891911
# Chunk upload successful
@@ -1097,7 +1117,7 @@ def perform():
10971117
)
10981118

10991119
try:
1100-
return self._retry_idempotent_operation(perform)
1120+
return self._retry_cloud_idempotent_operation(perform)
11011121
except RequestException:
11021122
_LOG.warning("Failed to retrieve upload status")
11031123
return None
@@ -1116,7 +1136,7 @@ def perform():
11161136
# a 503 or 500 response, then you need to resume the interrupted upload from where it left off.
11171137

11181138
# Let's follow that for all potentially retryable status codes.
1119-
# Together with the catch block below we replicate the logic in _retry_idempotent_operation().
1139+
# Together with the catch block below we replicate the logic in _retry_databricks_idempotent_operation().
11201140
if upload_response.status_code in self._RETRYABLE_STATUS_CODES:
11211141
if retry_count < self._config.multipart_upload_max_retries:
11221142
retry_count += 1
@@ -1243,7 +1263,7 @@ def perform():
12431263
timeout=self._config.multipart_upload_single_chunk_upload_timeout_seconds,
12441264
)
12451265

1246-
abort_response = self._retry_idempotent_operation(perform)
1266+
abort_response = self._retry_cloud_idempotent_operation(perform)
12471267

12481268
if abort_response.status_code not in (200, 201):
12491269
raise ValueError(abort_response)
@@ -1265,7 +1285,7 @@ def perform():
12651285
timeout=self._config.multipart_upload_single_chunk_upload_timeout_seconds,
12661286
)
12671287

1268-
abort_response = self._retry_idempotent_operation(perform)
1288+
abort_response = self._retry_cloud_idempotent_operation(perform)
12691289

12701290
if abort_response.status_code not in (200, 201):
12711291
raise ValueError(abort_response)
@@ -1283,31 +1303,39 @@ def _create_cloud_provider_session(self):
12831303
session.mount("http://", http_adapter)
12841304
return session
12851305

1286-
def _retry_idempotent_operation(
1306+
def _retry_cloud_idempotent_operation(
12871307
self, operation: Callable[[], requests.Response], before_retry: Callable = None
12881308
) -> requests.Response:
1289-
"""Perform given idempotent operation with necessary retries. Since operation is idempotent it's
1290-
safe to retry it for response codes where server state might have changed.
1309+
"""Perform given idempotent operation with necessary retries for requests to non Databricks APIs.
1310+
For cloud APIs, we will retry on network errors and on server response codes.
1311+
Since operation is idempotent it's safe to retry it for response codes where server state might have changed.
12911312
"""
12921313

1293-
def delegate():
1314+
def delegate() -> requests.Response:
12941315
response = operation()
12951316
if response.status_code in self._RETRYABLE_STATUS_CODES:
1296-
attrs = {}
1297-
# this will assign "retry_after_secs" to the attrs, essentially making exception look retryable
1298-
_RetryAfterCustomizer().customize_error(response, attrs)
1299-
raise _error_mapper(response, attrs)
1317+
raise _RetryableException.make_error(response)
13001318
else:
13011319
return response
13021320

1321+
def extended_is_retryable(e: BaseException) -> Optional[str]:
1322+
retry_reason_from_base = _BaseClient._is_retryable(e)
1323+
if retry_reason_from_base is not None:
1324+
return retry_reason_from_base
1325+
1326+
if isinstance(e, _RetryableException):
1327+
# this is a retriable exception, but not a network error
1328+
return f"retryable exception (status_code:{e.http_status_code})"
1329+
return None
1330+
13031331
# following _BaseClient timeout
13041332
retry_timeout_seconds = self._config.retry_timeout_seconds or 300
13051333

13061334
return retried(
13071335
timeout=timedelta(seconds=retry_timeout_seconds),
13081336
# also retry on network errors (connection error, connection timeout)
13091337
# where we believe request didn't reach the server
1310-
is_retryable=_BaseClient._is_retryable,
1338+
is_retryable=extended_is_retryable,
13111339
before_retry=before_retry,
13121340
)(delegate)()
13131341

0 commit comments

Comments
 (0)