Skip to content

Commit e32f265

Browse files
author
Andrei Neagu
committed
refactor to use size instead of items count as progress
1 parent c129672 commit e32f265

File tree

8 files changed

+122
-86
lines changed

8 files changed

+122
-86
lines changed

packages/aws-library/src/aws_library/s3/_client.py

Lines changed: 36 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import functools
44
import logging
55
import urllib.parse
6-
from collections.abc import AsyncGenerator, Callable, Sequence
6+
from collections.abc import AsyncGenerator, Sequence
77
from dataclasses import dataclass, field
88
from pathlib import Path
99
from typing import Any, Final, Protocol, cast
@@ -18,7 +18,12 @@
1818
from pydantic import AnyUrl, ByteSize, TypeAdapter
1919
from servicelib.logging_utils import log_catch, log_context
2020
from servicelib.utils import limited_gather
21-
from servicelib.zip_stream import DEFAULT_CHUNK_SIZE, FileStream
21+
from servicelib.zip_stream import (
22+
DEFAULT_READ_CHUNK_SIZE,
23+
FileSize,
24+
FileStream,
25+
FileStreamCallable,
26+
)
2227
from settings_library.s3 import S3Settings
2328
from types_aiobotocore_s3 import S3Client
2429
from types_aiobotocore_s3.literals import BucketLocationConstraintType
@@ -484,36 +489,43 @@ async def get_object_file_stream(
484489
bucket_name: S3BucketName,
485490
object_key: S3ObjectKey,
486491
*,
487-
chunk_size: int = DEFAULT_CHUNK_SIZE,
488-
) -> FileStream:
489-
response = await self._client.head_object(Bucket=bucket_name, Key=object_key)
490-
file_size = response["ContentLength"]
491-
492-
# Download the file in chunks
493-
position = 0
494-
while position < file_size:
495-
# Calculate the range for this chunk
496-
end = min(position + chunk_size - 1, file_size - 1)
497-
range_header = f"bytes={position}-{end}"
498-
499-
# Download the chunk
500-
response = await self._client.get_object(
501-
Bucket=bucket_name, Key=object_key, Range=range_header
502-
)
492+
chunk_size: int = DEFAULT_READ_CHUNK_SIZE,
493+
) -> tuple[FileSize, FileStreamCallable]:
494+
495+
# below is a quick call
496+
head_response = await self._client.head_object(
497+
Bucket=bucket_name, Key=object_key
498+
)
499+
file_size = FileSize(head_response["ContentLength"])
500+
501+
async def _() -> FileStream:
502+
# Download the file in chunks
503+
position = 0
504+
while position < file_size:
505+
# Calculate the range for this chunk
506+
end = min(position + chunk_size - 1, file_size - 1)
507+
range_header = f"bytes={position}-{end}"
508+
509+
# Download the chunk
510+
response = await self._client.get_object(
511+
Bucket=bucket_name, Key=object_key, Range=range_header
512+
)
513+
514+
chunk = await response["Body"].read()
503515

504-
chunk = await response["Body"].read()
516+
# Yield the chunk for processing
517+
yield chunk
505518

506-
# Yield the chunk for processing
507-
yield chunk
519+
position += chunk_size
508520

509-
position += chunk_size
521+
return file_size, _
510522

511523
@s3_exception_handler(_logger)
512524
async def upload_object_from_file_stream(
513525
self,
514526
bucket_name: S3BucketName,
515527
object_key: S3ObjectKey,
516-
file_stream: Callable[[], FileStream],
528+
file_stream: FileStream,
517529
) -> None:
518530
# Create a multipart upload
519531
multipart_response = await self._client.create_multipart_upload(
@@ -525,7 +537,7 @@ async def upload_object_from_file_stream(
525537
parts = []
526538
part_number = 1
527539

528-
async for chunk in file_stream():
540+
async for chunk in file_stream:
529541
part_response = await self._client.upload_part(
530542
Bucket=bucket_name,
531543
Key=object_key,

packages/aws-library/tests/test_s3_client.py

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1502,8 +1502,13 @@ async def extracted_archive_path(tmp_path: Path, faker: Faker) -> AsyncIterator[
15021502
assert not path.is_dir()
15031503

15041504

1505-
def _get_s3_object_keys(files: set[Path]) -> set[S3ObjectKey]:
1506-
return {f.name for f in files}
1505+
@pytest.fixture
1506+
async def archive_s3_object_key(
1507+
with_s3_bucket: S3BucketName, simcore_s3_api: SimcoreS3API
1508+
) -> AsyncIterator[S3ObjectKey]:
1509+
s3_object_key = "read_from_s3_write_to_s3"
1510+
yield s3_object_key
1511+
await simcore_s3_api.delete_object(bucket=with_s3_bucket, object_key=s3_object_key)
15071512

15081513

15091514
@pytest.fixture
@@ -1514,13 +1519,8 @@ def _progress_cb(*args, **kwargs) -> None:
15141519
return mocker.Mock(side_effect=_progress_cb)
15151520

15161521

1517-
@pytest.fixture
1518-
async def archive_s3_object_key(
1519-
with_s3_bucket: S3BucketName, simcore_s3_api: SimcoreS3API
1520-
) -> AsyncIterator[S3ObjectKey]:
1521-
s3_object_key = "read_from_s3_write_to_s3"
1522-
yield s3_object_key
1523-
await simcore_s3_api.delete_object(bucket=with_s3_bucket, object_key=s3_object_key)
1522+
def _get_s3_object_keys(files: set[Path]) -> set[S3ObjectKey]:
1523+
return {f.name for f in files}
15241524

15251525

15261526
@pytest.mark.parametrize(
@@ -1540,8 +1540,8 @@ async def test_workflow_compress_s3_objects_and_local_files_in_a_single_archive_
15401540
simcore_s3_api: SimcoreS3API,
15411541
with_s3_bucket: S3BucketName,
15421542
s3_client: S3Client,
1543-
mocked_progress_bar_cb: Mock,
15441543
archive_s3_object_key: S3ObjectKey,
1544+
mocked_progress_bar_cb: Mock,
15451545
):
15461546
# In this test:
15471547
# - files are read form disk and S3
@@ -1556,15 +1556,18 @@ async def test_workflow_compress_s3_objects_and_local_files_in_a_single_archive_
15561556
archive_file_entries.append(
15571557
(
15581558
file.name,
1559-
DiskStreamReader(file).get_stream,
1559+
DiskStreamReader(file).get_stream_data(),
15601560
)
15611561
)
15621562

1563-
for s3_object_key in _get_s3_object_keys(files_stored_in_s3):
1563+
s3_object_keys = _get_s3_object_keys(files_stored_in_s3)
1564+
assert len(s3_object_keys) == len(files_stored_in_s3)
1565+
1566+
for s3_object_key in s3_object_keys:
15641567
archive_file_entries.append(
15651568
(
15661569
s3_object_key,
1567-
lambda: simcore_s3_api.get_object_file_stream(
1570+
await simcore_s3_api.get_object_file_stream(
15681571
with_s3_bucket, s3_object_key
15691572
),
15701573
)
@@ -1574,23 +1577,24 @@ async def test_workflow_compress_s3_objects_and_local_files_in_a_single_archive_
15741577
# some will be read from S3 and some from the disk
15751578
random.shuffle(archive_file_entries)
15761579

1580+
started = time.time()
1581+
15771582
async with ProgressBarData(
15781583
num_steps=1,
15791584
progress_report_cb=mocked_progress_bar_cb,
15801585
description="root_bar",
1581-
) as root:
1582-
started = time.time()
1586+
) as progress_bar:
15831587
await simcore_s3_api.upload_object_from_file_stream(
15841588
with_s3_bucket,
15851589
archive_s3_object_key,
1586-
lambda: get_zip_archive_stream(
1590+
get_zip_archive_stream(
15871591
archive_file_entries,
1588-
progress_bar=root,
1592+
progress_bar=progress_bar,
15891593
chunk_size=MIN_MULTIPART_UPLOAD_CHUNK_SIZE,
15901594
),
15911595
)
1592-
duration = time.time() - started
1593-
print(f"Zip created on S3 in {duration:.2f} seconds")
1596+
duration = time.time() - started
1597+
print(f"Zip created on S3 in {duration:.2f} seconds")
15941598

15951599
# 2. download zip archive form S3
15961600
print(f"downloading {archive_download_path}")
@@ -1606,7 +1610,7 @@ async def test_workflow_compress_s3_objects_and_local_files_in_a_single_archive_
16061610
all_files_in_zip = get_files_info_from_itrable(
16071611
files_stored_locally
16081612
) | get_files_info_from_itrable(files_stored_in_s3)
1609-
assert len(all_files_in_zip) == 20
1613+
16101614
await assert_same_contents(
16111615
all_files_in_zip, get_files_info_from_path(extracted_archive_path)
16121616
)
Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,23 @@
1-
from ._constants import DEFAULT_CHUNK_SIZE
1+
from ._constants import DEFAULT_READ_CHUNK_SIZE
22
from ._input import DiskStreamReader
33
from ._output import DiskStreamWriter
4-
from ._types import ArchiveEntries, ArchiveFileEntry, FileStream
4+
from ._types import (
5+
ArchiveEntries,
6+
ArchiveFileEntry,
7+
FileSize,
8+
FileStream,
9+
FileStreamCallable,
10+
)
511
from ._zipper import get_zip_archive_stream
612

713
__all__: tuple[str, ...] = (
814
"ArchiveEntries",
915
"ArchiveFileEntry",
10-
"DEFAULT_CHUNK_SIZE",
16+
"DEFAULT_READ_CHUNK_SIZE",
1117
"DiskStreamReader",
1218
"DiskStreamWriter",
19+
"FileSize",
1320
"FileStream",
21+
"FileStreamCallable",
1422
"get_zip_archive_stream",
1523
)

packages/service-library/src/servicelib/zip_stream/_constants.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22

33
from pydantic import ByteSize, TypeAdapter
44

5-
DEFAULT_CHUNK_SIZE: Final[int] = TypeAdapter(ByteSize).validate_python("1MiB")
5+
DEFAULT_READ_CHUNK_SIZE: Final[int] = TypeAdapter(ByteSize).validate_python("1MiB")

packages/service-library/src/servicelib/zip_stream/_input.py

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,22 @@
22

33
import aiofiles
44

5-
from ._constants import DEFAULT_CHUNK_SIZE
6-
from ._types import FileStream
5+
from ._constants import DEFAULT_READ_CHUNK_SIZE
6+
from ._types import FileSize, FileStream, StreamData
77

88

99
class DiskStreamReader:
10-
def __init__(self, file_path: Path, *, chunk_size=DEFAULT_CHUNK_SIZE):
10+
def __init__(self, file_path: Path, *, chunk_size=DEFAULT_READ_CHUNK_SIZE):
1111
self.file_path = file_path
1212
self.chunk_size = chunk_size
1313

14-
async def get_stream(self) -> FileStream:
15-
async with aiofiles.open(self.file_path, "rb") as f:
16-
while True:
17-
chunk = await f.read(self.chunk_size)
18-
if not chunk:
19-
break
20-
yield chunk
14+
def get_stream_data(self) -> StreamData:
15+
async def _() -> FileStream:
16+
async with aiofiles.open(self.file_path, "rb") as f:
17+
while True:
18+
chunk = await f.read(self.chunk_size)
19+
if not chunk:
20+
break
21+
yield chunk
22+
23+
return FileSize(self.file_path.stat().st_size), _
Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,15 @@
11
from collections.abc import AsyncIterable, Callable
22
from typing import TypeAlias
33

4+
from pydantic import ByteSize
5+
46
FileNameInArchive: TypeAlias = str
57
FileStream: TypeAlias = AsyncIterable[bytes]
68

7-
ArchiveFileEntry: TypeAlias = tuple[FileNameInArchive, Callable[[], FileStream]]
9+
FileStreamCallable: TypeAlias = Callable[[], FileStream]
10+
FileSize: TypeAlias = ByteSize
11+
12+
StreamData: TypeAlias = tuple[FileSize, FileStreamCallable]
13+
14+
ArchiveFileEntry: TypeAlias = tuple[FileNameInArchive, StreamData]
815
ArchiveEntries: TypeAlias = list[ArchiveFileEntry]

packages/service-library/src/servicelib/zip_stream/_zipper.py

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,41 +5,43 @@
55
from stream_zip import ZIP_32, AsyncMemberFile, async_stream_zip
66

77
from ..progress_bar import ProgressBarData
8-
from ._constants import DEFAULT_CHUNK_SIZE
8+
from ._constants import DEFAULT_READ_CHUNK_SIZE
99
from ._types import ArchiveEntries, FileStream
1010

1111

12-
async def _iter_files(
13-
file_streams: ArchiveEntries, progress_bar: ProgressBarData
12+
async def _member_files_iter(
13+
file_streams: ArchiveEntries, progress: ProgressBarData
1414
) -> AsyncIterable[AsyncMemberFile]:
15-
async with progress_bar.sub_progress(
16-
steps=len(file_streams), description="..."
17-
) as sub_progress:
18-
for file_name, file_stream_handler in file_streams:
19-
yield (
20-
file_name,
21-
datetime.now(UTC),
22-
S_IFREG | 0o600,
23-
ZIP_32,
24-
file_stream_handler(),
25-
)
26-
await sub_progress.update(1)
15+
for file_name, (stream_size, file_stream_handler) in file_streams:
16+
yield (
17+
file_name,
18+
datetime.now(UTC),
19+
S_IFREG | 0o600,
20+
ZIP_32,
21+
file_stream_handler(),
22+
)
23+
await progress.update(stream_size)
2724

2825

2926
async def get_zip_archive_stream(
3027
archive_files: ArchiveEntries,
3128
*,
3229
progress_bar: ProgressBarData | None = None,
33-
chunk_size: int = DEFAULT_CHUNK_SIZE
30+
chunk_size: int = DEFAULT_READ_CHUNK_SIZE
3431
) -> FileStream:
3532
# NOTE: this is CPU bound task, even though the loop is not blocked,
36-
# the CPU is still used for compressing the content
33+
# the CPU is still used for compressing the content.
3734
if progress_bar is None:
38-
progress_bar = ProgressBarData(num_steps=1, description="stream archiver")
39-
40-
# NOTE: do not disable compression or the streams will be
41-
# loaded fully in memory before yielding their content
42-
async for chunk in async_stream_zip(
43-
_iter_files(archive_files, progress_bar), chunk_size=chunk_size
44-
):
45-
yield chunk
35+
progress_bar = ProgressBarData(num_steps=1, description="zip archive stream")
36+
37+
total_stream_lenth = sum(file_size for _, (file_size, _) in archive_files)
38+
39+
async with progress_bar.sub_progress(
40+
steps=total_stream_lenth, description="streams_reader", progress_unit="Byte"
41+
) as sub_progress:
42+
# NOTE: do not disable compression or the streams will be
43+
# loaded fully in memory before yielding their content
44+
async for chunk in async_stream_zip(
45+
_member_files_iter(archive_files, sub_progress), chunk_size=chunk_size
46+
):
47+
yield chunk

packages/service-library/tests/test_zip_stream.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ async def test_get_zip_archive_stream(
104104
for file in (x for x in local_files_dir.rglob("*") if x.is_file()):
105105
archive_name = get_relative_to(local_files_dir, file)
106106

107-
archive_files.append((archive_name, DiskStreamReader(file).get_stream))
107+
archive_files.append((archive_name, DiskStreamReader(file).get_stream_data()))
108108

109109
writer = DiskStreamWriter(local_archive_path)
110110

@@ -114,7 +114,7 @@ async def test_get_zip_archive_stream(
114114
description="root_bar",
115115
) as root:
116116
await writer.write_stream(
117-
get_zip_archive_stream(archive_files, progress_bar=root)
117+
get_zip_archive_stream(archive_files, progress_bar=root, chunk_size=1024)
118118
)
119119

120120
# 2. extract archive using exiting tools

0 commit comments

Comments
 (0)