Skip to content

Commit dc56a2a

Browse files
Mitigate hanging requests from api-server to storage (#7918)
1 parent ce92bee commit dc56a2a

File tree

7 files changed

+319
-138
lines changed

7 files changed

+319
-138
lines changed

services/api-server/src/simcore_service_api_server/api/routes/files.py

Lines changed: 14 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -11,22 +11,18 @@
1111
from fastapi.exceptions import HTTPException
1212
from fastapi_pagination.api import create_page
1313
from models_library.api_schemas_storage.storage_schemas import (
14-
ETag,
1514
FileUploadCompletionBody,
16-
LinkType,
1715
)
1816
from models_library.basic_types import SHA256Str
1917
from models_library.projects_nodes_io import NodeID
20-
from pydantic import AnyUrl, ByteSize, PositiveInt, TypeAdapter, ValidationError
18+
from pydantic import PositiveInt, ValidationError
2119
from servicelib.fastapi.requests_decorators import cancel_on_disconnect
20+
from servicelib.logging_utils import log_context
2221
from simcore_sdk.node_ports_common.constants import SIMCORE_LOCATION
2322
from simcore_sdk.node_ports_common.file_io_utils import UploadableFileObject
2423
from simcore_sdk.node_ports_common.filemanager import (
2524
UploadedFile,
2625
UploadedFolder,
27-
abort_upload,
28-
complete_file_upload,
29-
get_upload_links_from_s3,
3026
)
3127
from simcore_sdk.node_ports_common.filemanager import upload_path as storage_upload_path
3228
from starlette.datastructures import URL
@@ -297,23 +293,21 @@ async def get_upload_links(
297293
client_file: UserFileToProgramJob | UserFile,
298294
user_id: Annotated[PositiveInt, Depends(get_current_user_id)],
299295
webserver_api: Annotated[AuthSession, Depends(get_webserver_session)],
296+
storage_client: Annotated[StorageApi, Depends(get_api_client(StorageApi))],
300297
):
301298
"""Get upload links for uploading a file to storage"""
302299
assert request # nosec
303300
file_meta = await _create_domain_file(
304301
webserver_api=webserver_api, file_id=None, client_file=client_file
305302
)
306-
_, upload_links = await get_upload_links_from_s3(
307-
user_id=user_id,
308-
store_name=None,
309-
store_id=SIMCORE_LOCATION,
310-
s3_object=file_meta.storage_file_id,
311-
client_session=None,
312-
link_type=LinkType.PRESIGNED,
313-
file_size=ByteSize(client_file.filesize),
314-
is_directory=False,
315-
sha256_checksum=file_meta.sha256_checksum,
316-
)
303+
304+
with log_context(
305+
logger=_logger, level=logging.DEBUG, msg=f"Getting upload links for {file_meta}"
306+
):
307+
upload_links = await storage_client.get_file_upload_links(
308+
user_id=user_id, file=file_meta, client_file=client_file
309+
)
310+
317311
completion_url: URL = request.url_for(
318312
"complete_multipart_upload", file_id=file_meta.id
319313
)
@@ -420,12 +414,7 @@ async def abort_multipart_upload(
420414
file = await _create_domain_file(
421415
webserver_api=webserver_api, file_id=file_id, client_file=client_file
422416
)
423-
abort_link: URL = await storage_client.create_abort_upload_link(
424-
file=file, query={"user_id": str(user_id)}
425-
)
426-
await abort_upload(
427-
abort_upload_link=TypeAdapter(AnyUrl).validate_python(str(abort_link))
428-
)
417+
await storage_client.abort_file_upload(user_id=user_id, file=file)
429418

430419

431420
@router.post(
@@ -449,13 +438,8 @@ async def complete_multipart_upload(
449438
file = await _create_domain_file(
450439
webserver_api=webserver_api, file_id=file_id, client_file=client_file
451440
)
452-
complete_link: URL = await storage_client.create_complete_upload_link(
453-
file=file, query={"user_id": str(user_id)}
454-
)
455-
456-
e_tag: ETag | None = await complete_file_upload(
457-
uploaded_parts=uploaded_parts.parts,
458-
upload_completion_link=TypeAdapter(AnyUrl).validate_python(f"{complete_link}"),
441+
e_tag = await storage_client.complete_file_upload(
442+
user_id=user_id, file=file, uploaded_parts=uploaded_parts.parts
459443
)
460444
assert e_tag is not None # nosec
461445

services/api-server/src/simcore_service_api_server/exceptions/backend_errors.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@ def named_fields(cls) -> set[str]:
1717
)
1818

1919

20+
class BackendTimeoutError(BaseBackEndError):
21+
msg_template = "Backend request timed out"
22+
status_code = status.HTTP_504_GATEWAY_TIMEOUT
23+
24+
2025
class InvalidInputError(BaseBackEndError):
2126
msg_template = "Invalid input"
2227
status_code = status.HTTP_422_UNPROCESSABLE_ENTITY

services/api-server/src/simcore_service_api_server/services_http/storage.py

Lines changed: 86 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,57 @@
11
import logging
22
import re
33
import urllib.parse
4+
from datetime import timedelta
45
from functools import partial
56
from mimetypes import guess_type
6-
from typing import Literal
7+
from typing import Final, Literal
78
from uuid import UUID
89

910
from fastapi import FastAPI
1011
from fastapi.encoders import jsonable_encoder
12+
from httpx import QueryParams
1113
from models_library.api_schemas_storage.storage_schemas import (
14+
ETag,
1215
FileMetaDataArray,
1316
)
1417
from models_library.api_schemas_storage.storage_schemas import (
1518
FileMetaDataGet as StorageFileMetaData,
1619
)
1720
from models_library.api_schemas_storage.storage_schemas import (
21+
FileUploadCompleteFutureResponse,
22+
FileUploadCompleteResponse,
23+
FileUploadCompleteState,
24+
FileUploadCompletionBody,
1825
FileUploadSchema,
26+
LinkType,
1927
PresignedLink,
28+
UploadedPart,
2029
)
2130
from models_library.basic_types import SHA256Str
2231
from models_library.generics import Envelope
2332
from models_library.rest_pagination import PageLimitInt, PageOffsetInt
2433
from pydantic import AnyUrl
2534
from settings_library.tracing import TracingSettings
26-
from starlette.datastructures import URL
35+
from simcore_service_api_server.exceptions.backend_errors import BackendTimeoutError
36+
from simcore_service_api_server.models.schemas.files import UserFile
37+
from simcore_service_api_server.models.schemas.jobs import UserFileToProgramJob
38+
from tenacity import (
39+
AsyncRetrying,
40+
TryAgain,
41+
before_sleep_log,
42+
retry_if_exception_type,
43+
stop_after_delay,
44+
wait_fixed,
45+
)
2746

2847
from ..core.settings import StorageSettings
2948
from ..exceptions.service_errors_utils import service_exception_mapper
3049
from ..models.domain.files import File
3150
from ..utils.client_base import BaseServiceClientApi, setup_client_instance
3251

52+
_POLL_TIMEOUT: Final[timedelta] = timedelta(minutes=10)
53+
54+
3355
_logger = logging.getLogger(__name__)
3456

3557
_exception_mapper = partial(service_exception_mapper, service_name="Storage")
@@ -157,41 +179,81 @@ async def delete_file(self, *, user_id: int, quoted_storage_file_id: str) -> Non
157179
response.raise_for_status()
158180

159181
@_exception_mapper(http_status_map={})
160-
async def get_upload_links(
161-
self, *, user_id: int, file_id: UUID, file_name: str
182+
async def get_file_upload_links(
183+
self, *, user_id: int, file: File, client_file: UserFileToProgramJob | UserFile
162184
) -> FileUploadSchema:
163-
object_path = urllib.parse.quote_plus(f"api/{file_id}/{file_name}")
185+
186+
query_params = QueryParams(
187+
user_id=f"{user_id}",
188+
link_type=LinkType.PRESIGNED.value,
189+
file_size=int(client_file.filesize),
190+
is_directory="false",
191+
sha256_checksum=f"{client_file.sha256_checksum}",
192+
)
164193

165194
# complete_upload_file
166195
response = await self.client.put(
167-
f"/locations/{self.SIMCORE_S3_ID}/files/{object_path}",
168-
params={"user_id": user_id, "file_size": 0},
196+
f"/locations/{self.SIMCORE_S3_ID}/files/{file.quoted_storage_file_id}",
197+
params=query_params,
169198
)
170199
response.raise_for_status()
171200

172201
enveloped_data = Envelope[FileUploadSchema].model_validate_json(response.text)
173202
assert enveloped_data.data # nosec
174203
return enveloped_data.data
175204

176-
async def create_complete_upload_link(
177-
self, *, file: File, query: dict[str, str] | None = None
178-
) -> URL:
179-
url = URL(
180-
f"{self.client.base_url}locations/{self.SIMCORE_S3_ID}/files/{file.quoted_storage_file_id}:complete"
205+
@_exception_mapper(http_status_map={})
206+
async def complete_file_upload(
207+
self, *, user_id: int, file: File, uploaded_parts: list[UploadedPart]
208+
) -> ETag:
209+
210+
response = await self.client.post(
211+
f"/locations/{self.SIMCORE_S3_ID}/files/{file.storage_file_id}:complete",
212+
params={"user_id": f"{user_id}"},
213+
json=jsonable_encoder(FileUploadCompletionBody(parts=uploaded_parts)),
181214
)
182-
if query is not None:
183-
url = url.include_query_params(**query)
184-
return url
185-
186-
async def create_abort_upload_link(
187-
self, *, file: File, query: dict[str, str] | None = None
188-
) -> URL:
189-
url = URL(
190-
f"{self.client.base_url}locations/{self.SIMCORE_S3_ID}/files/{file.quoted_storage_file_id}:abort"
215+
response.raise_for_status()
216+
file_upload_complete_response = Envelope[
217+
FileUploadCompleteResponse
218+
].model_validate_json(response.text)
219+
assert file_upload_complete_response.data # nosec
220+
state_url = f"{file_upload_complete_response.data.links.state}"
221+
try:
222+
async for attempt in AsyncRetrying(
223+
reraise=True,
224+
wait=wait_fixed(1),
225+
stop=stop_after_delay(_POLL_TIMEOUT),
226+
retry=retry_if_exception_type(TryAgain),
227+
before_sleep=before_sleep_log(_logger, logging.DEBUG),
228+
):
229+
with attempt:
230+
resp = await self.client.post(state_url)
231+
resp.raise_for_status()
232+
future_enveloped = Envelope[
233+
FileUploadCompleteFutureResponse
234+
].model_validate_json(resp.text)
235+
assert future_enveloped.data # nosec
236+
if future_enveloped.data.state == FileUploadCompleteState.NOK:
237+
raise TryAgain()
238+
239+
assert future_enveloped.data.e_tag # nosec
240+
_logger.debug(
241+
"multipart upload completed in %s, received %s",
242+
attempt.retry_state.retry_object.statistics,
243+
f"{future_enveloped.data.e_tag=}",
244+
)
245+
return future_enveloped.data.e_tag
246+
except TryAgain as exc:
247+
raise BackendTimeoutError() from exc
248+
raise BackendTimeoutError()
249+
250+
@_exception_mapper(http_status_map={})
251+
async def abort_file_upload(self, *, user_id: int, file: File) -> None:
252+
response = await self.client.post(
253+
f"/locations/{self.SIMCORE_S3_ID}/files/{file.quoted_storage_file_id}:abort",
254+
params={"user_id": f"{user_id}"},
191255
)
192-
if query is not None:
193-
url = url.include_query_params(**query)
194-
return url
256+
response.raise_for_status()
195257

196258
@_exception_mapper(http_status_map={})
197259
async def create_soft_link(

services/api-server/tests/unit/conftest.py

Lines changed: 81 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@
55
# pylint: disable=broad-exception-caught
66

77
import json
8+
import re
89
import subprocess
910
from collections.abc import AsyncIterator, Callable, Iterator
1011
from copy import deepcopy
1112
from pathlib import Path
1213
from typing import Any
1314
from unittest.mock import MagicMock
15+
from urllib.parse import urlparse, urlunparse
1416

1517
import aiohttp.test_utils
1618
import httpx
@@ -21,13 +23,19 @@
2123
from faker import Faker
2224
from fastapi import FastAPI, status
2325
from fastapi.encoders import jsonable_encoder
24-
from httpx import ASGITransport
26+
from httpx import ASGITransport, Request, Response
2527
from models_library.api_schemas_long_running_tasks.tasks import (
2628
TaskGet,
2729
TaskProgress,
2830
TaskStatus,
2931
)
30-
from models_library.api_schemas_storage.storage_schemas import HealthCheck
32+
from models_library.api_schemas_storage.storage_schemas import (
33+
FileUploadCompleteFutureResponse,
34+
FileUploadCompleteResponse,
35+
FileUploadCompleteState,
36+
FileUploadSchema,
37+
HealthCheck,
38+
)
3139
from models_library.api_schemas_webserver.projects import ProjectGet
3240
from models_library.app_diagnostics import AppStatusCheck
3341
from models_library.generics import Envelope
@@ -422,6 +430,77 @@ def mocked_storage_rest_api_base(
422430
).model_dump(mode="json"),
423431
)
424432

433+
assert (
434+
openapi["paths"]["/v0/locations/{location_id}/files/{file_id}"]["put"][
435+
"operationId"
436+
]
437+
== "upload_file_v0_locations__location_id__files__file_id__put"
438+
)
439+
respx_mock.put(
440+
re.compile(r"^http://[a-z\-_]*storage:[0-9]+/v0/locations/[0-9]+/files.+$"),
441+
name="upload_file_v0_locations__location_id__files__file_id__put",
442+
).respond(
443+
status.HTTP_200_OK,
444+
json=Envelope[FileUploadSchema](
445+
data=FileUploadSchema.model_json_schema()["examples"][0]
446+
).model_dump(mode="json"),
447+
)
448+
449+
# Add mocks for completion and abort endpoints
450+
def generate_future_link(request: Request, **kwargs):
451+
parsed_url = urlparse(f"{request.url}")
452+
stripped_url = urlunparse(
453+
(parsed_url.scheme, parsed_url.netloc, parsed_url.path, "", "", "")
454+
)
455+
456+
payload = FileUploadCompleteResponse.model_validate(
457+
{
458+
"links": {
459+
"state": stripped_url
460+
+ ":complete/futures/"
461+
+ str(faker.uuid4())
462+
},
463+
},
464+
)
465+
return Response(
466+
status_code=status.HTTP_200_OK,
467+
json=jsonable_encoder(
468+
Envelope[FileUploadCompleteResponse](data=payload)
469+
),
470+
)
471+
472+
respx_mock.post(
473+
re.compile(
474+
r"^http://[a-z\-_]*storage:[0-9]+/v0/locations/[0-9]+/files/.+complete(?:\?.*)?$"
475+
),
476+
name="complete_upload_file_v0_locations__location_id__files__file_id__complete_post",
477+
).side_effect = generate_future_link
478+
479+
respx_mock.post(
480+
re.compile(
481+
r"^http://[a-z\-_]*storage:[0-9]+/v0/locations/[0-9]+/files/.+complete/futures/.+"
482+
)
483+
).respond(
484+
status_code=status.HTTP_200_OK,
485+
json=jsonable_encoder(
486+
Envelope[FileUploadCompleteFutureResponse](
487+
data=FileUploadCompleteFutureResponse(
488+
state=FileUploadCompleteState.OK,
489+
e_tag="07d1c1a4-b073-4be7-b022-f405d90e99aa",
490+
)
491+
)
492+
),
493+
)
494+
495+
respx_mock.post(
496+
re.compile(
497+
r"^http://[a-z\-_]*storage:[0-9]+/v0/locations/[0-9]+/files/.+:abort(?:\?.*)?$"
498+
),
499+
name="abort_upload_file_v0_locations__location_id__files__file_id__abort_post",
500+
).respond(
501+
status.HTTP_204_NO_CONTENT,
502+
)
503+
425504
# SEE https://github.com/pcrespov/sandbox-python/blob/f650aad57aced304aac9d0ad56c00723d2274ad0/respx-lib/test_disable_mock.py
426505
if not services_mocks_enabled:
427506
respx_mock.stop()

0 commit comments

Comments
 (0)