Skip to content
Merged
Show file tree
Hide file tree
Changes from 46 commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
0054d6a
added stream-zip
Feb 7, 2025
ff96d46
added utils for stream zipping
Feb 7, 2025
fe6c34d
rename
Feb 7, 2025
bd1df7c
added minimal progress support
Feb 7, 2025
7b7aae0
rename
Feb 7, 2025
5f72a43
fixed types
Feb 7, 2025
aace76a
refactor
Feb 7, 2025
388b81a
refactor
Feb 7, 2025
357273a
added S3 streaming and integration test
Feb 7, 2025
973423e
refactor
Feb 7, 2025
b2a66f5
Merge remote-tracking branch 'upstream/master' into pr-osparc-stream-…
Feb 7, 2025
c129672
removed debug print
Feb 7, 2025
e32f265
refactor to use size instead of items count as progress
Feb 7, 2025
de490ad
using faster file hash checking
Feb 7, 2025
706934d
Merge remote-tracking branch 'upstream/master' into pr-osparc-stream-…
Feb 7, 2025
c672212
refactor progress on zip
Feb 7, 2025
cee5e9c
remove unused
Feb 7, 2025
6d3eb72
remove unused
Feb 7, 2025
3873976
remove outdated
Feb 7, 2025
706ee4b
reshuffled imports
Feb 7, 2025
a7e0867
fixed more broken imports
Feb 7, 2025
4775e55
reverted delted import
Feb 7, 2025
e0a5407
remove unused error
Feb 7, 2025
23515e3
revert number
Feb 7, 2025
b546f10
fixed broken import
Feb 7, 2025
d26289d
typing
Feb 10, 2025
f2c5923
fixeed tests
Feb 10, 2025
c2eea57
typing and imports
Feb 10, 2025
dc3b63a
fixed broken test
Feb 10, 2025
7eac64d
Merge remote-tracking branch 'upstream/master' into pr-osparc-stream-…
Feb 11, 2025
1fabe65
added missing
Feb 11, 2025
b245e63
rename module
Feb 11, 2025
19b5ef5
added FileLikeFileStreamReader
Feb 11, 2025
a5c9060
Merge remote-tracking branch 'upstream/master' into pr-osparc-stream-…
Feb 11, 2025
6fb389d
repalced with simpler implementation
Feb 11, 2025
c6cc5e0
rename
Feb 11, 2025
e7eee8b
refactor imports
Feb 11, 2025
2045c08
refactor
Feb 11, 2025
1298895
refactor
Feb 11, 2025
d17e450
added readme
Feb 11, 2025
d14d8fd
refacto fixture
Feb 11, 2025
0497f86
extended tests
Feb 11, 2025
6261c65
extended tests
Feb 12, 2025
cd9d443
renaming
Feb 12, 2025
0777442
Merge remote-tracking branch 'upstream/master' into pr-osparc-stream-…
Feb 12, 2025
e042a6c
fixed broken mocks
Feb 12, 2025
eea3fba
Merge remote-tracking branch 'upstream/master' into pr-osparc-stream-…
Feb 13, 2025
ed8280f
rename
Feb 13, 2025
f170791
rename
Feb 13, 2025
17c0ac5
rename module
Feb 13, 2025
c024827
refactor interface
Feb 13, 2025
a844a0c
refactor progress
Feb 13, 2025
a277e12
refactor placement of FileLikeFileStreamReader
Feb 13, 2025
5a1a8e7
rename
Feb 13, 2025
ec87ba0
update
Feb 13, 2025
b835ae0
rename and move around parts
Feb 13, 2025
70afc40
renamed modules
Feb 13, 2025
0addf04
renames
Feb 13, 2025
cb479f3
moved imports to more appropriate places
Feb 13, 2025
881d2a1
Merge remote-tracking branch 'upstream/master' into pr-osparc-stream-…
Feb 13, 2025
e37972c
refactor
Feb 13, 2025
0f11526
renamed
Feb 13, 2025
b23d1f1
renamed to bytes_iter
Feb 13, 2025
b29bff2
renaming paths
Feb 13, 2025
d6ca255
renamed
Feb 13, 2025
a169f73
refactor
Feb 13, 2025
2c2d2db
added missing type
Feb 14, 2025
526ed0a
Merge remote-tracking branch 'upstream/master' into pr-osparc-stream-…
Feb 14, 2025
0ec7e23
rename fixture
Feb 14, 2025
9d48624
Merge remote-tracking branch 'upstream/master' into pr-osparc-stream-…
Feb 14, 2025
3aa766b
Merge remote-tracking branch 'upstream/master' into pr-osparc-stream-…
Feb 17, 2025
9cf8f1b
rename
Feb 17, 2025
d2f928a
removed todo in test
Feb 17, 2025
25372c7
Merge branch 'master' into pr-osparc-stream-zipping-of-s3-content
GitHK Feb 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/aws-library/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ protobuf==5.29.3
# opentelemetry-proto
psutil==6.1.1
# via -r requirements/../../../packages/service-library/requirements/_base.in
pycryptodome==3.21.0
# via stream-zip
pydantic==2.10.6
# via
# -c requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt
Expand Down Expand Up @@ -368,6 +370,8 @@ six==1.17.0
# via python-dateutil
sniffio==1.3.1
# via anyio
stream-zip==0.0.83
# via -r requirements/../../../packages/service-library/requirements/_base.in
tenacity==9.0.0
# via -r requirements/../../../packages/service-library/requirements/_base.in
toolz==1.0.0
Expand Down
57 changes: 57 additions & 0 deletions packages/aws-library/src/aws_library/s3/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
from models_library.basic_types import SHA256Str
from pydantic import AnyUrl, ByteSize, TypeAdapter
from servicelib.logging_utils import log_catch, log_context
from servicelib.progress_bar import ProgressBarData
from servicelib.utils import limited_gather
from servicelib.zip_stream import DEFAULT_READ_CHUNK_SIZE, FileSize, FileStream
from servicelib.zip_stream._file_like import FileLikeFileStreamReader
from servicelib.zip_stream._types import StreamData
from settings_library.s3 import S3Settings
from types_aiobotocore_s3 import S3Client
from types_aiobotocore_s3.literals import BucketLocationConstraintType
Expand Down Expand Up @@ -470,6 +474,59 @@ async def copy_objects_recursively(
limit=_MAX_CONCURRENT_COPY,
)

async def get_object_file_stream(
self,
bucket_name: S3BucketName,
object_key: S3ObjectKey,
*,
chunk_size: int = DEFAULT_READ_CHUNK_SIZE,
) -> StreamData:
"""stream read an object in S3 chunk by chunk"""

# NOTE `download_fileobj` cannot be used to implement this because
# it will buffer the entire file in memory instead of reading it
# chunk by chunk

# below is a quick call
head_response = await self._client.head_object(
Bucket=bucket_name, Key=object_key
)
file_size = FileSize(head_response["ContentLength"])

async def _(progress_bar: ProgressBarData) -> FileStream:
# Download the file in chunks
position = 0
while position < file_size:
# Calculate the range for this chunk
end = min(position + chunk_size - 1, file_size - 1)
range_header = f"bytes={position}-{end}"

# Download the chunk
response = await self._client.get_object(
Bucket=bucket_name, Key=object_key, Range=range_header
)

chunk = await response["Body"].read()

# Yield the chunk for processing

await progress_bar.update(len(chunk))
yield chunk

position += chunk_size

return file_size, _

@s3_exception_handler(_logger)
async def upload_object_from_file_stream(
self,
bucket_name: S3BucketName,
object_key: S3ObjectKey,
file_stream: FileStream,
) -> None:
"""streams write an object in S3 from an AsyncIterable[bytes]"""
await self._client.upload_fileobj(FileLikeFileStreamReader(file_stream), bucket_name, object_key) # type: ignore[arg-type]

@staticmethod
def is_multipart(file_size: ByteSize) -> bool:
return file_size >= MULTIPART_UPLOADS_MIN_TOTAL_SIZE
Expand Down
260 changes: 252 additions & 8 deletions packages/aws-library/tests/test_s3_client.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
# pylint:disable=unused-variable
# pylint:disable=unused-argument
# pylint:disable=contextmanager-generator-missing-cleanup
# pylint:disable=no-name-in-module
# pylint:disable=protected-access
# pylint:disable=redefined-outer-name
# pylint:disable=too-many-arguments
# pylint:disable=protected-access
# pylint:disable=no-name-in-module
# pylint:disable=unused-argument
# pylint:disable=unused-variable


import asyncio
import filecmp
import json
import logging
import random
import time
from collections import defaultdict
from collections.abc import AsyncIterator, Awaitable, Callable
from collections.abc import AsyncIterator, Awaitable, Callable, Iterator
from contextlib import contextmanager
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from unittest.mock import AsyncMock, Mock

import aiofiles
import botocore.exceptions
import pytest
from aiohttp import ClientSession
Expand All @@ -34,6 +40,12 @@
from moto.server import ThreadedMotoServer
from pydantic import AnyUrl, ByteSize, TypeAdapter
from pytest_benchmark.plugin import BenchmarkFixture
from pytest_mock import MockerFixture
from pytest_simcore.helpers.comparing import (
assert_same_contents,
assert_same_file_content,
get_files_info_from_path,
)
from pytest_simcore.helpers.logging_tools import log_context
from pytest_simcore.helpers.parametrizations import (
byte_size_ids,
Expand All @@ -44,7 +56,16 @@
upload_file_to_presigned_link,
)
from pytest_simcore.helpers.typing_env import EnvVarsDict
from servicelib.utils import limited_as_completed
from servicelib.archiving_utils import unarchive_dir
from servicelib.file_utils import remove_directory
from servicelib.progress_bar import ProgressBarData
from servicelib.utils import limited_as_completed, limited_gather
from servicelib.zip_stream import (
ArchiveEntries,
DiskStreamReader,
get_zip_archive_file_stream,
)
from servicelib.zip_stream._types import FileSize
from settings_library.s3 import S3Settings
from types_aiobotocore_s3 import S3Client
from types_aiobotocore_s3.literals import BucketLocationConstraintType
Expand Down Expand Up @@ -346,7 +367,7 @@ def set_log_levels_for_noisy_libraries() -> None:
@pytest.fixture
async def with_uploaded_folder_on_s3(
create_folder_of_size_with_multiple_files: Callable[
[ByteSize, ByteSize, ByteSize], Path
[ByteSize, ByteSize, ByteSize, Path | None], Path
],
upload_file: Callable[[Path, Path], Awaitable[UploadedFile]],
directory_size: ByteSize,
Expand All @@ -355,7 +376,7 @@ async def with_uploaded_folder_on_s3(
) -> list[UploadedFile]:
# create random files of random size and upload to S3
folder = create_folder_of_size_with_multiple_files(
ByteSize(directory_size), ByteSize(min_file_size), ByteSize(max_file_size)
ByteSize(directory_size), ByteSize(min_file_size), ByteSize(max_file_size), None
)
list_uploaded_files = []

Expand Down Expand Up @@ -1375,3 +1396,226 @@ def run_async_test(dst_folder: str) -> None:
)

benchmark.pedantic(run_async_test, setup=dst_folder_setup, rounds=4)


async def test_read_object_file_stream(
mocked_s3_server_envs: EnvVarsDict,
with_uploaded_file_on_s3: UploadedFile,
simcore_s3_api: SimcoreS3API,
with_s3_bucket: S3BucketName,
random_file_path: Path,
):
async with aiofiles.open(random_file_path, "wb") as f:
file_size, file_stream = await simcore_s3_api.get_object_file_stream(
with_s3_bucket, with_uploaded_file_on_s3.s3_key, chunk_size=1024
)
assert isinstance(file_size, FileSize)
async for chunk in file_stream(AsyncMock()):
await f.write(chunk)

assert file_size == random_file_path.stat().st_size

await assert_same_file_content(
with_uploaded_file_on_s3.local_path, random_file_path
)


async def test_upload_object_from_file_stream(
mocked_s3_server_envs: EnvVarsDict,
with_uploaded_file_on_s3: UploadedFile,
simcore_s3_api: SimcoreS3API,
with_s3_bucket: S3BucketName,
):
object_key = "read_from_s3_write_to_s3"
file_size, file_stream = await simcore_s3_api.get_object_file_stream(
with_s3_bucket, with_uploaded_file_on_s3.s3_key
)
assert isinstance(file_size, FileSize)

await simcore_s3_api.upload_object_from_file_stream(
with_s3_bucket, object_key, file_stream(AsyncMock())
)

await simcore_s3_api.delete_object(bucket=with_s3_bucket, object_key=object_key)


@contextmanager
def _folder_with_files(
create_folder_of_size_with_multiple_files: Callable[
[ByteSize, ByteSize, ByteSize, Path | None], Path
],
target_folder: Path,
) -> Iterator[dict[str, Path]]:
target_folder.mkdir(parents=True, exist_ok=True)
folder_path = create_folder_of_size_with_multiple_files(
TypeAdapter(ByteSize).validate_python("10MiB"),
TypeAdapter(ByteSize).validate_python("10KiB"),
TypeAdapter(ByteSize).validate_python("100KiB"),
target_folder,
)

relative_names_to_paths = get_files_info_from_path(folder_path)

yield relative_names_to_paths

for file in relative_names_to_paths.values():
file.unlink()


@pytest.fixture
def path_local_files_for_archive(
tmp_path: Path,
create_folder_of_size_with_multiple_files: Callable[
[ByteSize, ByteSize, ByteSize, Path | None], Path
],
) -> Iterator[Path]:
dir_path = tmp_path / "not_uploaded"
with _folder_with_files(create_folder_of_size_with_multiple_files, dir_path):
yield dir_path


@pytest.fixture
async def path_s3_files_for_archive(
tmp_path: Path,
create_folder_of_size_with_multiple_files: Callable[
[ByteSize, ByteSize, ByteSize, Path | None], Path
],
s3_client: S3Client,
with_s3_bucket: S3BucketName,
) -> AsyncIterator[Path]:
dir_path = tmp_path / "stored_in_s3"
with _folder_with_files(
create_folder_of_size_with_multiple_files, dir_path
) as relative_names_to_paths:
await limited_gather(
*(
s3_client.upload_file(
Filename=f"{file}", Bucket=with_s3_bucket, Key=s3_object_key
)
for s3_object_key, file in relative_names_to_paths.items()
),
limit=10,
)
yield dir_path

await delete_all_object_versions(
s3_client, with_s3_bucket, relative_names_to_paths.keys()
)


@pytest.fixture
def archive_download_path(tmp_path: Path, faker: Faker) -> Iterator[Path]:
path = tmp_path / f"downlaoded_ardhive_{faker.uuid4()}.zip"
yield path
if path.exists():
path.unlink()


@pytest.fixture
async def extracted_archive_path(tmp_path: Path, faker: Faker) -> AsyncIterator[Path]:
path = tmp_path / f"decomrepssed_archive{faker.uuid4()}"
path.mkdir(parents=True, exist_ok=True)
assert path.is_dir()
yield path
await remove_directory(path)
assert not path.is_dir()


@pytest.fixture
async def archive_s3_object_key(
with_s3_bucket: S3BucketName, simcore_s3_api: SimcoreS3API
) -> AsyncIterator[S3ObjectKey]:
s3_object_key = "read_from_s3_write_to_s3"
yield s3_object_key
await simcore_s3_api.delete_object(bucket=with_s3_bucket, object_key=s3_object_key)


@pytest.fixture
def mocked_progress_bar_cb(mocker: MockerFixture) -> Mock:
def _progress_cb(*args, **kwargs) -> None:
print(f"received progress: {args}, {kwargs}")

return mocker.Mock(side_effect=_progress_cb)


async def test_workflow_compress_s3_objects_and_local_files_in_a_single_archive_then_upload_to_s3(
mocked_s3_server_envs: EnvVarsDict,
path_local_files_for_archive: Path,
path_s3_files_for_archive: Path,
archive_download_path: Path,
extracted_archive_path: Path,
simcore_s3_api: SimcoreS3API,
with_s3_bucket: S3BucketName,
s3_client: S3Client,
archive_s3_object_key: S3ObjectKey,
mocked_progress_bar_cb: Mock,
):
# In this test:
# - files are read form disk and S3
# - a zip archive is created on the go
# - the zip archive is streamed to S3 as soon as chunks inside it are created
# Uses no disk and constant memory for the entire opration.

# 1. assemble and upload zip archive

archive_file_entries: ArchiveEntries = []

local_files = get_files_info_from_path(path_local_files_for_archive)
for file_name, file_path in local_files.items():
archive_file_entries.append(
(
file_name,
DiskStreamReader(file_path).get_stream_data(),
)
)

s3_files = get_files_info_from_path(path_s3_files_for_archive)

for s3_object_key in s3_files:
archive_file_entries.append(
(
s3_object_key,
await simcore_s3_api.get_object_file_stream(
with_s3_bucket, s3_object_key
),
)
)

# shuffle order of files in archive.
# some will be read from S3 and some from the disk
random.shuffle(archive_file_entries)

started = time.time()

async with ProgressBarData(
num_steps=1,
progress_report_cb=mocked_progress_bar_cb,
description="root_bar",
) as progress_bar:
await simcore_s3_api.upload_object_from_file_stream(
with_s3_bucket,
archive_s3_object_key,
get_zip_archive_file_stream(
archive_file_entries, progress_bar=progress_bar
),
)

duration = time.time() - started
print(f"Zip created on S3 in {duration:.2f} seconds")

# 2. download zip archive form S3
print(f"downloading {archive_download_path}")
await s3_client.download_file(
with_s3_bucket, archive_s3_object_key, f"{archive_download_path}"
)

# 3. extract archive
await unarchive_dir(archive_download_path, extracted_archive_path)

# 4. compare
print("comparing files")
all_files_in_zip = get_files_info_from_path(path_local_files_for_archive) | s3_files

await assert_same_contents(
all_files_in_zip, get_files_info_from_path(extracted_archive_path)
)
Loading
Loading