Skip to content

Commit 9d59b35

Browse files
bisgaard-itissandereggGitHK
authored
✨ API server endpoints for performing multipart upload directly to S3 (#4596)
Co-authored-by: Sylvain <[email protected]> Co-authored-by: Andrei Neagu <[email protected]>
1 parent 3a42b86 commit 9d59b35

File tree

11 files changed

+587
-130
lines changed

11 files changed

+587
-130
lines changed

packages/pytest-simcore/src/pytest_simcore/services_api_mocks_for_aiohttp_clients.py

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,18 @@
77
import re
88
from pathlib import Path
99
from typing import Any
10+
from urllib.parse import urlparse, urlunparse
1011

1112
import pytest
1213
from aiohttp import web
1314
from aioresponses import aioresponses as AioResponsesMock
1415
from aioresponses.core import CallbackResult
16+
from faker import Faker
1517
from models_library.api_schemas_storage import (
1618
FileMetaDataGet,
19+
FileUploadCompleteFutureResponse,
20+
FileUploadCompleteResponse,
21+
FileUploadCompleteState,
1722
FileUploadLinks,
1823
FileUploadSchema,
1924
LinkType,
@@ -31,7 +36,6 @@
3136
"pytest_simcore.aioresponses_mocker",
3237
]
3338

34-
3539
# The adjacency list is defined as a dictionary with the key to the node and its list of successors
3640
FULL_PROJECT_PIPELINE_ADJACENCY: dict[str, list[str]] = {
3741
"62bca361-8594-48c8-875e-b8577e868aec": [
@@ -391,7 +395,7 @@ def list_file_meta_data_cb(url: URL, **kwargs) -> CallbackResult:
391395

392396
@pytest.fixture
393397
async def storage_v0_service_mock(
394-
aioresponses_mocker: AioResponsesMock,
398+
aioresponses_mocker: AioResponsesMock, faker: Faker
395399
) -> AioResponsesMock:
396400
"""mocks responses of storage API"""
397401

@@ -413,6 +417,18 @@ async def storage_v0_service_mock(
413417
r"^http://[a-z\-_]*storage:[0-9]+/v0/locations/[0-9]+/files/metadata.+$"
414418
)
415419

420+
storage_complete_link = re.compile(
421+
r"^http://[a-z\-_]*storage:[0-9]+/v0/locations/[0-9]+/files/.+complete"
422+
)
423+
424+
storage_complete_link_futures = re.compile(
425+
r"^http://[a-z\-_]*storage:[0-9]+/v0/locations/[0-9]+/files/.+complete/futures/.+"
426+
)
427+
428+
storage_abort_link = re.compile(
429+
r"^http://[a-z\-_]*storage:[0-9]+/v0/locations/[0-9]+/files/.+abort"
430+
)
431+
416432
aioresponses_mocker.get(
417433
get_file_metadata_pattern,
418434
status=web.HTTPOk.status_code,
@@ -441,4 +457,46 @@ async def storage_v0_service_mock(
441457
repeat=True,
442458
)
443459

460+
def generate_future_link(url, **kwargs):
461+
462+
parsed_url = urlparse(str(url))
463+
stripped_url = urlunparse(
464+
(parsed_url.scheme, parsed_url.netloc, parsed_url.path, "", "", "")
465+
)
466+
467+
payload: FileUploadCompleteResponse = parse_obj_as(
468+
FileUploadCompleteResponse,
469+
{
470+
"links": {
471+
"state": stripped_url + ":complete/futures/" + str(faker.uuid4())
472+
},
473+
},
474+
)
475+
return CallbackResult(
476+
status=web.HTTPOk.status_code,
477+
payload=jsonable_encoder(
478+
Envelope[FileUploadCompleteResponse](data=payload)
479+
),
480+
)
481+
482+
aioresponses_mocker.post(storage_complete_link, callback=generate_future_link)
483+
484+
aioresponses_mocker.post(
485+
storage_complete_link_futures,
486+
status=web.HTTPOk.status_code,
487+
payload=jsonable_encoder(
488+
Envelope[FileUploadCompleteFutureResponse](
489+
data=FileUploadCompleteFutureResponse(
490+
state=FileUploadCompleteState.OK,
491+
e_tag="07d1c1a4-b073-4be7-b022-f405d90e99aa",
492+
)
493+
)
494+
),
495+
)
496+
497+
aioresponses_mocker.post(
498+
storage_abort_link,
499+
status=web.HTTPOk.status_code,
500+
)
501+
444502
return aioresponses_mocker
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
import logging
2+
3+
from aiohttp import ClientError, ClientSession
4+
from models_library.api_schemas_storage import (
5+
ETag,
6+
FileUploadCompleteFutureResponse,
7+
FileUploadCompleteResponse,
8+
FileUploadCompleteState,
9+
FileUploadCompletionBody,
10+
LocationID,
11+
LocationName,
12+
UploadedPart,
13+
)
14+
from models_library.generics import Envelope
15+
from models_library.users import UserID
16+
from models_library.utils.fastapi_encoders import jsonable_encoder
17+
from pydantic import AnyUrl, parse_obj_as
18+
from tenacity._asyncio import AsyncRetrying
19+
from tenacity.before_sleep import before_sleep_log
20+
from tenacity.retry import retry_if_exception_type
21+
from tenacity.stop import stop_after_delay
22+
from tenacity.wait import wait_fixed
23+
24+
from . import exceptions, storage_client
25+
from .settings import NodePortsSettings
26+
27+
_logger = logging.getLogger(__name__)
28+
29+
30+
async def _get_location_id_from_location_name(
31+
user_id: UserID,
32+
store: LocationName,
33+
session: ClientSession,
34+
) -> LocationID:
35+
resp = await storage_client.get_storage_locations(session=session, user_id=user_id)
36+
for location in resp:
37+
if location.name == store:
38+
return location.id
39+
# location id not found
40+
raise exceptions.S3InvalidStore(store)
41+
42+
43+
async def _complete_upload(
44+
session: ClientSession,
45+
upload_completion_link: AnyUrl,
46+
parts: list[UploadedPart],
47+
*,
48+
is_directory: bool,
49+
) -> ETag | None:
50+
"""completes a potentially multipart upload in AWS
51+
NOTE: it can take several minutes to finish, see [AWS documentation](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html)
52+
it can take several minutes
53+
:raises ValueError: _description_
54+
:raises exceptions.S3TransferError: _description_
55+
:rtype: ETag
56+
"""
57+
async with session.post(
58+
upload_completion_link,
59+
json=jsonable_encoder(FileUploadCompletionBody(parts=parts)),
60+
) as resp:
61+
resp.raise_for_status()
62+
# now poll for state
63+
file_upload_complete_response = parse_obj_as(
64+
Envelope[FileUploadCompleteResponse], await resp.json()
65+
)
66+
assert file_upload_complete_response.data # nosec
67+
state_url = file_upload_complete_response.data.links.state
68+
_logger.info(
69+
"completed upload of %s",
70+
f"{len(parts)} parts, received {file_upload_complete_response.json(indent=2)}",
71+
)
72+
73+
async for attempt in AsyncRetrying(
74+
reraise=True,
75+
wait=wait_fixed(1),
76+
stop=stop_after_delay(
77+
NodePortsSettings.create_from_envs().NODE_PORTS_MULTIPART_UPLOAD_COMPLETION_TIMEOUT_S
78+
),
79+
retry=retry_if_exception_type(ValueError),
80+
before_sleep=before_sleep_log(_logger, logging.DEBUG),
81+
):
82+
with attempt:
83+
async with session.post(state_url) as resp:
84+
resp.raise_for_status()
85+
future_enveloped = parse_obj_as(
86+
Envelope[FileUploadCompleteFutureResponse], await resp.json()
87+
)
88+
assert future_enveloped.data # nosec
89+
if future_enveloped.data.state == FileUploadCompleteState.NOK:
90+
msg = "upload not ready yet"
91+
raise ValueError(msg)
92+
if is_directory:
93+
assert future_enveloped.data.e_tag is None # nosec
94+
return None
95+
96+
assert future_enveloped.data.e_tag # nosec
97+
_logger.debug(
98+
"multipart upload completed in %s, received %s",
99+
attempt.retry_state.retry_object.statistics,
100+
f"{future_enveloped.data.e_tag=}",
101+
)
102+
return future_enveloped.data.e_tag
103+
msg = f"Could not complete the upload using the upload_completion_link={upload_completion_link!r}"
104+
raise exceptions.S3TransferError(msg)
105+
106+
107+
async def _resolve_location_id(
108+
client_session: ClientSession,
109+
user_id: UserID,
110+
store_name: LocationName | None,
111+
store_id: LocationID | None,
112+
) -> LocationID:
113+
if store_name is None and store_id is None:
114+
msg = f"both {store_name=} and {store_id=} are None"
115+
raise exceptions.NodeportsException(msg)
116+
117+
if store_name is not None:
118+
store_id = await _get_location_id_from_location_name(
119+
user_id, store_name, client_session
120+
)
121+
assert store_id is not None # nosec
122+
return store_id
123+
124+
125+
async def _abort_upload(
126+
session: ClientSession, abort_upload_link: AnyUrl, *, reraise_exceptions: bool
127+
) -> None:
128+
# abort the upload correctly, so it can revert back to last version
129+
try:
130+
async with session.post(abort_upload_link) as resp:
131+
resp.raise_for_status()
132+
except ClientError:
133+
_logger.warning("Error while aborting upload", exc_info=True)
134+
if reraise_exceptions:
135+
raise
136+
_logger.warning("Upload aborted")

0 commit comments

Comments
 (0)