Skip to content

Commit 19b5ef5

Browse files
author
Andrei Neagu
committed
added FileLikeFileStreamReader
1 parent b245e63 commit 19b5ef5

File tree

4 files changed

+49
-3
lines changed

4 files changed

+49
-3
lines changed

packages/service-library/src/servicelib/zip_stream/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from ._constants import DEFAULT_READ_CHUNK_SIZE, MIN_MULTIPART_UPLOAD_CHUNK_SIZE
2+
from ._file_like import FileLikeFileStreamReader
23
from ._input import DiskStreamReader
34
from ._output import DiskStreamWriter
45
from ._types import (
@@ -16,6 +17,7 @@
1617
"DEFAULT_READ_CHUNK_SIZE",
1718
"DiskStreamReader",
1819
"DiskStreamWriter",
20+
"FileLikeFileStreamReader",
1921
"FileSize",
2022
"FileStream",
2123
"FileStreamCallable",
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from ._types import FileStream
2+
3+
4+
class FileLikeFileStreamReader:
5+
def __init__(self, file_stream: FileStream):
6+
self.file_stream = file_stream
7+
self._buffer = bytearray()
8+
self._async_iterator = self._get_iterator()
9+
10+
async def _get_iterator(self):
11+
async for chunk in self.file_stream:
12+
yield chunk
13+
14+
async def read(self, size: int) -> bytes:
15+
while len(self._buffer) < size:
16+
try:
17+
chunk = await anext(self._async_iterator)
18+
self._buffer.extend(chunk)
19+
except StopAsyncIteration:
20+
break # End of file
21+
22+
result, self._buffer = self._buffer[:size], self._buffer[size:]
23+
return bytes(result)
Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from pathlib import Path
22

33
import aiofiles
4+
from servicelib.zip_stream._file_like import FileLikeFileStreamReader
45

56
from ._types import FileStream
67

@@ -9,8 +10,20 @@ class DiskStreamWriter:
910
def __init__(self, destination_path: Path):
1011
self.destination_path = destination_path
1112

12-
async def write_stream(self, stream: FileStream) -> None:
13+
async def write_from_stream(self, stream: FileStream) -> None:
1314
async with aiofiles.open(self.destination_path, "wb") as f:
1415
async for chunk in stream:
1516
await f.write(chunk)
1617
await f.flush()
18+
19+
async def write_from_file_like(
20+
self, file_like_reader: FileLikeFileStreamReader
21+
) -> None:
22+
async with aiofiles.open(self.destination_path, "wb") as f:
23+
while True:
24+
chunk = await file_like_reader.read(100)
25+
if not chunk:
26+
break
27+
28+
await f.write(chunk)
29+
await f.flush()

packages/service-library/tests/test_zip_stream.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
ArchiveEntries,
2222
DiskStreamReader,
2323
DiskStreamWriter,
24+
FileLikeFileStreamReader,
2425
get_zip_archive_stream,
2526
)
2627

@@ -92,12 +93,14 @@ def _progress_cb(*args, **kwargs) -> None:
9293
return mocker.Mock(side_effect=_progress_cb)
9394

9495

96+
@pytest.mark.parametrize("use_file_like", [True, False])
9597
async def test_get_zip_archive_stream(
9698
mocked_progress_bar_cb: Mock,
9799
prepare_content: None,
98100
local_files_dir: Path,
99101
local_archive_path: Path,
100102
local_unpacked_archive: Path,
103+
use_file_like: bool,
101104
):
102105
# 1. generate archive form soruces
103106
archive_files: ArchiveEntries = []
@@ -113,10 +116,15 @@ async def test_get_zip_archive_stream(
113116
progress_report_cb=mocked_progress_bar_cb,
114117
description="root_bar",
115118
) as root:
116-
await writer.write_stream(
117-
get_zip_archive_stream(archive_files, progress_bar=root, chunk_size=1024)
119+
file_stream = get_zip_archive_stream(
120+
archive_files, progress_bar=root, chunk_size=1024
118121
)
119122

123+
if use_file_like:
124+
await writer.write_from_file_like(FileLikeFileStreamReader(file_stream))
125+
else:
126+
await writer.write_from_stream(file_stream)
127+
120128
# 2. extract archive using exiting tools
121129
await unarchive_dir(local_archive_path, local_unpacked_archive)
122130

0 commit comments

Comments
 (0)