Skip to content

Commit c65573f

Browse files
authored
Merge pull request #352 from Backblaze/max-streams
Expose max streams parameter
2 parents 7aa5ba2 + 944b752 commit c65573f

File tree

4 files changed

+96
-1
lines changed

4 files changed

+96
-1
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
88

99
### Added
1010
* Logging performance summary of parallel download threads
11+
* Add `max_download_streams_per_file` parameter to B2Api class and underlying structures
1112

1213
### Fixed
1314
* Replace `ReplicationScanResult.source_has_sse_c_enabled` with `source_encryption_mode`

b2sdk/api.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ def __init__(
6464
max_download_workers: Optional[int] = None,
6565
save_to_buffer_size: Optional[int] = None,
6666
check_download_hash: bool = True,
67+
max_download_streams_per_file: Optional[int] = None,
6768
):
6869
"""
6970
Initialize Services object using given session.
@@ -74,6 +75,7 @@ def __init__(
7475
:param max_download_workers: maximum number of download threads
7576
:param save_to_buffer_size: buffer size to use when writing files using DownloadedFile.save_to
7677
:param check_download_hash: whether to check hash of downloaded files. Can be disabled for files with internal checksums, for example, or to forcefully retrieve objects with corrupted payload or hash value
78+
:param max_download_streams_per_file: how many streams to use for parallel downloader
7779
"""
7880
self.api = api
7981
self.session = api.session
@@ -82,11 +84,13 @@ def __init__(
8284
services=self, max_workers=max_upload_workers
8385
)
8486
self.copy_manager = self.COPY_MANAGER_CLASS(services=self, max_workers=max_copy_workers)
87+
assert max_download_streams_per_file is None or max_download_streams_per_file >= 1
8588
self.download_manager = self.DOWNLOAD_MANAGER_CLASS(
8689
services=self,
8790
max_workers=max_download_workers,
8891
write_buffer_size=save_to_buffer_size,
8992
check_hash=check_download_hash,
93+
max_download_streams_per_file=max_download_streams_per_file,
9094
)
9195
self.emerger = Emerger(self)
9296

@@ -129,6 +133,7 @@ def __init__(
129133
max_download_workers: Optional[int] = None,
130134
save_to_buffer_size: Optional[int] = None,
131135
check_download_hash: bool = True,
136+
max_download_streams_per_file: Optional[int] = None,
132137
):
133138
"""
134139
Initialize the API using the given account info.
@@ -145,6 +150,7 @@ def __init__(
145150
:param max_download_workers: maximum number of download threads
146151
:param save_to_buffer_size: buffer size to use when writing files using DownloadedFile.save_to
147152
:param check_download_hash: whether to check hash of downloaded files. Can be disabled for files with internal checksums, for example, or to forcefully retrieve objects with corrupted payload or hash value
153+
:param max_download_streams_per_file: number of streams for parallel download manager
148154
"""
149155
self.session = self.SESSION_CLASS(
150156
account_info=account_info, cache=cache, api_config=api_config
@@ -159,6 +165,7 @@ def __init__(
159165
max_download_workers=max_download_workers,
160166
save_to_buffer_size=save_to_buffer_size,
161167
check_download_hash=check_download_hash,
168+
max_download_streams_per_file=max_download_streams_per_file,
162169
)
163170

164171
@property

b2sdk/transfer/inbound/download_manager.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,13 @@ class DownloadManager(TransferManager, ThreadPoolMixin, metaclass=B2TraceMetaAbs
4444
PARALLEL_DOWNLOADER_CLASS = staticmethod(ParallelDownloader)
4545
SIMPLE_DOWNLOADER_CLASS = staticmethod(SimpleDownloader)
4646

47-
def __init__(self, write_buffer_size: Optional[int] = None, check_hash: bool = True, **kwargs):
47+
def __init__(
48+
self,
49+
write_buffer_size: Optional[int] = None,
50+
check_hash: bool = True,
51+
max_download_streams_per_file: Optional[int] = None,
52+
**kwargs
53+
):
4854
"""
4955
Initialize the DownloadManager using the given services object.
5056
"""
@@ -58,6 +64,7 @@ def __init__(self, write_buffer_size: Optional[int] = None, check_hash: bool = T
5864
align_factor=write_buffer_size,
5965
thread_pool=self._thread_pool,
6066
check_hash=check_hash,
67+
max_streams=max_download_streams_per_file,
6168
),
6269
self.SIMPLE_DOWNLOADER_CLASS(
6370
min_chunk_size=self.MIN_CHUNK_SIZE,

test/unit/v_all/test_api.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,86 @@
2222
from ..test_base import TestBase
2323

2424

25+
class DummyA:
26+
def __init__(self, *args, **kwargs):
27+
pass
28+
29+
30+
class DummyB:
31+
def __init__(self, *args, **kwargs):
32+
pass
33+
34+
35+
class TestServices:
36+
@pytest.mark.apiver(from_ver=2)
37+
@pytest.mark.parametrize(
38+
('kwargs', '_raw_api_class'),
39+
[
40+
[
41+
{
42+
'max_upload_workers': 1,
43+
'max_copy_workers': 2,
44+
'max_download_workers': 3,
45+
'save_to_buffer_size': 4,
46+
'check_download_hash': False,
47+
'max_download_streams_per_file': 5,
48+
},
49+
DummyA,
50+
],
51+
[
52+
{
53+
'max_upload_workers': 2,
54+
'max_copy_workers': 3,
55+
'max_download_workers': 4,
56+
'save_to_buffer_size': 5,
57+
'check_download_hash': True,
58+
'max_download_streams_per_file': 6,
59+
},
60+
DummyB,
61+
],
62+
],
63+
) # yapf: disable
64+
def test_api_initialization(self, kwargs, _raw_api_class):
65+
self.account_info = InMemoryAccountInfo()
66+
self.cache = InMemoryCache()
67+
68+
api_config = B2HttpApiConfig(_raw_api_class=_raw_api_class)
69+
70+
self.api = B2Api(
71+
self.account_info,
72+
self.cache,
73+
api_config=api_config,
74+
75+
**kwargs
76+
) # yapf: disable
77+
78+
assert self.api.account_info is self.account_info
79+
assert self.api.api_config is api_config
80+
assert self.api.cache is self.cache
81+
82+
assert self.api.session.account_info is self.account_info
83+
assert self.api.session.cache is self.cache
84+
assert isinstance(self.api.session.raw_api, _raw_api_class)
85+
86+
assert isinstance(self.api.file_version_factory, B2Api.FILE_VERSION_FACTORY_CLASS)
87+
assert isinstance(
88+
self.api.download_version_factory,
89+
B2Api.DOWNLOAD_VERSION_FACTORY_CLASS,
90+
)
91+
92+
services = self.api.services
93+
assert isinstance(services, B2Api.SERVICES_CLASS)
94+
95+
# max copy/upload/download workers could only be verified with mocking
96+
97+
download_manager = services.download_manager
98+
assert isinstance(download_manager, services.DOWNLOAD_MANAGER_CLASS)
99+
100+
assert download_manager.write_buffer_size == kwargs['save_to_buffer_size']
101+
assert download_manager.check_hash == kwargs['check_download_hash']
102+
assert download_manager.strategies[0].max_streams == kwargs['max_download_streams_per_file']
103+
104+
25105
class TestApi(TestBase):
26106
def setUp(self):
27107
self.account_info = InMemoryAccountInfo()

0 commit comments

Comments
 (0)