Skip to content

Commit dfbd080

Browse files
committed
Merge branch 'master' into object_lock
* master: Apply code review feedback Do not run a the test with extra parameters on old apiver Add test of B2Api/Services initialization Rename parameter for consistency remove useless line More detailed download performance logs Add logging performance summary of parallel download threads Refactor stats collector to be usable for other threads Fix CopySizeTooBig exception Add download stats collector for parallel download writer thread Make streams loggable Add `max_download_streams_per_file` parameter to B2Api and underlying classes
2 parents 0396fa1 + c65573f commit dfbd080

File tree

8 files changed

+234
-1
lines changed

8 files changed

+234
-1
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77
## [Unreleased]
88

99
### Added
10+
* Logging performance summary of parallel download threads
11+
* Add `max_download_streams_per_file` parameter to B2Api class and underlying structures
1012
* Add `is_file_lock_enabled` parameter to `Bucket.update()` and related methods
1113

1214
### Fixed
1315
* Replace `ReplicationScanResult.source_has_sse_c_enabled` with `source_encryption_mode`
1416
* Fix `B2Api.get_key()` and `RawSimulator.delete_key()`
17+
* Fix calling `CopySizeTooBig` exception
1518

1619
### Infrastructure
1720
* Fix nox's deprecated `session.install()` calls

b2sdk/api.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ def __init__(
6464
max_download_workers: Optional[int] = None,
6565
save_to_buffer_size: Optional[int] = None,
6666
check_download_hash: bool = True,
67+
max_download_streams_per_file: Optional[int] = None,
6768
):
6869
"""
6970
Initialize Services object using given session.
@@ -74,6 +75,7 @@ def __init__(
7475
:param max_download_workers: maximum number of download threads
7576
:param save_to_buffer_size: buffer size to use when writing files using DownloadedFile.save_to
7677
:param check_download_hash: whether to check hash of downloaded files. Can be disabled for files with internal checksums, for example, or to forcefully retrieve objects with corrupted payload or hash value
78+
:param max_download_streams_per_file: how many streams to use for parallel downloader
7779
"""
7880
self.api = api
7981
self.session = api.session
@@ -82,11 +84,13 @@ def __init__(
8284
services=self, max_workers=max_upload_workers
8385
)
8486
self.copy_manager = self.COPY_MANAGER_CLASS(services=self, max_workers=max_copy_workers)
87+
assert max_download_streams_per_file is None or max_download_streams_per_file >= 1
8588
self.download_manager = self.DOWNLOAD_MANAGER_CLASS(
8689
services=self,
8790
max_workers=max_download_workers,
8891
write_buffer_size=save_to_buffer_size,
8992
check_hash=check_download_hash,
93+
max_download_streams_per_file=max_download_streams_per_file,
9094
)
9195
self.emerger = Emerger(self)
9296

@@ -129,6 +133,7 @@ def __init__(
129133
max_download_workers: Optional[int] = None,
130134
save_to_buffer_size: Optional[int] = None,
131135
check_download_hash: bool = True,
136+
max_download_streams_per_file: Optional[int] = None,
132137
):
133138
"""
134139
Initialize the API using the given account info.
@@ -145,6 +150,7 @@ def __init__(
145150
:param max_download_workers: maximum number of download threads
146151
:param save_to_buffer_size: buffer size to use when writing files using DownloadedFile.save_to
147152
:param check_download_hash: whether to check hash of downloaded files. Can be disabled for files with internal checksums, for example, or to forcefully retrieve objects with corrupted payload or hash value
153+
:param max_download_streams_per_file: number of streams for parallel download manager
148154
"""
149155
self.session = self.SESSION_CLASS(
150156
account_info=account_info, cache=cache, api_config=api_config
@@ -159,6 +165,7 @@ def __init__(
159165
max_download_workers=max_download_workers,
160166
save_to_buffer_size=save_to_buffer_size,
161167
check_download_hash=check_download_hash,
168+
max_download_streams_per_file=max_download_streams_per_file,
162169
)
163170

164171
@property

b2sdk/stream/progress.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ def _progress_update(self, delta):
3636
self.bytes_completed += delta
3737
self.progress_listener.bytes_completed(self.bytes_completed + self.offset)
3838

39+
def __str__(self):
40+
return str(self.stream)
41+
3942

4043
class ReadingStreamWithProgress(AbstractStreamWithProgress):
4144
"""

b2sdk/transfer/inbound/download_manager.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,13 @@ class DownloadManager(TransferManager, ThreadPoolMixin, metaclass=B2TraceMetaAbs
4444
PARALLEL_DOWNLOADER_CLASS = staticmethod(ParallelDownloader)
4545
SIMPLE_DOWNLOADER_CLASS = staticmethod(SimpleDownloader)
4646

47-
def __init__(self, write_buffer_size: Optional[int] = None, check_hash: bool = True, **kwargs):
47+
def __init__(
48+
self,
49+
write_buffer_size: Optional[int] = None,
50+
check_hash: bool = True,
51+
max_download_streams_per_file: Optional[int] = None,
52+
**kwargs
53+
):
4854
"""
4955
Initialize the DownloadManager using the given services object.
5056
"""
@@ -58,6 +64,7 @@ def __init__(self, write_buffer_size: Optional[int] = None, check_hash: bool = T
5864
align_factor=write_buffer_size,
5965
thread_pool=self._thread_pool,
6066
check_hash=check_hash,
67+
max_streams=max_download_streams_per_file,
6168
),
6269
self.SIMPLE_DOWNLOADER_CLASS(
6370
min_chunk_size=self.MIN_CHUNK_SIZE,

b2sdk/transfer/inbound/downloaded_file.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ def __exit__(self, exc_type, exc_val, exc_tb):
7979
self.file.close()
8080
set_file_mtime(self.path_, self.mod_time_to_set)
8181

82+
def __str__(self):
83+
return str(self.path_)
84+
8285

8386
class DownloadedFile:
8487
"""

b2sdk/transfer/inbound/downloader/parallel.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
from concurrent import futures
1212
from io import IOBase
13+
from time import perf_counter_ns
1314
from typing import Optional
1415
import logging
1516
import queue
@@ -18,6 +19,7 @@
1819
from requests.models import Response
1920

2021
from .abstract import AbstractDownloader
22+
from .stats_collector import StatsCollector
2123
from b2sdk.encryption.setting import EncryptionSetting
2224
from b2sdk.file_version import DownloadVersion
2325
from b2sdk.session import B2Session
@@ -118,7 +120,15 @@ def download(
118120
if self._check_hash:
119121
# we skip hashing if we would not check it - hasher object is actually a EmptyHasher instance
120122
# but we avoid here reading whole file (except for the first part) from disk again
123+
before_hash = perf_counter_ns()
121124
self._finish_hashing(first_part, file, hasher, download_version.content_length)
125+
after_hash = perf_counter_ns()
126+
logger.info(
127+
'download stats | %s | %s total: %.3f ms',
128+
file,
129+
'finish_hash',
130+
(after_hash - before_hash) / 1000000,
131+
)
122132

123133
return bytes_written, hasher.hexdigest()
124134

@@ -203,18 +213,33 @@ def __init__(self, file, max_queue_depth):
203213
self.file = file
204214
self.queue = queue.Queue(max_queue_depth)
205215
self.total = 0
216+
self.stats_collector = StatsCollector(str(self.file), 'writer', 'seek')
206217
super(WriterThread, self).__init__()
207218

208219
def run(self):
209220
file = self.file
210221
queue_get = self.queue.get
222+
stats_collector_read_append = self.stats_collector.read.append
223+
stats_collector_other_append = self.stats_collector.other.append
224+
stats_collector_write_append = self.stats_collector.write.append
225+
start = perf_counter_ns()
211226
while 1:
227+
228+
before_read = perf_counter_ns()
212229
shutdown, offset, data = queue_get()
230+
stats_collector_read_append(perf_counter_ns() - before_read)
231+
213232
if shutdown:
214233
break
234+
before_seek = perf_counter_ns()
215235
file.seek(offset)
236+
after_seek = perf_counter_ns()
216237
file.write(data)
238+
after_write = perf_counter_ns()
239+
stats_collector_other_append(after_seek - before_seek)
240+
stats_collector_write_append(after_write - after_seek)
217241
self.total += len(data)
242+
self.stats_collector.total = perf_counter_ns() - start
218243

219244
def __enter__(self):
220245
self.start()
@@ -223,6 +248,7 @@ def __enter__(self):
223248
def __exit__(self, exc_type, exc_val, exc_tb):
224249
self.queue.put((True, None, None))
225250
self.join()
251+
self.stats_collector.report()
226252

227253

228254
def download_first_part(
@@ -243,6 +269,19 @@ def download_first_part(
243269
:param chunk_size: size (in bytes) of read data chunks
244270
:param encryption: encryption mode, algorithm and key
245271
"""
272+
# This function contains a loop that has heavy impact on performance.
273+
# It has not been broken down to several small functions due to fear of
274+
# performance overhead of calling a python function. Advanced performance optimization
275+
# techniques are in use here, for example avoiding internal python getattr calls by
276+
# caching function signatures in local variables. Most of this code was written in
277+
# times where python 2.7 (or maybe even 2.6) had to be supported, so maybe some
278+
# of those optimizations could be removed without affecting performance.
279+
#
280+
# Due to reports of hard to debug performance issues, this code has also been riddled
281+
# with performance measurements. A known issue is GCP VMs which have more network speed
282+
# than storage speed, but end users have different issues with network and storage.
283+
# Basic tools to figure out where the time is being spent is a must for long-term
284+
# maintainability.
246285

247286
writer_queue_put = writer.queue.put
248287
hasher_update = hasher.update
@@ -253,14 +292,29 @@ def download_first_part(
253292

254293
bytes_read = 0
255294
stop = False
295+
296+
stats_collector = StatsCollector(response.url, f'{first_offset}:{last_offset}', 'hash')
297+
stats_collector_read_append = stats_collector.read.append
298+
stats_collector_other_append = stats_collector.other.append
299+
stats_collector_write_append = stats_collector.write.append
300+
start = before_read = perf_counter_ns()
256301
for data in response.iter_content(chunk_size=chunk_size):
302+
stats_collector_read_append(perf_counter_ns() - before_read)
257303
if first_offset + bytes_read + len(data) >= last_offset:
258304
to_write = data[:last_offset - bytes_read]
259305
stop = True
260306
else:
261307
to_write = data
308+
before_put = perf_counter_ns()
262309
writer_queue_put((False, first_offset + bytes_read, to_write))
310+
311+
before_hash = perf_counter_ns()
263312
hasher_update(to_write)
313+
after_hash = perf_counter_ns()
314+
315+
stats_collector_write_append(before_hash - before_put)
316+
stats_collector_other_append(after_hash - before_hash)
317+
264318
bytes_read += len(to_write)
265319
if stop:
266320
break
@@ -284,11 +338,24 @@ def download_first_part(
284338
cloud_range.as_tuple(),
285339
encryption=encryption,
286340
) as response:
341+
before_read = perf_counter_ns()
287342
for to_write in response.iter_content(chunk_size=chunk_size):
343+
stats_collector_read_append(perf_counter_ns() - before_read)
344+
345+
before_put = perf_counter_ns()
288346
writer_queue_put((False, first_offset + bytes_read, to_write))
347+
before_hash = perf_counter_ns()
289348
hasher_update(to_write)
349+
after_hash = perf_counter_ns()
350+
351+
stats_collector_write_append(before_hash - before_put)
352+
stats_collector_other_append(after_hash - before_hash)
353+
290354
bytes_read += len(to_write)
355+
before_read = perf_counter_ns()
291356
tries_left -= 1
357+
stats_collector.total = perf_counter_ns() - start
358+
stats_collector.report()
292359

293360

294361
def download_non_first_part(
@@ -321,15 +388,27 @@ def download_non_first_part(
321388
'download attempts remaining: %i, bytes read already: %i. Getting range %s now.',
322389
retries_left, bytes_read, cloud_range
323390
)
391+
stats_collector = StatsCollector(url, f'{cloud_range.start}:{cloud_range.end}', 'none')
392+
stats_collector_read_append = stats_collector.read.append
393+
stats_collector_write_append = stats_collector.write.append
394+
start = before_read = perf_counter_ns()
324395
with session.download_file_from_url(
325396
url,
326397
cloud_range.as_tuple(),
327398
encryption=encryption,
328399
) as response:
400+
before_read = perf_counter_ns()
329401
for to_write in response.iter_content(chunk_size=chunk_size):
402+
after_read = perf_counter_ns()
330403
writer_queue_put((False, start_range + bytes_read, to_write))
404+
after_write = perf_counter_ns()
405+
stats_collector_read_append(after_read - before_read)
406+
stats_collector_write_append(after_write - after_read)
331407
bytes_read += len(to_write)
408+
before_read = perf_counter_ns()
332409
retries_left -= 1
410+
stats_collector.total = perf_counter_ns() - start
411+
stats_collector.report()
333412

334413

335414
class PartToDownload:
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
######################################################################
2+
#
3+
# File: b2sdk/transfer/inbound/downloader/stats_collector.py
4+
#
5+
# Copyright 2020 Backblaze Inc. All Rights Reserved.
6+
#
7+
# License https://www.backblaze.com/using_b2_code.html
8+
#
9+
######################################################################
10+
11+
import logging
12+
from dataclasses import dataclass, field
13+
from typing import List # 3.7 doesn't understand `list` vs `List`
14+
from typing import Optional
15+
16+
logger = logging.getLogger(__name__)
17+
18+
19+
@dataclass
20+
class StatsCollector:
21+
name: str #: file name or object url
22+
detail: str #: description of the thread, ex. "10000000:20000000" or "writer"
23+
other_name: str #: other statistic, typically "seek" or "hash"
24+
total: Optional[int] = None
25+
other: List[int] = field(default_factory=list)
26+
write: List[int] = field(default_factory=list)
27+
read: List[int] = field(default_factory=list)
28+
29+
def report(self):
30+
if self.read:
31+
logger.info('download stats | %s | TTFB: %.3f ms', self, self.read[0] / 1000000)
32+
logger.info(
33+
'download stats | %s | read() without TTFB: %.3f ms', self,
34+
sum(self.read[1:]) / 1000000
35+
)
36+
if self.other:
37+
logger.info(
38+
'download stats | %s | %s total: %.3f ms', self, self.other_name,
39+
sum(self.other) / 1000000
40+
)
41+
if self.write:
42+
logger.info(
43+
'download stats | %s | write() total: %.3f ms', self,
44+
sum(self.write) / 1000000
45+
)
46+
if self.total is not None:
47+
overhead = self.total - sum(self.write) - sum(self.other) - sum(self.read)
48+
logger.info('download stats | %s | overhead: %.3f ms', self, overhead / 1000000)
49+
50+
def __str__(self):
51+
return f'{self.name}[{self.detail}]'

0 commit comments

Comments
 (0)