|
21 | 21 | from urllib import parse |
22 | 22 |
|
23 | 23 | import requests |
| 24 | +import requests.adapters |
24 | 25 | from requests import RequestException |
25 | 26 |
|
26 | 27 | from .._base_client import _BaseClient, _RawResponse, _StreamingResponse |
27 | 28 | from .._property import _cached_property |
28 | 29 | from ..clock import Clock, RealClock |
| 30 | +from ..config import Config |
29 | 31 | from ..errors import AlreadyExists, NotFound |
30 | 32 | from ..errors.mapper import _error_mapper |
31 | 33 | from ..retries import retried |
@@ -723,15 +725,14 @@ def download(self, file_path: str) -> DownloadResponse: |
723 | 725 | initial_response.contents._response = wrapped_response |
724 | 726 | return initial_response |
725 | 727 |
|
726 | | - |
727 | 728 | def upload(self, file_path: str, contents: BinaryIO, *, overwrite: Optional[bool] = None): |
728 | 729 | # Upload empty and small files with one-shot upload. |
729 | 730 | pre_read_buffer = contents.read(self._config.multipart_upload_min_stream_size) |
730 | 731 | if len(pre_read_buffer) < self._config.multipart_upload_min_stream_size: |
731 | 732 | _LOG.debug( |
732 | 733 | f"Using one-shot upload for input stream of size {len(pre_read_buffer)} below {self._config.multipart_upload_min_stream_size} bytes" |
733 | 734 | ) |
734 | | - return super().upload(file_path=file_path, contents=pre_read_buffer, overwrite=overwrite) |
| 735 | + return super().upload(file_path=file_path, contents=BytesIO(pre_read_buffer), overwrite=overwrite) |
735 | 736 |
|
736 | 737 | query = {"action": "initiate-upload"} |
737 | 738 | if overwrite is not None: |
@@ -829,6 +830,7 @@ def fill_buffer(): |
829 | 830 |
|
830 | 831 | headers = {"Content-Type": "application/json"} |
831 | 832 |
|
| 833 | + # Requesting URLs for the same set of parts is an idempotent operation, safe to retry. |
832 | 834 | # _api.do() does retry |
833 | 835 | upload_part_urls_response = self._api.do( |
834 | 836 | "POST", "/api/2.0/fs/create-upload-part-urls", headers=headers, body=body |
@@ -931,6 +933,7 @@ def perform(): |
931 | 933 |
|
932 | 934 | body["parts"] = parts |
933 | 935 |
|
| 936 | + # Completing upload is an idempotent operation, safe to retry. |
934 | 937 | # _api.do() does retry |
935 | 938 | self._api.do( |
936 | 939 | "POST", |
@@ -1109,7 +1112,7 @@ def _resumable_upload( |
1109 | 1112 | break |
1110 | 1113 |
|
1111 | 1114 | elif upload_response.status_code == 308: |
1112 | | - # chunk accepted, let's determine received offset to resume from there |
| 1115 | + # chunk accepted (or check-status succeeded), let's determine received offset to resume from there |
1113 | 1116 | range_string = upload_response.headers.get("Range") |
1114 | 1117 | confirmed_offset = self._extract_range_offset(range_string) |
1115 | 1118 | _LOG.debug(f"Received confirmed offset: {confirmed_offset}") |
|
0 commit comments