Skip to content

Commit 357273a

Browse files
author
Andrei Neagu
committed
added S3 streaming and integration test
1 parent 388b81a commit 357273a

File tree

6 files changed

+341
-14
lines changed

6 files changed

+341
-14
lines changed

packages/aws-library/src/aws_library/s3/__init__.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,11 @@
33
SimcoreS3API,
44
UploadedBytesTransferredCallback,
55
)
6-
from ._constants import PRESIGNED_LINK_MAX_SIZE, S3_MAX_FILE_SIZE
6+
from ._constants import (
7+
MIN_MULTIPART_UPLOAD_CHUNK_SIZE,
8+
PRESIGNED_LINK_MAX_SIZE,
9+
S3_MAX_FILE_SIZE,
10+
)
711
from ._errors import (
812
S3AccessError,
913
S3BucketInvalidError,
@@ -23,6 +27,7 @@
2327

2428
__all__: tuple[str, ...] = (
2529
"CopiedBytesTransferredCallback",
30+
"MIN_MULTIPART_UPLOAD_CHUNK_SIZE",
2631
"MultiPartUploadLinks",
2732
"PRESIGNED_LINK_MAX_SIZE",
2833
"S3_MAX_FILE_SIZE",

packages/aws-library/src/aws_library/s3/_client.py

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import functools
44
import logging
55
import urllib.parse
6-
from collections.abc import AsyncGenerator, Sequence
6+
from collections.abc import AsyncGenerator, Callable, Sequence
77
from dataclasses import dataclass, field
88
from pathlib import Path
99
from typing import Any, Final, Protocol, cast
@@ -18,6 +18,7 @@
1818
from pydantic import AnyUrl, ByteSize, TypeAdapter
1919
from servicelib.logging_utils import log_catch, log_context
2020
from servicelib.utils import limited_gather
21+
from servicelib.zip_stream import DEFAULT_CHUNK_SIZE, FileStream
2122
from settings_library.s3 import S3Settings
2223
from types_aiobotocore_s3 import S3Client
2324
from types_aiobotocore_s3.literals import BucketLocationConstraintType
@@ -57,6 +58,14 @@ def __call__(self, total_bytes_copied: int, *, file_name: str) -> None:
5758
...
5859

5960

61+
class AsyncFileProtocol(Protocol):
62+
async def read(self, chunk_size: int) -> bytes:
63+
...
64+
65+
async def write(self, data: bytes) -> None:
66+
...
67+
68+
6069
@dataclass(frozen=True)
6170
class SimcoreS3API: # pylint: disable=too-many-public-methods
6271
_client: S3Client
@@ -470,6 +479,79 @@ async def copy_objects_recursively(
470479
limit=_MAX_CONCURRENT_COPY,
471480
)
472481

482+
async def get_object_file_stream(
483+
self,
484+
bucket_name: S3BucketName,
485+
object_key: S3ObjectKey,
486+
*,
487+
chunk_size: int = DEFAULT_CHUNK_SIZE,
488+
) -> FileStream:
489+
response = await self._client.head_object(Bucket=bucket_name, Key=object_key)
490+
file_size = response["ContentLength"]
491+
492+
# Download the file in chunks
493+
position = 0
494+
while position < file_size:
495+
# Calculate the range for this chunk
496+
end = min(position + chunk_size - 1, file_size - 1)
497+
range_header = f"bytes={position}-{end}"
498+
499+
# Download the chunk
500+
response = await self._client.get_object(
501+
Bucket=bucket_name, Key=object_key, Range=range_header
502+
)
503+
504+
chunk = await response["Body"].read()
505+
506+
# Yield the chunk for processing
507+
yield chunk
508+
509+
position += chunk_size
510+
511+
@s3_exception_handler(_logger)
512+
async def upload_object_from_file_stream(
513+
self,
514+
bucket_name: S3BucketName,
515+
object_key: S3ObjectKey,
516+
file_stream: Callable[[], FileStream],
517+
) -> None:
518+
# Create a multipart upload
519+
multipart_response = await self._client.create_multipart_upload(
520+
Bucket=bucket_name, Key=object_key
521+
)
522+
upload_id = multipart_response["UploadId"]
523+
524+
try:
525+
parts = []
526+
part_number = 1
527+
528+
async for chunk in file_stream():
529+
print(f"partsizze={len(chunk)}")
530+
531+
part_response = await self._client.upload_part(
532+
Bucket=bucket_name,
533+
Key=object_key,
534+
PartNumber=part_number,
535+
UploadId=upload_id,
536+
Body=chunk,
537+
)
538+
parts.append({"ETag": part_response["ETag"], "PartNumber": part_number})
539+
part_number += 1
540+
541+
# Complete the multipart upload
542+
await self._client.complete_multipart_upload(
543+
Bucket=bucket_name,
544+
Key=object_key,
545+
UploadId=upload_id,
546+
MultipartUpload={"Parts": parts},
547+
)
548+
except Exception:
549+
# Abort the multipart upload if something goes wrong
550+
await self._client.abort_multipart_upload(
551+
Bucket=bucket_name, Key=object_key, UploadId=upload_id
552+
)
553+
raise
554+
473555
@staticmethod
474556
def is_multipart(file_size: ByteSize) -> bool:
475557
return file_size >= MULTIPART_UPLOADS_MIN_TOTAL_SIZE

packages/aws-library/src/aws_library/s3/_constants.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
MULTIPART_UPLOADS_MIN_TOTAL_SIZE: Final[ByteSize] = TypeAdapter(
77
ByteSize
88
).validate_python("100MiB")
9+
MIN_MULTIPART_UPLOAD_CHUNK_SIZE: Final[int] = TypeAdapter(ByteSize).validate_python(
10+
"5MiB"
11+
)
912
MULTIPART_COPY_THRESHOLD: Final[ByteSize] = TypeAdapter(ByteSize).validate_python(
1013
"100MiB"
1114
)

packages/aws-library/src/aws_library/s3/_errors.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,10 @@ class S3UploadNotFoundError(S3AccessError):
2727

2828
class S3DestinationNotEmptyError(S3AccessError):
2929
msg_template: str = "The destination {dst_prefix} is not empty"
30+
31+
32+
class S3MultipartUploadMinChunkSizeError(S3RuntimeError):
33+
msg_template: str = (
34+
"chunk_size='{current_chunk_size}' must be grater of equal "
35+
"to min_chunk_size='{min_chunk_size}'"
36+
)

0 commit comments

Comments
 (0)