Skip to content

Commit d1a9027

Browse files
authored
Merge pull request #313 from Backblaze/new-b2-api-params
New B2Api parameters: save_to_buffer_size, check_download_hash
2 parents 990f203 + 2d36111 commit d1a9027

File tree

9 files changed

+199
-23
lines changed

9 files changed

+199
-23
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2020
* Add parameters to set the min/max part size for large file upload/copy methods
2121
* Add CopySourceTooBig exception
2222
* Add an option to set a custom file version class to FileVersionFactory
23+
* Add an option for B2Api to turn off hash checking for downloaded files
24+
* Add an option for B2Api to set write buffer size for DownloadedFile.save_to method
2325

2426
### Fixed
2527
* Fix copying objects larger than 1TB

CONTRIBUTING.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ To run just integration tests:
8888
export B2_TEST_APPLICATION_KEY_ID=your_app_key_id
8989
nox -s integration-3.10
9090

91+
To run tests by keyword expressions:
92+
93+
nox -s unit-3.10 -- -k keyword
94+
9195
## Documentation
9296

9397
To build the documentation and watch for changes (including the source code):

b2sdk/api.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ def __init__(
6060
max_upload_workers: Optional[int] = None,
6161
max_copy_workers: Optional[int] = None,
6262
max_download_workers: Optional[int] = None,
63+
save_to_buffer_size: Optional[int] = None,
64+
check_download_hash: bool = True,
6365
):
6466
"""
6567
Initialize Services object using given session.
@@ -68,6 +70,8 @@ def __init__(
6870
:param max_upload_workers: a number of upload threads
6971
:param max_copy_workers: a number of copy threads
7072
:param max_download_workers: maximum number of download threads
73+
:param save_to_buffer_size: buffer size to use when writing files using DownloadedFile.save_to
74+
:param check_download_hash: whether to check hash of downloaded files. Can be disabled for files with internal checksums, for example, or to forcefully retrieve objects with corrupted payload or hash value
7175
"""
7276
self.api = api
7377
self.session = api.session
@@ -77,7 +81,10 @@ def __init__(
7781
)
7882
self.copy_manager = self.COPY_MANAGER_CLASS(services=self, max_workers=max_copy_workers)
7983
self.download_manager = self.DOWNLOAD_MANAGER_CLASS(
80-
services=self, max_workers=max_download_workers
84+
services=self,
85+
max_workers=max_download_workers,
86+
write_buffer_size=save_to_buffer_size,
87+
check_hash=check_download_hash,
8188
)
8289
self.emerger = Emerger(self)
8390

@@ -118,6 +125,8 @@ def __init__(
118125
max_copy_workers: Optional[int] = None,
119126
api_config: B2HttpApiConfig = DEFAULT_HTTP_API_CONFIG,
120127
max_download_workers: Optional[int] = None,
128+
save_to_buffer_size: Optional[int] = None,
129+
check_download_hash: bool = True,
121130
):
122131
"""
123132
Initialize the API using the given account info.
@@ -132,6 +141,8 @@ def __init__(
132141
:param max_copy_workers: a number of copy threads
133142
:param api_config:
134143
:param max_download_workers: maximum number of download threads
144+
:param save_to_buffer_size: buffer size to use when writing files using DownloadedFile.save_to
145+
:param check_download_hash: whether to check hash of downloaded files. Can be disabled for files with internal checksums, for example, or to forcefully retrieve objects with corrupted payload or hash value
135146
"""
136147
self.session = self.SESSION_CLASS(
137148
account_info=account_info, cache=cache, api_config=api_config
@@ -143,6 +154,8 @@ def __init__(
143154
max_upload_workers=max_upload_workers,
144155
max_copy_workers=max_copy_workers,
145156
max_download_workers=max_download_workers,
157+
save_to_buffer_size=save_to_buffer_size,
158+
check_download_hash=check_download_hash,
146159
)
147160

148161
@property

b2sdk/transfer/inbound/download_manager.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ class DownloadManager(TransferManager, ThreadPoolMixin, metaclass=B2TraceMetaAbs
4444
PARALLEL_DOWNLOADER_CLASS = staticmethod(ParallelDownloader)
4545
SIMPLE_DOWNLOADER_CLASS = staticmethod(SimpleDownloader)
4646

47-
def __init__(self, **kwargs):
47+
def __init__(self, write_buffer_size: Optional[int] = None, check_hash: bool = True, **kwargs):
4848
"""
4949
Initialize the DownloadManager using the given services object.
5050
"""
@@ -54,15 +54,21 @@ def __init__(self, **kwargs):
5454
self.PARALLEL_DOWNLOADER_CLASS(
5555
min_part_size=self.DEFAULT_MIN_PART_SIZE,
5656
min_chunk_size=self.MIN_CHUNK_SIZE,
57-
max_chunk_size=self.MAX_CHUNK_SIZE,
57+
max_chunk_size=max(self.MAX_CHUNK_SIZE, write_buffer_size or 0),
58+
align_factor=write_buffer_size,
5859
thread_pool=self._thread_pool,
60+
check_hash=check_hash,
5961
),
6062
self.SIMPLE_DOWNLOADER_CLASS(
6163
min_chunk_size=self.MIN_CHUNK_SIZE,
62-
max_chunk_size=self.MAX_CHUNK_SIZE,
64+
max_chunk_size=max(self.MAX_CHUNK_SIZE, write_buffer_size or 0),
65+
align_factor=write_buffer_size,
6366
thread_pool=self._thread_pool,
67+
check_hash=check_hash,
6468
),
6569
]
70+
self.write_buffer_size = write_buffer_size
71+
self.check_hash = check_hash
6672

6773
def download_file_from_url(
6874
self,
@@ -98,4 +104,6 @@ def download_file_from_url(
98104
response=response,
99105
encryption=encryption,
100106
progress_listener=progress_listener,
107+
write_buffer_size=self.write_buffer_size,
108+
check_hash=self.check_hash,
101109
)

b2sdk/transfer/inbound/downloaded_file.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,10 @@ class MtimeUpdatedFile(io.IOBase):
4444
# 'some_local_path' has the mod_time set according to metadata in B2
4545
"""
4646

47-
def __init__(self, path_, mod_time_millis: int, mode='wb+'):
47+
def __init__(self, path_, mod_time_millis: int, mode='wb+', buffering=None):
4848
self.path_ = path_
4949
self.mode = mode
50+
self.buffering = buffering if buffering is not None else -1
5051
self.mod_time_to_set = mod_time_millis
5152
self.file = None
5253

@@ -69,7 +70,7 @@ def tell(self):
6970
return self.file.tell()
7071

7172
def __enter__(self):
72-
self.file = open(self.path_, self.mode)
73+
self.file = open(self.path_, self.mode, buffering=self.buffering)
7374
self.write = self.file.write
7475
self.read = self.file.read
7576
return self
@@ -93,6 +94,8 @@ def __init__(
9394
response: Response,
9495
encryption: Optional[EncryptionSetting],
9596
progress_listener: AbstractProgressListener,
97+
write_buffer_size=None,
98+
check_hash=True,
9699
):
97100
self.download_version = download_version
98101
self.download_manager = download_manager
@@ -101,13 +104,18 @@ def __init__(
101104
self.encryption = encryption
102105
self.progress_listener = progress_listener
103106
self.download_strategy = None
107+
self.write_buffer_size = write_buffer_size
108+
self.check_hash = check_hash
104109

105110
def _validate_download(self, bytes_read, actual_sha1):
106111
if self.range_ is None:
107112
if bytes_read != self.download_version.content_length:
108113
raise TruncatedOutput(bytes_read, self.download_version.content_length)
109114

110-
if self.download_version.content_sha1 != 'none' and actual_sha1 != self.download_version.content_sha1:
115+
if (
116+
self.check_hash and self.download_version.content_sha1 != 'none' and
117+
actual_sha1 != self.download_version.content_sha1
118+
):
111119
raise ChecksumMismatch(
112120
checksum_type='sha1',
113121
expected=self.download_version.content_sha1,
@@ -158,6 +166,9 @@ def save_to(self, path_, mode='wb+', allow_seeking=True):
158166
(parallel strategies) will be discarded.
159167
"""
160168
with MtimeUpdatedFile(
161-
path_, mod_time_millis=self.download_version.mod_time_millis, mode=mode
169+
path_,
170+
mod_time_millis=self.download_version.mod_time_millis,
171+
mode=mode,
172+
buffering=self.write_buffer_size,
162173
) as file:
163174
self.save(file, allow_seeking=allow_seeking)

b2sdk/transfer/inbound/downloader/abstract.py

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
#
99
######################################################################
1010

11+
import hashlib
12+
1113
from abc import abstractmethod
1214
from concurrent.futures import ThreadPoolExecutor
1315
from io import IOBase
@@ -22,36 +24,64 @@
2224
from b2sdk.encryption.setting import EncryptionSetting
2325

2426

27+
class EmptyHasher:
28+
def __init__(self, *args, **kwargs):
29+
pass
30+
31+
def update(self, data):
32+
pass
33+
34+
def digest(self):
35+
return b''
36+
37+
def hexdigest(self):
38+
return ''
39+
40+
def copy(self):
41+
return self
42+
43+
2544
class AbstractDownloader(metaclass=B2TraceMetaAbstract):
2645

2746
REQUIRES_SEEKING = True
2847
DEFAULT_THREAD_POOL_CLASS = staticmethod(ThreadPoolExecutor)
48+
DEFAULT_ALIGN_FACTOR = 4096
2949

3050
def __init__(
3151
self,
3252
thread_pool: Optional[ThreadPoolExecutor] = None,
33-
force_chunk_size=None,
34-
min_chunk_size=None,
35-
max_chunk_size=None,
53+
force_chunk_size: Optional[int] = None,
54+
min_chunk_size: Optional[int] = None,
55+
max_chunk_size: Optional[int] = None,
56+
align_factor: Optional[int] = None,
57+
check_hash: bool = True,
3658
**kwargs
3759
):
60+
align_factor = align_factor or self.DEFAULT_ALIGN_FACTOR
3861
assert force_chunk_size is not None or (
3962
min_chunk_size is not None and max_chunk_size is not None and
40-
0 < min_chunk_size <= max_chunk_size
63+
0 < min_chunk_size <= max_chunk_size and max_chunk_size >= align_factor
4164
)
4265
self._min_chunk_size = min_chunk_size
4366
self._max_chunk_size = max_chunk_size
4467
self._forced_chunk_size = force_chunk_size
68+
self._align_factor = align_factor
69+
self._check_hash = check_hash
4570
self._thread_pool = thread_pool if thread_pool is not None \
4671
else self.DEFAULT_THREAD_POOL_CLASS()
4772
super().__init__(**kwargs)
4873

49-
def _get_chunk_size(self, content_length):
74+
def _get_hasher(self):
75+
if self._check_hash:
76+
return hashlib.sha1()
77+
return EmptyHasher()
78+
79+
def _get_chunk_size(self, content_length: Optional[int]):
5080
if self._forced_chunk_size is not None:
5181
return self._forced_chunk_size
52-
ideal = content_length // 1000
82+
ideal = max(content_length // 1000, self._align_factor)
5383
non_aligned = min(max(ideal, self._min_chunk_size), self._max_chunk_size)
54-
aligned = non_aligned // 4096 * 4096
84+
aligned = non_aligned // self._align_factor * self._align_factor
5585
return aligned
5686

5787
@classmethod

b2sdk/transfer/inbound/downloader/parallel.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
from io import IOBase
1313
from typing import Optional
1414
import logging
15-
import hashlib
1615
import queue
1716
import threading
1817

@@ -99,7 +98,7 @@ def download(
9998

10099
first_part = parts_to_download[0]
101100

102-
hasher = hashlib.sha1()
101+
hasher = self._get_hasher()
103102

104103
with WriterThread(file, max_queue_depth=len(parts_to_download) * 2) as writer:
105104
self._get_parts(
@@ -116,7 +115,10 @@ def download(
116115

117116
# At this point the hasher already consumed the data until the end of first stream.
118117
# Consume the rest of the file to complete the hashing process
119-
self._finish_hashing(first_part, file, hasher, download_version.content_length)
118+
if self._check_hash:
119+
# we skip hashing if we would not check it - hasher object is actually a EmptyHasher instance
120+
# but we avoid here reading whole file (except for the first part) from disk again
121+
self._finish_hashing(first_part, file, hasher, download_version.content_length)
120122

121123
return bytes_written, hasher.hexdigest()
122124

b2sdk/transfer/inbound/downloader/simple.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010

1111
from io import IOBase
1212
from typing import Optional
13-
import hashlib
1413
import logging
1514

1615
from requests.models import Response
@@ -38,7 +37,8 @@ def _download(
3837
actual_size = self._get_remote_range(response, download_version).size()
3938
chunk_size = self._get_chunk_size(actual_size)
4039

41-
digest = hashlib.sha1()
40+
digest = self._get_hasher()
41+
4242
bytes_read = 0
4343
for data in response.iter_content(chunk_size=chunk_size):
4444
file.write(data)

0 commit comments

Comments
 (0)