From 6295a3540d4808ddf499d4ea69f37eeb62e5311b Mon Sep 17 00:00:00 2001 From: Yuanjie Ding Date: Mon, 27 Oct 2025 14:50:29 +0100 Subject: [PATCH 1/2] Fix FilesExt upload fails when content size is zero --- NEXT_CHANGELOG.md | 2 ++ databricks/sdk/mixins/files.py | 11 +++++--- tests/test_files.py | 49 ++++++++++++++++++++++++---------- 3 files changed, 44 insertions(+), 18 deletions(-) diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index 2e7b46f0a..e2c01b386 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -7,6 +7,8 @@ ### Bug Fixes - Fix `FilesExt` can fail to upload and download data when Presigned URLs are not available in certain environments (e.g. Serverless GPU clusters). +- Fix `FilesExt.upload` and `FilesExt.upload_from` would fail when the source content is empty and `use_parallel=True`. + ### Documentation ### Internal Changes diff --git a/databricks/sdk/mixins/files.py b/databricks/sdk/mixins/files.py index ca5f56130..819e14d4b 100644 --- a/databricks/sdk/mixins/files.py +++ b/databricks/sdk/mixins/files.py @@ -1134,7 +1134,9 @@ def upload( f"Upload context: part_size={ctx.part_size}, batch_size={ctx.batch_size}, content_length={ctx.content_length}" ) - if ctx.use_parallel: + if ctx.use_parallel and ( + ctx.content_length is None or ctx.content_length >= self._config.files_ext_multipart_upload_min_stream_size + ): self._parallel_upload_from_stream(ctx, contents) return UploadStreamResult() elif ctx.content_length is not None: @@ -1206,7 +1208,7 @@ def upload_from( use_parallel=use_parallel, parallelism=parallelism, ) - if ctx.use_parallel: + if ctx.use_parallel and ctx.content_length >= self._config.files_ext_multipart_upload_min_stream_size: self._parallel_upload_from_file(ctx) return UploadFileResult() else: @@ -1459,8 +1461,9 @@ def _parallel_multipart_upload_from_stream( # Do the first part read ahead pre_read_buffer = content.read(ctx.part_size) if not pre_read_buffer: - self._complete_multipart_upload(ctx, {}, session_token) - return + raise FallbackToUploadUsingFilesApi( + b"", "Falling back to single-shot upload with Files API due to empty input stream" + ) try: etag = self._do_upload_one_part( ctx, cloud_provider_session, 1, 0, len(pre_read_buffer), session_token, BytesIO(pre_read_buffer) diff --git a/tests/test_files.py b/tests/test_files.py index 2a7450269..762771471 100644 --- a/tests/test_files.py +++ b/tests/test_files.py @@ -31,7 +31,7 @@ from databricks.sdk.mixins.files_utils import CreateDownloadUrlResponse from tests.clock import FakeClock -from .test_files_utils import Utils +from .test_files_utils import NonSeekableBuffer, Utils logger = logging.getLogger(__name__) @@ -1286,8 +1286,8 @@ def __init__( custom_response_on_single_shot_upload: CustomResponse, # exception which is expected to be thrown (so upload is expected to have failed) expected_exception_type: Optional[Type[BaseException]], - # if abort is expected to be called for multipart/resumable upload - expected_multipart_upload_aborted: bool, + # Whether abort is expected to be called for multipart/resumable upload, set to None if we don't care. + expected_multipart_upload_aborted: Optional[bool], expected_single_shot_upload: bool, ): self.name = name @@ -1303,7 +1303,7 @@ def __init__( self.multipart_upload_max_retries = multipart_upload_max_retries self.custom_response_on_single_shot_upload = custom_response_on_single_shot_upload self.expected_exception_type = expected_exception_type - self.expected_multipart_upload_aborted: bool = expected_multipart_upload_aborted + self.expected_multipart_upload_aborted: Optional[bool] = expected_multipart_upload_aborted self.expected_single_shot_upload = expected_single_shot_upload self.path = "/test.txt" @@ -1323,7 +1323,9 @@ def clear_state(self) -> None: logger.warning("Failed to remove temp file: %s", file_path) self.created_temp_files = [] - def get_upload_file(self, content: bytes, source_type: "UploadSourceType") -> Union[str, io.BytesIO]: + def get_upload_file( + self, content: bytes, source_type: "UploadSourceType" + ) -> Union[str, io.BytesIO, NonSeekableBuffer]: """Returns a file or stream to upload based on the source type.""" if source_type == UploadSourceType.FILE: with NamedTemporaryFile(mode="wb", delete=False) as f: @@ -1331,8 +1333,10 @@ def get_upload_file(self, content: bytes, source_type: "UploadSourceType") -> Un file_path = f.name self.created_temp_files.append(file_path) return file_path - elif source_type == UploadSourceType.STREAM: + elif source_type == UploadSourceType.SEEKABLE_STREAM: return io.BytesIO(content) + elif source_type == UploadSourceType.NONSEEKABLE_STREAM: + return NonSeekableBuffer(content) else: raise ValueError(f"Unknown source type: {source_type}") @@ -1446,7 +1450,8 @@ def upload() -> None: ), "Single-shot upload should not have succeeded" assert ( - multipart_server_state.aborted == self.expected_multipart_upload_aborted + self.expected_multipart_upload_aborted is None + or multipart_server_state.aborted == self.expected_multipart_upload_aborted ), "Multipart upload aborted state mismatch" finally: @@ -1462,7 +1467,8 @@ class UploadSourceType(Enum): """Source type for the upload. Used to determine how to upload the file.""" FILE = "file" # upload from a file on disk - STREAM = "stream" # upload from a stream (e.g. BytesIO) + SEEKABLE_STREAM = "seekable_stream" # upload from a seekable stream (e.g. BytesIO) + NONSEEKABLE_STREAM = "nonseekable_stream" # upload from a non-seekable stream (e.g. network stream) class MultipartUploadTestCase(UploadTestCase): @@ -1570,7 +1576,7 @@ def __init__( # if abort is expected to be called # expected part size expected_part_size: Optional[int] = None, - expected_multipart_upload_aborted: bool = False, + expected_multipart_upload_aborted: Optional[bool] = False, expected_single_shot_upload: bool = False, ): super().__init__( @@ -1578,7 +1584,8 @@ def __init__( content_size, cloud, overwrite, - source_type or [UploadSourceType.FILE, UploadSourceType.STREAM], + source_type + or [UploadSourceType.FILE, UploadSourceType.SEEKABLE_STREAM, UploadSourceType.NONSEEKABLE_STREAM], use_parallel or [False, True], parallelism, multipart_upload_min_stream_size, @@ -1710,6 +1717,7 @@ def processor() -> list: request_json = request.json() etags = {} + assert len(request_json["parts"]) > 0 for part in request_json["parts"]: etags[part["part_number"]] = part["etag"] @@ -1786,10 +1794,22 @@ def to_string(test_case: "MultipartUploadTestCase") -> str: [ # -------------------------- happy cases -------------------------- MultipartUploadTestCase( - "Multipart upload successful: single part", + "Multipart upload successful: single part because of small file", content_size=1024 * 1024, # less than part size - multipart_upload_part_size=10 * 1024 * 1024, + multipart_upload_min_stream_size=10 * 1024 * 1024, + source_type=[ + UploadSourceType.FILE, + UploadSourceType.SEEKABLE_STREAM, + ], # non-seekable streams always use multipart upload expected_part_size=1024 * 1024, # chunk size is used + expected_single_shot_upload=True, + ), + MultipartUploadTestCase( + "Multipart upload successful: empty file or empty seekable stream", + content_size=0, # less than part size + multipart_upload_min_stream_size=100 * 1024 * 1024, # all files smaller than 100M goes to single-shot + expected_single_shot_upload=True, + expected_multipart_upload_aborted=None, ), MultipartUploadTestCase( "Multipart upload successful: multiple parts (aligned)", @@ -2221,7 +2241,7 @@ def to_string(test_case: "MultipartUploadTestCase") -> str: "Multipart parallel upload for stream: Upload errors are not retried", content_size=10 * 1024 * 1024, multipart_upload_part_size=1024 * 1024, - source_type=[UploadSourceType.STREAM], + source_type=[UploadSourceType.SEEKABLE_STREAM], use_parallel=[True], parallelism=1, custom_response_on_upload=CustomResponse( @@ -2385,7 +2405,8 @@ def __init__( stream_size, cloud, overwrite, - source_type or [UploadSourceType.FILE, UploadSourceType.STREAM], + source_type + or [UploadSourceType.FILE, UploadSourceType.SEEKABLE_STREAM, UploadSourceType.NONSEEKABLE_STREAM], use_parallel or [True, False], # Resumable Upload doesn't support parallel uploading of parts, but fallback should work parallelism, From 504f30cb5dc80a9373b43a99d032c52afa226fcf Mon Sep 17 00:00:00 2001 From: Yuanjie Ding Date: Wed, 29 Oct 2025 13:47:54 +0100 Subject: [PATCH 2/2] address PR comments --- tests/test_files.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_files.py b/tests/test_files.py index 762771471..64947e9b8 100644 --- a/tests/test_files.py +++ b/tests/test_files.py @@ -1806,7 +1806,7 @@ def to_string(test_case: "MultipartUploadTestCase") -> str: ), MultipartUploadTestCase( "Multipart upload successful: empty file or empty seekable stream", - content_size=0, # less than part size + content_size=0, # content with zero length multipart_upload_min_stream_size=100 * 1024 * 1024, # all files smaller than 100M goes to single-shot expected_single_shot_upload=True, expected_multipart_upload_aborted=None,