Skip to content

Commit 7aa5ba2

Browse files
authored
Merge pull request #344 from Backblaze/download-stats
Add stats for parallel downloader
2 parents ca98eb6 + 1e5c054 commit 7aa5ba2

File tree

6 files changed

+141
-1
lines changed

6 files changed

+141
-1
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
66

77
## [Unreleased]
88

9+
### Added
10+
* Logging performance summary of parallel download threads
11+
912
### Fixed
1013
* Replace `ReplicationScanResult.source_has_sse_c_enabled` with `source_encryption_mode`
1114
* Fix `B2Api.get_key()` and `RawSimulator.delete_key()`
15+
* Fix calling `CopySizeTooBig` exception
1216

1317
### Infrastructure
1418
* Fix nox's deprecated `session.install()` calls

b2sdk/exception.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -566,7 +566,7 @@ def interpret_b2_error(
566566
matcher = COPY_SOURCE_TOO_BIG_ERROR_MESSAGE_RE.match(message)
567567
if matcher is not None:
568568
size = int(matcher.group('size'))
569-
return CopySourceTooBig(size)
569+
return CopySourceTooBig(message, code, size)
570570

571571
return BadRequest(message, code)
572572
elif status == 400:

b2sdk/stream/progress.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ def _progress_update(self, delta):
3636
self.bytes_completed += delta
3737
self.progress_listener.bytes_completed(self.bytes_completed + self.offset)
3838

39+
def __str__(self):
40+
return str(self.stream)
41+
3942

4043
class ReadingStreamWithProgress(AbstractStreamWithProgress):
4144
"""

b2sdk/transfer/inbound/downloaded_file.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ def __exit__(self, exc_type, exc_val, exc_tb):
7979
self.file.close()
8080
set_file_mtime(self.path_, self.mod_time_to_set)
8181

82+
def __str__(self):
83+
return str(self.path_)
84+
8285

8386
class DownloadedFile:
8487
"""

b2sdk/transfer/inbound/downloader/parallel.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
from concurrent import futures
1212
from io import IOBase
13+
from time import perf_counter_ns
1314
from typing import Optional
1415
import logging
1516
import queue
@@ -18,6 +19,7 @@
1819
from requests.models import Response
1920

2021
from .abstract import AbstractDownloader
22+
from .stats_collector import StatsCollector
2123
from b2sdk.encryption.setting import EncryptionSetting
2224
from b2sdk.file_version import DownloadVersion
2325
from b2sdk.session import B2Session
@@ -118,7 +120,15 @@ def download(
118120
if self._check_hash:
119121
# we skip hashing if we would not check it - hasher object is actually a EmptyHasher instance
120122
# but we avoid here reading whole file (except for the first part) from disk again
123+
before_hash = perf_counter_ns()
121124
self._finish_hashing(first_part, file, hasher, download_version.content_length)
125+
after_hash = perf_counter_ns()
126+
logger.info(
127+
'download stats | %s | %s total: %.3f ms',
128+
file,
129+
'finish_hash',
130+
(after_hash - before_hash) / 1000000,
131+
)
122132

123133
return bytes_written, hasher.hexdigest()
124134

@@ -203,18 +213,33 @@ def __init__(self, file, max_queue_depth):
203213
self.file = file
204214
self.queue = queue.Queue(max_queue_depth)
205215
self.total = 0
216+
self.stats_collector = StatsCollector(str(self.file), 'writer', 'seek')
206217
super(WriterThread, self).__init__()
207218

208219
def run(self):
209220
file = self.file
210221
queue_get = self.queue.get
222+
stats_collector_read_append = self.stats_collector.read.append
223+
stats_collector_other_append = self.stats_collector.other.append
224+
stats_collector_write_append = self.stats_collector.write.append
225+
start = perf_counter_ns()
211226
while 1:
227+
228+
before_read = perf_counter_ns()
212229
shutdown, offset, data = queue_get()
230+
stats_collector_read_append(perf_counter_ns() - before_read)
231+
213232
if shutdown:
214233
break
234+
before_seek = perf_counter_ns()
215235
file.seek(offset)
236+
after_seek = perf_counter_ns()
216237
file.write(data)
238+
after_write = perf_counter_ns()
239+
stats_collector_other_append(after_seek - before_seek)
240+
stats_collector_write_append(after_write - after_seek)
217241
self.total += len(data)
242+
self.stats_collector.total = perf_counter_ns() - start
218243

219244
def __enter__(self):
220245
self.start()
@@ -223,6 +248,7 @@ def __enter__(self):
223248
def __exit__(self, exc_type, exc_val, exc_tb):
224249
self.queue.put((True, None, None))
225250
self.join()
251+
self.stats_collector.report()
226252

227253

228254
def download_first_part(
@@ -243,6 +269,19 @@ def download_first_part(
243269
:param chunk_size: size (in bytes) of read data chunks
244270
:param encryption: encryption mode, algorithm and key
245271
"""
272+
# This function contains a loop that has heavy impact on performance.
273+
# It has not been broken down to several small functions due to fear of
274+
# performance overhead of calling a python function. Advanced performance optimization
275+
# techniques are in use here, for example avoiding internal python getattr calls by
276+
# caching function signatures in local variables. Most of this code was written in
277+
# times where python 2.7 (or maybe even 2.6) had to be supported, so maybe some
278+
# of those optimizations could be removed without affecting performance.
279+
#
280+
# Due to reports of hard to debug performance issues, this code has also been riddled
281+
# with performance measurements. A known issue is GCP VMs which have more network speed
282+
# than storage speed, but end users have different issues with network and storage.
283+
# Basic tools to figure out where the time is being spent is a must for long-term
284+
# maintainability.
246285

247286
writer_queue_put = writer.queue.put
248287
hasher_update = hasher.update
@@ -253,14 +292,29 @@ def download_first_part(
253292

254293
bytes_read = 0
255294
stop = False
295+
296+
stats_collector = StatsCollector(response.url, f'{first_offset}:{last_offset}', 'hash')
297+
stats_collector_read_append = stats_collector.read.append
298+
stats_collector_other_append = stats_collector.other.append
299+
stats_collector_write_append = stats_collector.write.append
300+
start = before_read = perf_counter_ns()
256301
for data in response.iter_content(chunk_size=chunk_size):
302+
stats_collector_read_append(perf_counter_ns() - before_read)
257303
if first_offset + bytes_read + len(data) >= last_offset:
258304
to_write = data[:last_offset - bytes_read]
259305
stop = True
260306
else:
261307
to_write = data
308+
before_put = perf_counter_ns()
262309
writer_queue_put((False, first_offset + bytes_read, to_write))
310+
311+
before_hash = perf_counter_ns()
263312
hasher_update(to_write)
313+
after_hash = perf_counter_ns()
314+
315+
stats_collector_write_append(before_hash - before_put)
316+
stats_collector_other_append(after_hash - before_hash)
317+
264318
bytes_read += len(to_write)
265319
if stop:
266320
break
@@ -284,11 +338,24 @@ def download_first_part(
284338
cloud_range.as_tuple(),
285339
encryption=encryption,
286340
) as response:
341+
before_read = perf_counter_ns()
287342
for to_write in response.iter_content(chunk_size=chunk_size):
343+
stats_collector_read_append(perf_counter_ns() - before_read)
344+
345+
before_put = perf_counter_ns()
288346
writer_queue_put((False, first_offset + bytes_read, to_write))
347+
before_hash = perf_counter_ns()
289348
hasher_update(to_write)
349+
after_hash = perf_counter_ns()
350+
351+
stats_collector_write_append(before_hash - before_put)
352+
stats_collector_other_append(after_hash - before_hash)
353+
290354
bytes_read += len(to_write)
355+
before_read = perf_counter_ns()
291356
tries_left -= 1
357+
stats_collector.total = perf_counter_ns() - start
358+
stats_collector.report()
292359

293360

294361
def download_non_first_part(
@@ -321,15 +388,27 @@ def download_non_first_part(
321388
'download attempts remaining: %i, bytes read already: %i. Getting range %s now.',
322389
retries_left, bytes_read, cloud_range
323390
)
391+
stats_collector = StatsCollector(url, f'{cloud_range.start}:{cloud_range.end}', 'none')
392+
stats_collector_read_append = stats_collector.read.append
393+
stats_collector_write_append = stats_collector.write.append
394+
start = before_read = perf_counter_ns()
324395
with session.download_file_from_url(
325396
url,
326397
cloud_range.as_tuple(),
327398
encryption=encryption,
328399
) as response:
400+
before_read = perf_counter_ns()
329401
for to_write in response.iter_content(chunk_size=chunk_size):
402+
after_read = perf_counter_ns()
330403
writer_queue_put((False, start_range + bytes_read, to_write))
404+
after_write = perf_counter_ns()
405+
stats_collector_read_append(after_read - before_read)
406+
stats_collector_write_append(after_write - after_read)
331407
bytes_read += len(to_write)
408+
before_read = perf_counter_ns()
332409
retries_left -= 1
410+
stats_collector.total = perf_counter_ns() - start
411+
stats_collector.report()
333412

334413

335414
class PartToDownload:
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
######################################################################
2+
#
3+
# File: b2sdk/transfer/inbound/downloader/stats_collector.py
4+
#
5+
# Copyright 2020 Backblaze Inc. All Rights Reserved.
6+
#
7+
# License https://www.backblaze.com/using_b2_code.html
8+
#
9+
######################################################################
10+
11+
import logging
12+
from dataclasses import dataclass, field
13+
from typing import List # 3.7 doesn't understand `list` vs `List`
14+
from typing import Optional
15+
16+
logger = logging.getLogger(__name__)
17+
18+
19+
@dataclass
20+
class StatsCollector:
21+
name: str #: file name or object url
22+
detail: str #: description of the thread, ex. "10000000:20000000" or "writer"
23+
other_name: str #: other statistic, typically "seek" or "hash"
24+
total: Optional[int] = None
25+
other: List[int] = field(default_factory=list)
26+
write: List[int] = field(default_factory=list)
27+
read: List[int] = field(default_factory=list)
28+
29+
def report(self):
30+
if self.read:
31+
logger.info('download stats | %s | TTFB: %.3f ms', self, self.read[0] / 1000000)
32+
logger.info(
33+
'download stats | %s | read() without TTFB: %.3f ms', self,
34+
sum(self.read[1:]) / 1000000
35+
)
36+
if self.other:
37+
logger.info(
38+
'download stats | %s | %s total: %.3f ms', self, self.other_name,
39+
sum(self.other) / 1000000
40+
)
41+
if self.write:
42+
logger.info(
43+
'download stats | %s | write() total: %.3f ms', self,
44+
sum(self.write) / 1000000
45+
)
46+
if self.total is not None:
47+
overhead = self.total - sum(self.write) - sum(self.other) - sum(self.read)
48+
logger.info('download stats | %s | overhead: %.3f ms', self, overhead / 1000000)
49+
50+
def __str__(self):
51+
return f'{self.name}[{self.detail}]'

0 commit comments

Comments
 (0)