Skip to content

Commit 406a337

Browse files
Stats collector using context managers to hold the data
- Replaced list append with an explicit summation. - Provided a variable as the divider for time calculation.
1 parent 96aa9ea commit 406a337

File tree

3 files changed

+166
-118
lines changed

3 files changed

+166
-118
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2121
### Infrastructure
2222
* Fix nox's deprecated `session.install()` calls
2323
* Re-enable changelog validation in CI
24+
* StatsCollector is now a context manager
2425

2526
## [1.17.3] - 2022-07-15
2627

b2sdk/transfer/inbound/downloader/parallel.py

Lines changed: 108 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,22 @@
88
#
99
######################################################################
1010

11+
import logging
12+
import queue
13+
import threading
1114
from concurrent import futures
1215
from io import IOBase
1316
from time import perf_counter_ns
1417
from typing import Optional
15-
import logging
16-
import queue
17-
import threading
1818

1919
from requests.models import Response
2020

21-
from .abstract import AbstractDownloader
22-
from .stats_collector import StatsCollector
2321
from b2sdk.encryption.setting import EncryptionSetting
2422
from b2sdk.file_version import DownloadVersion
2523
from b2sdk.session import B2Session
2624
from b2sdk.utils.range_ import Range
25+
from .abstract import AbstractDownloader
26+
from .stats_collector import StatsCollector
2727

2828
logger = logging.getLogger(__name__)
2929

@@ -219,27 +219,25 @@ def __init__(self, file, max_queue_depth):
219219
def run(self):
220220
file = self.file
221221
queue_get = self.queue.get
222-
stats_collector_read_append = self.stats_collector.read.append
223-
stats_collector_other_append = self.stats_collector.other.append
224-
stats_collector_write_append = self.stats_collector.write.append
225-
start = perf_counter_ns()
226-
while 1:
222+
stats_collector_read = self.stats_collector.read
223+
stats_collector_other = self.stats_collector.other
224+
stats_collector_write = self.stats_collector.write
227225

228-
before_read = perf_counter_ns()
229-
shutdown, offset, data = queue_get()
230-
stats_collector_read_append(perf_counter_ns() - before_read)
226+
with self.stats_collector.total:
227+
while 1:
228+
with stats_collector_read:
229+
shutdown, offset, data = queue_get()
231230

232-
if shutdown:
233-
break
234-
before_seek = perf_counter_ns()
235-
file.seek(offset)
236-
after_seek = perf_counter_ns()
237-
file.write(data)
238-
after_write = perf_counter_ns()
239-
stats_collector_other_append(after_seek - before_seek)
240-
stats_collector_write_append(after_write - after_seek)
241-
self.total += len(data)
242-
self.stats_collector.total = perf_counter_ns() - start
231+
if shutdown:
232+
break
233+
234+
with stats_collector_other:
235+
file.seek(offset)
236+
237+
with stats_collector_write:
238+
file.write(data)
239+
240+
self.total += len(data)
243241

244242
def __enter__(self):
245243
self.start()
@@ -294,67 +292,73 @@ def download_first_part(
294292
stop = False
295293

296294
stats_collector = StatsCollector(response.url, f'{first_offset}:{last_offset}', 'hash')
297-
stats_collector_read_append = stats_collector.read.append
298-
stats_collector_other_append = stats_collector.other.append
299-
stats_collector_write_append = stats_collector.write.append
300-
start = before_read = perf_counter_ns()
301-
for data in response.iter_content(chunk_size=chunk_size):
302-
stats_collector_read_append(perf_counter_ns() - before_read)
303-
if first_offset + bytes_read + len(data) >= last_offset:
304-
to_write = data[:last_offset - bytes_read]
305-
stop = True
306-
else:
307-
to_write = data
308-
before_put = perf_counter_ns()
309-
writer_queue_put((False, first_offset + bytes_read, to_write))
310-
311-
before_hash = perf_counter_ns()
312-
hasher_update(to_write)
313-
after_hash = perf_counter_ns()
314-
315-
stats_collector_write_append(before_hash - before_put)
316-
stats_collector_other_append(after_hash - before_hash)
317-
318-
bytes_read += len(to_write)
319-
if stop:
320-
break
321-
322-
# since we got everything we need from original response, close the socket and free the buffer
323-
# to avoid a timeout exception during hashing and other trouble
324-
response.close()
325-
326-
url = response.request.url
327-
tries_left = 5 - 1 # this is hardcoded because we are going to replace the entire retry interface soon, so we'll avoid deprecation here and keep it private
328-
while tries_left and bytes_read < actual_part_size:
329-
cloud_range = starting_cloud_range.subrange(
330-
bytes_read, actual_part_size - 1
331-
) # first attempt was for the whole file, but retries are bound correctly
332-
logger.debug(
333-
'download attempts remaining: %i, bytes read already: %i. Getting range %s now.',
334-
tries_left, bytes_read, cloud_range
335-
)
336-
with session.download_file_from_url(
337-
url,
338-
cloud_range.as_tuple(),
339-
encryption=encryption,
340-
) as response:
341-
before_read = perf_counter_ns()
342-
for to_write in response.iter_content(chunk_size=chunk_size):
343-
stats_collector_read_append(perf_counter_ns() - before_read)
295+
stats_collector_read = stats_collector.read
296+
stats_collector_other = stats_collector.other
297+
stats_collector_write = stats_collector.write
298+
299+
with stats_collector.total:
300+
response_iterator = response.iter_content(chunk_size=chunk_size)
301+
302+
while True:
303+
with stats_collector_read:
304+
try:
305+
data = next(response_iterator)
306+
except StopIteration:
307+
break
308+
309+
if first_offset + bytes_read + len(data) >= last_offset:
310+
to_write = data[:last_offset - bytes_read]
311+
stop = True
312+
else:
313+
to_write = data
344314

345-
before_put = perf_counter_ns()
315+
with stats_collector_write:
346316
writer_queue_put((False, first_offset + bytes_read, to_write))
347-
before_hash = perf_counter_ns()
317+
318+
with stats_collector_other:
348319
hasher_update(to_write)
349-
after_hash = perf_counter_ns()
350320

351-
stats_collector_write_append(before_hash - before_put)
352-
stats_collector_other_append(after_hash - before_hash)
321+
bytes_read += len(to_write)
322+
if stop:
323+
break
324+
325+
# since we got everything we need from original response, close the socket and free the buffer
326+
# to avoid a timeout exception during hashing and other trouble
327+
response.close()
328+
329+
url = response.request.url
330+
tries_left = 5 - 1 # this is hardcoded because we are going to replace the entire retry interface soon, so we'll avoid deprecation here and keep it private
331+
while tries_left and bytes_read < actual_part_size:
332+
cloud_range = starting_cloud_range.subrange(
333+
bytes_read, actual_part_size - 1
334+
) # first attempt was for the whole file, but retries are bound correctly
335+
logger.debug(
336+
'download attempts remaining: %i, bytes read already: %i. Getting range %s now.',
337+
tries_left, bytes_read, cloud_range
338+
)
339+
with session.download_file_from_url(
340+
url,
341+
cloud_range.as_tuple(),
342+
encryption=encryption,
343+
) as response:
344+
response_iterator = response.iter_content(chunk_size=chunk_size)
345+
346+
while True:
347+
with stats_collector_read:
348+
try:
349+
to_write = next(response_iterator)
350+
except StopIteration:
351+
break
352+
353+
with stats_collector_write:
354+
writer_queue_put((False, first_offset + bytes_read, to_write))
355+
356+
with stats_collector_other:
357+
hasher_update(to_write)
358+
359+
bytes_read += len(to_write)
360+
tries_left -= 1
353361

354-
bytes_read += len(to_write)
355-
before_read = perf_counter_ns()
356-
tries_left -= 1
357-
stats_collector.total = perf_counter_ns() - start
358362
stats_collector.report()
359363

360364

@@ -389,25 +393,30 @@ def download_non_first_part(
389393
retries_left, bytes_read, cloud_range
390394
)
391395
stats_collector = StatsCollector(url, f'{cloud_range.start}:{cloud_range.end}', 'none')
392-
stats_collector_read_append = stats_collector.read.append
393-
stats_collector_write_append = stats_collector.write.append
394-
start = before_read = perf_counter_ns()
395-
with session.download_file_from_url(
396-
url,
397-
cloud_range.as_tuple(),
398-
encryption=encryption,
399-
) as response:
400-
before_read = perf_counter_ns()
401-
for to_write in response.iter_content(chunk_size=chunk_size):
402-
after_read = perf_counter_ns()
403-
writer_queue_put((False, start_range + bytes_read, to_write))
404-
after_write = perf_counter_ns()
405-
stats_collector_read_append(after_read - before_read)
406-
stats_collector_write_append(after_write - after_read)
407-
bytes_read += len(to_write)
408-
before_read = perf_counter_ns()
409-
retries_left -= 1
410-
stats_collector.total = perf_counter_ns() - start
396+
stats_collector_read = stats_collector.read
397+
stats_collector_write = stats_collector.write
398+
399+
with stats_collector.total:
400+
with session.download_file_from_url(
401+
url,
402+
cloud_range.as_tuple(),
403+
encryption=encryption,
404+
) as response:
405+
response_iterator = response.iter_content(chunk_size=chunk_size)
406+
407+
while True:
408+
with stats_collector_read:
409+
try:
410+
to_write = next(response_iterator)
411+
except StopIteration:
412+
break
413+
414+
with stats_collector_write:
415+
writer_queue_put((False, start_range + bytes_read, to_write))
416+
417+
bytes_read += len(to_write)
418+
retries_left -= 1
419+
411420
stats_collector.report()
412421

413422

b2sdk/transfer/inbound/downloader/stats_collector.py

Lines changed: 57 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,43 +9,81 @@
99
######################################################################
1010

1111
import logging
12-
from dataclasses import dataclass, field
13-
from typing import List # 3.7 doesn't understand `list` vs `List`
14-
from typing import Optional
12+
from dataclasses import (
13+
dataclass,
14+
field,
15+
)
16+
from time import perf_counter_ns
17+
from typing import (
18+
Any,
19+
Optional,
20+
Type,
21+
)
1522

1623
logger = logging.getLogger(__name__)
1724

1825

26+
class SingleStatsCollector:
27+
TO_MS = 1_000_000
28+
29+
def __init__(self):
30+
self.latest_entry: Optional[int] = None
31+
self.sum_of_all_entries: int = 0
32+
self.started_perf_timer: Optional[int] = None
33+
34+
def __enter__(self) -> None:
35+
self.started_perf_timer = perf_counter_ns()
36+
37+
def __exit__(self, exc_type: Type, exc_val: Exception, exc_tb: Any) -> None:
38+
time_diff = perf_counter_ns() - self.started_perf_timer
39+
self.latest_entry = time_diff
40+
self.sum_of_all_entries += time_diff
41+
self.started_perf_timer = None
42+
43+
@property
44+
def sum_ms(self) -> float:
45+
return self.sum_of_all_entries / self.TO_MS
46+
47+
@property
48+
def latest_ms(self) -> float:
49+
return self.latest_entry / self.TO_MS
50+
51+
@property
52+
def has_any_entry(self) -> bool:
53+
return self.latest_entry is not None
54+
55+
1956
@dataclass
2057
class StatsCollector:
2158
name: str #: file name or object url
2259
detail: str #: description of the thread, ex. "10000000:20000000" or "writer"
2360
other_name: str #: other statistic, typically "seek" or "hash"
24-
total: Optional[int] = None
25-
other: List[int] = field(default_factory=list)
26-
write: List[int] = field(default_factory=list)
27-
read: List[int] = field(default_factory=list)
61+
total: SingleStatsCollector = field(default_factory=SingleStatsCollector)
62+
other: SingleStatsCollector = field(default_factory=SingleStatsCollector)
63+
write: SingleStatsCollector = field(default_factory=SingleStatsCollector)
64+
read: SingleStatsCollector = field(default_factory=SingleStatsCollector)
2865

2966
def report(self):
30-
if self.read:
31-
logger.info('download stats | %s | TTFB: %.3f ms', self, self.read[0] / 1000000)
67+
if self.read.has_any_entry:
68+
logger.info('download stats | %s | TTFB: %.3f ms', self, self.read.latest_ms)
3269
logger.info(
3370
'download stats | %s | read() without TTFB: %.3f ms', self,
34-
sum(self.read[1:]) / 1000000
71+
(self.read.sum_of_all_entries - self.read.latest_entry) / self.read.TO_MS
3572
)
36-
if self.other:
73+
if self.other.has_any_entry:
3774
logger.info(
38-
'download stats | %s | %s total: %.3f ms', self, self.other_name,
39-
sum(self.other) / 1000000
75+
'download stats | %s | %s total: %.3f ms', self, self.other_name, self.other.sum_ms
4076
)
41-
if self.write:
77+
if self.write.has_any_entry:
78+
logger.info('download stats | %s | write() total: %.3f ms', self, self.write.sum_ms)
79+
if self.total.has_any_entry:
80+
basic_operation_time = self.write.sum_of_all_entries \
81+
+ self.other.sum_of_all_entries \
82+
+ self.read.sum_of_all_entries
83+
overhead = self.total.sum_of_all_entries - basic_operation_time
4284
logger.info(
43-
'download stats | %s | write() total: %.3f ms', self,
44-
sum(self.write) / 1000000
85+
'download stats | %s | overhead: %.3f ms', self, overhead / self.total.TO_MS
4586
)
46-
if self.total is not None:
47-
overhead = self.total - sum(self.write) - sum(self.other) - sum(self.read)
48-
logger.info('download stats | %s | overhead: %.3f ms', self, overhead / 1000000)
4987

5088
def __str__(self):
5189
return f'{self.name}[{self.detail}]'

0 commit comments

Comments
 (0)