Skip to content

Commit 87b49ab

Browse files
committed
convert S3 progress to progress bar data
1 parent 1c76f4e commit 87b49ab

File tree

2 files changed

+102
-90
lines changed

2 files changed

+102
-90
lines changed

services/storage/src/simcore_service_storage/simcore_s3_dsm.py

Lines changed: 68 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@
3535
)
3636
from models_library.users import UserID
3737
from pydantic import AnyUrl, ByteSize, NonNegativeInt, TypeAdapter, ValidationError
38-
from servicelib.aiohttp.long_running_tasks.server import TaskProgress
3938
from servicelib.fastapi.client_session import get_client_session
4039
from servicelib.logging_utils import log_context
40+
from servicelib.progress_bar import ProgressBarData
4141
from servicelib.utils import ensure_ends_with, limited_gather
4242
from simcore_postgres_database.utils_repos import transaction_context
4343
from sqlalchemy.ext.asyncio import AsyncEngine
@@ -77,7 +77,7 @@
7777
from .modules.db.projects import ProjectRepository
7878
from .modules.db.tokens import TokenRepository
7979
from .modules.s3 import get_s3_client
80-
from .utils.s3_utils import S3TransferDataCB, update_task_progress
80+
from .utils.s3_utils import S3TransferDataCB
8181
from .utils.simcore_s3_dsm_utils import (
8282
compute_file_id_prefix,
8383
expand_directory,
@@ -751,7 +751,7 @@ async def deep_copy_project_simcore_s3(
751751
src_project: dict[str, Any],
752752
dst_project: dict[str, Any],
753753
node_mapping: dict[NodeID, NodeID],
754-
task_progress: TaskProgress | None = None,
754+
task_progress: ProgressBarData,
755755
) -> None:
756756
src_project_uuid: ProjectID = ProjectID(src_project["uuid"])
757757
dst_project_uuid: ProjectID = ProjectID(dst_project["uuid"])
@@ -761,7 +761,7 @@ async def deep_copy_project_simcore_s3(
761761
msg=f"{src_project_uuid} -> {dst_project_uuid}: "
762762
"Step 1: check access rights (read of src and write of dst)",
763763
):
764-
update_task_progress(task_progress, "Checking study access rights...")
764+
task_progress.description = "Checking study access rights..."
765765

766766
for prj_uuid in [src_project_uuid, dst_project_uuid]:
767767
if not await ProjectRepository.instance(
@@ -789,8 +789,8 @@ async def deep_copy_project_simcore_s3(
789789
msg=f"{src_project_uuid} -> {dst_project_uuid}:"
790790
" Step 2: collect what to copy",
791791
):
792-
update_task_progress(
793-
task_progress, f"Collecting files of '{src_project['name']}'..."
792+
task_progress.description = (
793+
f"Collecting files of '{src_project['name']}'..."
794794
)
795795

796796
src_project_files = await FileMetaDataRepository.instance(
@@ -814,68 +814,70 @@ async def deep_copy_project_simcore_s3(
814814
src_project_total_data_size: ByteSize = TypeAdapter(
815815
ByteSize
816816
).validate_python(sum(n for n, _ in sizes_and_num_files))
817-
with log_context(
818-
_logger,
819-
logging.INFO,
820-
msg=f"{src_project_uuid} -> {dst_project_uuid}:"
821-
" Step 3.1: prepare copy tasks for files referenced from simcore",
822-
):
823-
copy_tasks = []
824-
s3_transfered_data_cb = S3TransferDataCB(
825-
task_progress,
826-
src_project_total_data_size,
827-
task_progress_message_prefix=f"Copying {total_num_of_files} files to '{dst_project['name']}'",
828-
)
829-
for src_fmd in src_project_files:
830-
if not src_fmd.node_id or (src_fmd.location_id != self.location_id):
831-
msg = (
832-
"This is not foreseen, stem from old decisions, and needs to "
833-
f"be implemented if needed. Faulty metadata: {src_fmd=}"
834-
)
835-
raise NotImplementedError(msg)
836-
837-
if new_node_id := node_mapping.get(src_fmd.node_id):
838-
copy_tasks.append(
839-
self._copy_path_s3_s3(
840-
user_id,
841-
src_fmd=src_fmd,
842-
dst_file_id=TypeAdapter(SimcoreS3FileID).validate_python(
843-
f"{dst_project_uuid}/{new_node_id}/{src_fmd.object_name.split('/', maxsplit=2)[-1]}"
844-
),
845-
bytes_transfered_cb=s3_transfered_data_cb.copy_transfer_cb,
817+
818+
async with S3TransferDataCB(
819+
task_progress,
820+
src_project_total_data_size,
821+
task_progress_message_prefix=f"Copying {total_num_of_files} files to '{dst_project['name']}'",
822+
) as s3_transfered_data_cb:
823+
with log_context(
824+
_logger,
825+
logging.INFO,
826+
msg=f"{src_project_uuid} -> {dst_project_uuid}:"
827+
" Step 3.1: prepare copy tasks for files referenced from simcore",
828+
):
829+
copy_tasks = []
830+
for src_fmd in src_project_files:
831+
if not src_fmd.node_id or (src_fmd.location_id != self.location_id):
832+
msg = (
833+
"This is not foreseen, stem from old decisions, and needs to "
834+
f"be implemented if needed. Faulty metadata: {src_fmd=}"
846835
)
847-
)
848-
with log_context(
849-
_logger,
850-
logging.INFO,
851-
msg=f"{src_project_uuid} -> {dst_project_uuid}:"
852-
" Step 3.1: prepare copy tasks for files referenced from DAT-CORE",
853-
):
854-
for node_id, node in dst_project.get("workbench", {}).items():
855-
copy_tasks.extend(
856-
[
857-
self._copy_file_datcore_s3(
858-
user_id=user_id,
859-
source_uuid=output["path"],
860-
dest_project_id=dst_project_uuid,
861-
dest_node_id=NodeID(node_id),
862-
file_storage_link=output,
863-
bytes_transfered_cb=s3_transfered_data_cb.upload_transfer_cb,
836+
raise NotImplementedError(msg)
837+
838+
if new_node_id := node_mapping.get(src_fmd.node_id):
839+
copy_tasks.append(
840+
self._copy_path_s3_s3(
841+
user_id,
842+
src_fmd=src_fmd,
843+
dst_file_id=TypeAdapter(
844+
SimcoreS3FileID
845+
).validate_python(
846+
f"{dst_project_uuid}/{new_node_id}/{src_fmd.object_name.split('/', maxsplit=2)[-1]}"
847+
),
848+
bytes_transfered_cb=s3_transfered_data_cb.copy_transfer_cb,
849+
)
864850
)
865-
for output in node.get("outputs", {}).values()
866-
if isinstance(output, dict)
867-
and (int(output.get("store", self.location_id)) == DATCORE_ID)
868-
]
869-
)
870-
with log_context(
871-
_logger,
872-
logging.INFO,
873-
msg=f"{src_project_uuid} -> {dst_project_uuid}: Step 3.3: effective copying {len(copy_tasks)} files",
874-
):
875-
await limited_gather(*copy_tasks, limit=MAX_CONCURRENT_S3_TASKS)
876-
877-
# ensure the full size is reported
878-
s3_transfered_data_cb.finalize_transfer()
851+
with log_context(
852+
_logger,
853+
logging.INFO,
854+
msg=f"{src_project_uuid} -> {dst_project_uuid}:"
855+
" Step 3.1: prepare copy tasks for files referenced from DAT-CORE",
856+
):
857+
for node_id, node in dst_project.get("workbench", {}).items():
858+
copy_tasks.extend(
859+
[
860+
self._copy_file_datcore_s3(
861+
user_id=user_id,
862+
source_uuid=output["path"],
863+
dest_project_id=dst_project_uuid,
864+
dest_node_id=NodeID(node_id),
865+
file_storage_link=output,
866+
bytes_transfered_cb=s3_transfered_data_cb.upload_transfer_cb,
867+
)
868+
for output in node.get("outputs", {}).values()
869+
if isinstance(output, dict)
870+
and (
871+
int(output.get("store", self.location_id)) == DATCORE_ID
872+
)
873+
]
874+
)
875+
with log_context(
876+
_logger,
877+
logging.INFO,
878+
msg=f"{src_project_uuid} -> {dst_project_uuid}: Step 3.3: effective copying {len(copy_tasks)} files",
879+
):
880+
await limited_gather(*copy_tasks, limit=MAX_CONCURRENT_S3_TASKS)
879881

880882
async def _get_size_and_num_files(
881883
self, fmd: FileMetaDataAtDB

services/storage/src/simcore_service_storage/utils/s3_utils.py

Lines changed: 34 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,61 @@
1+
import asyncio
2+
import datetime
13
import logging
24
from collections import defaultdict
35
from dataclasses import dataclass, field
46

57
from pydantic import ByteSize, TypeAdapter
6-
from servicelib.aiohttp.long_running_tasks.server import (
7-
ProgressMessage,
8-
ProgressPercent,
9-
TaskProgress,
10-
)
8+
from servicelib.async_utils import cancel_wait_task
9+
from servicelib.background_task import create_periodic_task
10+
from servicelib.progress_bar import ProgressBarData
1111

1212
_logger = logging.getLogger(__name__)
1313

1414

15-
def update_task_progress(
16-
task_progress: TaskProgress | None,
17-
message: ProgressMessage | None = None,
18-
progress: ProgressPercent | None = None,
19-
) -> None:
20-
_logger.debug("%s [%s]", message or "", progress or "n/a")
21-
if task_progress:
22-
task_progress.update(message=message, percent=progress)
23-
24-
2515
@dataclass
2616
class S3TransferDataCB:
27-
task_progress: TaskProgress | None
17+
task_progress: ProgressBarData
2818
total_bytes_to_transfer: ByteSize
2919
task_progress_message_prefix: str = ""
3020
_total_bytes_copied: int = 0
3121
_file_total_bytes_copied: dict[str, int] = field(
3222
default_factory=lambda: defaultdict(int)
3323
)
24+
_update_task_event: asyncio.Event = field(default_factory=asyncio.Event)
25+
_async_update_periodic_task: asyncio.Task | None = None
3426

3527
def __post_init__(self) -> None:
28+
self._async_update_periodic_task = create_periodic_task(
29+
self._async_update,
30+
interval=datetime.timedelta(seconds=1),
31+
task_name="s3_transfer_cb_update",
32+
)
3633
self._update()
3734

38-
def _update(self) -> None:
39-
update_task_progress(
40-
self.task_progress,
35+
async def __aenter__(self) -> "S3TransferDataCB":
36+
return self
37+
38+
async def __aexit__(self, exc_type, exc_value, traceback) -> None:
39+
self.finalize_transfer()
40+
await asyncio.sleep(0)
41+
assert self._async_update_periodic_task # nosec
42+
await cancel_wait_task(self._async_update_periodic_task)
43+
44+
async def _async_update(self) -> None:
45+
await self._update_task_event.wait()
46+
self._update_task_event.clear()
47+
self.task_progress.description = (
4148
f"{self.task_progress_message_prefix} - "
42-
f"{self.total_bytes_to_transfer.human_readable()}",
43-
ProgressPercent(
44-
min(self._total_bytes_copied, self.total_bytes_to_transfer)
45-
/ (self.total_bytes_to_transfer or 1)
46-
),
49+
f"{self.total_bytes_to_transfer.human_readable()}"
50+
)
51+
await self.task_progress.update(
52+
min(self._total_bytes_copied, self.total_bytes_to_transfer)
53+
/ (self.total_bytes_to_transfer or 1)
4754
)
4855

56+
def _update(self) -> None:
57+
self._update_task_event.set()
58+
4959
def finalize_transfer(self) -> None:
5060
self._total_bytes_copied = (
5161
self.total_bytes_to_transfer - self._total_bytes_copied

0 commit comments

Comments
 (0)