Skip to content

Commit a26962a

Browse files
Add fall back to all exceptions encountered when presigned URL is not available for upload and download in FilesExt (#1115)
## What changes are proposed in this pull request? **WHAT** - Add fall back to all exceptions encountered when presigned URL is not available for upload and download in `FilesExt` **WHY** - This is to eliminate the possibility of any potential regression introduced by upload and download through presigned URL. ## How is this tested? Unit tests are updated to reflect the change. --------- Co-authored-by: Parth Bansal <[email protected]>
1 parent 2df6609 commit a26962a

File tree

3 files changed

+100
-78
lines changed

3 files changed

+100
-78
lines changed

NEXT_CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99

1010
### Bug Fixes
1111

12+
- Fixed an issue where download from Shared Volumes could fail by falling back to Files API whenever Presigned URLs are not available.
13+
1214
### Documentation
1315

1416
### Internal Changes

databricks/sdk/mixins/files.py

Lines changed: 75 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -964,6 +964,8 @@ def _parallel_download_presigned_url(self, remote_path: str, destination: str, p
964964

965965
cloud_session = self._create_cloud_provider_session()
966966
url_distributor = _PresignedUrlDistributor(lambda: self._create_download_url(remote_path))
967+
# An event to indicate if any download chunk has succeeded. If any chunk succeeds, we do not fall back to Files API.
968+
any_success = Event()
967969

968970
def download_chunk(additional_headers: dict[str, str]) -> BinaryIO:
969971
retry_count = 0
@@ -982,10 +984,14 @@ def get_content() -> requests.Response:
982984
url_distributor.invalidate_url(version)
983985
retry_count += 1
984986
continue
985-
elif raw_resp.status_code == 403:
987+
elif raw_resp.status_code == 403 and not any_success.is_set():
986988
raise FallbackToDownloadUsingFilesApi("Received 403 Forbidden from presigned URL")
989+
elif not any_success.is_set():
990+
# For other errors, we raise a retryable exception to trigger retry logic.
991+
raise FallbackToDownloadUsingFilesApi(f"Received {raw_resp.status_code} from presigned URL")
987992

988993
raw_resp.raise_for_status()
994+
any_success.set()
989995
return BytesIO(raw_resp.content)
990996
raise ValueError("Exceeded maximum retries for downloading with presigned URL: URL expired too many times")
991997

@@ -1404,20 +1410,47 @@ def _parallel_multipart_upload_from_file(
14041410
part_size = ctx.part_size
14051411
num_parts = (file_size + part_size - 1) // part_size
14061412
_LOG.debug(f"Uploading file of size {file_size} bytes in {num_parts} parts using {ctx.parallelism} threads")
1413+
cloud_provider_session = self._create_cloud_provider_session()
1414+
1415+
# Upload one part to verify the upload can proceed.
1416+
with open(ctx.source_file_path, "rb") as f:
1417+
f.seek(0)
1418+
first_part_size = min(part_size, file_size)
1419+
first_part_buffer = f.read(first_part_size)
1420+
try:
1421+
etag = self._do_upload_one_part(
1422+
ctx,
1423+
cloud_provider_session,
1424+
1,
1425+
0,
1426+
first_part_size,
1427+
session_token,
1428+
BytesIO(first_part_buffer),
1429+
is_first_part=True,
1430+
)
1431+
except FallbackToUploadUsingFilesApi as e:
1432+
raise FallbackToUploadUsingFilesApi(None, "Falling back to single-shot upload with Files API") from e
1433+
if num_parts == 1:
1434+
self._complete_multipart_upload(ctx, {1: etag}, session_token)
1435+
return
14071436

14081437
# Create queues and worker threads.
14091438
task_queue = Queue()
14101439
etags_result_queue = Queue()
1440+
etags_result_queue.put_nowait((1, etag))
14111441
exception_queue = Queue()
14121442
aborted = Event()
14131443
workers = [
1414-
Thread(target=self._upload_file_consumer, args=(task_queue, etags_result_queue, exception_queue, aborted))
1444+
Thread(
1445+
target=self._upload_file_consumer,
1446+
args=(cloud_provider_session, task_queue, etags_result_queue, exception_queue, aborted),
1447+
)
14151448
for _ in range(ctx.parallelism)
14161449
]
14171450
_LOG.debug(f"Starting {len(workers)} worker threads for parallel upload")
14181451

14191452
# Enqueue all parts. Since the task queue is populated before starting the workers, we don't need to signal completion.
1420-
for part_index in range(1, num_parts + 1):
1453+
for part_index in range(2, num_parts + 1):
14211454
part_offset = (part_index - 1) * part_size
14221455
part_size = min(part_size, file_size - part_offset)
14231456
part = self._MultipartUploadPart(ctx, part_index, part_offset, part_size, session_token)
@@ -1466,7 +1499,14 @@ def _parallel_multipart_upload_from_stream(
14661499
)
14671500
try:
14681501
etag = self._do_upload_one_part(
1469-
ctx, cloud_provider_session, 1, 0, len(pre_read_buffer), session_token, BytesIO(pre_read_buffer)
1502+
ctx,
1503+
cloud_provider_session,
1504+
1,
1505+
0,
1506+
len(pre_read_buffer),
1507+
session_token,
1508+
BytesIO(pre_read_buffer),
1509+
is_first_part=True,
14701510
)
14711511
etags_result_queue.put((1, etag))
14721512
except FallbackToUploadUsingFilesApi as e:
@@ -1551,12 +1591,12 @@ def _complete_multipart_upload(self, ctx, etags, session_token):
15511591

15521592
def _upload_file_consumer(
15531593
self,
1594+
cloud_provider_session: requests.Session,
15541595
task_queue: Queue[FilesExt._MultipartUploadPart],
15551596
etags_queue: Queue[tuple[int, str]],
15561597
exception_queue: Queue[Exception],
15571598
aborted: Event,
15581599
) -> None:
1559-
cloud_provider_session = self._create_cloud_provider_session()
15601600
while not aborted.is_set():
15611601
try:
15621602
part = task_queue.get(block=False)
@@ -1627,6 +1667,7 @@ def _do_upload_one_part(
16271667
part_size: int,
16281668
session_token: str,
16291669
part_content: BinaryIO,
1670+
is_first_part: bool = False,
16301671
) -> str:
16311672
retry_count = 0
16321673

@@ -1648,18 +1689,14 @@ def _do_upload_one_part(
16481689
upload_part_urls_response = self._api.do(
16491690
"POST", "/api/2.0/fs/create-upload-part-urls", headers=headers, body=body
16501691
)
1651-
except PermissionDenied as e:
1652-
if self._is_presigned_urls_disabled_error(e):
1653-
raise FallbackToUploadUsingFilesApi(None, "Presigned URLs are disabled")
1654-
else:
1655-
raise e from None
1656-
except InternalError as e:
1657-
if self._is_presigned_urls_network_zone_error(e):
1692+
except Exception as e:
1693+
if is_first_part:
16581694
raise FallbackToUploadUsingFilesApi(
1659-
None, "Presigned URLs are not supported in the current network zone"
1695+
None,
1696+
f"Failed to obtain upload URL for part {part_index}: {e}, falling back to single shot upload",
16601697
)
16611698
else:
1662-
raise e from None
1699+
raise e
16631700

16641701
upload_part_urls = upload_part_urls_response.get("upload_part_urls", [])
16651702
if len(upload_part_urls) == 0:
@@ -1699,8 +1736,11 @@ def perform_upload() -> requests.Response:
16991736
continue
17001737
else:
17011738
raise ValueError(f"Unsuccessful chunk upload: upload URL expired after {retry_count} retries")
1702-
elif upload_response.status_code == 403:
1739+
elif upload_response.status_code == 403 and is_first_part:
17031740
raise FallbackToUploadUsingFilesApi(None, f"Direct upload forbidden: {upload_response.content}")
1741+
elif is_first_part:
1742+
message = f"Unsuccessful chunk upload. Response status: {upload_response.status_code}, body: {upload_response.content}"
1743+
raise FallbackToUploadUsingFilesApi(None, message)
17041744
else:
17051745
message = f"Unsuccessful chunk upload. Response status: {upload_response.status_code}, body: {upload_response.content}"
17061746
_LOG.warning(message)
@@ -1765,18 +1805,13 @@ def _perform_multipart_upload(
17651805
upload_part_urls_response = self._api.do(
17661806
"POST", "/api/2.0/fs/create-upload-part-urls", headers=headers, body=body
17671807
)
1768-
except PermissionDenied as e:
1769-
if chunk_offset == 0 and self._is_presigned_urls_disabled_error(e):
1770-
raise FallbackToUploadUsingFilesApi(buffer, "Presigned URLs are disabled")
1771-
else:
1772-
raise e from None
1773-
except InternalError as e:
1774-
if chunk_offset == 0 and self._is_presigned_urls_network_zone_error(e):
1808+
except Exception as e:
1809+
if chunk_offset == 0:
17751810
raise FallbackToUploadUsingFilesApi(
1776-
buffer, "Presigned URLs are not supported in the current network zone"
1777-
)
1811+
buffer, f"Failed to obtain upload URLs: {e}, falling back to single shot upload"
1812+
) from e
17781813
else:
1779-
raise e from None
1814+
raise e
17801815

17811816
upload_part_urls = upload_part_urls_response.get("upload_part_urls", [])
17821817
if len(upload_part_urls) == 0:
@@ -1847,7 +1882,14 @@ def perform():
18471882
# Let's fallback to using Files API which might be allowlisted to upload, passing
18481883
# currently buffered (but not yet uploaded) part of the stream.
18491884
raise FallbackToUploadUsingFilesApi(buffer, f"Direct upload forbidden: {upload_response.content}")
1850-
1885+
elif chunk_offset == 0:
1886+
# We got an upload failure when uploading the very first chunk.
1887+
# Let's fallback to using Files API which might be more reliable in this case,
1888+
# passing currently buffered (but not yet uploaded) part of the stream.
1889+
raise FallbackToUploadUsingFilesApi(
1890+
buffer,
1891+
f"Unsuccessful chunk upload: {upload_response.status_code}, falling back to single shot upload",
1892+
)
18511893
else:
18521894
message = f"Unsuccessful chunk upload. Response status: {upload_response.status_code}, body: {upload_response.content}"
18531895
_LOG.warning(message)
@@ -1985,18 +2027,10 @@ def _perform_resumable_upload(
19852027
resumable_upload_url_response = self._api.do(
19862028
"POST", "/api/2.0/fs/create-resumable-upload-url", headers=headers, body=body
19872029
)
1988-
except PermissionDenied as e:
1989-
if self._is_presigned_urls_disabled_error(e):
1990-
raise FallbackToUploadUsingFilesApi(pre_read_buffer, "Presigned URLs are disabled")
1991-
else:
1992-
raise e from None
1993-
except InternalError as e:
1994-
if self._is_presigned_urls_network_zone_error(e):
1995-
raise FallbackToUploadUsingFilesApi(
1996-
pre_read_buffer, "Presigned URLs are not supported in the current network zone"
1997-
)
1998-
else:
1999-
raise e from None
2030+
except Exception as e:
2031+
raise FallbackToUploadUsingFilesApi(
2032+
pre_read_buffer, f"Failed to obtain resumable upload URL: {e}, falling back to single shot upload"
2033+
) from e
20002034

20012035
resumable_upload_url_node = resumable_upload_url_response.get("resumable_upload_url")
20022036
if not resumable_upload_url_node:
@@ -2376,16 +2410,8 @@ def _create_download_url(self, file_path: str) -> CreateDownloadUrlResponse:
23762410
)
23772411

23782412
return CreateDownloadUrlResponse.from_dict(raw_response)
2379-
except PermissionDenied as e:
2380-
if self._is_presigned_urls_disabled_error(e):
2381-
raise FallbackToDownloadUsingFilesApi(f"Presigned URLs are disabled")
2382-
else:
2383-
raise e from None
2384-
except InternalError as e:
2385-
if self._is_presigned_urls_network_zone_error(e):
2386-
raise FallbackToDownloadUsingFilesApi("Presigned URLs are not supported in the current network zone")
2387-
else:
2388-
raise e from None
2413+
except Exception as e:
2414+
raise FallbackToDownloadUsingFilesApi(f"Failed to create download URL: {e}") from e
23892415

23902416
def _init_download_response_presigned_api(self, file_path: str, added_headers: dict[str, str]) -> DownloadResponse:
23912417
"""
@@ -2428,17 +2454,11 @@ def perform() -> requests.Response:
24282454
contents=_StreamingResponse(csp_response, self._config.files_ext_client_download_streaming_chunk_size),
24292455
)
24302456
return resp
2431-
elif csp_response.status_code == 403:
2432-
# We got 403 failure when downloading the file. This might happen due to Azure firewall enabled for the customer bucket.
2433-
# Let's fallback to using Files API which might be allowlisted to download.
2434-
raise FallbackToDownloadUsingFilesApi(f"Direct download forbidden: {csp_response.content}")
24352457
else:
24362458
message = (
24372459
f"Unsuccessful download. Response status: {csp_response.status_code}, body: {csp_response.content}"
24382460
)
2439-
_LOG.warning(message)
2440-
mapped_error = _error_mapper(csp_response, {})
2441-
raise mapped_error or ValueError(message)
2461+
raise FallbackToDownloadUsingFilesApi(message)
24422462

24432463
def _init_download_response_mode_csp_with_fallback(
24442464
self, file_path: str, headers: dict[str, str], response_headers: list[str]

0 commit comments

Comments
 (0)