Skip to content

Commit b23d1f1

Browse files
author
Andrei Neagu
committed
renamed to bytes_iter
1 parent 0f11526 commit b23d1f1

File tree

10 files changed

+41
-45
lines changed

10 files changed

+41
-45
lines changed

packages/aws-library/src/aws_library/s3/_client.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@
1515
from botocore.client import Config
1616
from models_library.api_schemas_storage import ETag, S3BucketName, UploadedPart
1717
from models_library.basic_types import SHA256Str
18-
from models_library.data_streams import DataSize, DataStream
18+
from models_library.data_streams import BytesIter, DataSize
1919
from pydantic import AnyUrl, ByteSize, TypeAdapter
2020
from servicelib.data_streams import DEFAULT_READ_CHUNK_SIZE, StreamData
2121
from servicelib.logging_utils import log_catch, log_context
22-
from servicelib.s3_utils import FileLikeDataStreamReader
22+
from servicelib.s3_utils import FileLikeBytesIterReader
2323
from servicelib.utils import limited_gather
2424
from settings_library.s3 import S3Settings
2525
from types_aiobotocore_s3 import S3Client
@@ -492,7 +492,7 @@ async def get_object_stream_data(
492492
)
493493
data_size = DataSize(head_response["ContentLength"])
494494

495-
async def _() -> DataStream:
495+
async def _() -> BytesIter:
496496
# Download the file in chunks
497497
position = 0
498498
while position < data_size:
@@ -515,14 +515,14 @@ async def _() -> DataStream:
515515
return StreamData(data_size, _)
516516

517517
@s3_exception_handler(_logger)
518-
async def upload_object_from_file_stream(
518+
async def upload_object_from_bytes_iter( # TODO: this needs to be based on file interface -> use protocol to expose read
519519
self,
520520
bucket_name: S3BucketName,
521521
object_key: S3ObjectKey,
522-
file_stream: DataStream,
522+
bytes_iter: BytesIter,
523523
) -> None:
524524
"""streams write an object in S3 from an AsyncIterable[bytes]"""
525-
await self._client.upload_fileobj(FileLikeDataStreamReader(file_stream), bucket_name, object_key) # type: ignore[arg-type]
525+
await self._client.upload_fileobj(FileLikeBytesIterReader(bytes_iter), bucket_name, object_key) # type: ignore[arg-type]
526526

527527
@staticmethod
528528
def is_multipart(file_size: ByteSize) -> bool:

packages/aws-library/tests/test_s3_client.py

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,7 @@
6060
)
6161
from pytest_simcore.helpers.typing_env import EnvVarsDict
6262
from servicelib.archiving_utils import unarchive_dir
63-
from servicelib.data_streams import (
64-
ArchiveEntries,
65-
DiskStreamReader,
66-
get_zip_data_stream,
67-
)
63+
from servicelib.data_streams import ArchiveEntries, DiskStreamReader, get_zip_bytes_iter
6864
from servicelib.data_streams._models import DataSize
6965
from servicelib.file_utils import remove_directory
7066
from servicelib.progress_bar import ProgressBarData
@@ -1413,7 +1409,7 @@ async def test_read_object_file_stream(
14131409
with_s3_bucket, with_uploaded_file_on_s3.s3_key, chunk_size=1024
14141410
)
14151411
assert isinstance(stream_data.data_size, DataSize)
1416-
async for chunk in stream_data.with_progress_data_stream(AsyncMock()):
1412+
async for chunk in stream_data.with_progress_bytes_iter(AsyncMock()):
14171413
await f.write(chunk)
14181414

14191415
assert stream_data.data_size == tmp_file_name.stat().st_size
@@ -1433,8 +1429,8 @@ async def test_upload_object_from_file_stream(
14331429
)
14341430
assert isinstance(stream_data.data_size, DataSize)
14351431

1436-
await simcore_s3_api.upload_object_from_file_stream(
1437-
with_s3_bucket, object_key, stream_data.with_progress_data_stream(AsyncMock())
1432+
await simcore_s3_api.upload_object_from_bytes_iter(
1433+
with_s3_bucket, object_key, stream_data.with_progress_bytes_iter(AsyncMock())
14381434
)
14391435

14401436
await simcore_s3_api.delete_object(bucket=with_s3_bucket, object_key=object_key)
@@ -1593,10 +1589,10 @@ async def test_workflow_compress_s3_objects_and_local_files_in_a_single_archive_
15931589
progress_report_cb=mocked_progress_bar_cb,
15941590
description="root_bar",
15951591
) as progress_bar:
1596-
await simcore_s3_api.upload_object_from_file_stream(
1592+
await simcore_s3_api.upload_object_from_bytes_iter(
15971593
with_s3_bucket,
15981594
archive_s3_object_key,
1599-
get_zip_data_stream(
1595+
get_zip_bytes_iter(
16001596
archive_file_entries,
16011597
progress_bar=progress_bar,
16021598
chunk_size=MULTIPART_COPY_THRESHOLD,

packages/models-library/src/models_library/data_streams.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
from pydantic import ByteSize
55

6-
DataStream: TypeAlias = AsyncIterable[bytes]
6+
BytesIter: TypeAlias = AsyncIterable[bytes]
77

8-
DataStreamCallable: TypeAlias = Callable[[], DataStream]
8+
BytesIterCallable: TypeAlias = Callable[[], BytesIter]
99
DataSize: TypeAlias = ByteSize

packages/service-library/src/servicelib/data_streams/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@
22
from ._input import DiskStreamReader
33
from ._models import StreamData
44
from ._output import DiskStreamWriter
5-
from ._stream_zip import ArchiveEntries, ArchiveFileEntry, get_zip_data_stream
5+
from ._stream_zip import ArchiveEntries, ArchiveFileEntry, get_zip_bytes_iter
66

77
__all__: tuple[str, ...] = (
88
"ArchiveEntries",
99
"ArchiveFileEntry",
1010
"DEFAULT_READ_CHUNK_SIZE",
1111
"DiskStreamReader",
1212
"DiskStreamWriter",
13-
"get_zip_data_stream",
13+
"get_zip_bytes_iter",
1414
"StreamData",
1515
)

packages/service-library/src/servicelib/data_streams/_input.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from pathlib import Path
22

33
import aiofiles
4-
from models_library.data_streams import DataSize, DataStream
4+
from models_library.data_streams import BytesIter, DataSize
55

66
from ._constants import DEFAULT_READ_CHUNK_SIZE
77
from ._models import StreamData
@@ -13,7 +13,7 @@ def __init__(self, file_path: Path, *, chunk_size=DEFAULT_READ_CHUNK_SIZE):
1313
self.chunk_size = chunk_size
1414

1515
def get_stream_data(self) -> StreamData:
16-
async def _() -> DataStream:
16+
async def _() -> BytesIter:
1717
async with aiofiles.open(self.file_path, "rb") as f:
1818
while True:
1919
chunk = await f.read(self.chunk_size)
Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
from dataclasses import dataclass
22

3-
from models_library.data_streams import DataSize, DataStream, DataStreamCallable
3+
from models_library.data_streams import BytesIter, BytesIterCallable, DataSize
44

55
from ..progress_bar import ProgressBarData
66

77

88
@dataclass(frozen=True)
99
class StreamData:
1010
data_size: DataSize
11-
data_stream_callable: DataStreamCallable
11+
bytes_iter_callable: BytesIterCallable
1212

13-
async def with_progress_data_stream(
13+
async def with_progress_bytes_iter(
1414
self, progress_bar: ProgressBarData
15-
) -> DataStream:
16-
async for chunk in self.data_stream_callable():
15+
) -> BytesIter:
16+
async for chunk in self.bytes_iter_callable():
1717
await progress_bar.update(len(chunk))
1818
yield chunk

packages/service-library/src/servicelib/data_streams/_output.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,23 @@
11
from pathlib import Path
22

33
import aiofiles
4-
from models_library.data_streams import DataStream
4+
from models_library.data_streams import BytesIter
55

6-
from ..s3_utils import FileLikeDataStreamReader
6+
from ..s3_utils import FileLikeBytesIterReader
77

88

99
class DiskStreamWriter:
1010
def __init__(self, destination_path: Path):
1111
self.destination_path = destination_path
1212

13-
async def write_from_stream(self, stream: DataStream) -> None:
13+
async def write_from_bytes_iter(self, stream: BytesIter) -> None:
1414
async with aiofiles.open(self.destination_path, "wb") as f:
1515
async for chunk in stream:
1616
await f.write(chunk)
1717
await f.flush()
1818

1919
async def write_from_file_like(
20-
self, file_like_reader: FileLikeDataStreamReader
20+
self, file_like_reader: FileLikeBytesIterReader
2121
) -> None:
2222
async with aiofiles.open(self.destination_path, "wb") as f:
2323
while True:

packages/service-library/src/servicelib/data_streams/_stream_zip.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from stat import S_IFREG
44
from typing import TypeAlias
55

6-
from models_library.data_streams import DataSize, DataStream
6+
from models_library.data_streams import BytesIter, DataSize
77
from stream_zip import ZIP_32, AsyncMemberFile, async_stream_zip
88

99
from ..progress_bar import ProgressBarData
@@ -23,16 +23,16 @@ async def _member_files_iter(
2323
datetime.now(UTC),
2424
S_IFREG | 0o600,
2525
ZIP_32,
26-
stream_info.with_progress_data_stream(progress_bar=progress_bar),
26+
stream_info.with_progress_bytes_iter(progress_bar=progress_bar),
2727
)
2828

2929

30-
async def get_zip_data_stream(
30+
async def get_zip_bytes_iter(
3131
archive_files: ArchiveEntries,
3232
*,
3333
progress_bar: ProgressBarData | None = None,
3434
chunk_size: int,
35-
) -> DataStream:
35+
) -> BytesIter:
3636
# NOTE: this is CPU bound task, even though the loop is not blocked,
3737
# the CPU is still used for compressing the content.
3838
if progress_bar is None:

packages/service-library/src/servicelib/s3_utils.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
1-
from models_library.data_streams import DataStream
1+
from models_library.data_streams import BytesIter
22

33

4-
class FileLikeDataStreamReader:
5-
def __init__(self, data_stream: DataStream):
6-
self._data_stream = data_stream
4+
class FileLikeBytesIterReader:
5+
def __init__(self, bytes_iter: BytesIter):
6+
self._bytes_iter = bytes_iter
77
self._buffer = bytearray()
88
self._async_iterator = self._get_iterator()
99

1010
async def _get_iterator(self):
11-
async for chunk in self._data_stream:
11+
async for chunk in self._bytes_iter:
1212
yield chunk
1313

1414
async def read(self, size: int) -> bytes:

packages/service-library/tests/test_data_streams.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@
1919
ArchiveEntries,
2020
DiskStreamReader,
2121
DiskStreamWriter,
22-
get_zip_data_stream,
22+
get_zip_bytes_iter,
2323
)
2424
from servicelib.file_utils import remove_directory
2525
from servicelib.progress_bar import ProgressBarData
26-
from servicelib.s3_utils import FileLikeDataStreamReader
26+
from servicelib.s3_utils import FileLikeBytesIterReader
2727

2828

2929
def _ensure_dir(path: Path) -> Path:
@@ -116,14 +116,14 @@ async def test_get_zip_data_stream(
116116
progress_report_cb=mocked_progress_bar_cb,
117117
description="root_bar",
118118
) as root:
119-
file_stream = get_zip_data_stream(
119+
bytes_iter = get_zip_bytes_iter(
120120
archive_files, progress_bar=root, chunk_size=1024
121121
)
122122

123123
if use_file_like:
124-
await writer.write_from_file_like(FileLikeDataStreamReader(file_stream))
124+
await writer.write_from_file_like(FileLikeBytesIterReader(bytes_iter))
125125
else:
126-
await writer.write_from_stream(file_stream)
126+
await writer.write_from_bytes_iter(bytes_iter)
127127

128128
# 2. extract archive using exiting tools
129129
await unarchive_dir(local_archive_path, local_unpacked_archive)

0 commit comments

Comments
 (0)