Skip to content

Commit de490ad

Browse files
author
Andrei Neagu
committed
using faster file hash checking
1 parent e32f265 commit de490ad

File tree

4 files changed

+43
-40
lines changed

4 files changed

+43
-40
lines changed

packages/aws-library/tests/test_s3_client.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1526,6 +1526,7 @@ def _get_s3_object_keys(files: set[Path]) -> set[S3ObjectKey]:
15261526
@pytest.mark.parametrize(
15271527
"file_size, local_count, remote_count",
15281528
[
1529+
pytest.param(TypeAdapter(ByteSize).validate_python("2Mib"), 10, 10, id="micro"),
15291530
pytest.param(
15301531
TypeAdapter(ByteSize).validate_python("10Mib"), 10, 10, id="small"
15311532
),

packages/pytest-simcore/src/pytest_simcore/helpers/comparing.py

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1+
import asyncio
2+
import hashlib
13
from collections.abc import Iterable
4+
from concurrent.futures import ProcessPoolExecutor
25
from pathlib import Path
36
from typing import TypeAlias
47

58
import aiofiles
69
from servicelib.file_utils import create_sha256_checksum
7-
from servicelib.utils import limited_gather
810

911
_FilesInfo: TypeAlias = dict[str, Path]
1012

@@ -28,13 +30,37 @@ def get_files_info_from_itrable(items: Iterable[Path]) -> _FilesInfo:
2830
return {f.name: f for f in items if f.is_file()}
2931

3032

33+
def _compute_hash(file_path: Path) -> tuple[Path, str]:
34+
with Path.open(file_path, "rb") as file_to_hash:
35+
file_hash = hashlib.md5() # noqa: S324
36+
chunk = file_to_hash.read(8192)
37+
while chunk:
38+
file_hash.update(chunk)
39+
chunk = file_to_hash.read(8192)
40+
41+
return file_path, file_hash.hexdigest()
42+
43+
44+
async def compute_hashes(file_paths: list[Path]) -> dict[Path, str]:
45+
"""given a list of files computes hashes for the files on a process pool"""
46+
47+
loop = asyncio.get_event_loop()
48+
49+
with ProcessPoolExecutor() as prcess_pool_executor:
50+
tasks = [
51+
loop.run_in_executor(prcess_pool_executor, _compute_hash, file_path)
52+
for file_path in file_paths
53+
]
54+
# pylint: disable=unnecessary-comprehension
55+
# see return value of _compute_hash it is a tuple, mapping list[Tuple[Path,str]] to Dict[Path, str] here
56+
return dict(await asyncio.gather(*tasks))
57+
58+
3159
async def assert_same_contents(file_info1: _FilesInfo, file_info2: _FilesInfo) -> None:
3260
assert set(file_info1.keys()) == set(file_info2.keys())
3361

34-
await limited_gather(
35-
*(
36-
assert_same_file_content(file_info1[file_name], file_info2[file_name])
37-
for file_name in file_info1
38-
),
39-
limit=10,
40-
)
62+
hashes_1 = await compute_hashes(list(file_info1.values()))
63+
hashes_2 = await compute_hashes(list(file_info2.values()))
64+
65+
for key in file_info1:
66+
assert hashes_1[file_info1[key]] == hashes_2[file_info2[key]]

packages/service-library/src/servicelib/zip_stream/_zipper.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
from ..progress_bar import ProgressBarData
88
from ._constants import DEFAULT_READ_CHUNK_SIZE
9-
from ._types import ArchiveEntries, FileStream
9+
from ._types import ArchiveEntries, FileSize, FileStream
1010

1111

1212
async def _member_files_iter(
@@ -27,17 +27,20 @@ async def get_zip_archive_stream(
2727
archive_files: ArchiveEntries,
2828
*,
2929
progress_bar: ProgressBarData | None = None,
30-
chunk_size: int = DEFAULT_READ_CHUNK_SIZE
30+
chunk_size: int = DEFAULT_READ_CHUNK_SIZE,
3131
) -> FileStream:
3232
# NOTE: this is CPU bound task, even though the loop is not blocked,
3333
# the CPU is still used for compressing the content.
3434
if progress_bar is None:
3535
progress_bar = ProgressBarData(num_steps=1, description="zip archive stream")
3636

37-
total_stream_lenth = sum(file_size for _, (file_size, _) in archive_files)
37+
total_stream_lenth = FileSize(sum(file_size for _, (file_size, _) in archive_files))
38+
description = (
39+
f"STATS: count={len(archive_files)}, size={total_stream_lenth.human_readable()}"
40+
)
3841

3942
async with progress_bar.sub_progress(
40-
steps=total_stream_lenth, description="streams_reader", progress_unit="Byte"
43+
steps=total_stream_lenth, description=description, progress_unit="Byte"
4144
) as sub_progress:
4245
# NOTE: do not disable compression or the streams will be
4346
# loaded fully in memory before yielding their content

packages/service-library/tests/archiving_utils/test_archiving_utils.py

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,18 @@
44
# pylint:disable=no-name-in-module
55

66
import asyncio
7-
import hashlib
87
import os
98
import secrets
109
import string
1110
import tempfile
1211
from collections.abc import Callable, Iterable
13-
from concurrent.futures import ProcessPoolExecutor
1412
from pathlib import Path
1513

1614
import pytest
1715
from faker import Faker
1816
from pydantic import ByteSize, TypeAdapter
1917
from pytest_benchmark.plugin import BenchmarkFixture
18+
from pytest_simcore.helpers.comparing import compute_hashes
2019
from servicelib.archiving_utils import archive_dir, unarchive_dir
2120

2221

@@ -92,32 +91,6 @@ def get_all_files_in_dir(dir_path: Path) -> set[Path]:
9291
}
9392

9493

95-
def _compute_hash(file_path: Path) -> tuple[Path, str]:
96-
with Path.open(file_path, "rb") as file_to_hash:
97-
file_hash = hashlib.md5() # noqa: S324
98-
chunk = file_to_hash.read(8192)
99-
while chunk:
100-
file_hash.update(chunk)
101-
chunk = file_to_hash.read(8192)
102-
103-
return file_path, file_hash.hexdigest()
104-
105-
106-
async def compute_hashes(file_paths: list[Path]) -> dict[Path, str]:
107-
"""given a list of files computes hashes for the files on a process pool"""
108-
109-
loop = asyncio.get_event_loop()
110-
111-
with ProcessPoolExecutor() as prcess_pool_executor:
112-
tasks = [
113-
loop.run_in_executor(prcess_pool_executor, _compute_hash, file_path)
114-
for file_path in file_paths
115-
]
116-
# pylint: disable=unnecessary-comprehension
117-
# see return value of _compute_hash it is a tuple, mapping list[Tuple[Path,str]] to Dict[Path, str] here
118-
return dict(await asyncio.gather(*tasks))
119-
120-
12194
def full_file_path_from_dir_and_subdirs(dir_path: Path) -> list[Path]:
12295
return [x for x in dir_path.rglob("*") if x.is_file()]
12396

0 commit comments

Comments
 (0)