Skip to content

Commit d6ca255

Browse files
author
Andrei Neagu
committed
renamed
1 parent b29bff2 commit d6ca255

File tree

7 files changed

+37
-37
lines changed

7 files changed

+37
-37
lines changed

packages/aws-library/src/aws_library/s3/_client.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from models_library.basic_types import SHA256Str
1818
from models_library.bytes_iters import BytesIter, DataSize
1919
from pydantic import AnyUrl, ByteSize, TypeAdapter
20-
from servicelib.bytes_iters import DEFAULT_READ_CHUNK_SIZE, StreamData
20+
from servicelib.bytes_iters import DEFAULT_READ_CHUNK_SIZE, BytesStreamer
2121
from servicelib.logging_utils import log_catch, log_context
2222
from servicelib.s3_utils import FileLikeBytesIterReader
2323
from servicelib.utils import limited_gather
@@ -473,13 +473,13 @@ async def copy_objects_recursively(
473473
limit=_MAX_CONCURRENT_COPY,
474474
)
475475

476-
async def get_object_stream_data(
476+
async def get_bytes_streamer_from_object(
477477
self,
478478
bucket_name: S3BucketName,
479479
object_key: S3ObjectKey,
480480
*,
481481
chunk_size: int = DEFAULT_READ_CHUNK_SIZE,
482-
) -> StreamData:
482+
) -> BytesStreamer:
483483
"""stream read an object from S3 chunk by chunk"""
484484

485485
# NOTE `download_fileobj` cannot be used to implement this because
@@ -512,7 +512,7 @@ async def _() -> BytesIter:
512512

513513
position += chunk_size
514514

515-
return StreamData(data_size, _)
515+
return BytesStreamer(data_size, _)
516516

517517
@s3_exception_handler(_logger)
518518
async def upload_object_from_bytes_iter( # TODO: this needs to be based on file interface -> use protocol to expose read

packages/aws-library/tests/test_s3_client.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1405,14 +1405,14 @@ async def test_read_object_file_stream(
14051405
tmp_file_name: Path,
14061406
):
14071407
async with aiofiles.open(tmp_file_name, "wb") as f:
1408-
stream_data = await simcore_s3_api.get_object_stream_data(
1408+
bytes_streamer = await simcore_s3_api.get_bytes_streamer_from_object(
14091409
with_s3_bucket, with_uploaded_file_on_s3.s3_key, chunk_size=1024
14101410
)
1411-
assert isinstance(stream_data.data_size, DataSize)
1412-
async for chunk in stream_data.with_progress_bytes_iter(AsyncMock()):
1411+
assert isinstance(bytes_streamer.data_size, DataSize)
1412+
async for chunk in bytes_streamer.with_progress_bytes_iter(AsyncMock()):
14131413
await f.write(chunk)
14141414

1415-
assert stream_data.data_size == tmp_file_name.stat().st_size
1415+
assert bytes_streamer.data_size == tmp_file_name.stat().st_size
14161416

14171417
await assert_same_file_content(with_uploaded_file_on_s3.local_path, tmp_file_name)
14181418

@@ -1424,13 +1424,13 @@ async def test_upload_object_from_file_stream(
14241424
with_s3_bucket: S3BucketName,
14251425
):
14261426
object_key = "read_from_s3_write_to_s3"
1427-
stream_data = await simcore_s3_api.get_object_stream_data(
1427+
bytes_streamer = await simcore_s3_api.get_bytes_streamer_from_object(
14281428
with_s3_bucket, with_uploaded_file_on_s3.s3_key
14291429
)
1430-
assert isinstance(stream_data.data_size, DataSize)
1430+
assert isinstance(bytes_streamer.data_size, DataSize)
14311431

14321432
await simcore_s3_api.upload_object_from_bytes_iter(
1433-
with_s3_bucket, object_key, stream_data.with_progress_bytes_iter(AsyncMock())
1433+
with_s3_bucket, object_key, bytes_streamer.with_progress_bytes_iter(AsyncMock())
14341434
)
14351435

14361436
await simcore_s3_api.delete_object(bucket=with_s3_bucket, object_key=object_key)
@@ -1555,32 +1555,32 @@ async def test_workflow_compress_s3_objects_and_local_files_in_a_single_archive_
15551555

15561556
# 1. assemble and upload zip archive
15571557

1558-
archive_file_entries: ArchiveEntries = []
1558+
archive_entries: ArchiveEntries = []
15591559

15601560
local_files = get_files_info_from_path(path_local_files_for_archive)
15611561
for file_name, file_path in local_files.items():
1562-
archive_file_entries.append(
1562+
archive_entries.append(
15631563
(
15641564
file_name,
1565-
DiskStreamReader(file_path).get_stream_data(),
1565+
DiskStreamReader(file_path).get_bytes_streamer(),
15661566
)
15671567
)
15681568

15691569
s3_files = get_files_info_from_path(path_s3_files_for_archive)
15701570

15711571
for s3_object_key in s3_files:
1572-
archive_file_entries.append(
1572+
archive_entries.append(
15731573
(
15741574
s3_object_key,
1575-
await simcore_s3_api.get_object_stream_data(
1575+
await simcore_s3_api.get_bytes_streamer_from_object(
15761576
with_s3_bucket, s3_object_key
15771577
),
15781578
)
15791579
)
15801580

15811581
# shuffle order of files in archive.
15821582
# some will be read from S3 and some from the disk
1583-
random.shuffle(archive_file_entries)
1583+
random.shuffle(archive_entries)
15841584

15851585
started = time.time()
15861586

@@ -1593,7 +1593,7 @@ async def test_workflow_compress_s3_objects_and_local_files_in_a_single_archive_
15931593
with_s3_bucket,
15941594
archive_s3_object_key,
15951595
get_zip_bytes_iter(
1596-
archive_file_entries,
1596+
archive_entries,
15971597
progress_bar=progress_bar,
15981598
chunk_size=MULTIPART_COPY_THRESHOLD,
15991599
),
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
from ._constants import DEFAULT_READ_CHUNK_SIZE
22
from ._input import DiskStreamReader
3-
from ._models import StreamData
3+
from ._models import BytesStreamer
44
from ._output import DiskStreamWriter
55
from ._stream_zip import ArchiveEntries, ArchiveFileEntry, get_zip_bytes_iter
66

77
__all__: tuple[str, ...] = (
88
"ArchiveEntries",
99
"ArchiveFileEntry",
10+
"BytesStreamer",
1011
"DEFAULT_READ_CHUNK_SIZE",
1112
"DiskStreamReader",
1213
"DiskStreamWriter",
1314
"get_zip_bytes_iter",
14-
"StreamData",
1515
)

packages/service-library/src/servicelib/bytes_iters/_input.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,15 @@
44
from models_library.bytes_iters import BytesIter, DataSize
55

66
from ._constants import DEFAULT_READ_CHUNK_SIZE
7-
from ._models import StreamData
7+
from ._models import BytesStreamer
88

99

1010
class DiskStreamReader:
1111
def __init__(self, file_path: Path, *, chunk_size=DEFAULT_READ_CHUNK_SIZE):
1212
self.file_path = file_path
1313
self.chunk_size = chunk_size
1414

15-
def get_stream_data(self) -> StreamData:
15+
def get_bytes_streamer(self) -> BytesStreamer:
1616
async def _() -> BytesIter:
1717
async with aiofiles.open(self.file_path, "rb") as f:
1818
while True:
@@ -22,4 +22,4 @@ async def _() -> BytesIter:
2222

2323
yield chunk
2424

25-
return StreamData(DataSize(self.file_path.stat().st_size), _)
25+
return BytesStreamer(DataSize(self.file_path.stat().st_size), _)

packages/service-library/src/servicelib/bytes_iters/_models.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77

88
@dataclass(frozen=True)
9-
class StreamData:
9+
class BytesStreamer:
1010
data_size: DataSize
1111
bytes_iter_callable: BytesIterCallable
1212

packages/service-library/src/servicelib/bytes_iters/_stream_zip.py

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,28 +7,28 @@
77
from stream_zip import ZIP_32, AsyncMemberFile, async_stream_zip
88

99
from ..progress_bar import ProgressBarData
10-
from ._models import StreamData
10+
from ._models import BytesStreamer
1111

1212
FileNameInArchive: TypeAlias = str
13-
ArchiveFileEntry: TypeAlias = tuple[FileNameInArchive, StreamData]
13+
ArchiveFileEntry: TypeAlias = tuple[FileNameInArchive, BytesStreamer]
1414
ArchiveEntries: TypeAlias = list[ArchiveFileEntry]
1515

1616

1717
async def _member_files_iter(
18-
file_streams: ArchiveEntries, progress_bar: ProgressBarData
18+
archive_entries: ArchiveEntries, progress_bar: ProgressBarData
1919
) -> AsyncIterable[AsyncMemberFile]:
20-
for file_name, stream_info in file_streams:
20+
for file_name, byte_streamer in archive_entries:
2121
yield (
2222
file_name,
2323
datetime.now(UTC),
2424
S_IFREG | 0o600,
2525
ZIP_32,
26-
stream_info.with_progress_bytes_iter(progress_bar=progress_bar),
26+
byte_streamer.with_progress_bytes_iter(progress_bar=progress_bar),
2727
)
2828

2929

3030
async def get_zip_bytes_iter(
31-
archive_files: ArchiveEntries,
31+
archive_entries: ArchiveEntries,
3232
*,
3333
progress_bar: ProgressBarData | None = None,
3434
chunk_size: int,
@@ -39,18 +39,16 @@ async def get_zip_bytes_iter(
3939
progress_bar = ProgressBarData(num_steps=1, description="zip archive stream")
4040

4141
total_stream_lenth = DataSize(
42-
sum(stream_info.data_size for _, stream_info in archive_files)
43-
)
44-
description = (
45-
f"files: count={len(archive_files)}, size={total_stream_lenth.human_readable()}"
42+
sum(bytes_streamer.data_size for _, bytes_streamer in archive_entries)
4643
)
44+
description = f"files: count={len(archive_entries)}, size={total_stream_lenth.human_readable()}"
4745

4846
async with progress_bar.sub_progress(
4947
steps=total_stream_lenth, description=description, progress_unit="Byte"
5048
) as sub_progress:
5149
# NOTE: do not disable compression or the streams will be
5250
# loaded fully in memory before yielding their content
5351
async for chunk in async_stream_zip(
54-
_member_files_iter(archive_files, sub_progress), chunk_size=chunk_size
52+
_member_files_iter(archive_entries, sub_progress), chunk_size=chunk_size
5553
):
5654
yield chunk

packages/service-library/tests/test_bytes_iters.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ def _progress_cb(*args, **kwargs) -> None:
9494

9595

9696
@pytest.mark.parametrize("use_file_like", [True, False])
97-
async def test_get_zip_data_stream(
97+
async def test_get_zip_bytes_iter(
9898
mocked_progress_bar_cb: Mock,
9999
prepare_content: None,
100100
local_files_dir: Path,
@@ -107,7 +107,9 @@ async def test_get_zip_data_stream(
107107
for file in (x for x in local_files_dir.rglob("*") if x.is_file()):
108108
archive_name = get_relative_to(local_files_dir, file)
109109

110-
archive_files.append((archive_name, DiskStreamReader(file).get_stream_data()))
110+
archive_files.append(
111+
(archive_name, DiskStreamReader(file).get_bytes_streamer())
112+
)
111113

112114
writer = DiskStreamWriter(local_archive_path)
113115

0 commit comments

Comments
 (0)