diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index f4b5104ec..c530be8a7 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -6,8 +6,12 @@ ### Bug Fixes +- Fix a reported issue where `FilesExt` fails to retry if it receives certain status code from server. + ### Documentation ### Internal Changes +- Refactor unit tests for `FilesExt` to improve its readability. + ### API Changes diff --git a/databricks/sdk/mixins/files.py b/databricks/sdk/mixins/files.py index 8d9923b4f..2cdaf3532 100644 --- a/databricks/sdk/mixins/files.py +++ b/databricks/sdk/mixins/files.py @@ -28,7 +28,6 @@ from .._property import _cached_property from ..config import Config from ..errors import AlreadyExists, NotFound -from ..errors.customizer import _RetryAfterCustomizer from ..errors.mapper import _error_mapper from ..retries import retried from ..service import files @@ -577,6 +576,27 @@ def __repr__(self) -> str: return f"<_DbfsPath {self._path}>" +class _RetryableException(Exception): + """Base class for retryable exceptions in DBFS operations.""" + + def __init__(self, message: str, http_status_code: int): + super().__init__() + self.message = message + self.http_status_code = http_status_code + + def __str__(self) -> str: + return f"{self.message} (HTTP Status: {self.http_status_code})" + + @staticmethod + def make_error(response: requests.Response) -> "_RetryableException": + """Map the response to a retryable exception.""" + + return _RetryableException( + message=response.text, + http_status_code=response.status_code, + ) + + class DbfsExt(files.DbfsAPI): __doc__ = files.DbfsAPI.__doc__ @@ -885,7 +905,7 @@ def perform(): timeout=self._config.multipart_upload_single_chunk_upload_timeout_seconds, ) - upload_response = self._retry_idempotent_operation(perform, rewind) + upload_response = self._retry_cloud_idempotent_operation(perform, rewind) if upload_response.status_code in (200, 201): # Chunk upload successful @@ -1097,7 +1117,7 @@ def perform(): ) try: - return self._retry_idempotent_operation(perform) + return self._retry_cloud_idempotent_operation(perform) except RequestException: _LOG.warning("Failed to retrieve upload status") return None @@ -1116,7 +1136,7 @@ def perform(): # a 503 or 500 response, then you need to resume the interrupted upload from where it left off. # Let's follow that for all potentially retryable status codes. - # Together with the catch block below we replicate the logic in _retry_idempotent_operation(). + # Together with the catch block below we replicate the logic in _retry_databricks_idempotent_operation(). if upload_response.status_code in self._RETRYABLE_STATUS_CODES: if retry_count < self._config.multipart_upload_max_retries: retry_count += 1 @@ -1243,7 +1263,7 @@ def perform(): timeout=self._config.multipart_upload_single_chunk_upload_timeout_seconds, ) - abort_response = self._retry_idempotent_operation(perform) + abort_response = self._retry_cloud_idempotent_operation(perform) if abort_response.status_code not in (200, 201): raise ValueError(abort_response) @@ -1265,7 +1285,7 @@ def perform(): timeout=self._config.multipart_upload_single_chunk_upload_timeout_seconds, ) - abort_response = self._retry_idempotent_operation(perform) + abort_response = self._retry_cloud_idempotent_operation(perform) if abort_response.status_code not in (200, 201): raise ValueError(abort_response) @@ -1283,23 +1303,31 @@ def _create_cloud_provider_session(self): session.mount("http://", http_adapter) return session - def _retry_idempotent_operation( + def _retry_cloud_idempotent_operation( self, operation: Callable[[], requests.Response], before_retry: Callable = None ) -> requests.Response: - """Perform given idempotent operation with necessary retries. Since operation is idempotent it's - safe to retry it for response codes where server state might have changed. + """Perform given idempotent operation with necessary retries for requests to non Databricks APIs. + For cloud APIs, we will retry on network errors and on server response codes. + Since operation is idempotent it's safe to retry it for response codes where server state might have changed. """ - def delegate(): + def delegate() -> requests.Response: response = operation() if response.status_code in self._RETRYABLE_STATUS_CODES: - attrs = {} - # this will assign "retry_after_secs" to the attrs, essentially making exception look retryable - _RetryAfterCustomizer().customize_error(response, attrs) - raise _error_mapper(response, attrs) + raise _RetryableException.make_error(response) else: return response + def extended_is_retryable(e: BaseException) -> Optional[str]: + retry_reason_from_base = _BaseClient._is_retryable(e) + if retry_reason_from_base is not None: + return retry_reason_from_base + + if isinstance(e, _RetryableException): + # this is a retriable exception, but not a network error + return f"retryable exception (status_code:{e.http_status_code})" + return None + # following _BaseClient timeout retry_timeout_seconds = self._config.retry_timeout_seconds or 300 @@ -1307,7 +1335,7 @@ def delegate(): timeout=timedelta(seconds=retry_timeout_seconds), # also retry on network errors (connection error, connection timeout) # where we believe request didn't reach the server - is_retryable=_BaseClient._is_retryable, + is_retryable=extended_is_retryable, before_retry=before_retry, )(delegate)() diff --git a/tests/test_files.py b/tests/test_files.py index e25035523..d795c4649 100644 --- a/tests/test_files.py +++ b/tests/test_files.py @@ -57,10 +57,10 @@ def __init__( self.expected_requested_offsets = expected_requested_offsets @staticmethod - def to_string(test_case): + def to_string(test_case: "DownloadTestCase") -> str: return test_case.name - def run(self, config: Config): + def run(self, config: Config) -> None: config = config.copy() config.enable_experimental_files_api_client = self.enable_new_client config.files_api_client_download_max_total_recovers = self.max_recovers_total @@ -99,8 +99,8 @@ def __init__(self, test_case: DownloadTestCase): # following the signature of Session.request() def request( self, - method, - url, + method: str, + url: str, params=None, data=None, headers=None, @@ -108,14 +108,14 @@ def request( files=None, auth=None, timeout=None, - allow_redirects=True, + allow_redirects: bool = True, proxies=None, hooks=None, - stream=None, + stream: bool = None, verify=None, cert=None, json=None, - ): + ) -> "MockResponse": assert method == "GET" assert stream == True @@ -167,7 +167,7 @@ def __init__(self, session: MockSession, offset: int, request: MockRequest): self.ok = True self.url = request.url - def iter_content(self, chunk_size: int, decode_unicode: bool): + def iter_content(self, chunk_size: int, decode_unicode: bool) -> "MockIterator": assert decode_unicode == False return MockIterator(self, chunk_size) @@ -179,7 +179,7 @@ def __init__(self, response: MockResponse, chunk_size: int): self.chunk_size = chunk_size self.offset = 0 - def __next__(self): + def __next__(self) -> bytes: start_offset = self.response.offset + self.offset if start_offset == len(self.response.session.content): raise StopIteration @@ -198,7 +198,7 @@ def __next__(self): self.offset += len(result) return result - def close(self): + def close(self) -> None: pass @@ -270,7 +270,7 @@ class _Constants: expected_requested_offsets=[0], ), DownloadTestCase( - name="New client: no failures, file of 10 chunks", + name="New client: no failures, file of 10 parts", enable_new_client=True, file_size=10 * _Constants.underlying_chunk_size, failure_at_absolute_offset=[], @@ -404,7 +404,7 @@ class _Constants: ], ids=DownloadTestCase.to_string, ) -def test_download_recover(config: Config, test_case: DownloadTestCase): +def test_download_recover(config: Config, test_case: DownloadTestCase) -> None: test_case.run(config) @@ -415,96 +415,102 @@ def __init__(self, length: int, checksum: str): self.checksum = checksum @classmethod - def from_bytes(cls, data: bytes): + def from_bytes(cls, data: bytes) -> "FileContent": sha256 = hashlib.sha256() sha256.update(data) return FileContent(len(data), sha256.hexdigest()) - def __repr__(self): + def __repr__(self) -> str: return f"Length: {self._length}, checksum: {self.checksum}" - def __eq__(self, other): + def __eq__(self, other: object) -> bool: if not isinstance(other, FileContent): return NotImplemented return self._length == other._length and self.checksum == other.checksum class MultipartUploadServerState: - upload_chunk_url_prefix = "https://cloud_provider.com/upload-chunk/" + """This server state is updated on multipart upload (AWS, Azure)""" + + upload_part_url_prefix = "https://cloud_provider.com/upload-part/" abort_upload_url_prefix = "https://cloud_provider.com/abort-upload/" def __init__(self): self.issued_multipart_urls = {} # part_number -> expiration_time - self.uploaded_chunks = {} # part_number -> [chunk file path, etag] + self.uploaded_parts = {} # part_number -> [part file path, etag] self.session_token = "token-" + MultipartUploadServerState.randomstr() self.file_content = None self.issued_abort_url_expire_time = None self.aborted = False - def create_upload_chunk_url(self, path: str, part_number: int, expire_time: datetime) -> str: + def create_upload_part_url(self, path: str, part_number: int, expire_time: datetime) -> str: assert not self.aborted # client may have requested a URL for the same part if retrying on network error self.issued_multipart_urls[part_number] = expire_time - return f"{self.upload_chunk_url_prefix}{path}/{part_number}" + return f"{self.upload_part_url_prefix}{path}/{part_number}" def create_abort_url(self, path: str, expire_time: datetime) -> str: assert not self.aborted self.issued_abort_url_expire_time = expire_time return f"{self.abort_upload_url_prefix}{path}" - def save_part(self, part_number: int, part_content: bytes, etag: str): + def save_part(self, part_number: int, part_content: bytes, etag: str) -> None: assert not self.aborted assert len(part_content) > 0 logger.info(f"Saving part {part_number} of size {len(part_content)}") - # chunk might already have been uploaded - existing_chunk = self.uploaded_chunks.get(part_number) - if existing_chunk: - chunk_file = existing_chunk[0] - with open(chunk_file, "wb") as f: + # part might already have been uploaded + existing_part = self.uploaded_parts.get(part_number) + if existing_part: + part_file = existing_part[0] + with open(part_file, "wb") as f: # overwrite f.write(part_content) else: - fd, chunk_file = mkstemp() + fd, part_file = mkstemp() with open(fd, "wb") as f: f.write(part_content) - self.uploaded_chunks[part_number] = [chunk_file, etag] + self.uploaded_parts[part_number] = [part_file, etag] - def cleanup(self): - for [file, _] in self.uploaded_chunks.values(): + def cleanup(self) -> None: + for [file, _] in self.uploaded_parts.values(): os.remove(file) - def get_file_content(self) -> FileContent: - assert not self.aborted + def get_file_content(self) -> Optional[FileContent]: + if self.aborted: + assert not self.file_content + + # content may be None even for a non-aborted upload, + # in case single-shot upload was used due to small stream size. return self.file_content - def upload_complete(self, etags: dict): + def upload_complete(self, etags: dict) -> None: assert not self.aborted # validate etags expected_etags = {} - for part_number in self.uploaded_chunks.keys(): - expected_etags[part_number] = self.uploaded_chunks[part_number][1] + for part_number in self.uploaded_parts.keys(): + expected_etags[part_number] = self.uploaded_parts[part_number][1] assert etags == expected_etags size = 0 sha256 = hashlib.sha256() - sorted_chunks = sorted(self.uploaded_chunks.keys()) - for part_number in sorted_chunks: - [chunk_path, _] = self.uploaded_chunks[part_number] - size += os.path.getsize(chunk_path) - with open(chunk_path, "rb") as f: - chunk_content = f.read() - sha256.update(chunk_content) + sorted_parts = sorted(self.uploaded_parts.keys()) + for part_number in sorted_parts: + [part_path, _] = self.uploaded_parts[part_number] + size += os.path.getsize(part_path) + with open(part_path, "rb") as f: + part_content = f.read() + sha256.update(part_content) self.file_content = FileContent(size, sha256.hexdigest()) - def abort_upload(self): + def abort_upload(self) -> None: self.aborted = True @staticmethod - def randomstr(): + def randomstr() -> str: return f"{random.randrange(10000)}-{int(time.time())}" @@ -521,7 +527,7 @@ def __init__( # If False, default response is always returned. # If True, response is defined by the current invocation count # with respect to first_invocation / last_invocation / only_invocation - enabled=True, + enabled: bool = True, # Custom code to return code: Optional[int] = 200, # Custom body to return @@ -555,7 +561,7 @@ def __init__( self.invocation_count = 0 - def invocation_matches(self): + def invocation_matches(self) -> bool: if not self.enabled: return False @@ -570,7 +576,7 @@ def invocation_matches(self): return False return True - def generate_response(self, request: requests.Request, processor: Callable[[], list]): + def generate_response(self, request: requests.Request, processor: Callable[[], list]) -> requests.Response: activate_for_current_invocation = self.invocation_matches() if activate_for_current_invocation and self.exception and self.exception_happened_before_processing: @@ -608,13 +614,149 @@ def generate_response(self, request: requests.Request, processor: Callable[[], l return resp -class MultipartUploadTestCase: +class SingleShotUploadServerState: + """This server state is updated on single-shot upload""" + + def __init__(self): + self.file_content: Optional[FileContent] = None + + def cleanup(self) -> None: + pass + + def upload(self, content: bytes) -> None: + self.file_content = FileContent.from_bytes(content) + + def get_file_content(self) -> Optional[FileContent]: + return self.file_content + + +class UploadTestCase: + """Base class for upload test cases""" + + def __init__( + self, + name: str, + stream_size: int, + overwrite: bool, + multipart_upload_min_stream_size: int, + multipart_upload_chunk_size: Optional[int], + sdk_retry_timeout_seconds: Optional[int], + multipart_upload_max_retries: Optional[int], + custom_response_on_single_shot_upload: CustomResponse, + # exception which is expected to be thrown (so upload is expected to have failed) + expected_exception_type: Optional[Type[BaseException]], + # if abort is expected to be called for multipart/resumable upload + expected_multipart_upload_aborted: bool, + expected_single_shot_upload: bool, + ): + self.name = name + self.stream_size = stream_size + self.overwrite = overwrite + self.multipart_upload_min_stream_size = multipart_upload_min_stream_size + self.multipart_upload_chunk_size = multipart_upload_chunk_size + self.sdk_retry_timeout_seconds = sdk_retry_timeout_seconds + self.multipart_upload_max_retries = multipart_upload_max_retries + self.custom_response_on_single_shot_upload = custom_response_on_single_shot_upload + self.expected_exception_type = expected_exception_type + self.expected_multipart_upload_aborted: bool = expected_multipart_upload_aborted + self.expected_single_shot_upload = expected_single_shot_upload + + self.path = "/test.txt" + + def customize_config(self, config: Config) -> None: + pass + + def create_multipart_upload_server_state(self) -> Union[MultipartUploadServerState, "ResumableUploadServerState"]: + raise NotImplementedError + + def match_request_to_response( + self, request: requests.Request, server_state: Union[MultipartUploadServerState, "ResumableUploadServerState"] + ) -> Optional[requests.Response]: + raise NotImplementedError + + def run(self, config: Config) -> None: + config = config.copy() + config.enable_experimental_files_api_client = True + + if self.sdk_retry_timeout_seconds: + config.retry_timeout_seconds = self.sdk_retry_timeout_seconds + if self.multipart_upload_chunk_size: + config.multipart_upload_chunk_size = self.multipart_upload_chunk_size + if self.multipart_upload_max_retries: + config.multipart_upload_max_retries = self.multipart_upload_max_retries + + config.multipart_upload_min_stream_size = self.multipart_upload_min_stream_size + + pat_token = "some_pat_token" + config._header_factory = lambda: {"Authorization": f"Bearer {pat_token}"} + + self.customize_config(config) + + multipart_server_state = self.create_multipart_upload_server_state() + single_shot_server_state = SingleShotUploadServerState() + + file_content = os.urandom(self.stream_size) + w = WorkspaceClient(config=config) + + try: + with requests_mock.Mocker() as session_mock: + + def custom_matcher(request: requests.Request) -> Optional[requests.Response]: + # first, try to match single-shot upload + parsed_url = urlparse(request.url) + if ( + parsed_url.hostname == "localhost" + and parsed_url.path == f"/api/2.0/fs/files{self.path}" + and request.method == "PUT" + and not parsed_url.params + ): + + def processor() -> list: + body = request.body.read() + single_shot_server_state.upload(body) + return [200, "", {}] + + return self.custom_response_on_single_shot_upload.generate_response(request, processor) + + # otherwise fall back to specific matcher from the test case + return self.match_request_to_response(request, multipart_server_state) + + session_mock.add_matcher(matcher=custom_matcher) + + def upload() -> None: + w.files.upload(self.path, io.BytesIO(file_content), overwrite=self.overwrite) + + if self.expected_exception_type is not None: + with pytest.raises(self.expected_exception_type): + upload() + assert not single_shot_server_state.get_file_content() + assert not multipart_server_state.get_file_content() + else: + upload() + if self.expected_single_shot_upload: + assert single_shot_server_state.get_file_content() == FileContent.from_bytes(file_content) + assert not multipart_server_state.get_file_content() + else: + assert multipart_server_state.get_file_content() == FileContent.from_bytes(file_content) + assert not single_shot_server_state.get_file_content() + + assert multipart_server_state.aborted == self.expected_multipart_upload_aborted + + finally: + multipart_server_state.cleanup() + + @staticmethod + def is_auth_header_present(r: requests.Request) -> bool: + return r.headers.get("Authorization") is not None + + +class MultipartUploadTestCase(UploadTestCase): """Test case for multipart upload of a file. Multipart uploads are used on AWS and Azure. Multipart upload via presigned URLs involves multiple HTTP requests: - initiating upload (call to Databricks Files API) - requesting upload part URLs (calls to Databricks Files API) - - uploading data in chunks (calls to cloud storage provider or Databricks storage proxy) + - uploading data in parts (calls to cloud storage provider or Databricks storage proxy) - completing the upload (call to Databricks Files API) - requesting abort upload URL (call to Databricks Files API) - aborting the upload (call to cloud storage provider or Databricks storage proxy) @@ -625,9 +767,7 @@ class MultipartUploadTestCase: Response of each call can be modified by parameterising a respective `CustomResponse` object. """ - path = "/test.txt" - - expired_url_aws_response = ( + expired_url_aws_response: str = ( '' "AuthenticationFailedServer failed to authenticate " "the request. Make sure the value of Authorization header is formed " @@ -639,7 +779,7 @@ class MultipartUploadTestCase: "GMT]" ) - expired_url_azure_response = ( + expired_url_azure_response: str = ( '\nAccessDenied' "Request has expired" "142025-01-01T17:47:13Z" @@ -648,32 +788,43 @@ class MultipartUploadTestCase: "" ) - # TODO test for overwrite = false - def __init__( self, name: str, stream_size: int, # size of uploaded file or, technically, stream + overwrite: bool = True, # TODO test for overwrite = false + multipart_upload_min_stream_size: int = 0, # disable single-shot uploads by default multipart_upload_chunk_size: Optional[int] = None, sdk_retry_timeout_seconds: Optional[int] = None, multipart_upload_max_retries: Optional[int] = None, multipart_upload_batch_url_count: Optional[int] = None, - custom_response_on_initiate=CustomResponse(enabled=False), - custom_response_on_create_multipart_url=CustomResponse(enabled=False), - custom_response_on_upload=CustomResponse(enabled=False), - custom_response_on_complete=CustomResponse(enabled=False), - custom_response_on_create_abort_url=CustomResponse(enabled=False), - custom_response_on_abort=CustomResponse(enabled=False), + custom_response_on_single_shot_upload: CustomResponse = CustomResponse(enabled=False), + custom_response_on_initiate: CustomResponse = CustomResponse(enabled=False), + custom_response_on_create_multipart_url: CustomResponse = CustomResponse(enabled=False), + custom_response_on_upload: CustomResponse = CustomResponse(enabled=False), + custom_response_on_complete: CustomResponse = CustomResponse(enabled=False), + custom_response_on_create_abort_url: CustomResponse = CustomResponse(enabled=False), + custom_response_on_abort: CustomResponse = CustomResponse(enabled=False), # exception which is expected to be thrown (so upload is expected to have failed) expected_exception_type: Optional[Type[BaseException]] = None, # if abort is expected to be called - expected_aborted: bool = False, + expected_multipart_upload_aborted: bool = False, + expected_single_shot_upload: bool = False, ): - self.name = name - self.stream_size = stream_size - self.multipart_upload_chunk_size = multipart_upload_chunk_size - self.sdk_retry_timeout_seconds = sdk_retry_timeout_seconds - self.multipart_upload_max_retries = multipart_upload_max_retries + super().__init__( + name, + stream_size, + overwrite, + multipart_upload_min_stream_size, + multipart_upload_chunk_size, + sdk_retry_timeout_seconds, + multipart_upload_max_retries, + custom_response_on_single_shot_upload, + expected_exception_type, + expected_multipart_upload_aborted, + expected_single_shot_upload, + ) + self.multipart_upload_batch_url_count = multipart_upload_batch_url_count self.custom_response_on_initiate = copy.deepcopy(custom_response_on_initiate) self.custom_response_on_create_multipart_url = copy.deepcopy(custom_response_on_create_multipart_url) @@ -681,154 +832,145 @@ def __init__( self.custom_response_on_complete = copy.deepcopy(custom_response_on_complete) self.custom_response_on_create_abort_url = copy.deepcopy(custom_response_on_create_abort_url) self.custom_response_on_abort = copy.deepcopy(custom_response_on_abort) - self.expected_exception_type = expected_exception_type - self.expected_aborted: bool = expected_aborted - - def setup_session_mock(self, session_mock: requests_mock.Mocker, server_state: MultipartUploadServerState): - - def custom_matcher(request): - request_url = urlparse(request.url) - request_query = parse_qs(request_url.query) - - # initial request - if ( - request_url.hostname == "localhost" - and request_url.path == f"/api/2.0/fs/files{MultipartUploadTestCase.path}" - and request_query.get("action") == ["initiate-upload"] - and request.method == "POST" - ): - assert MultipartUploadTestCase.is_auth_header_present(request) - assert request.text is None - - def processor(): - response_json = {"multipart_upload": {"session_token": server_state.session_token}} - return [200, json.dumps(response_json), {}] - - return self.custom_response_on_initiate.generate_response(request, processor) - - # multipart upload, create upload part URLs - elif ( - request_url.hostname == "localhost" - and request_url.path == "/api/2.0/fs/create-upload-part-urls" - and request.method == "POST" - ): - - assert MultipartUploadTestCase.is_auth_header_present(request) - - request_json = request.json() - assert request_json.keys() == {"count", "expire_time", "path", "session_token", "start_part_number"} - assert request_json["path"] == self.path - assert request_json["session_token"] == server_state.session_token - - start_part_number = int(request_json["start_part_number"]) - count = int(request_json["count"]) - assert count >= 1 + def customize_config(self, config: Config) -> None: + if self.multipart_upload_batch_url_count: + config.multipart_upload_batch_url_count = self.multipart_upload_batch_url_count - expire_time = MultipartUploadTestCase.parse_and_validate_expire_time(request_json["expire_time"]) + def create_multipart_upload_server_state(self) -> MultipartUploadServerState: + return MultipartUploadServerState() + + def match_request_to_response( + self, request: requests.Request, server_state: MultipartUploadServerState + ) -> Optional[requests.Response]: + request_url = urlparse(request.url) + request_query = parse_qs(request_url.query) + + # initial request + if ( + request_url.hostname == "localhost" + and request_url.path == f"/api/2.0/fs/files{self.path}" + and request_query.get("action") == ["initiate-upload"] + and request.method == "POST" + ): + + assert UploadTestCase.is_auth_header_present(request) + assert request.text is None + + def processor() -> list: + response_json = {"multipart_upload": {"session_token": server_state.session_token}} + return [200, json.dumps(response_json), {}] + + return self.custom_response_on_initiate.generate_response(request, processor) + + # multipart upload, create upload part URLs + elif ( + request_url.hostname == "localhost" + and request_url.path == "/api/2.0/fs/create-upload-part-urls" + and request.method == "POST" + ): + + assert UploadTestCase.is_auth_header_present(request) + + request_json = request.json() + assert request_json.keys() == {"count", "expire_time", "path", "session_token", "start_part_number"} + assert request_json["path"] == self.path + assert request_json["session_token"] == server_state.session_token + + start_part_number = int(request_json["start_part_number"]) + count = int(request_json["count"]) + assert count >= 1 + + expire_time = MultipartUploadTestCase.parse_and_validate_expire_time(request_json["expire_time"]) + + def processor() -> list: + response_nodes = [] + for part_number in range(start_part_number, start_part_number + count): + upload_part_url = server_state.create_upload_part_url(self.path, part_number, expire_time) + response_nodes.append( + { + "part_number": part_number, + "url": upload_part_url, + "headers": [{"name": "name1", "value": "value1"}], + } + ) - def processor(): - response_nodes = [] - for part_number in range(start_part_number, start_part_number + count): - upload_part_url = server_state.create_upload_chunk_url(self.path, part_number, expire_time) - response_nodes.append( - { - "part_number": part_number, - "url": upload_part_url, - "headers": [{"name": "name1", "value": "value1"}], - } - ) + response_json = {"upload_part_urls": response_nodes} + return [200, json.dumps(response_json), {}] - response_json = {"upload_part_urls": response_nodes} - return [200, json.dumps(response_json), {}] + return self.custom_response_on_create_multipart_url.generate_response(request, processor) - return self.custom_response_on_create_multipart_url.generate_response(request, processor) + # multipart upload, uploading part + elif request.url.startswith(MultipartUploadServerState.upload_part_url_prefix) and request.method == "PUT": - # multipart upload, uploading part - elif request.url.startswith(MultipartUploadServerState.upload_chunk_url_prefix) and request.method == "PUT": + assert not UploadTestCase.is_auth_header_present(request) - assert not MultipartUploadTestCase.is_auth_header_present(request) + url_path = request.url[len(MultipartUploadServerState.upload_part_url_prefix) :] + part_num = url_path.split("/")[-1] + assert url_path[: -len(part_num) - 1] == self.path - url_path = request.url[len(MultipartUploadServerState.abort_upload_url_prefix) :] - part_num = url_path.split("/")[-1] - assert url_path[: -len(part_num) - 1] == self.path + def processor() -> list: + body = request.body.read() + etag = "etag-" + MultipartUploadServerState.randomstr() + server_state.save_part(int(part_num), body, etag) + return [200, "", {"ETag": etag}] - def processor(): - body = request.body.read() - etag = "etag-" + MultipartUploadServerState.randomstr() - server_state.save_part(int(part_num), body, etag) - return [200, "", {"ETag": etag}] + return self.custom_response_on_upload.generate_response(request, processor) - return self.custom_response_on_upload.generate_response(request, processor) + # multipart upload, completion + elif ( + request_url.hostname == "localhost" + and request_url.path == f"/api/2.0/fs/files{self.path}" + and request_query.get("action") == ["complete-upload"] + and request_query.get("upload_type") == ["multipart"] + and request.method == "POST" + ): - # multipart upload, completion - elif ( - request_url.hostname == "localhost" - and request_url.path == f"/api/2.0/fs/files{MultipartUploadTestCase.path}" - and request_query.get("action") == ["complete-upload"] - and request_query.get("upload_type") == ["multipart"] - and request.method == "POST" - ): + assert UploadTestCase.is_auth_header_present(request) + assert [server_state.session_token] == request_query.get("session_token") - assert MultipartUploadTestCase.is_auth_header_present(request) - assert [server_state.session_token] == request_query.get("session_token") + def processor() -> list: + request_json = request.json() + etags = {} - def processor(): - request_json = request.json() - etags = {} + for part in request_json["parts"]: + etags[part["part_number"]] = part["etag"] - for part in request_json["parts"]: - etags[part["part_number"]] = part["etag"] + server_state.upload_complete(etags) + return [200, "", {}] - server_state.upload_complete(etags) - return [200, "", {}] + return self.custom_response_on_complete.generate_response(request, processor) - return self.custom_response_on_complete.generate_response(request, processor) + # create abort URL + elif request.url == "http://localhost/api/2.0/fs/create-abort-upload-url" and request.method == "POST": + assert UploadTestCase.is_auth_header_present(request) + request_json = request.json() + assert request_json["path"] == self.path + expire_time = MultipartUploadTestCase.parse_and_validate_expire_time(request_json["expire_time"]) - # create abort URL - elif request.url == "http://localhost/api/2.0/fs/create-abort-upload-url" and request.method == "POST": - assert MultipartUploadTestCase.is_auth_header_present(request) - request_json = request.json() - assert request_json["path"] == self.path - expire_time = MultipartUploadTestCase.parse_and_validate_expire_time(request_json["expire_time"]) - - def processor(): - response_json = { - "abort_upload_url": { - "url": server_state.create_abort_url(self.path, expire_time), - "headers": [{"name": "header1", "value": "headervalue1"}], - } + def processor() -> list: + response_json = { + "abort_upload_url": { + "url": server_state.create_abort_url(self.path, expire_time), + "headers": [{"name": "header1", "value": "headervalue1"}], } - return [200, json.dumps(response_json), {}] - - return self.custom_response_on_create_abort_url.generate_response(request, processor) - - # abort upload - elif ( - request.url.startswith(MultipartUploadServerState.abort_upload_url_prefix) - and request.method == "DELETE" - ): - assert not MultipartUploadTestCase.is_auth_header_present(request) - assert request.url[len(MultipartUploadServerState.abort_upload_url_prefix) :] == self.path + } + return [200, json.dumps(response_json), {}] - def processor(): - server_state.abort_upload() - return [200, "", {}] - - return self.custom_response_on_abort.generate_response(request, processor) + return self.custom_response_on_create_abort_url.generate_response(request, processor) - return None + # abort upload + elif request.url.startswith(MultipartUploadServerState.abort_upload_url_prefix) and request.method == "DELETE": + assert not UploadTestCase.is_auth_header_present(request) + assert request.url[len(MultipartUploadServerState.abort_upload_url_prefix) :] == self.path - session_mock.add_matcher(matcher=custom_matcher) + def processor() -> list: + server_state.abort_upload() + return [200, "", {}] - @staticmethod - def setup_token_auth(config: Config): - pat_token = "some_pat_token" - config._header_factory = lambda: {"Authorization": f"Bearer {pat_token}"} + return self.custom_response_on_abort.generate_response(request, processor) - @staticmethod - def is_auth_header_present(r: requests.Request): - return r.headers.get("Authorization") is not None + return None @staticmethod def parse_and_validate_expire_time(s: str) -> datetime: @@ -839,52 +981,11 @@ def parse_and_validate_expire_time(s: str) -> datetime: assert now < expire_time < max_expiration return expire_time - def run(self, config: Config): - config = config.copy() - - MultipartUploadTestCase.setup_token_auth(config) - - if self.sdk_retry_timeout_seconds: - config.retry_timeout_seconds = self.sdk_retry_timeout_seconds - if self.multipart_upload_chunk_size: - config.multipart_upload_chunk_size = self.multipart_upload_chunk_size - if self.multipart_upload_max_retries: - config.multipart_upload_max_retries = self.multipart_upload_max_retries - if self.multipart_upload_batch_url_count: - config.multipart_upload_batch_url_count = self.multipart_upload_batch_url_count - config.enable_experimental_files_api_client = True - config.multipart_upload_min_stream_size = 0 # disable single-shot uploads - - file_content = os.urandom(self.stream_size) - - upload_state = MultipartUploadServerState() - - try: - w = WorkspaceClient(config=config) - with requests_mock.Mocker() as session_mock: - self.setup_session_mock(session_mock, upload_state) - - def upload(): - w.files.upload("/test.txt", io.BytesIO(file_content), overwrite=True) - - if self.expected_exception_type is not None: - with pytest.raises(self.expected_exception_type): - upload() - else: - upload() - actual_content = upload_state.get_file_content() - assert actual_content == FileContent.from_bytes(file_content) - - assert upload_state.aborted == self.expected_aborted - - finally: - upload_state.cleanup() - - def __str__(self): + def __str__(self) -> str: return self.name @staticmethod - def to_string(test_case): + def to_string(test_case: "MultipartUploadTestCase") -> str: return str(test_case) @@ -895,37 +996,38 @@ def to_string(test_case): MultipartUploadTestCase( "Initiate: 400 response is not retried", stream_size=1024 * 1024, + multipart_upload_min_stream_size=1024 * 1024, # still multipart upload is used custom_response_on_initiate=CustomResponse(code=400, only_invocation=1), expected_exception_type=BadRequest, - expected_aborted=False, # upload didn't start + expected_multipart_upload_aborted=False, # upload didn't start ), MultipartUploadTestCase( "Initiate: 403 response is not retried", stream_size=1024 * 1024, custom_response_on_initiate=CustomResponse(code=403, only_invocation=1), expected_exception_type=PermissionDenied, - expected_aborted=False, # upload didn't start + expected_multipart_upload_aborted=False, # upload didn't start ), MultipartUploadTestCase( "Initiate: 500 response is not retried", stream_size=1024 * 1024, custom_response_on_initiate=CustomResponse(code=500, only_invocation=1), expected_exception_type=InternalError, - expected_aborted=False, # upload didn't start + expected_multipart_upload_aborted=False, # upload didn't start ), MultipartUploadTestCase( "Initiate: non-JSON response is not retried", stream_size=1024 * 1024, custom_response_on_initiate=CustomResponse(body="this is not a JSON", only_invocation=1), expected_exception_type=requests.exceptions.JSONDecodeError, - expected_aborted=False, # upload didn't start + expected_multipart_upload_aborted=False, # upload didn't start ), MultipartUploadTestCase( "Initiate: meaningless JSON response is not retried", stream_size=1024 * 1024, custom_response_on_initiate=CustomResponse(body='{"foo": 123}', only_invocation=1), expected_exception_type=ValueError, - expected_aborted=False, # upload didn't start + expected_multipart_upload_aborted=False, # upload didn't start ), MultipartUploadTestCase( "Initiate: no session token in response is not retried", @@ -934,7 +1036,7 @@ def to_string(test_case): body='{"multipart_upload":{"session_token1": "token123"}}', only_invocation=1 ), expected_exception_type=ValueError, - expected_aborted=False, # upload didn't start + expected_multipart_upload_aborted=False, # upload didn't start ), MultipartUploadTestCase( "Initiate: permanent retryable exception", @@ -942,7 +1044,7 @@ def to_string(test_case): custom_response_on_initiate=CustomResponse(exception=requests.ConnectionError), sdk_retry_timeout_seconds=30, # let's not wait 5 min (SDK default timeout) expected_exception_type=TimeoutError, # SDK throws this if retries are taking too long - expected_aborted=False, # upload didn't start + expected_multipart_upload_aborted=False, # upload didn't start ), MultipartUploadTestCase( "Initiate: intermittent retryable exception", @@ -953,7 +1055,7 @@ def to_string(test_case): first_invocation=1, last_invocation=3, ), - expected_aborted=False, + expected_multipart_upload_aborted=False, ), MultipartUploadTestCase( "Initiate: intermittent retryable status code", @@ -964,11 +1066,11 @@ def to_string(test_case): first_invocation=1, last_invocation=3, ), - expected_aborted=False, + expected_multipart_upload_aborted=False, ), # -------------------------- failures on "create upload URL" -------------------------- MultipartUploadTestCase( - "Create upload URL: 400 response is not retied", + "Create upload URL: 400 response is not retried", stream_size=1024 * 1024, custom_response_on_create_multipart_url=CustomResponse( code=400, @@ -976,35 +1078,35 @@ def to_string(test_case): only_invocation=1, ), expected_exception_type=BadRequest, - expected_aborted=True, + expected_multipart_upload_aborted=True, ), MultipartUploadTestCase( - "Create upload URL: 500 error is not retied", + "Create upload URL: 403 response is not retried", stream_size=1024 * 1024, custom_response_on_create_multipart_url=CustomResponse(code=500, only_invocation=1), expected_exception_type=InternalError, - expected_aborted=True, + expected_multipart_upload_aborted=True, ), MultipartUploadTestCase( "Create upload URL: non-JSON response is not retried", stream_size=1024 * 1024, custom_response_on_create_multipart_url=CustomResponse(body="this is not a JSON", only_invocation=1), expected_exception_type=requests.exceptions.JSONDecodeError, - expected_aborted=True, + expected_multipart_upload_aborted=True, ), MultipartUploadTestCase( "Create upload URL: meaningless JSON response is not retried", stream_size=1024 * 1024, custom_response_on_create_multipart_url=CustomResponse(body='{"foo":123}', only_invocation=1), expected_exception_type=ValueError, - expected_aborted=True, + expected_multipart_upload_aborted=True, ), MultipartUploadTestCase( "Create upload URL: meaningless JSON response is not retried 2", stream_size=1024 * 1024, custom_response_on_create_multipart_url=CustomResponse(body='{"upload_part_urls":[]}', only_invocation=1), expected_exception_type=ValueError, - expected_aborted=True, + expected_multipart_upload_aborted=True, ), MultipartUploadTestCase( "Create upload URL: meaningless JSON response is not retried 3", @@ -1013,7 +1115,7 @@ def to_string(test_case): body='{"upload_part_urls":[{"url":""}]}', only_invocation=1 ), expected_exception_type=KeyError, # TODO we might want to make JSON parsing more reliable - expected_aborted=True, + expected_multipart_upload_aborted=True, ), MultipartUploadTestCase( "Create upload URL: permanent retryable exception", @@ -1021,7 +1123,7 @@ def to_string(test_case): custom_response_on_create_multipart_url=CustomResponse(exception=requests.ConnectionError), sdk_retry_timeout_seconds=30, # don't wait for 5 min (SDK default timeout) expected_exception_type=TimeoutError, - expected_aborted=True, + expected_multipart_upload_aborted=True, ), MultipartUploadTestCase( "Create upload URL: intermittent retryable exception", @@ -1031,11 +1133,11 @@ def to_string(test_case): # happens only once, retry succeeds only_invocation=1, ), - expected_aborted=False, + expected_multipart_upload_aborted=False, ), MultipartUploadTestCase( "Create upload URL: intermittent retryable exception 2", - stream_size=100 * 1024 * 1024, # 10 chunks + stream_size=100 * 1024 * 1024, # 10 parts multipart_upload_chunk_size=10 * 1024 * 1024, custom_response_on_create_multipart_url=CustomResponse( exception=requests.Timeout, @@ -1043,12 +1145,23 @@ def to_string(test_case): first_invocation=4, last_invocation=6, ), - expected_aborted=False, + expected_multipart_upload_aborted=False, ), - # -------------------------- failures on chunk upload -------------------------- MultipartUploadTestCase( - "Upload chunk: 403 response is not retried", - stream_size=100 * 1024 * 1024, # 10 chunks + "Create upload URL: intermittent retryable exception 3", + stream_size=1024 * 1024, + multipart_upload_chunk_size=10 * 1024 * 1024, + custom_response_on_create_multipart_url=CustomResponse( + code=500, + first_invocation=4, + last_invocation=6, + ), + expected_multipart_upload_aborted=False, + ), + # -------------------------- failures on part upload -------------------------- + MultipartUploadTestCase( + "Upload part: 403 response is not retried", + stream_size=100 * 1024 * 1024, # 10 parts multipart_upload_chunk_size=10 * 1024 * 1024, custom_response_on_upload=CustomResponse( code=403, @@ -1056,41 +1169,33 @@ def to_string(test_case): only_invocation=1, ), expected_exception_type=PermissionDenied, - expected_aborted=True, + expected_multipart_upload_aborted=True, ), MultipartUploadTestCase( - "Upload chunk: 400 response is not retried", - stream_size=100 * 1024 * 1024, # 10 chunks + "Upload part: 400 response is not retried", + stream_size=100 * 1024 * 1024, # 10 parts multipart_upload_chunk_size=10 * 1024 * 1024, custom_response_on_upload=CustomResponse( code=400, - # fail once, but not on the first chunk + # fail once, but not on the first part only_invocation=3, ), expected_exception_type=BadRequest, - expected_aborted=True, + expected_multipart_upload_aborted=True, ), MultipartUploadTestCase( - "Upload chunk: 500 response is not retried", - stream_size=100 * 1024 * 1024, # 10 chunks - multipart_upload_chunk_size=10 * 1024 * 1024, - custom_response_on_upload=CustomResponse(code=500, only_invocation=5), - expected_exception_type=InternalError, - expected_aborted=True, - ), - MultipartUploadTestCase( - "Upload chunk: expired URL is retried on AWS", - stream_size=100 * 1024 * 1024, # 10 chunks + "Upload part: expired URL is retried on AWS", + stream_size=100 * 1024 * 1024, # 10 parts multipart_upload_chunk_size=10 * 1024 * 1024, custom_response_on_upload=CustomResponse( code=403, body=MultipartUploadTestCase.expired_url_aws_response, only_invocation=2 ), - expected_aborted=False, + expected_multipart_upload_aborted=False, ), MultipartUploadTestCase( - "Upload chunk: expired URL is retried on Azure", + "Upload part: expired URL is retried on Azure", multipart_upload_max_retries=3, - stream_size=100 * 1024 * 1024, # 10 chunks + stream_size=100 * 1024 * 1024, # 10 parts multipart_upload_chunk_size=10 * 1024 * 1024, custom_response_on_upload=CustomResponse( code=403, @@ -1099,10 +1204,10 @@ def to_string(test_case): first_invocation=2, last_invocation=4, ), - expected_aborted=False, + expected_multipart_upload_aborted=False, ), MultipartUploadTestCase( - "Upload chunk: expired URL is retried on Azure, requesting urls by 6", + "Upload part: expired URL is retried on Azure, requesting urls by 6", multipart_upload_max_retries=3, multipart_upload_batch_url_count=6, stream_size=100 * 1024 * 1024, # 100 chunks @@ -1114,12 +1219,12 @@ def to_string(test_case): first_invocation=2, last_invocation=4, ), - expected_aborted=False, + expected_multipart_upload_aborted=False, ), MultipartUploadTestCase( - "Upload chunk: expired URL retry is exhausted", + "Upload part: expired URL retry is exhausted", multipart_upload_max_retries=3, - stream_size=100 * 1024 * 1024, # 10 chunks + stream_size=100 * 1024 * 1024, # 10 parts multipart_upload_chunk_size=10 * 1024 * 1024, custom_response_on_upload=CustomResponse( code=403, @@ -1129,76 +1234,75 @@ def to_string(test_case): last_invocation=5, ), expected_exception_type=ValueError, - expected_aborted=True, + expected_multipart_upload_aborted=True, ), MultipartUploadTestCase( - "Upload chunk: permanent retryable error", - stream_size=100 * 1024 * 1024, # 10 chunks + "Upload part: permanent retryable error", + stream_size=100 * 1024 * 1024, # 10 parts multipart_upload_chunk_size=10 * 1024 * 1024, sdk_retry_timeout_seconds=30, # don't wait for 5 min (SDK default timeout) custom_response_on_upload=CustomResponse(exception=requests.ConnectionError, first_invocation=8), expected_exception_type=TimeoutError, - expected_aborted=True, + expected_multipart_upload_aborted=True, ), MultipartUploadTestCase( - "Upload chunk: permanent retryable status code", - stream_size=100 * 1024 * 1024, # 10 chunks + "Upload part: permanent retryable status code", + stream_size=100 * 1024 * 1024, # 10 parts multipart_upload_chunk_size=10 * 1024 * 1024, sdk_retry_timeout_seconds=30, # don't wait for 5 min (SDK default timeout) custom_response_on_upload=CustomResponse(code=429, first_invocation=8), expected_exception_type=TimeoutError, - expected_aborted=True, + expected_multipart_upload_aborted=True, ), MultipartUploadTestCase( - "Upload chunk: intermittent retryable error", - stream_size=100 * 1024 * 1024, # 10 chunks + "Upload part: intermittent retryable error", + stream_size=100 * 1024 * 1024, # 10 parts multipart_upload_chunk_size=10 * 1024 * 1024, custom_response_on_upload=CustomResponse( exception=requests.ConnectionError, first_invocation=2, last_invocation=5 ), - expected_aborted=False, + expected_multipart_upload_aborted=False, ), MultipartUploadTestCase( - "Upload chunk: intermittent retryable status code", - stream_size=100 * 1024 * 1024, # 10 chunks + "Upload part: intermittent retryable status code 429", + stream_size=100 * 1024 * 1024, # 10 parts multipart_upload_chunk_size=10 * 1024 * 1024, custom_response_on_upload=CustomResponse(code=429, first_invocation=2, last_invocation=4), - expected_aborted=False, + expected_multipart_upload_aborted=False, ), - # -------------------------- failures on abort -------------------------- MultipartUploadTestCase( - "Abort URL: 500 response", - stream_size=1024 * 1024, - custom_response_on_create_multipart_url=CustomResponse(code=500, only_invocation=1), - custom_response_on_create_abort_url=CustomResponse(code=400), - expected_exception_type=InternalError, # original error - expected_aborted=False, # server state didn't change to record abort + "Upload chunk: intermittent retryable status code 500", + stream_size=100 * 1024 * 1024, # 10 chunks + multipart_upload_chunk_size=10 * 1024 * 1024, + custom_response_on_upload=CustomResponse(code=500, first_invocation=2, last_invocation=4), + expected_multipart_upload_aborted=False, ), + # -------------------------- failures on abort -------------------------- MultipartUploadTestCase( "Abort URL: 403 response", stream_size=1024 * 1024, - custom_response_on_upload=CustomResponse(code=500, only_invocation=1), + custom_response_on_upload=CustomResponse(code=403, only_invocation=1), custom_response_on_create_abort_url=CustomResponse(code=403), - expected_exception_type=InternalError, # original error - expected_aborted=False, # server state didn't change to record abort + expected_exception_type=PermissionDenied, # original error + expected_multipart_upload_aborted=False, # server state didn't change to record abort ), MultipartUploadTestCase( "Abort URL: intermittent retryable error", stream_size=1024 * 1024, - custom_response_on_create_multipart_url=CustomResponse(code=500, only_invocation=1), + custom_response_on_create_multipart_url=CustomResponse(code=403, only_invocation=1), custom_response_on_create_abort_url=CustomResponse(code=429, first_invocation=1, last_invocation=3), - expected_exception_type=InternalError, # original error - expected_aborted=True, # abort successfully called after abort URL creation is retried + expected_exception_type=PermissionDenied, # original error + expected_multipart_upload_aborted=True, # abort successfully called after abort URL creation is retried ), MultipartUploadTestCase( "Abort URL: intermittent retryable error 2", stream_size=1024 * 1024, - custom_response_on_create_multipart_url=CustomResponse(code=500, only_invocation=1), + custom_response_on_create_multipart_url=CustomResponse(code=403, only_invocation=1), custom_response_on_create_abort_url=CustomResponse( exception=requests.Timeout, first_invocation=1, last_invocation=3 ), - expected_exception_type=InternalError, # original error - expected_aborted=True, # abort successfully called after abort URL creation is retried + expected_exception_type=PermissionDenied, # original error + expected_multipart_upload_aborted=True, # abort successfully called after abort URL creation is retried ), MultipartUploadTestCase( "Abort: exception", @@ -1212,157 +1316,53 @@ def to_string(test_case): exception_happened_before_processing=False, ), expected_exception_type=PermissionDenied, # original error is reported - expected_aborted=True, + expected_multipart_upload_aborted=True, ), # -------------------------- happy cases -------------------------- MultipartUploadTestCase( - "Multipart upload successful: single chunk", - stream_size=1024 * 1024, # less than chunk size + "Multipart upload successful: single part", + stream_size=1024 * 1024, # less than part size multipart_upload_chunk_size=10 * 1024 * 1024, ), MultipartUploadTestCase( - "Multipart upload successful: multiple chunks (aligned)", - stream_size=100 * 1024 * 1024, # 10 chunks + "Multipart upload successful: multiple parts (aligned)", + stream_size=100 * 1024 * 1024, # 10 parts multipart_upload_chunk_size=10 * 1024 * 1024, ), MultipartUploadTestCase( - "Multipart upload successful: multiple chunks (aligned), upload urls by 3", + "Multipart upload successful: multiple parts (aligned), upload urls by 3", multipart_upload_batch_url_count=3, - stream_size=100 * 1024 * 1024, # 10 chunks + stream_size=100 * 1024 * 1024, # 10 parts multipart_upload_chunk_size=10 * 1024 * 1024, ), MultipartUploadTestCase( - "Multipart upload successful: multiple chunks (not aligned), upload urls by 1", - stream_size=100 * 1024 * 1024 + 1566, # 14 full chunks + remainder + "Multipart upload successful: multiple parts (not aligned), upload urls by 1", + stream_size=100 * 1024 * 1024 + 1566, # 14 full parts + remainder multipart_upload_chunk_size=7 * 1024 * 1024 - 17, ), MultipartUploadTestCase( - "Multipart upload successful: multiple chunks (not aligned), upload urls by 5", + "Multipart upload successful: multiple parts (not aligned), upload urls by 5", multipart_upload_batch_url_count=5, - stream_size=100 * 1024 * 1024 + 1566, # 14 full chunks + remainder + stream_size=100 * 1024 * 1024 + 1566, # 14 full parts + remainder multipart_upload_chunk_size=7 * 1024 * 1024 - 17, ), - ], - ids=MultipartUploadTestCase.to_string, -) -def test_multipart_upload(config: Config, test_case: MultipartUploadTestCase): - test_case.run(config) - - -class SingleShotUploadState: - - def __init__(self): - self.single_shot_file_content = None - - -class SingleShotUploadTestCase: - - def __init__(self, name: str, stream_size: int, multipart_upload_min_stream_size: int, expected_single_shot: bool): - self.name = name - self.stream_size = stream_size - self.multipart_upload_min_stream_size = multipart_upload_min_stream_size - self.expected_single_shot = expected_single_shot - - def __str__(self): - return self.name - - @staticmethod - def to_string(test_case): - return str(test_case) - - def run(self, config: Config): - config = config.copy() - config.enable_experimental_files_api_client = True - config.multipart_upload_min_stream_size = self.multipart_upload_min_stream_size - - file_content = os.urandom(self.stream_size) - - session = requests.Session() - with requests_mock.Mocker(session=session) as session_mock: - session_mock.get(f"http://localhost/api/2.0/fs/files{MultipartUploadTestCase.path}", status_code=200) - - upload_state = SingleShotUploadState() - - def custom_matcher(request): - request_url = urlparse(request.url) - request_query = parse_qs(request_url.query) - - if self.expected_single_shot: - if ( - request_url.hostname == "localhost" - and request_url.path == f"/api/2.0/fs/files{MultipartUploadTestCase.path}" - and request.method == "PUT" - ): - body = request.body.read() - upload_state.single_shot_file_content = FileContent.from_bytes(body) - - resp = requests.Response() - resp.status_code = 204 - resp.request = request - resp._content = b"" - return resp - else: - if ( - request_url.hostname == "localhost" - and request_url.path == f"/api/2.0/fs/files{MultipartUploadTestCase.path}" - and request_query.get("action") == ["initiate-upload"] - and request.method == "POST" - ): - - resp = requests.Response() - resp.status_code = 403 # this will throw, that's fine - resp.request = request - resp._content = b"" - return resp - - return None - - session_mock.add_matcher(matcher=custom_matcher) - - w = WorkspaceClient(config=config) - w.files._api._api_client._session = session - - def upload(): - w.files.upload("/test.txt", io.BytesIO(file_content), overwrite=True) - - if self.expected_single_shot: - upload() - actual_content = upload_state.single_shot_file_content - assert actual_content == FileContent.from_bytes(file_content) - else: - with pytest.raises(PermissionDenied): - upload() - - -@pytest.mark.parametrize( - "test_case", - [ - SingleShotUploadTestCase( - "Single-shot upload", + MultipartUploadTestCase( + "Small stream, single-shot upload used", stream_size=1024 * 1024, multipart_upload_min_stream_size=1024 * 1024 + 1, - expected_single_shot=True, - ), - SingleShotUploadTestCase( - "Multipart upload 1", - stream_size=1024 * 1024, - multipart_upload_min_stream_size=1024 * 1024, - expected_single_shot=False, - ), - SingleShotUploadTestCase( - "Multipart upload 2", - stream_size=1024 * 1024, - multipart_upload_min_stream_size=0, - expected_single_shot=False, + expected_multipart_upload_aborted=False, + expected_single_shot_upload=True, ), ], - ids=SingleShotUploadTestCase.to_string, + ids=MultipartUploadTestCase.to_string, ) -def test_single_shot_upload(config: Config, test_case: SingleShotUploadTestCase): +def test_multipart_upload(config: Config, test_case: MultipartUploadTestCase) -> None: test_case.run(config) class ResumableUploadServerState: + """This server state is updated on resumable upload (GCP)""" + resumable_upload_url_prefix = "https://cloud_provider.com/resumable-upload/" abort_upload_url_prefix = "https://cloud_provider.com/abort-upload/" @@ -1371,10 +1371,10 @@ def __init__(self, unconfirmed_delta: Union[int, list]): self.confirmed_last_byte: Optional[int] = None # inclusive self.uploaded_parts = [] self.session_token = "token-" + MultipartUploadServerState.randomstr() - self.file_content = None + self.file_content: Optional[FileContent] = None self.aborted = False - def save_part(self, start_offset: int, end_offset_incl: int, part_content: bytes, file_size_s: str): + def save_part(self, start_offset: int, end_offset_incl: int, part_content: bytes, file_size_s: str) -> None: assert not self.aborted assert len(part_content) > 0 @@ -1389,7 +1389,7 @@ def save_part(self, start_offset: int, end_offset_incl: int, part_content: bytes if is_last_part: assert int(file_size_s) == end_offset_incl + 1 else: - assert not self.file_content # last chunk should not have been uploaded yet + assert not self.file_content # last part should not have been uploaded yet if isinstance(self.unconfirmed_delta, int): unconfirmed_delta = self.unconfirmed_delta @@ -1408,20 +1408,20 @@ def save_part(self, start_offset: int, end_offset_incl: int, part_content: bytes if unconfirmed_delta > 0: part_content = part_content[:-unconfirmed_delta] - fd, chunk_file = mkstemp() + fd, part_file = mkstemp() with open(fd, "wb") as f: f.write(part_content) - self.uploaded_parts.append(chunk_file) + self.uploaded_parts.append(part_file) if is_last_part and unconfirmed_delta == 0: size = 0 sha256 = hashlib.sha256() - for chunk_path in self.uploaded_parts: - size += os.path.getsize(chunk_path) - with open(chunk_path, "rb") as f: - chunk_content = f.read() - sha256.update(chunk_content) + for part_path in self.uploaded_parts: + size += os.path.getsize(part_path) + with open(part_path, "rb") as f: + part_content = f.read() + sha256.update(part_content) assert size == end_offset_incl + 1 self.file_content = FileContent(size, sha256.hexdigest()) @@ -1433,25 +1433,29 @@ def create_abort_url(self, path: str, expire_time: datetime) -> str: self.issued_abort_url_expire_time = expire_time return f"{self.abort_upload_url_prefix}{path}" - def cleanup(self): + def cleanup(self) -> None: for file in self.uploaded_parts: os.remove(file) - def get_file_content(self) -> FileContent: - assert not self.aborted + def get_file_content(self) -> Optional[FileContent]: + if self.aborted: + assert not self.file_content + + # content may be None even for a non-aborted upload, + # in case single-shot upload was used due to small stream size. return self.file_content - def abort_upload(self): + def abort_upload(self) -> None: self.aborted = True -class ResumableUploadTestCase: +class ResumableUploadTestCase(UploadTestCase): """Test case for resumable upload of a file. Resumable uploads are used on GCP. Resumable upload involves multiple HTTP requests: - initiating upload (call to Databricks Files API) - requesting resumable upload URL (call to Databricks Files API) - - uploading chunks of data (calls to cloud storage provider or Databricks storage proxy) + - uploading data in parts (calls to cloud storage provider or Databricks storage proxy) - aborting the upload (call to cloud storage provider or Databricks storage proxy) Test case uses requests-mock library to mock all these requests. Within a test, mocks use @@ -1460,200 +1464,167 @@ class ResumableUploadTestCase: Response of each call can be modified by parameterising a respective `CustomResponse` object. """ - path = "/test.txt" - def __init__( self, name: str, stream_size: int, overwrite: bool = True, + multipart_upload_min_stream_size: int = 0, # disable single-shot uploads by default multipart_upload_chunk_size: Optional[int] = None, sdk_retry_timeout_seconds: Optional[int] = None, multipart_upload_max_retries: Optional[int] = None, - # In resumable upload, when replying to chunk upload request, server returns + # In resumable upload, when replying to part upload request, server returns # (confirms) last accepted byte offset for the client to resume upload after. # - # `unconfirmed_delta` defines offset from the end of the chunk that remains + # `unconfirmed_delta` defines offset from the end of the part that remains # "unconfirmed", i.e. the last accepted offset would be (range_end - unconfirmed_delta). - # Can be int (same for all chunks) or list (individual for each chunk). + # Can be int (same for all parts) or list (individual for each part). unconfirmed_delta: Union[int, list] = 0, - custom_response_on_create_resumable_url=CustomResponse(enabled=False), - custom_response_on_upload=CustomResponse(enabled=False), - custom_response_on_status_check=CustomResponse(enabled=False), - custom_response_on_abort=CustomResponse(enabled=False), + custom_response_on_single_shot_upload: CustomResponse = CustomResponse(enabled=False), + custom_response_on_create_resumable_url: CustomResponse = CustomResponse(enabled=False), + custom_response_on_upload: CustomResponse = CustomResponse(enabled=False), + custom_response_on_status_check: CustomResponse = CustomResponse(enabled=False), + custom_response_on_abort: CustomResponse = CustomResponse(enabled=False), # exception which is expected to be thrown (so upload is expected to have failed) expected_exception_type: Optional[Type[BaseException]] = None, # if abort is expected to be called - expected_aborted: bool = False, + expected_multipart_upload_aborted: bool = False, + expected_single_shot_upload: bool = False, ): - self.name = name - self.stream_size = stream_size - self.overwrite = overwrite - self.multipart_upload_chunk_size = multipart_upload_chunk_size - self.sdk_retry_timeout_seconds = sdk_retry_timeout_seconds - self.multipart_upload_max_retries = multipart_upload_max_retries + super().__init__( + name, + stream_size, + overwrite, + multipart_upload_min_stream_size, + multipart_upload_chunk_size, + sdk_retry_timeout_seconds, + multipart_upload_max_retries, + custom_response_on_single_shot_upload, + expected_exception_type, + expected_multipart_upload_aborted, + expected_single_shot_upload, + ) + self.unconfirmed_delta = unconfirmed_delta self.custom_response_on_create_resumable_url = copy.deepcopy(custom_response_on_create_resumable_url) self.custom_response_on_upload = copy.deepcopy(custom_response_on_upload) self.custom_response_on_status_check = copy.deepcopy(custom_response_on_status_check) self.custom_response_on_abort = copy.deepcopy(custom_response_on_abort) - self.expected_exception_type = expected_exception_type - self.expected_aborted: bool = expected_aborted - - def setup_session_mock(self, session_mock: requests_mock.Mocker, server_state: ResumableUploadServerState): - - def custom_matcher(request): - request_url = urlparse(request.url) - request_query = parse_qs(request_url.query) - - # initial request - if ( - request_url.hostname == "localhost" - and request_url.path == f"/api/2.0/fs/files{MultipartUploadTestCase.path}" - and request_query.get("action") == ["initiate-upload"] - and request.method == "POST" - ): - - assert MultipartUploadTestCase.is_auth_header_present(request) - assert request.text is None - - def processor(): - response_json = {"resumable_upload": {"session_token": server_state.session_token}} - return [200, json.dumps(response_json), {}] - - # Different initiate error responses have been verified by test_multipart_upload(), - # so we're always generating a "success" response. - return CustomResponse(enabled=False).generate_response(request, processor) - - elif ( - request_url.hostname == "localhost" - and request_url.path == "/api/2.0/fs/create-resumable-upload-url" - and request.method == "POST" - ): - - assert MultipartUploadTestCase.is_auth_header_present(request) - - request_json = request.json() - assert request_json.keys() == {"path", "session_token"} - assert request_json["path"] == self.path - assert request_json["session_token"] == server_state.session_token - - def processor(): - resumable_upload_url = f"{ResumableUploadServerState.resumable_upload_url_prefix}{self.path}" - response_json = { - "resumable_upload_url": { - "url": resumable_upload_url, - "headers": [{"name": "name1", "value": "value1"}], - } + def create_multipart_upload_server_state(self) -> ResumableUploadServerState: + return ResumableUploadServerState(self.unconfirmed_delta) + + def match_request_to_response( + self, request: requests.Request, server_state: ResumableUploadServerState + ) -> Optional[requests.Response]: + request_url = urlparse(request.url) + request_query = parse_qs(request_url.query) + + # initial request + if ( + request_url.hostname == "localhost" + and request_url.path == f"/api/2.0/fs/files{self.path}" + and request_query.get("action") == ["initiate-upload"] + and request.method == "POST" + ): + + assert UploadTestCase.is_auth_header_present(request) + assert request.text is None + + def processor() -> list: + response_json = {"resumable_upload": {"session_token": server_state.session_token}} + return [200, json.dumps(response_json), {}] + + # Different initiate error responses have been verified by test_multipart_upload(), + # so we're always generating a "success" response. + return CustomResponse(enabled=False).generate_response(request, processor) + + elif ( + request_url.hostname == "localhost" + and request_url.path == "/api/2.0/fs/create-resumable-upload-url" + and request.method == "POST" + ): + + assert UploadTestCase.is_auth_header_present(request) + + request_json = request.json() + assert request_json.keys() == {"path", "session_token"} + assert request_json["path"] == self.path + assert request_json["session_token"] == server_state.session_token + + def processor() -> list: + resumable_upload_url = f"{ResumableUploadServerState.resumable_upload_url_prefix}{self.path}" + + response_json = { + "resumable_upload_url": { + "url": resumable_upload_url, + "headers": [{"name": "name1", "value": "value1"}], } - return [200, json.dumps(response_json), {}] + } + return [200, json.dumps(response_json), {}] - return self.custom_response_on_create_resumable_url.generate_response(request, processor) + return self.custom_response_on_create_resumable_url.generate_response(request, processor) - # resumable upload, uploading part - elif ( - request.url.startswith(ResumableUploadServerState.resumable_upload_url_prefix) - and request.method == "PUT" - ): + # resumable upload, uploading part + elif request.url.startswith(ResumableUploadServerState.resumable_upload_url_prefix) and request.method == "PUT": - assert not MultipartUploadTestCase.is_auth_header_present(request) - url_path = request.url[len(ResumableUploadServerState.resumable_upload_url_prefix) :] - assert url_path == self.path + assert not UploadTestCase.is_auth_header_present(request) + url_path = request.url[len(ResumableUploadServerState.resumable_upload_url_prefix) :] + assert url_path == self.path - content_range_header = request.headers["Content-range"] - is_status_check_request = re.match("bytes \\*/\\*", content_range_header) - if is_status_check_request: - assert not request.body - response_customizer = self.custom_response_on_status_check - else: - response_customizer = self.custom_response_on_upload + content_range_header = request.headers["Content-range"] + is_status_check_request = re.match("bytes \\*/\\*", content_range_header) + if is_status_check_request: + assert not request.body + response_customizer = self.custom_response_on_status_check + else: + response_customizer = self.custom_response_on_upload - def processor(): - if not is_status_check_request: - body = request.body.read() + def processor() -> list: + if not is_status_check_request: + body = request.body.read() - match = re.match("bytes (\\d+)-(\\d+)/(.+)", content_range_header) - [range_start_s, range_end_s, file_size_s] = match.groups() + match = re.match("bytes (\\d+)-(\\d+)/(.+)", content_range_header) + [range_start_s, range_end_s, file_size_s] = match.groups() - server_state.save_part(int(range_start_s), int(range_end_s), body, file_size_s) + server_state.save_part(int(range_start_s), int(range_end_s), body, file_size_s) - if server_state.file_content: - # upload complete - return [200, "", {}] - else: - # more data expected - if server_state.confirmed_last_byte: - headers = {"Range": f"bytes=0-{server_state.confirmed_last_byte}"} - else: - headers = {} - return [308, "", headers] - - return response_customizer.generate_response(request, processor) - - # abort upload - elif ( - request.url.startswith(ResumableUploadServerState.resumable_upload_url_prefix) - and request.method == "DELETE" - ): - - assert not MultipartUploadTestCase.is_auth_header_present(request) - url_path = request.url[len(ResumableUploadServerState.resumable_upload_url_prefix) :] - assert url_path == self.path - - def processor(): - server_state.abort_upload() + if server_state.file_content: + # upload complete return [200, "", {}] + else: + # more data expected + if server_state.confirmed_last_byte: + headers = {"Range": f"bytes=0-{server_state.confirmed_last_byte}"} + else: + headers = {} + return [308, "", headers] - return self.custom_response_on_abort.generate_response(request, processor) - - return None - - session_mock.add_matcher(matcher=custom_matcher) - - def run(self, config: Config): - config = config.copy() - if self.sdk_retry_timeout_seconds: - config.retry_timeout_seconds = self.sdk_retry_timeout_seconds - if self.multipart_upload_chunk_size: - config.multipart_upload_chunk_size = self.multipart_upload_chunk_size - if self.multipart_upload_max_retries: - config.multipart_upload_max_retries = self.multipart_upload_max_retries - config.enable_experimental_files_api_client = True - config.multipart_upload_min_stream_size = 0 # disable single-shot uploads - - MultipartUploadTestCase.setup_token_auth(config) - - file_content = os.urandom(self.stream_size) - - upload_state = ResumableUploadServerState(self.unconfirmed_delta) + return response_customizer.generate_response(request, processor) - try: - with requests_mock.Mocker() as session_mock: - self.setup_session_mock(session_mock, upload_state) - w = WorkspaceClient(config=config) + # abort upload + elif ( + request.url.startswith(ResumableUploadServerState.resumable_upload_url_prefix) + and request.method == "DELETE" + ): - def upload(): - w.files.upload("/test.txt", io.BytesIO(file_content), overwrite=self.overwrite) + assert not UploadTestCase.is_auth_header_present(request) + url_path = request.url[len(ResumableUploadServerState.resumable_upload_url_prefix) :] + assert url_path == self.path - if self.expected_exception_type is not None: - with pytest.raises(self.expected_exception_type): - upload() - else: - upload() - actual_content = upload_state.get_file_content() - assert actual_content == FileContent.from_bytes(file_content) + def processor() -> list: + server_state.abort_upload() + return [200, "", {}] - assert upload_state.aborted == self.expected_aborted + return self.custom_response_on_abort.generate_response(request, processor) - finally: - upload_state.cleanup() + return None - def __str__(self): + def __str__(self) -> str: return self.name @staticmethod - def to_string(test_case): + def to_string(test_case: "ResumableUploadTestCase") -> str: return str(test_case) @@ -1670,28 +1641,28 @@ def to_string(test_case): only_invocation=1, ), expected_exception_type=BadRequest, - expected_aborted=False, # upload didn't start + expected_multipart_upload_aborted=False, # upload didn't start ), ResumableUploadTestCase( "Create resumable URL: 403 response is not retried", stream_size=1024 * 1024, custom_response_on_create_resumable_url=CustomResponse(code=403, only_invocation=1), expected_exception_type=PermissionDenied, - expected_aborted=False, # upload didn't start + expected_multipart_upload_aborted=False, # upload didn't start ), ResumableUploadTestCase( "Create resumable URL: 500 response is not retried", stream_size=1024 * 1024, custom_response_on_create_resumable_url=CustomResponse(code=500, only_invocation=1), expected_exception_type=InternalError, - expected_aborted=False, # upload didn't start + expected_multipart_upload_aborted=False, # upload didn't start ), ResumableUploadTestCase( "Create resumable URL: non-JSON response is not retried", stream_size=1024 * 1024, custom_response_on_create_resumable_url=CustomResponse(body="Foo bar", only_invocation=1), expected_exception_type=requests.exceptions.JSONDecodeError, - expected_aborted=False, # upload didn't start + expected_multipart_upload_aborted=False, # upload didn't start ), ResumableUploadTestCase( "Create resumable URL: meaningless JSON response is not retried", @@ -1700,7 +1671,7 @@ def to_string(test_case): body='{"upload_part_urls":[{"url":""}]}', only_invocation=1 ), expected_exception_type=ValueError, - expected_aborted=False, # upload didn't start + expected_multipart_upload_aborted=False, # upload didn't start ), ResumableUploadTestCase( "Create resumable URL: permanent retryable status code", @@ -1708,7 +1679,7 @@ def to_string(test_case): custom_response_on_create_resumable_url=CustomResponse(code=429), sdk_retry_timeout_seconds=30, # don't wait for 5 min (SDK default timeout) expected_exception_type=TimeoutError, - expected_aborted=False, # upload didn't start + expected_multipart_upload_aborted=False, # upload didn't start ), ResumableUploadTestCase( "Create resumable URL: intermittent retryable exception is retried", @@ -1719,7 +1690,7 @@ def to_string(test_case): first_invocation=1, last_invocation=3, ), - expected_aborted=False, # upload succeeds + expected_multipart_upload_aborted=False, # upload succeeds ), # ------------------ failures during upload ------------------ ResumableUploadTestCase( @@ -1732,7 +1703,7 @@ def to_string(test_case): ), # Despite the returned error, file has been uploaded. We'll discover that # on the next status check and consider upload completed. - expected_aborted=False, + expected_multipart_upload_aborted=False, ), ResumableUploadTestCase( "Upload: retryable exception before file is uploaded, not enough retries", @@ -1740,7 +1711,7 @@ def to_string(test_case): multipart_upload_max_retries=3, custom_response_on_upload=CustomResponse( exception=requests.ConnectionError, - # prevent server from saving this chunk + # prevent server from saving this part exception_happened_before_processing=True, # fail 4 times, exceeding max_retries first_invocation=1, @@ -1748,7 +1719,7 @@ def to_string(test_case): ), # File was never uploaded and we gave up retrying expected_exception_type=requests.ConnectionError, - expected_aborted=True, + expected_multipart_upload_aborted=True, ), ResumableUploadTestCase( "Upload: retryable exception before file is uploaded, enough retries", @@ -1756,14 +1727,14 @@ def to_string(test_case): multipart_upload_max_retries=4, custom_response_on_upload=CustomResponse( exception=requests.ConnectionError, - # prevent server from saving this chunk + # prevent server from saving this part exception_happened_before_processing=True, # fail 4 times, not exceeding max_retries first_invocation=1, last_invocation=4, ), # File was uploaded after retries - expected_aborted=False, + expected_multipart_upload_aborted=False, ), ResumableUploadTestCase( "Upload: intermittent 429 response: retried", @@ -1776,7 +1747,7 @@ def to_string(test_case): first_invocation=2, last_invocation=4, ), - expected_aborted=False, # upload succeeded + expected_multipart_upload_aborted=False, # upload succeeded ), ResumableUploadTestCase( "Upload: intermittent 429 response: retry exhausted", @@ -1790,19 +1761,19 @@ def to_string(test_case): last_invocation=5, ), expected_exception_type=TooManyRequests, - expected_aborted=True, + expected_multipart_upload_aborted=True, ), # -------------- abort failures -------------- ResumableUploadTestCase( "Abort: client error", stream_size=1024 * 1024, - # prevent chunk from being uploaded + # prevent part from being uploaded custom_response_on_upload=CustomResponse(code=403), # internal server error does not prevent server state change custom_response_on_abort=CustomResponse(code=500), expected_exception_type=PermissionDenied, # abort returned error but was actually processed - expected_aborted=True, + expected_multipart_upload_aborted=True, ), # -------------- file already exists -------------- ResumableUploadTestCase( @@ -1811,47 +1782,54 @@ def to_string(test_case): overwrite=False, custom_response_on_upload=CustomResponse(code=412, only_invocation=1), expected_exception_type=AlreadyExists, - expected_aborted=True, + expected_multipart_upload_aborted=True, ), # -------------- success cases -------------- ResumableUploadTestCase( - "Multiple chunks, zero unconfirmed delta", + "Multiple parts, zero unconfirmed delta", stream_size=100 * 1024 * 1024, multipart_upload_chunk_size=7 * 1024 * 1024 + 566, - # server accepts all the chunks in full + # server accepts all the parts in full unconfirmed_delta=0, - expected_aborted=False, + expected_multipart_upload_aborted=False, ), ResumableUploadTestCase( - "Multiple small chunks, zero unconfirmed delta", + "Multiple small parts, zero unconfirmed delta", stream_size=100 * 1024 * 1024, multipart_upload_chunk_size=100 * 1024, - # server accepts all the chunks in full + # server accepts all the parts in full unconfirmed_delta=0, - expected_aborted=False, + expected_multipart_upload_aborted=False, ), ResumableUploadTestCase( - "Multiple chunks, non-zero unconfirmed delta", + "Multiple parts, non-zero unconfirmed delta", stream_size=100 * 1024 * 1024, multipart_upload_chunk_size=7 * 1024 * 1024 + 566, - # for every chunk, server accepts all except last 239 bytes + # for every part, server accepts all except last 239 bytes unconfirmed_delta=239, - expected_aborted=False, + expected_multipart_upload_aborted=False, ), ResumableUploadTestCase( - "Multiple chunks, variable unconfirmed delta", + "Multiple parts, variable unconfirmed delta", stream_size=100 * 1024 * 1024, multipart_upload_chunk_size=7 * 1024 * 1024 + 566, - # for the first chunk, server accepts all except last 15Kib - # for the second chunk, server accepts it all - # for the 3rd chunk, server accepts all except last 25000 bytes - # for the 4th chunk, server accepts all except last 7 Mb - # for the 5th chunk onwards server accepts all except last 5 bytes + # for the first part, server accepts all except last 15Kib + # for the second part, server accepts it all + # for the 3rd part, server accepts all except last 25000 bytes + # for the 4th part, server accepts all except last 7 Mb + # for the 5th part onwards server accepts all except last 5 bytes unconfirmed_delta=[15 * 1024, 0, 25000, 7 * 1024 * 1024, 5], - expected_aborted=False, + expected_multipart_upload_aborted=False, + ), + ResumableUploadTestCase( + "Small stream, single-shot upload used", + stream_size=1024 * 1024, + multipart_upload_min_stream_size=1024 * 1024 + 1, + expected_multipart_upload_aborted=False, + expected_single_shot_upload=True, ), ], ids=ResumableUploadTestCase.to_string, ) -def test_resumable_upload(config: Config, test_case: ResumableUploadTestCase): +def test_resumable_upload(config: Config, test_case: ResumableUploadTestCase) -> None: test_case.run(config)