Skip to content

Commit 1e72019

Browse files
authored
Merge branch 'master' into max-streams
2 parents 6e1f7bb + 7aa5ba2 commit 1e72019

File tree

10 files changed

+174
-5
lines changed

10 files changed

+174
-5
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77
## [Unreleased]
88

99
### Added
10+
* Logging performance summary of parallel download threads
1011
* Add `max_download_streams_per_file` parameter to B2Api class and underlying structures
1112

1213
### Fixed
1314
* Replace `ReplicationScanResult.source_has_sse_c_enabled` with `source_encryption_mode`
15+
* Fix `B2Api.get_key()` and `RawSimulator.delete_key()`
16+
* Fix calling `CopySizeTooBig` exception
1417

1518
### Infrastructure
1619
* Fix nox's deprecated `session.install()` calls

b2sdk/api.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
######################################################################
1010

1111
from typing import Optional, Tuple, List, Generator
12+
from contextlib import suppress
1213

1314
from .account_info.abstract import AbstractAccountInfo
1415
from .api_config import B2HttpApiConfig, DEFAULT_HTTP_API_CONFIG
@@ -545,10 +546,13 @@ def get_key(self, key_id: str) -> Optional[ApplicationKey]:
545546
546547
Raises an exception if profile is not permitted to list keys.
547548
"""
548-
return next(
549-
self.list_keys(start_application_key_id=key_id),
550-
None,
551-
)
549+
with suppress(StopIteration):
550+
key = next(self.list_keys(start_application_key_id=key_id))
551+
552+
# list_keys() may return some other key if `key_id` does not exist;
553+
# thus manually check that we retrieved the right key
554+
if key.id_ == key_id:
555+
return key
552556

553557
# other
554558
def get_file_info(self, file_id: str) -> FileVersion:

b2sdk/exception.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -566,7 +566,7 @@ def interpret_b2_error(
566566
matcher = COPY_SOURCE_TOO_BIG_ERROR_MESSAGE_RE.match(message)
567567
if matcher is not None:
568568
size = int(matcher.group('size'))
569-
return CopySourceTooBig(size)
569+
return CopySourceTooBig(message, code, size)
570570

571571
return BadRequest(message, code)
572572
elif status == 400:

b2sdk/raw_simulator.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1400,6 +1400,9 @@ def delete_key(self, api_url, account_auth_token, application_key_id):
14001400
'application key does not exist: %s' % (application_key_id,),
14011401
'bad_request',
14021402
)
1403+
self.all_application_keys = [
1404+
key for key in self.all_application_keys if key.application_key_id != application_key_id
1405+
]
14031406
return key_sim.as_key()
14041407

14051408
def finish_large_file(self, api_url, account_auth_token, file_id, part_sha1_array):

b2sdk/stream/progress.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ def _progress_update(self, delta):
3636
self.bytes_completed += delta
3737
self.progress_listener.bytes_completed(self.bytes_completed + self.offset)
3838

39+
def __str__(self):
40+
return str(self.stream)
41+
3942

4043
class ReadingStreamWithProgress(AbstractStreamWithProgress):
4144
"""

b2sdk/transfer/inbound/downloaded_file.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ def __exit__(self, exc_type, exc_val, exc_tb):
7979
self.file.close()
8080
set_file_mtime(self.path_, self.mod_time_to_set)
8181

82+
def __str__(self):
83+
return str(self.path_)
84+
8285

8386
class DownloadedFile:
8487
"""

b2sdk/transfer/inbound/downloader/parallel.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
from concurrent import futures
1212
from io import IOBase
13+
from time import perf_counter_ns
1314
from typing import Optional
1415
import logging
1516
import queue
@@ -18,6 +19,7 @@
1819
from requests.models import Response
1920

2021
from .abstract import AbstractDownloader
22+
from .stats_collector import StatsCollector
2123
from b2sdk.encryption.setting import EncryptionSetting
2224
from b2sdk.file_version import DownloadVersion
2325
from b2sdk.session import B2Session
@@ -118,7 +120,15 @@ def download(
118120
if self._check_hash:
119121
# we skip hashing if we would not check it - hasher object is actually a EmptyHasher instance
120122
# but we avoid here reading whole file (except for the first part) from disk again
123+
before_hash = perf_counter_ns()
121124
self._finish_hashing(first_part, file, hasher, download_version.content_length)
125+
after_hash = perf_counter_ns()
126+
logger.info(
127+
'download stats | %s | %s total: %.3f ms',
128+
file,
129+
'finish_hash',
130+
(after_hash - before_hash) / 1000000,
131+
)
122132

123133
return bytes_written, hasher.hexdigest()
124134

@@ -203,18 +213,33 @@ def __init__(self, file, max_queue_depth):
203213
self.file = file
204214
self.queue = queue.Queue(max_queue_depth)
205215
self.total = 0
216+
self.stats_collector = StatsCollector(str(self.file), 'writer', 'seek')
206217
super(WriterThread, self).__init__()
207218

208219
def run(self):
209220
file = self.file
210221
queue_get = self.queue.get
222+
stats_collector_read_append = self.stats_collector.read.append
223+
stats_collector_other_append = self.stats_collector.other.append
224+
stats_collector_write_append = self.stats_collector.write.append
225+
start = perf_counter_ns()
211226
while 1:
227+
228+
before_read = perf_counter_ns()
212229
shutdown, offset, data = queue_get()
230+
stats_collector_read_append(perf_counter_ns() - before_read)
231+
213232
if shutdown:
214233
break
234+
before_seek = perf_counter_ns()
215235
file.seek(offset)
236+
after_seek = perf_counter_ns()
216237
file.write(data)
238+
after_write = perf_counter_ns()
239+
stats_collector_other_append(after_seek - before_seek)
240+
stats_collector_write_append(after_write - after_seek)
217241
self.total += len(data)
242+
self.stats_collector.total = perf_counter_ns() - start
218243

219244
def __enter__(self):
220245
self.start()
@@ -223,6 +248,7 @@ def __enter__(self):
223248
def __exit__(self, exc_type, exc_val, exc_tb):
224249
self.queue.put((True, None, None))
225250
self.join()
251+
self.stats_collector.report()
226252

227253

228254
def download_first_part(
@@ -243,6 +269,19 @@ def download_first_part(
243269
:param chunk_size: size (in bytes) of read data chunks
244270
:param encryption: encryption mode, algorithm and key
245271
"""
272+
# This function contains a loop that has heavy impact on performance.
273+
# It has not been broken down to several small functions due to fear of
274+
# performance overhead of calling a python function. Advanced performance optimization
275+
# techniques are in use here, for example avoiding internal python getattr calls by
276+
# caching function signatures in local variables. Most of this code was written in
277+
# times where python 2.7 (or maybe even 2.6) had to be supported, so maybe some
278+
# of those optimizations could be removed without affecting performance.
279+
#
280+
# Due to reports of hard to debug performance issues, this code has also been riddled
281+
# with performance measurements. A known issue is GCP VMs which have more network speed
282+
# than storage speed, but end users have different issues with network and storage.
283+
# Basic tools to figure out where the time is being spent is a must for long-term
284+
# maintainability.
246285

247286
writer_queue_put = writer.queue.put
248287
hasher_update = hasher.update
@@ -253,14 +292,29 @@ def download_first_part(
253292

254293
bytes_read = 0
255294
stop = False
295+
296+
stats_collector = StatsCollector(response.url, f'{first_offset}:{last_offset}', 'hash')
297+
stats_collector_read_append = stats_collector.read.append
298+
stats_collector_other_append = stats_collector.other.append
299+
stats_collector_write_append = stats_collector.write.append
300+
start = before_read = perf_counter_ns()
256301
for data in response.iter_content(chunk_size=chunk_size):
302+
stats_collector_read_append(perf_counter_ns() - before_read)
257303
if first_offset + bytes_read + len(data) >= last_offset:
258304
to_write = data[:last_offset - bytes_read]
259305
stop = True
260306
else:
261307
to_write = data
308+
before_put = perf_counter_ns()
262309
writer_queue_put((False, first_offset + bytes_read, to_write))
310+
311+
before_hash = perf_counter_ns()
263312
hasher_update(to_write)
313+
after_hash = perf_counter_ns()
314+
315+
stats_collector_write_append(before_hash - before_put)
316+
stats_collector_other_append(after_hash - before_hash)
317+
264318
bytes_read += len(to_write)
265319
if stop:
266320
break
@@ -284,11 +338,24 @@ def download_first_part(
284338
cloud_range.as_tuple(),
285339
encryption=encryption,
286340
) as response:
341+
before_read = perf_counter_ns()
287342
for to_write in response.iter_content(chunk_size=chunk_size):
343+
stats_collector_read_append(perf_counter_ns() - before_read)
344+
345+
before_put = perf_counter_ns()
288346
writer_queue_put((False, first_offset + bytes_read, to_write))
347+
before_hash = perf_counter_ns()
289348
hasher_update(to_write)
349+
after_hash = perf_counter_ns()
350+
351+
stats_collector_write_append(before_hash - before_put)
352+
stats_collector_other_append(after_hash - before_hash)
353+
290354
bytes_read += len(to_write)
355+
before_read = perf_counter_ns()
291356
tries_left -= 1
357+
stats_collector.total = perf_counter_ns() - start
358+
stats_collector.report()
292359

293360

294361
def download_non_first_part(
@@ -321,15 +388,27 @@ def download_non_first_part(
321388
'download attempts remaining: %i, bytes read already: %i. Getting range %s now.',
322389
retries_left, bytes_read, cloud_range
323390
)
391+
stats_collector = StatsCollector(url, f'{cloud_range.start}:{cloud_range.end}', 'none')
392+
stats_collector_read_append = stats_collector.read.append
393+
stats_collector_write_append = stats_collector.write.append
394+
start = before_read = perf_counter_ns()
324395
with session.download_file_from_url(
325396
url,
326397
cloud_range.as_tuple(),
327398
encryption=encryption,
328399
) as response:
400+
before_read = perf_counter_ns()
329401
for to_write in response.iter_content(chunk_size=chunk_size):
402+
after_read = perf_counter_ns()
330403
writer_queue_put((False, start_range + bytes_read, to_write))
404+
after_write = perf_counter_ns()
405+
stats_collector_read_append(after_read - before_read)
406+
stats_collector_write_append(after_write - after_read)
331407
bytes_read += len(to_write)
408+
before_read = perf_counter_ns()
332409
retries_left -= 1
410+
stats_collector.total = perf_counter_ns() - start
411+
stats_collector.report()
333412

334413

335414
class PartToDownload:
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
######################################################################
2+
#
3+
# File: b2sdk/transfer/inbound/downloader/stats_collector.py
4+
#
5+
# Copyright 2020 Backblaze Inc. All Rights Reserved.
6+
#
7+
# License https://www.backblaze.com/using_b2_code.html
8+
#
9+
######################################################################
10+
11+
import logging
12+
from dataclasses import dataclass, field
13+
from typing import List # 3.7 doesn't understand `list` vs `List`
14+
from typing import Optional
15+
16+
logger = logging.getLogger(__name__)
17+
18+
19+
@dataclass
20+
class StatsCollector:
21+
name: str #: file name or object url
22+
detail: str #: description of the thread, ex. "10000000:20000000" or "writer"
23+
other_name: str #: other statistic, typically "seek" or "hash"
24+
total: Optional[int] = None
25+
other: List[int] = field(default_factory=list)
26+
write: List[int] = field(default_factory=list)
27+
read: List[int] = field(default_factory=list)
28+
29+
def report(self):
30+
if self.read:
31+
logger.info('download stats | %s | TTFB: %.3f ms', self, self.read[0] / 1000000)
32+
logger.info(
33+
'download stats | %s | read() without TTFB: %.3f ms', self,
34+
sum(self.read[1:]) / 1000000
35+
)
36+
if self.other:
37+
logger.info(
38+
'download stats | %s | %s total: %.3f ms', self, self.other_name,
39+
sum(self.other) / 1000000
40+
)
41+
if self.write:
42+
logger.info(
43+
'download stats | %s | write() total: %.3f ms', self,
44+
sum(self.write) / 1000000
45+
)
46+
if self.total is not None:
47+
overhead = self.total - sum(self.write) - sum(self.other) - sum(self.read)
48+
logger.info('download stats | %s | overhead: %.3f ms', self, overhead / 1000000)
49+
50+
def __str__(self):
51+
return f'{self.name}[{self.detail}]'

b2sdk/v1/api.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,3 +204,7 @@ def create_key(
204204

205205
def delete_key(self, application_key_id):
206206
return super().delete_key_by_id(application_key_id).as_dict()
207+
208+
def get_key(self, key_id: str) -> Optional[dict]:
209+
keys = self.list_keys(start_application_key_id=key_id)['keys']
210+
return next((key for key in keys if key['applicationKeyId'] == key_id), None)

test/unit/api/test_api.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -480,3 +480,22 @@ def test_list_keys_v2(self):
480480
'appKeyId9',
481481
]
482482
assert isinstance(keys[0], ApplicationKey)
483+
484+
def test_get_key(self):
485+
self._authorize_account()
486+
key = self.api.create_key(['readFiles'], 'testkey')
487+
488+
if apiver_deps.V <= 1:
489+
key_id = key['applicationKeyId']
490+
else:
491+
key_id = key.id_
492+
493+
assert self.api.get_key(key_id) is not None
494+
495+
if apiver_deps.V <= 1:
496+
self.api.delete_key(key_id)
497+
else:
498+
self.api.delete_key(key)
499+
500+
assert self.api.get_key(key_id) is None
501+
assert self.api.get_key('non-existent') is None

0 commit comments

Comments
 (0)