Skip to content

Commit f1f3e26

Browse files
committed
Improve retry logic, add more test cases and comments
1 parent 8c97a49 commit f1f3e26

File tree

3 files changed

+212
-102
lines changed

3 files changed

+212
-102
lines changed

databricks/sdk/config.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,12 @@ class Config:
134134
# but a maximum time between consecutive data reception events (even 1 byte) from the server
135135
multipart_upload_single_chunk_upload_timeout_seconds: float = 60
136136

137-
# Limit of retries during multipart upload.
138-
# Retry counter is reset when progressing along the stream.
137+
# Cap on the number of custom retries during incremental uploads:
138+
# 1) multipart: upload part URL is expired, so new upload URLs must be requested to continue upload
139+
# 2) resumable: chunk upload produced a retryable response (or exception), so upload status must be
140+
# retrieved to continue the upload.
141+
# In these two cases standard SDK retries (which are capped by the `retry_timeout_seconds` option) are not used.
142+
# Note that retry counter is reset when upload is successfully resumed.
139143
multipart_upload_max_retries = 3
140144

141145
def __init__(

databricks/sdk/mixins/files.py

Lines changed: 81 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
from datetime import timedelta
1717
from io import BytesIO
1818
from types import TracebackType
19-
from typing import (TYPE_CHECKING, AnyStr, BinaryIO, Generator, Iterable,
20-
Optional, Type, Union)
19+
from typing import (TYPE_CHECKING, AnyStr, BinaryIO, Callable, Generator,
20+
Iterable, Optional, Type, Union)
2121
from urllib import parse
2222

2323
import requests
@@ -26,9 +26,9 @@
2626

2727
from .._base_client import _BaseClient, _RawResponse, _StreamingResponse
2828
from .._property import _cached_property
29-
from ..clock import Clock, RealClock
3029
from ..config import Config
3130
from ..errors import AlreadyExists, NotFound
31+
from ..errors.customizer import _RetryAfterCustomizer
3232
from ..errors.mapper import _error_mapper
3333
from ..retries import retried
3434
from ..service import files
@@ -693,10 +693,12 @@ def delete(self, path: str, *, recursive=False):
693693
class FilesExt(files.FilesAPI):
694694
__doc__ = files.FilesAPI.__doc__
695695

696-
def __init__(self, api_client, config: Config, clock: Clock = None):
696+
# note that these error codes are retryable only for idempotent operations
697+
_RETRYABLE_STATUS_CODES = [408, 429, 500, 502, 503, 504]
698+
699+
def __init__(self, api_client, config: Config):
697700
super().__init__(api_client)
698701
self._config = config.copy()
699-
self._clock = clock or RealClock()
700702
self._multipart_upload_read_ahead_bytes = 1
701703

702704
def download(self, file_path: str) -> DownloadResponse:
@@ -867,25 +869,15 @@ def rewind():
867869
chunk.seek(0, os.SEEK_SET)
868870

869871
def perform():
870-
result = cloud_provider_session.request(
872+
return cloud_provider_session.request(
871873
"PUT",
872874
url,
873875
headers=headers,
874876
data=chunk,
875877
timeout=self._config.multipart_upload_single_chunk_upload_timeout_seconds,
876878
)
877-
return result
878-
879-
# following _BaseClient timeout
880-
retry_timeout_seconds = self._config.retry_timeout_seconds or 300
881879

882-
# Uploading same data to the same URL is an idempotent operation, safe to retry.
883-
upload_response = retried(
884-
timeout=timedelta(seconds=retry_timeout_seconds),
885-
is_retryable=_BaseClient._is_retryable,
886-
clock=self._clock,
887-
before_retry=rewind,
888-
)(perform)()
880+
upload_response = self._retry_idempotent_operation(perform, rewind)
889881

890882
if upload_response.status_code in (200, 201):
891883
# Chunk upload successful
@@ -1068,38 +1060,55 @@ def _resumable_upload(
10681060
_LOG.debug(f"Uploading chunk: {content_range_header}")
10691061
headers["Content-Range"] = content_range_header
10701062

1063+
def retrieve_upload_status() -> Optional[requests.Response]:
1064+
def perform():
1065+
return cloud_provider_session.request(
1066+
"PUT",
1067+
resumable_upload_url,
1068+
headers={"Content-Range": "bytes */*"},
1069+
data=b"",
1070+
timeout=self._config.multipart_upload_single_chunk_upload_timeout_seconds,
1071+
)
1072+
1073+
try:
1074+
return self._retry_idempotent_operation(perform)
1075+
except RequestException:
1076+
_LOG.warning("Failed to retrieve upload status")
1077+
return None
1078+
10711079
try:
1072-
# We're not retrying this single request as we don't know where to rewind to.
1073-
# Instead, in case of retryable failure we'll re-request the current offset
1074-
# from the server and resume upload from there in the main upload loop.
1075-
upload_response: requests.Response = cloud_provider_session.request(
1080+
upload_response = cloud_provider_session.request(
10761081
"PUT",
10771082
resumable_upload_url,
10781083
headers=headers,
10791084
data=BytesIO(buffer[:actual_chunk_length]),
10801085
timeout=self._config.multipart_upload_single_chunk_upload_timeout_seconds,
10811086
)
1082-
retry_count = 0 # reset retry count when progressing along the stream
1087+
1088+
# https://cloud.google.com/storage/docs/performing-resumable-uploads#resume-upload
1089+
# If an upload request is terminated before receiving a response, or if you receive
1090+
# a 503 or 500 response, then you need to resume the interrupted upload from where it left off.
1091+
1092+
# Let's follow that for all potentially retryable status codes.
1093+
if upload_response.status_code in self._RETRYABLE_STATUS_CODES:
1094+
if retry_count < self._config.multipart_upload_max_retries:
1095+
retry_count += 1
1096+
# let original upload_response be handled as an error
1097+
upload_response = retrieve_upload_status() or upload_response
1098+
else:
1099+
# we received non-retryable response, reset retry count
1100+
retry_count = 0
1101+
10831102
except RequestException as e:
1084-
_LOG.warning(f"Failure during upload request: {sys.exc_info()}")
1103+
# Let's do the same for retryable network errors
10851104
if _BaseClient._is_retryable(e) and retry_count < self._config.multipart_upload_max_retries:
10861105
retry_count += 1
1087-
# Chunk upload threw an error, try to retrieve the current received offset
1088-
try:
1089-
# https://cloud.google.com/storage/docs/performing-resumable-uploads#status-check
1090-
headers["Content-Range"] = "bytes */*"
1091-
upload_response = cloud_provider_session.request(
1092-
"PUT",
1093-
resumable_upload_url,
1094-
headers=headers,
1095-
data=b"",
1096-
timeout=self._config.multipart_upload_single_chunk_upload_timeout_seconds,
1097-
)
1098-
except RequestException:
1099-
# status check failed, abort the upload
1106+
upload_response = retrieve_upload_status()
1107+
if not upload_response:
1108+
# rethrow original exception
11001109
raise e from None
11011110
else:
1102-
# error is not retryable, abort the upload
1111+
# rethrow original exception
11031112
raise e from None
11041113

11051114
if upload_response.status_code in (200, 201):
@@ -1138,23 +1147,16 @@ def _resumable_upload(
11381147
uploaded_bytes_count = next_chunk_offset - chunk_offset
11391148
chunk_offset = next_chunk_offset
11401149

1141-
elif upload_response.status_code == 400:
1142-
# Expecting response body to be small to be safely logged
1143-
mapped_error = _error_mapper(upload_response, {})
1144-
raise mapped_error or ValueError(
1145-
f"Failed to upload (status: {upload_response.status_code}): {upload_response.text}"
1146-
)
1147-
11481150
elif upload_response.status_code == 412 and not overwrite:
11491151
# Assuming this is only possible reason
11501152
# Full message in this case: "At least one of the pre-conditions you specified did not hold."
11511153
raise AlreadyExists("The file being created already exists.")
1152-
else:
1153-
if _LOG.isEnabledFor(logging.DEBUG):
1154-
_LOG.debug(f"Failed to upload (status: {upload_response.status_code}): {upload_response.text}")
11551154

1155+
else:
1156+
message = f"Unsuccessful chunk upload. Response status: {upload_response.status_code}, body: {upload_response.content}"
1157+
_LOG.warning(message)
11561158
mapped_error = _error_mapper(upload_response, {})
1157-
raise mapped_error or ValueError(f"Failed to upload: {upload_response}")
1159+
raise mapped_error or ValueError(message)
11581160

11591161
except Exception as e:
11601162
_LOG.info(f"Aborting resumable upload on error: {e}")
@@ -1204,22 +1206,15 @@ def _abort_multipart_upload(self, target_path: str, session_token: str, cloud_pr
12041206
headers[h["name"]] = h["value"]
12051207

12061208
def perform():
1207-
result = cloud_provider_session.request(
1209+
return cloud_provider_session.request(
12081210
"DELETE",
12091211
abort_url,
12101212
headers=headers,
12111213
data=b"",
12121214
timeout=self._config.multipart_upload_single_chunk_upload_timeout_seconds,
12131215
)
1214-
return result
12151216

1216-
# following _BaseClient timeout
1217-
retry_timeout_seconds = self._config.retry_timeout_seconds or 300
1218-
1219-
# Aborting upload is an idempotent operation, safe to retry.
1220-
abort_response = retried(
1221-
timeout=timedelta(seconds=retry_timeout_seconds), is_retryable=_BaseClient._is_retryable, clock=self._clock
1222-
)(perform)()
1217+
abort_response = self._retry_idempotent_operation(perform)
12231218

12241219
if abort_response.status_code not in (200, 201):
12251220
raise ValueError(abort_response)
@@ -1232,21 +1227,15 @@ def _abort_resumable_upload(
12321227
headers[h["name"]] = h["value"]
12331228

12341229
def perform():
1235-
result = cloud_provider_session.request(
1230+
return cloud_provider_session.request(
12361231
"DELETE",
12371232
resumable_upload_url,
12381233
headers=headers,
12391234
data=b"",
12401235
timeout=self._config.multipart_upload_single_chunk_upload_timeout_seconds,
12411236
)
1242-
return result
12431237

1244-
# following _BaseClient timeout
1245-
retry_timeout_seconds = self._config.retry_timeout_seconds or 300
1246-
# Aborting upload is an idempotent operation, safe to retry.
1247-
abort_response = retried(
1248-
timeout=timedelta(seconds=retry_timeout_seconds), is_retryable=_BaseClient._is_retryable, clock=self._clock
1249-
)(perform)()
1238+
abort_response = self._retry_idempotent_operation(perform)
12501239

12511240
if abort_response.status_code not in (200, 201):
12521241
raise ValueError(abort_response)
@@ -1265,6 +1254,30 @@ def _create_cloud_provider_session(self):
12651254
session.mount("http://", http_adapter)
12661255
return session
12671256

1257+
def _retry_idempotent_operation(
1258+
self, operation: Callable[[], requests.Response], before_retry: Callable = None
1259+
) -> requests.Response:
1260+
def delegate():
1261+
response = operation()
1262+
if response.status_code in self._RETRYABLE_STATUS_CODES:
1263+
attrs = {}
1264+
# this will assign "retry_after_secs" to the attrs, essentially making exception look retryable
1265+
_RetryAfterCustomizer().customize_error(response, attrs)
1266+
raise _error_mapper(response, attrs)
1267+
else:
1268+
return response
1269+
1270+
# following _BaseClient timeout
1271+
retry_timeout_seconds = self._config.retry_timeout_seconds or 300
1272+
1273+
return retried(
1274+
timeout=timedelta(seconds=retry_timeout_seconds),
1275+
# also retry on network errors (connection error, connection timeout)
1276+
# where we believe request didn't reach the server
1277+
is_retryable=_BaseClient._is_retryable,
1278+
before_retry=before_retry,
1279+
)(delegate)()
1280+
12681281
def _download_raw_stream(
12691282
self, file_path: str, start_byte_offset: int, if_unmodified_since_timestamp: Optional[str] = None
12701283
) -> DownloadResponse:
@@ -1302,12 +1315,12 @@ def _download_raw_stream(
13021315

13031316
return result
13041317

1305-
def _wrap_stream(self, file_path: str, downloadResponse: DownloadResponse):
1306-
underlying_response = _ResilientIterator._extract_raw_response(downloadResponse)
1318+
def _wrap_stream(self, file_path: str, download_response: DownloadResponse):
1319+
underlying_response = _ResilientIterator._extract_raw_response(download_response)
13071320
return _ResilientResponse(
13081321
self,
13091322
file_path,
1310-
downloadResponse.last_modified,
1323+
download_response.last_modified,
13111324
offset=0,
13121325
underlying_response=underlying_response,
13131326
)

0 commit comments

Comments
 (0)