Skip to content

Commit 1592c4e

Browse files
Introducing fallback mechanism for all exception in Files API upload
1 parent 6210fc8 commit 1592c4e

File tree

2 files changed

+86
-58
lines changed

2 files changed

+86
-58
lines changed

databricks/sdk/mixins/files.py

Lines changed: 65 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1404,20 +1404,47 @@ def _parallel_multipart_upload_from_file(
14041404
part_size = ctx.part_size
14051405
num_parts = (file_size + part_size - 1) // part_size
14061406
_LOG.debug(f"Uploading file of size {file_size} bytes in {num_parts} parts using {ctx.parallelism} threads")
1407+
cloud_provider_session = self._create_cloud_provider_session()
1408+
1409+
# Upload one part to verify the upload can proceed.
1410+
with open(ctx.source_file_path, "rb") as f:
1411+
f.seek(0)
1412+
first_part_size = min(part_size, file_size)
1413+
first_part_buffer = f.read(first_part_size)
1414+
try:
1415+
etag = self._do_upload_one_part(
1416+
ctx,
1417+
cloud_provider_session,
1418+
1,
1419+
0,
1420+
first_part_size,
1421+
session_token,
1422+
BytesIO(first_part_buffer),
1423+
is_first_part=True,
1424+
)
1425+
except FallbackToUploadUsingFilesApi as e:
1426+
raise FallbackToUploadUsingFilesApi(None, "Falling back to single-shot upload with Files API") from e
1427+
if num_parts == 1:
1428+
self._complete_multipart_upload(ctx, {1: etag}, session_token)
1429+
return
14071430

14081431
# Create queues and worker threads.
14091432
task_queue = Queue()
14101433
etags_result_queue = Queue()
1434+
etags_result_queue.put_nowait((1, etag))
14111435
exception_queue = Queue()
14121436
aborted = Event()
14131437
workers = [
1414-
Thread(target=self._upload_file_consumer, args=(task_queue, etags_result_queue, exception_queue, aborted))
1438+
Thread(
1439+
target=self._upload_file_consumer,
1440+
args=(cloud_provider_session, task_queue, etags_result_queue, exception_queue, aborted),
1441+
)
14151442
for _ in range(ctx.parallelism)
14161443
]
14171444
_LOG.debug(f"Starting {len(workers)} worker threads for parallel upload")
14181445

14191446
# Enqueue all parts. Since the task queue is populated before starting the workers, we don't need to signal completion.
1420-
for part_index in range(1, num_parts + 1):
1447+
for part_index in range(2, num_parts + 1):
14211448
part_offset = (part_index - 1) * part_size
14221449
part_size = min(part_size, file_size - part_offset)
14231450
part = self._MultipartUploadPart(ctx, part_index, part_offset, part_size, session_token)
@@ -1466,7 +1493,14 @@ def _parallel_multipart_upload_from_stream(
14661493
)
14671494
try:
14681495
etag = self._do_upload_one_part(
1469-
ctx, cloud_provider_session, 1, 0, len(pre_read_buffer), session_token, BytesIO(pre_read_buffer)
1496+
ctx,
1497+
cloud_provider_session,
1498+
1,
1499+
0,
1500+
len(pre_read_buffer),
1501+
session_token,
1502+
BytesIO(pre_read_buffer),
1503+
is_first_part=True,
14701504
)
14711505
etags_result_queue.put((1, etag))
14721506
except FallbackToUploadUsingFilesApi as e:
@@ -1551,12 +1585,12 @@ def _complete_multipart_upload(self, ctx, etags, session_token):
15511585

15521586
def _upload_file_consumer(
15531587
self,
1588+
cloud_provider_session: requests.Session,
15541589
task_queue: Queue[FilesExt._MultipartUploadPart],
15551590
etags_queue: Queue[tuple[int, str]],
15561591
exception_queue: Queue[Exception],
15571592
aborted: Event,
15581593
) -> None:
1559-
cloud_provider_session = self._create_cloud_provider_session()
15601594
while not aborted.is_set():
15611595
try:
15621596
part = task_queue.get(block=False)
@@ -1627,6 +1661,7 @@ def _do_upload_one_part(
16271661
part_size: int,
16281662
session_token: str,
16291663
part_content: BinaryIO,
1664+
is_first_part: bool = False,
16301665
) -> str:
16311666
retry_count = 0
16321667

@@ -1648,18 +1683,14 @@ def _do_upload_one_part(
16481683
upload_part_urls_response = self._api.do(
16491684
"POST", "/api/2.0/fs/create-upload-part-urls", headers=headers, body=body
16501685
)
1651-
except PermissionDenied as e:
1652-
if self._is_presigned_urls_disabled_error(e):
1653-
raise FallbackToUploadUsingFilesApi(None, "Presigned URLs are disabled")
1654-
else:
1655-
raise e from None
1656-
except InternalError as e:
1657-
if self._is_presigned_urls_network_zone_error(e):
1686+
except Exception as e:
1687+
if is_first_part:
16581688
raise FallbackToUploadUsingFilesApi(
1659-
None, "Presigned URLs are not supported in the current network zone"
1689+
None,
1690+
f"Failed to obtain upload URL for part {part_index}: {e}, falling back to single shot upload",
16601691
)
16611692
else:
1662-
raise e from None
1693+
raise e
16631694

16641695
upload_part_urls = upload_part_urls_response.get("upload_part_urls", [])
16651696
if len(upload_part_urls) == 0:
@@ -1699,8 +1730,11 @@ def perform_upload() -> requests.Response:
16991730
continue
17001731
else:
17011732
raise ValueError(f"Unsuccessful chunk upload: upload URL expired after {retry_count} retries")
1702-
elif upload_response.status_code == 403:
1733+
elif upload_response.status_code == 403 and is_first_part:
17031734
raise FallbackToUploadUsingFilesApi(None, f"Direct upload forbidden: {upload_response.content}")
1735+
elif is_first_part:
1736+
message = f"Unsuccessful chunk upload. Response status: {upload_response.status_code}, body: {upload_response.content}"
1737+
raise FallbackToUploadUsingFilesApi(None, message)
17041738
else:
17051739
message = f"Unsuccessful chunk upload. Response status: {upload_response.status_code}, body: {upload_response.content}"
17061740
_LOG.warning(message)
@@ -1765,18 +1799,13 @@ def _perform_multipart_upload(
17651799
upload_part_urls_response = self._api.do(
17661800
"POST", "/api/2.0/fs/create-upload-part-urls", headers=headers, body=body
17671801
)
1768-
except PermissionDenied as e:
1769-
if chunk_offset == 0 and self._is_presigned_urls_disabled_error(e):
1770-
raise FallbackToUploadUsingFilesApi(buffer, "Presigned URLs are disabled")
1771-
else:
1772-
raise e from None
1773-
except InternalError as e:
1774-
if chunk_offset == 0 and self._is_presigned_urls_network_zone_error(e):
1802+
except Exception as e:
1803+
if chunk_offset == 0:
17751804
raise FallbackToUploadUsingFilesApi(
1776-
buffer, "Presigned URLs are not supported in the current network zone"
1777-
)
1805+
buffer, f"Failed to obtain upload URLs: {e}, falling back to single shot upload"
1806+
) from e
17781807
else:
1779-
raise e from None
1808+
raise e
17801809

17811810
upload_part_urls = upload_part_urls_response.get("upload_part_urls", [])
17821811
if len(upload_part_urls) == 0:
@@ -1847,7 +1876,14 @@ def perform():
18471876
# Let's fallback to using Files API which might be allowlisted to upload, passing
18481877
# currently buffered (but not yet uploaded) part of the stream.
18491878
raise FallbackToUploadUsingFilesApi(buffer, f"Direct upload forbidden: {upload_response.content}")
1850-
1879+
elif chunk_offset == 0:
1880+
# We got an upload failure when uploading the very first chunk.
1881+
# Let's fallback to using Files API which might be more reliable in this case,
1882+
# passing currently buffered (but not yet uploaded) part of the stream.
1883+
raise FallbackToUploadUsingFilesApi(
1884+
buffer,
1885+
f"Unsuccessful chunk upload: {upload_response.status_code}, falling back to single shot upload",
1886+
)
18511887
else:
18521888
message = f"Unsuccessful chunk upload. Response status: {upload_response.status_code}, body: {upload_response.content}"
18531889
_LOG.warning(message)
@@ -1985,18 +2021,10 @@ def _perform_resumable_upload(
19852021
resumable_upload_url_response = self._api.do(
19862022
"POST", "/api/2.0/fs/create-resumable-upload-url", headers=headers, body=body
19872023
)
1988-
except PermissionDenied as e:
1989-
if self._is_presigned_urls_disabled_error(e):
1990-
raise FallbackToUploadUsingFilesApi(pre_read_buffer, "Presigned URLs are disabled")
1991-
else:
1992-
raise e from None
1993-
except InternalError as e:
1994-
if self._is_presigned_urls_network_zone_error(e):
1995-
raise FallbackToUploadUsingFilesApi(
1996-
pre_read_buffer, "Presigned URLs are not supported in the current network zone"
1997-
)
1998-
else:
1999-
raise e from None
2024+
except Exception as e:
2025+
raise FallbackToUploadUsingFilesApi(
2026+
pre_read_buffer, f"Failed to obtain resumable upload URL: {e}, falling back to single shot upload"
2027+
) from e
20002028

20012029
resumable_upload_url_node = resumable_upload_url_response.get("resumable_upload_url")
20022030
if not resumable_upload_url_node:

tests/test_files.py

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1921,40 +1921,40 @@ def to_string(test_case: "MultipartUploadTestCase") -> str:
19211921
),
19221922
# -------------------------- failures on "create upload URL" --------------------------
19231923
MultipartUploadTestCase(
1924-
"Create upload URL: 400 response is not retried",
1924+
"Create upload URL: 400 response should fallback",
19251925
content_size=1024 * 1024,
19261926
custom_response_on_create_multipart_url=CustomResponse(
19271927
code=400,
19281928
# 1 failure is enough
19291929
only_invocation=1,
19301930
),
1931-
expected_exception_type=BadRequest,
19321931
expected_multipart_upload_aborted=True,
1932+
expected_single_shot_upload=True,
19331933
),
19341934
MultipartUploadTestCase(
1935-
"Create upload URL: 403 response is not retried",
1935+
"Create upload URL: 403 response should fallback",
19361936
content_size=1024 * 1024,
19371937
custom_response_on_create_multipart_url=CustomResponse(
19381938
code=403,
19391939
# 1 failure is enough
19401940
only_invocation=1,
19411941
),
1942-
expected_exception_type=PermissionDenied,
19431942
expected_multipart_upload_aborted=True,
1943+
expected_single_shot_upload=True,
19441944
),
19451945
MultipartUploadTestCase(
1946-
"Create upload URL: internal error is not retried",
1946+
"Create upload URL: internal error should fallback",
19471947
content_size=1024 * 1024,
19481948
custom_response_on_create_multipart_url=CustomResponse(code=500, only_invocation=1),
1949-
expected_exception_type=InternalError,
19501949
expected_multipart_upload_aborted=True,
1950+
expected_single_shot_upload=True,
19511951
),
19521952
MultipartUploadTestCase(
1953-
"Create upload URL: non-JSON response is not retried",
1953+
"Create upload URL: non-JSON response should fallback",
19541954
content_size=1024 * 1024,
19551955
custom_response_on_create_multipart_url=CustomResponse(body="this is not a JSON", only_invocation=1),
1956-
expected_exception_type=requests.exceptions.JSONDecodeError,
19571956
expected_multipart_upload_aborted=True,
1957+
expected_single_shot_upload=True,
19581958
),
19591959
MultipartUploadTestCase(
19601960
"Create upload URL: meaningless JSON response is not retried",
@@ -1980,11 +1980,11 @@ def to_string(test_case: "MultipartUploadTestCase") -> str:
19801980
expected_multipart_upload_aborted=True,
19811981
),
19821982
MultipartUploadTestCase(
1983-
"Create upload URL: permanent retryable exception",
1983+
"Create upload URL: permanent retryable exception should fallback",
19841984
content_size=1024 * 1024,
19851985
custom_response_on_create_multipart_url=CustomResponse(exception=requests.ConnectionError),
1986-
expected_exception_type=TimeoutError,
19871986
expected_multipart_upload_aborted=True,
1987+
expected_single_shot_upload=True,
19881988
),
19891989
MultipartUploadTestCase(
19901990
"Create upload URL: intermittent retryable exception",
@@ -2221,8 +2221,8 @@ def to_string(test_case: "MultipartUploadTestCase") -> str:
22212221
content_size=1024 * 1024,
22222222
custom_response_on_create_multipart_url=CustomResponse(code=403, only_invocation=1),
22232223
custom_response_on_create_abort_url=CustomResponse(code=429, first_invocation=1, last_invocation=3),
2224-
expected_exception_type=PermissionDenied, # original error
22252224
expected_multipart_upload_aborted=True, # abort successfully called after abort URL creation is retried
2225+
expected_single_shot_upload=True,
22262226
),
22272227
MultipartUploadTestCase(
22282228
"Abort: exception",
@@ -2233,12 +2233,12 @@ def to_string(test_case: "MultipartUploadTestCase") -> str:
22332233
# this allows to change the server state to "aborted"
22342234
exception_happened_before_processing=False,
22352235
),
2236-
expected_exception_type=PermissionDenied, # original error is reported
22372236
expected_multipart_upload_aborted=True,
2237+
expected_single_shot_upload=True,
22382238
),
22392239
# -------------------------- Parallel Upload for Streams --------------------------
22402240
MultipartUploadTestCase(
2241-
"Multipart parallel upload for stream: Upload errors are not retried",
2241+
"Multipart parallel upload for stream: Upload errors are not retried but fallback",
22422242
content_size=10 * 1024 * 1024,
22432243
multipart_upload_part_size=1024 * 1024,
22442244
source_type=[UploadSourceType.SEEKABLE_STREAM],
@@ -2557,22 +2557,22 @@ def to_string(test_case: "ResumableUploadTestCase") -> str:
25572557
[
25582558
# ------------------ failures on creating resumable upload URL ------------------
25592559
ResumableUploadTestCase(
2560-
"Create resumable URL: 400 response is not retried",
2560+
"Create resumable URL: 400 response is not retried and should fallback",
25612561
stream_size=1024 * 1024,
25622562
custom_response_on_create_resumable_url=CustomResponse(
25632563
code=400,
25642564
# 1 failure is enough
25652565
only_invocation=1,
25662566
),
2567-
expected_exception_type=BadRequest,
25682567
expected_multipart_upload_aborted=False, # upload didn't start
2568+
expected_single_shot_upload=True,
25692569
),
25702570
ResumableUploadTestCase(
2571-
"Create resumable URL: 403 response is not retried",
2571+
"Create resumable URL: 403 response is not retried and should fallback",
25722572
stream_size=1024 * 1024,
25732573
custom_response_on_create_resumable_url=CustomResponse(code=403, only_invocation=1),
2574-
expected_exception_type=PermissionDenied,
25752574
expected_multipart_upload_aborted=False, # upload didn't start
2575+
expected_single_shot_upload=True,
25762576
),
25772577
ResumableUploadTestCase(
25782578
"Create resumable URL: fallback to single-shot upload when presigned URLs are disabled",
@@ -2595,18 +2595,18 @@ def to_string(test_case: "ResumableUploadTestCase") -> str:
25952595
expected_single_shot_upload=True,
25962596
),
25972597
ResumableUploadTestCase(
2598-
"Create resumable URL: 500 response is not retried",
2598+
"Create resumable URL: 500 response is not retried and should fallback",
25992599
stream_size=1024 * 1024,
26002600
custom_response_on_create_resumable_url=CustomResponse(code=500, only_invocation=1),
2601-
expected_exception_type=InternalError,
26022601
expected_multipart_upload_aborted=False, # upload didn't start
2602+
expected_single_shot_upload=True,
26032603
),
26042604
ResumableUploadTestCase(
26052605
"Create resumable URL: non-JSON response is not retried",
26062606
stream_size=1024 * 1024,
26072607
custom_response_on_create_resumable_url=CustomResponse(body="Foo bar", only_invocation=1),
2608-
expected_exception_type=requests.exceptions.JSONDecodeError,
26092608
expected_multipart_upload_aborted=False, # upload didn't start
2609+
expected_single_shot_upload=True,
26102610
),
26112611
ResumableUploadTestCase(
26122612
"Create resumable URL: meaningless JSON response is not retried",
@@ -2621,8 +2621,8 @@ def to_string(test_case: "ResumableUploadTestCase") -> str:
26212621
"Create resumable URL: permanent retryable status code",
26222622
stream_size=1024 * 1024,
26232623
custom_response_on_create_resumable_url=CustomResponse(code=429),
2624-
expected_exception_type=TimeoutError,
26252624
expected_multipart_upload_aborted=False, # upload didn't start
2625+
expected_single_shot_upload=True,
26262626
),
26272627
ResumableUploadTestCase(
26282628
"Create resumable URL: intermittent retryable exception is retried",

0 commit comments

Comments
 (0)