Skip to content

Commit ad1641a

Browse files
authored
🐛Storage: Copying returns wrong size (#6272)
1 parent d04f303 commit ad1641a

File tree

5 files changed

+112
-42
lines changed

5 files changed

+112
-42
lines changed

packages/aws-library/src/aws_library/s3/__init__.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
from ._client import SimcoreS3API
1+
from ._client import (
2+
CopiedBytesTransferredCallback,
3+
SimcoreS3API,
4+
UploadedBytesTransferredCallback,
5+
)
26
from ._constants import PRESIGNED_LINK_MAX_SIZE, S3_MAX_FILE_SIZE
37
from ._errors import (
48
S3AccessError,
@@ -18,20 +22,22 @@
1822
)
1923

2024
__all__: tuple[str, ...] = (
21-
"SimcoreS3API",
25+
"CopiedBytesTransferredCallback",
26+
"MultiPartUploadLinks",
2227
"PRESIGNED_LINK_MAX_SIZE",
2328
"S3_MAX_FILE_SIZE",
2429
"S3AccessError",
2530
"S3BucketInvalidError",
2631
"S3DestinationNotEmptyError",
32+
"S3DirectoryMetaData",
2733
"S3KeyNotFoundError",
34+
"S3MetaData",
2835
"S3NotConnectedError",
36+
"S3ObjectKey",
2937
"S3RuntimeError",
3038
"S3UploadNotFoundError",
31-
"S3DirectoryMetaData",
32-
"S3MetaData",
33-
"S3ObjectKey",
34-
"MultiPartUploadLinks",
39+
"SimcoreS3API",
40+
"UploadedBytesTransferredCallback",
3541
"UploadID",
3642
)
3743

packages/aws-library/src/aws_library/s3/_client.py

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
import asyncio
22
import contextlib
3+
import functools
34
import logging
45
import urllib.parse
5-
from collections.abc import AsyncGenerator, Callable, Sequence
6+
from collections.abc import AsyncGenerator, Sequence
67
from dataclasses import dataclass, field
78
from pathlib import Path
8-
from typing import Any, Final, cast
9+
from typing import Any, Final, Protocol, cast
910

1011
import aioboto3
1112
from aiobotocore.session import ClientCreatorContext
@@ -43,6 +44,16 @@
4344
_AWS_MAX_ITEMS_PER_PAGE: Final[int] = 1000
4445

4546

47+
class UploadedBytesTransferredCallback(Protocol):
48+
def __call__(self, bytes_transferred: int, *, file_name: str) -> None:
49+
...
50+
51+
52+
class CopiedBytesTransferredCallback(Protocol):
53+
def __call__(self, total_bytes_copied: int, *, file_name: str) -> None:
54+
...
55+
56+
4657
@dataclass(frozen=True)
4758
class SimcoreS3API: # pylint: disable=too-many-public-methods
4859
_client: S3Client
@@ -372,7 +383,7 @@ async def upload_file(
372383
bucket: S3BucketName,
373384
file: Path,
374385
object_key: S3ObjectKey,
375-
bytes_transfered_cb: Callable[[int], None] | None,
386+
bytes_transfered_cb: UploadedBytesTransferredCallback | None,
376387
) -> None:
377388
"""upload a file using aioboto3 transfer manager (e.g. works >5Gb and creates multiple threads)"""
378389
upload_options: dict[str, Any] = {
@@ -381,7 +392,11 @@ async def upload_file(
381392
"Config": TransferConfig(max_concurrency=self.transfer_max_concurrency),
382393
}
383394
if bytes_transfered_cb:
384-
upload_options |= {"Callback": bytes_transfered_cb}
395+
upload_options |= {
396+
"Callback": functools.partial(
397+
bytes_transfered_cb, file_name=f"{object_key}"
398+
)
399+
}
385400
await self._client.upload_file(f"{file}", **upload_options)
386401

387402
@s3_exception_handler(_logger)
@@ -391,7 +406,7 @@ async def copy_object(
391406
bucket: S3BucketName,
392407
src_object_key: S3ObjectKey,
393408
dst_object_key: S3ObjectKey,
394-
bytes_transfered_cb: Callable[[int], None] | None,
409+
bytes_transfered_cb: CopiedBytesTransferredCallback | None,
395410
) -> None:
396411
"""copy a file in S3 using aioboto3 transfer manager (e.g. works >5Gb and creates multiple threads)"""
397412
copy_options: dict[str, Any] = {
@@ -404,7 +419,11 @@ async def copy_object(
404419
),
405420
}
406421
if bytes_transfered_cb:
407-
copy_options |= {"Callback": bytes_transfered_cb}
422+
copy_options |= {
423+
"Callback": functools.partial(
424+
bytes_transfered_cb, file_name=f"{dst_object_key}"
425+
)
426+
}
408427
await self._client.copy(**copy_options)
409428

410429
@s3_exception_handler(_logger)
@@ -414,7 +433,7 @@ async def copy_objects_recursively(
414433
bucket: S3BucketName,
415434
src_prefix: str,
416435
dst_prefix: str,
417-
bytes_transfered_cb: Callable[[int], None] | None,
436+
bytes_transfered_cb: CopiedBytesTransferredCallback | None,
418437
) -> None:
419438
"""copy from 1 location in S3 to another recreating the same structure"""
420439
dst_metadata = await self.get_directory_metadata(

packages/aws-library/tests/test_s3_client.py

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -257,17 +257,34 @@ async def _uploader(
257257

258258

259259
@dataclass
260-
class _ProgressCallback:
260+
class _UploadProgressCallback:
261261
file_size: int
262262
action: str
263263
logger: logging.Logger
264264
_total_bytes_transfered: int = 0
265265

266-
def __call__(self, bytes_transferred: int) -> None:
266+
def __call__(self, bytes_transferred: int, *, file_name: str) -> None:
267267
self._total_bytes_transfered += bytes_transferred
268-
self.logger.debug(
268+
assert self._total_bytes_transfered <= self.file_size
269+
self.logger.info(
269270
"progress: %s",
270-
f"{self.action} {self._total_bytes_transfered} / {self.file_size} bytes",
271+
f"{self.action} {file_name=} {self._total_bytes_transfered} / {self.file_size} bytes",
272+
)
273+
274+
275+
@dataclass
276+
class _CopyProgressCallback:
277+
file_size: int
278+
action: str
279+
logger: logging.Logger
280+
_total_bytes_transfered: int = 0
281+
282+
def __call__(self, total_bytes_copied: int, *, file_name: str) -> None:
283+
self._total_bytes_transfered = total_bytes_copied
284+
assert self._total_bytes_transfered <= self.file_size
285+
self.logger.info(
286+
"progress: %s",
287+
f"{self.action} {file_name=} {self._total_bytes_transfered} / {self.file_size} bytes",
271288
)
272289

273290

@@ -287,7 +304,7 @@ async def _uploader(file: Path, base_path: Path | None = None) -> UploadedFile:
287304
with log_context(
288305
logging.INFO, msg=f"uploading {file} to {with_s3_bucket}/{object_key}"
289306
) as ctx:
290-
progress_cb = _ProgressCallback(
307+
progress_cb = _UploadProgressCallback(
291308
file_size=file.stat().st_size, action="uploaded", logger=ctx.logger
292309
)
293310
response = await simcore_s3_api.upload_file(
@@ -365,7 +382,7 @@ async def _copier(src_key: S3ObjectKey, dst_key: S3ObjectKey) -> S3ObjectKey:
365382
bucket=with_s3_bucket, object_key=src_key
366383
)
367384
with log_context(logging.INFO, msg=f"copying {src_key} to {dst_key}") as ctx:
368-
progress_cb = _ProgressCallback(
385+
progress_cb = _CopyProgressCallback(
369386
file_size=file_metadata.size, action="copied", logger=ctx.logger
370387
)
371388
await simcore_s3_api.copy_object(
@@ -397,7 +414,7 @@ async def _copier(src_prefix: str, dst_prefix: str) -> str:
397414
logging.INFO,
398415
msg=f"copying {src_prefix} [{ByteSize(src_directory_metadata.size).human_readable()}] to {dst_prefix}",
399416
) as ctx:
400-
progress_cb = _ProgressCallback(
417+
progress_cb = _CopyProgressCallback(
401418
file_size=src_directory_metadata.size,
402419
action="copied",
403420
logger=ctx.logger,

services/storage/src/simcore_service_storage/s3_utils.py

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import logging
2-
from dataclasses import dataclass
2+
from collections import defaultdict
3+
from dataclasses import dataclass, field
34

45
from pydantic import ByteSize, parse_obj_as
56
from servicelib.aiohttp.long_running_tasks.server import (
@@ -27,23 +28,44 @@ class S3TransferDataCB:
2728
total_bytes_to_transfer: ByteSize
2829
task_progress_message_prefix: str = ""
2930
_total_bytes_copied: int = 0
31+
_file_total_bytes_copied: dict[str, int] = field(
32+
default_factory=lambda: defaultdict(int)
33+
)
3034

31-
def __post_init__(self):
32-
self.copy_transfer_cb(0)
35+
def __post_init__(self) -> None:
36+
self._update()
3337

34-
def finalize_transfer(self):
35-
self.copy_transfer_cb(self.total_bytes_to_transfer - self._total_bytes_copied)
38+
def _update(self) -> None:
39+
update_task_progress(
40+
self.task_progress,
41+
f"{self.task_progress_message_prefix} - "
42+
f"{parse_obj_as(ByteSize,self._total_bytes_copied).human_readable()}"
43+
f"/{self.total_bytes_to_transfer.human_readable()}]",
44+
ProgressPercent(
45+
min(self._total_bytes_copied, self.total_bytes_to_transfer)
46+
/ (self.total_bytes_to_transfer or 1)
47+
),
48+
)
3649

37-
def copy_transfer_cb(self, copied_bytes: int):
38-
self._total_bytes_copied += copied_bytes
50+
def finalize_transfer(self) -> None:
51+
self._total_bytes_copied = (
52+
self.total_bytes_to_transfer - self._total_bytes_copied
53+
)
54+
self._update()
55+
56+
def copy_transfer_cb(self, total_bytes_copied: int, *, file_name: str) -> None:
57+
logger.debug(
58+
"Copied %s of %s",
59+
parse_obj_as(ByteSize, total_bytes_copied).human_readable(),
60+
file_name,
61+
)
62+
self._file_total_bytes_copied[file_name] = total_bytes_copied
63+
self._total_bytes_copied = sum(self._file_total_bytes_copied.values())
64+
if self.total_bytes_to_transfer != 0:
65+
self._update()
66+
67+
def upload_transfer_cb(self, bytes_transferred: int, *, file_name: str) -> None:
68+
self._file_total_bytes_copied[file_name] += bytes_transferred
69+
self._total_bytes_copied = sum(self._file_total_bytes_copied.values())
3970
if self.total_bytes_to_transfer != 0:
40-
update_task_progress(
41-
self.task_progress,
42-
f"{self.task_progress_message_prefix} - "
43-
f"{parse_obj_as(ByteSize,self._total_bytes_copied).human_readable()}"
44-
f"/{self.total_bytes_to_transfer.human_readable()}]",
45-
ProgressPercent(
46-
max(self._total_bytes_copied, self.total_bytes_to_transfer)
47-
/ self.total_bytes_to_transfer
48-
),
49-
)
71+
self._update()

services/storage/src/simcore_service_storage/simcore_s3_dsm.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import logging
44
import tempfile
55
import urllib.parse
6-
from collections.abc import Callable, Coroutine
6+
from collections.abc import Coroutine
77
from contextlib import suppress
88
from dataclasses import dataclass
99
from pathlib import Path
@@ -13,7 +13,13 @@
1313
from aiohttp import web
1414
from aiopg.sa import Engine
1515
from aiopg.sa.connection import SAConnection
16-
from aws_library.s3 import S3DirectoryMetaData, S3KeyNotFoundError, S3MetaData
16+
from aws_library.s3 import (
17+
CopiedBytesTransferredCallback,
18+
S3DirectoryMetaData,
19+
S3KeyNotFoundError,
20+
S3MetaData,
21+
UploadedBytesTransferredCallback,
22+
)
1723
from models_library.api_schemas_storage import LinkType, S3BucketName, UploadedPart
1824
from models_library.basic_types import SHA256Str
1925
from models_library.projects import ProjectID
@@ -668,7 +674,7 @@ async def deep_copy_project_simcore_s3(
668674
dest_project_id=dst_project_uuid,
669675
dest_node_id=NodeID(node_id),
670676
file_storage_link=output,
671-
bytes_transfered_cb=s3_transfered_data_cb.copy_transfer_cb,
677+
bytes_transfered_cb=s3_transfered_data_cb.upload_transfer_cb,
672678
)
673679
for output in node.get("outputs", {}).values()
674680
if isinstance(output, dict)
@@ -953,7 +959,7 @@ async def _copy_file_datcore_s3(
953959
dest_project_id: ProjectID,
954960
dest_node_id: NodeID,
955961
file_storage_link: dict[str, Any],
956-
bytes_transfered_cb: Callable[[int], None],
962+
bytes_transfered_cb: UploadedBytesTransferredCallback,
957963
) -> FileMetaData:
958964
session = get_client_session(self.app)
959965
# 2 steps: Get download link for local copy, then upload to S3
@@ -1004,7 +1010,7 @@ async def _copy_path_s3_s3(
10041010
*,
10051011
src_fmd: FileMetaDataAtDB,
10061012
dst_file_id: SimcoreS3FileID,
1007-
bytes_transfered_cb: Callable[[int], None],
1013+
bytes_transfered_cb: CopiedBytesTransferredCallback,
10081014
) -> FileMetaData:
10091015
with log_context(
10101016
_logger,

0 commit comments

Comments
 (0)