Skip to content

Commit c6c2892

Browse files
Fix FilesExt upload fails when content size is zero
1 parent 49eb17b commit c6c2892

File tree

3 files changed

+44
-18
lines changed

3 files changed

+44
-18
lines changed

NEXT_CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
### Bug Fixes
88

9+
- Fix `FilesExt.upload` and `FilesExt.upload_from` would fail when the source content is empty and `use_parallel=True`.
10+
911
### Documentation
1012

1113
### Internal Changes

databricks/sdk/mixins/files.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1134,7 +1134,9 @@ def upload(
11341134
f"Upload context: part_size={ctx.part_size}, batch_size={ctx.batch_size}, content_length={ctx.content_length}"
11351135
)
11361136

1137-
if ctx.use_parallel:
1137+
if ctx.use_parallel and (
1138+
ctx.content_length is None or ctx.content_length >= self._config.files_ext_multipart_upload_min_stream_size
1139+
):
11381140
self._parallel_upload_from_stream(ctx, content)
11391141
return UploadStreamResult()
11401142
elif ctx.content_length is not None:
@@ -1206,7 +1208,7 @@ def upload_from(
12061208
use_parallel=use_parallel,
12071209
parallelism=parallelism,
12081210
)
1209-
if ctx.use_parallel:
1211+
if ctx.use_parallel and ctx.content_length >= self._config.files_ext_multipart_upload_min_stream_size:
12101212
self._parallel_upload_from_file(ctx)
12111213
return UploadFileResult()
12121214
else:
@@ -1459,8 +1461,9 @@ def _parallel_multipart_upload_from_stream(
14591461
# Do the first part read ahead
14601462
pre_read_buffer = content.read(ctx.part_size)
14611463
if not pre_read_buffer:
1462-
self._complete_multipart_upload(ctx, {}, session_token)
1463-
return
1464+
raise FallbackToUploadUsingFilesApi(
1465+
b"", "Falling back to single-shot upload with Files API due to empty input stream"
1466+
)
14641467
try:
14651468
etag = self._do_upload_one_part(
14661469
ctx, cloud_provider_session, 1, 0, len(pre_read_buffer), session_token, BytesIO(pre_read_buffer)

tests/test_files.py

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
from databricks.sdk.mixins.files_utils import CreateDownloadUrlResponse
3232
from tests.clock import FakeClock
3333

34-
from .test_files_utils import Utils
34+
from .test_files_utils import NonSeekableBuffer, Utils
3535

3636
logger = logging.getLogger(__name__)
3737

@@ -1257,8 +1257,8 @@ def __init__(
12571257
custom_response_on_single_shot_upload: CustomResponse,
12581258
# exception which is expected to be thrown (so upload is expected to have failed)
12591259
expected_exception_type: Optional[Type[BaseException]],
1260-
# if abort is expected to be called for multipart/resumable upload
1261-
expected_multipart_upload_aborted: bool,
1260+
# Whether abort is expected to be called for multipart/resumable upload, set to None if we don't care.
1261+
expected_multipart_upload_aborted: Optional[bool],
12621262
expected_single_shot_upload: bool,
12631263
):
12641264
self.name = name
@@ -1274,7 +1274,7 @@ def __init__(
12741274
self.multipart_upload_max_retries = multipart_upload_max_retries
12751275
self.custom_response_on_single_shot_upload = custom_response_on_single_shot_upload
12761276
self.expected_exception_type = expected_exception_type
1277-
self.expected_multipart_upload_aborted: bool = expected_multipart_upload_aborted
1277+
self.expected_multipart_upload_aborted: Optional[bool] = expected_multipart_upload_aborted
12781278
self.expected_single_shot_upload = expected_single_shot_upload
12791279

12801280
self.path = "/test.txt"
@@ -1294,16 +1294,20 @@ def clear_state(self) -> None:
12941294
logger.warning("Failed to remove temp file: %s", file_path)
12951295
self.created_temp_files = []
12961296

1297-
def get_upload_file(self, content: bytes, source_type: "UploadSourceType") -> Union[str, io.BytesIO]:
1297+
def get_upload_file(
1298+
self, content: bytes, source_type: "UploadSourceType"
1299+
) -> Union[str, io.BytesIO, NonSeekableBuffer]:
12981300
"""Returns a file or stream to upload based on the source type."""
12991301
if source_type == UploadSourceType.FILE:
13001302
with NamedTemporaryFile(mode="wb", delete=False) as f:
13011303
f.write(content)
13021304
file_path = f.name
13031305
self.created_temp_files.append(file_path)
13041306
return file_path
1305-
elif source_type == UploadSourceType.STREAM:
1307+
elif source_type == UploadSourceType.SEEKABLE_STREAM:
13061308
return io.BytesIO(content)
1309+
elif source_type == UploadSourceType.NONSEEKABLE_STREAM:
1310+
return NonSeekableBuffer(content)
13071311
else:
13081312
raise ValueError(f"Unknown source type: {source_type}")
13091313

@@ -1417,7 +1421,8 @@ def upload() -> None:
14171421
), "Single-shot upload should not have succeeded"
14181422

14191423
assert (
1420-
multipart_server_state.aborted == self.expected_multipart_upload_aborted
1424+
self.expected_multipart_upload_aborted is None
1425+
or multipart_server_state.aborted == self.expected_multipart_upload_aborted
14211426
), "Multipart upload aborted state mismatch"
14221427

14231428
finally:
@@ -1433,7 +1438,8 @@ class UploadSourceType(Enum):
14331438
"""Source type for the upload. Used to determine how to upload the file."""
14341439

14351440
FILE = "file" # upload from a file on disk
1436-
STREAM = "stream" # upload from a stream (e.g. BytesIO)
1441+
SEEKABLE_STREAM = "seekable_stream" # upload from a seekable stream (e.g. BytesIO)
1442+
NONSEEKABLE_STREAM = "nonseekable_stream" # upload from a non-seekable stream (e.g. network stream)
14371443

14381444

14391445
class MultipartUploadTestCase(UploadTestCase):
@@ -1522,15 +1528,16 @@ def __init__(
15221528
# if abort is expected to be called
15231529
# expected part size
15241530
expected_part_size: Optional[int] = None,
1525-
expected_multipart_upload_aborted: bool = False,
1531+
expected_multipart_upload_aborted: Optional[bool] = False,
15261532
expected_single_shot_upload: bool = False,
15271533
):
15281534
super().__init__(
15291535
name,
15301536
content_size,
15311537
cloud,
15321538
overwrite,
1533-
source_type or [UploadSourceType.FILE, UploadSourceType.STREAM],
1539+
source_type
1540+
or [UploadSourceType.FILE, UploadSourceType.SEEKABLE_STREAM, UploadSourceType.NONSEEKABLE_STREAM],
15341541
use_parallel or [False, True],
15351542
parallelism,
15361543
multipart_upload_min_stream_size,
@@ -1662,6 +1669,7 @@ def processor() -> list:
16621669
request_json = request.json()
16631670
etags = {}
16641671

1672+
assert len(request_json["parts"]) > 0
16651673
for part in request_json["parts"]:
16661674
etags[part["part_number"]] = part["etag"]
16671675

@@ -1738,10 +1746,22 @@ def to_string(test_case: "MultipartUploadTestCase") -> str:
17381746
[
17391747
# -------------------------- happy cases --------------------------
17401748
MultipartUploadTestCase(
1741-
"Multipart upload successful: single part",
1749+
"Multipart upload successful: single part because of small file",
17421750
content_size=1024 * 1024, # less than part size
1743-
multipart_upload_part_size=10 * 1024 * 1024,
1751+
multipart_upload_min_stream_size=10 * 1024 * 1024,
1752+
source_type=[
1753+
UploadSourceType.FILE,
1754+
UploadSourceType.SEEKABLE_STREAM,
1755+
], # non-seekable streams always use multipart upload
17441756
expected_part_size=1024 * 1024, # chunk size is used
1757+
expected_single_shot_upload=True,
1758+
),
1759+
MultipartUploadTestCase(
1760+
"Multipart upload successful: empty file or empty seekable stream",
1761+
content_size=0, # less than part size
1762+
multipart_upload_min_stream_size=100 * 1024 * 1024, # all files smaller than 100M goes to single-shot
1763+
expected_single_shot_upload=True,
1764+
expected_multipart_upload_aborted=None,
17451765
),
17461766
MultipartUploadTestCase(
17471767
"Multipart upload successful: multiple parts (aligned)",
@@ -2161,7 +2181,7 @@ def to_string(test_case: "MultipartUploadTestCase") -> str:
21612181
"Multipart parallel upload for stream: Upload errors are not retried",
21622182
content_size=10 * 1024 * 1024,
21632183
multipart_upload_part_size=1024 * 1024,
2164-
source_type=[UploadSourceType.STREAM],
2184+
source_type=[UploadSourceType.SEEKABLE_STREAM],
21652185
use_parallel=[True],
21662186
parallelism=1,
21672187
custom_response_on_upload=CustomResponse(
@@ -2325,7 +2345,8 @@ def __init__(
23252345
stream_size,
23262346
cloud,
23272347
overwrite,
2328-
source_type or [UploadSourceType.FILE, UploadSourceType.STREAM],
2348+
source_type
2349+
or [UploadSourceType.FILE, UploadSourceType.SEEKABLE_STREAM, UploadSourceType.NONSEEKABLE_STREAM],
23292350
use_parallel
23302351
or [True, False], # Resumable Upload doesn't support parallel uploading of parts, but fallback should work
23312352
parallelism,

0 commit comments

Comments
 (0)