Skip to content

Commit e890378

Browse files
Fix FilesExt upload fails when content size is zero (#1088)
## What changes are proposed in this pull request? Provide the readers and reviewers with the information they need to understand this PR in a comprehensive manner. Specifically, try to answer the two following questions: - **WHAT** - Fix the `FilesExt` issue where it throws exception when the content size is 0, by checking if the size of the content is below a certain threshold. If it is below the threshold, upload using single-shot instead of multipart upload. - **WHY** - `upload` interface needs to support upload data with zero size. ## How is this tested? Unit test cases are updated to test the behavior change.
1 parent b4ffd99 commit e890378

File tree

3 files changed

+44
-18
lines changed

3 files changed

+44
-18
lines changed

NEXT_CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
### Bug Fixes
88
- Fix `FilesExt` can fail to upload and download data when Presigned URLs are not available in certain environments (e.g. Serverless GPU clusters).
99

10+
- Fix `FilesExt.upload` and `FilesExt.upload_from` would fail when the source content is empty and `use_parallel=True`.
11+
1012
### Documentation
1113

1214
### Internal Changes

databricks/sdk/mixins/files.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1134,7 +1134,9 @@ def upload(
11341134
f"Upload context: part_size={ctx.part_size}, batch_size={ctx.batch_size}, content_length={ctx.content_length}"
11351135
)
11361136

1137-
if ctx.use_parallel:
1137+
if ctx.use_parallel and (
1138+
ctx.content_length is None or ctx.content_length >= self._config.files_ext_multipart_upload_min_stream_size
1139+
):
11381140
self._parallel_upload_from_stream(ctx, contents)
11391141
return UploadStreamResult()
11401142
elif ctx.content_length is not None:
@@ -1206,7 +1208,7 @@ def upload_from(
12061208
use_parallel=use_parallel,
12071209
parallelism=parallelism,
12081210
)
1209-
if ctx.use_parallel:
1211+
if ctx.use_parallel and ctx.content_length >= self._config.files_ext_multipart_upload_min_stream_size:
12101212
self._parallel_upload_from_file(ctx)
12111213
return UploadFileResult()
12121214
else:
@@ -1459,8 +1461,9 @@ def _parallel_multipart_upload_from_stream(
14591461
# Do the first part read ahead
14601462
pre_read_buffer = content.read(ctx.part_size)
14611463
if not pre_read_buffer:
1462-
self._complete_multipart_upload(ctx, {}, session_token)
1463-
return
1464+
raise FallbackToUploadUsingFilesApi(
1465+
b"", "Falling back to single-shot upload with Files API due to empty input stream"
1466+
)
14641467
try:
14651468
etag = self._do_upload_one_part(
14661469
ctx, cloud_provider_session, 1, 0, len(pre_read_buffer), session_token, BytesIO(pre_read_buffer)

tests/test_files.py

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
from databricks.sdk.mixins.files_utils import CreateDownloadUrlResponse
3232
from tests.clock import FakeClock
3333

34-
from .test_files_utils import Utils
34+
from .test_files_utils import NonSeekableBuffer, Utils
3535

3636
logger = logging.getLogger(__name__)
3737

@@ -1286,8 +1286,8 @@ def __init__(
12861286
custom_response_on_single_shot_upload: CustomResponse,
12871287
# exception which is expected to be thrown (so upload is expected to have failed)
12881288
expected_exception_type: Optional[Type[BaseException]],
1289-
# if abort is expected to be called for multipart/resumable upload
1290-
expected_multipart_upload_aborted: bool,
1289+
# Whether abort is expected to be called for multipart/resumable upload, set to None if we don't care.
1290+
expected_multipart_upload_aborted: Optional[bool],
12911291
expected_single_shot_upload: bool,
12921292
):
12931293
self.name = name
@@ -1303,7 +1303,7 @@ def __init__(
13031303
self.multipart_upload_max_retries = multipart_upload_max_retries
13041304
self.custom_response_on_single_shot_upload = custom_response_on_single_shot_upload
13051305
self.expected_exception_type = expected_exception_type
1306-
self.expected_multipart_upload_aborted: bool = expected_multipart_upload_aborted
1306+
self.expected_multipart_upload_aborted: Optional[bool] = expected_multipart_upload_aborted
13071307
self.expected_single_shot_upload = expected_single_shot_upload
13081308

13091309
self.path = "/test.txt"
@@ -1323,16 +1323,20 @@ def clear_state(self) -> None:
13231323
logger.warning("Failed to remove temp file: %s", file_path)
13241324
self.created_temp_files = []
13251325

1326-
def get_upload_file(self, content: bytes, source_type: "UploadSourceType") -> Union[str, io.BytesIO]:
1326+
def get_upload_file(
1327+
self, content: bytes, source_type: "UploadSourceType"
1328+
) -> Union[str, io.BytesIO, NonSeekableBuffer]:
13271329
"""Returns a file or stream to upload based on the source type."""
13281330
if source_type == UploadSourceType.FILE:
13291331
with NamedTemporaryFile(mode="wb", delete=False) as f:
13301332
f.write(content)
13311333
file_path = f.name
13321334
self.created_temp_files.append(file_path)
13331335
return file_path
1334-
elif source_type == UploadSourceType.STREAM:
1336+
elif source_type == UploadSourceType.SEEKABLE_STREAM:
13351337
return io.BytesIO(content)
1338+
elif source_type == UploadSourceType.NONSEEKABLE_STREAM:
1339+
return NonSeekableBuffer(content)
13361340
else:
13371341
raise ValueError(f"Unknown source type: {source_type}")
13381342

@@ -1446,7 +1450,8 @@ def upload() -> None:
14461450
), "Single-shot upload should not have succeeded"
14471451

14481452
assert (
1449-
multipart_server_state.aborted == self.expected_multipart_upload_aborted
1453+
self.expected_multipart_upload_aborted is None
1454+
or multipart_server_state.aborted == self.expected_multipart_upload_aborted
14501455
), "Multipart upload aborted state mismatch"
14511456

14521457
finally:
@@ -1462,7 +1467,8 @@ class UploadSourceType(Enum):
14621467
"""Source type for the upload. Used to determine how to upload the file."""
14631468

14641469
FILE = "file" # upload from a file on disk
1465-
STREAM = "stream" # upload from a stream (e.g. BytesIO)
1470+
SEEKABLE_STREAM = "seekable_stream" # upload from a seekable stream (e.g. BytesIO)
1471+
NONSEEKABLE_STREAM = "nonseekable_stream" # upload from a non-seekable stream (e.g. network stream)
14661472

14671473

14681474
class MultipartUploadTestCase(UploadTestCase):
@@ -1570,15 +1576,16 @@ def __init__(
15701576
# if abort is expected to be called
15711577
# expected part size
15721578
expected_part_size: Optional[int] = None,
1573-
expected_multipart_upload_aborted: bool = False,
1579+
expected_multipart_upload_aborted: Optional[bool] = False,
15741580
expected_single_shot_upload: bool = False,
15751581
):
15761582
super().__init__(
15771583
name,
15781584
content_size,
15791585
cloud,
15801586
overwrite,
1581-
source_type or [UploadSourceType.FILE, UploadSourceType.STREAM],
1587+
source_type
1588+
or [UploadSourceType.FILE, UploadSourceType.SEEKABLE_STREAM, UploadSourceType.NONSEEKABLE_STREAM],
15821589
use_parallel or [False, True],
15831590
parallelism,
15841591
multipart_upload_min_stream_size,
@@ -1710,6 +1717,7 @@ def processor() -> list:
17101717
request_json = request.json()
17111718
etags = {}
17121719

1720+
assert len(request_json["parts"]) > 0
17131721
for part in request_json["parts"]:
17141722
etags[part["part_number"]] = part["etag"]
17151723

@@ -1786,10 +1794,22 @@ def to_string(test_case: "MultipartUploadTestCase") -> str:
17861794
[
17871795
# -------------------------- happy cases --------------------------
17881796
MultipartUploadTestCase(
1789-
"Multipart upload successful: single part",
1797+
"Multipart upload successful: single part because of small file",
17901798
content_size=1024 * 1024, # less than part size
1791-
multipart_upload_part_size=10 * 1024 * 1024,
1799+
multipart_upload_min_stream_size=10 * 1024 * 1024,
1800+
source_type=[
1801+
UploadSourceType.FILE,
1802+
UploadSourceType.SEEKABLE_STREAM,
1803+
], # non-seekable streams always use multipart upload
17921804
expected_part_size=1024 * 1024, # chunk size is used
1805+
expected_single_shot_upload=True,
1806+
),
1807+
MultipartUploadTestCase(
1808+
"Multipart upload successful: empty file or empty seekable stream",
1809+
content_size=0, # content with zero length
1810+
multipart_upload_min_stream_size=100 * 1024 * 1024, # all files smaller than 100M goes to single-shot
1811+
expected_single_shot_upload=True,
1812+
expected_multipart_upload_aborted=None,
17931813
),
17941814
MultipartUploadTestCase(
17951815
"Multipart upload successful: multiple parts (aligned)",
@@ -2221,7 +2241,7 @@ def to_string(test_case: "MultipartUploadTestCase") -> str:
22212241
"Multipart parallel upload for stream: Upload errors are not retried",
22222242
content_size=10 * 1024 * 1024,
22232243
multipart_upload_part_size=1024 * 1024,
2224-
source_type=[UploadSourceType.STREAM],
2244+
source_type=[UploadSourceType.SEEKABLE_STREAM],
22252245
use_parallel=[True],
22262246
parallelism=1,
22272247
custom_response_on_upload=CustomResponse(
@@ -2385,7 +2405,8 @@ def __init__(
23852405
stream_size,
23862406
cloud,
23872407
overwrite,
2388-
source_type or [UploadSourceType.FILE, UploadSourceType.STREAM],
2408+
source_type
2409+
or [UploadSourceType.FILE, UploadSourceType.SEEKABLE_STREAM, UploadSourceType.NONSEEKABLE_STREAM],
23892410
use_parallel
23902411
or [True, False], # Resumable Upload doesn't support parallel uploading of parts, but fallback should work
23912412
parallelism,

0 commit comments

Comments
 (0)