Skip to content

Commit 1d90d78

Browse files
authored
✨Storage can return a S3 link instead of a presigned link on demand (ITISFoundation#3004)
1 parent f1ab2d7 commit 1d90d78

File tree

5 files changed

+103
-28
lines changed

5 files changed

+103
-28
lines changed

api/specs/storage/openapi.yaml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,16 @@ paths:
292292
required: true
293293
schema:
294294
type: string
295+
- name: link_type
296+
in: query
297+
required: false
298+
schema:
299+
type: string
300+
default: "presigned"
301+
enum:
302+
- presigned
303+
- s3
304+
295305
responses:
296306
"200":
297307
description: "Returns presigned link"
@@ -330,6 +340,15 @@ paths:
330340
required: false
331341
schema:
332342
type: string
343+
- name: link_type
344+
in: query
345+
required: false
346+
schema:
347+
type: string
348+
default: "presigned"
349+
enum:
350+
- presigned
351+
- s3
333352
responses:
334353
"200":
335354
description: "Returns presigned link"

services/storage/src/simcore_service_storage/api/v0/openapi.yaml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,15 @@ paths:
285285
required: true
286286
schema:
287287
type: string
288+
- name: link_type
289+
in: query
290+
required: false
291+
schema:
292+
type: string
293+
default: presigned
294+
enum:
295+
- presigned
296+
- s3
288297
responses:
289298
'200':
290299
description: Returns presigned link
@@ -323,6 +332,15 @@ paths:
323332
required: false
324333
schema:
325334
type: string
335+
- name: link_type
336+
in: query
337+
required: false
338+
schema:
339+
type: string
340+
default: presigned
341+
enum:
342+
- presigned
343+
- s3
326344
responses:
327345
'200':
328346
description: Returns presigned link

services/storage/src/simcore_service_storage/dsm.py

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import os
99
import re
1010
import tempfile
11+
import urllib.parse
1112
from collections import deque
1213
from concurrent.futures import ThreadPoolExecutor
1314
from dataclasses import dataclass, field
@@ -16,12 +17,14 @@
1617

1718
import attr
1819
import botocore
20+
import botocore.exceptions
1921
import sqlalchemy as sa
2022
from aiobotocore.client import AioBaseClient
2123
from aiobotocore.session import AioSession, ClientCreatorContext, get_session
2224
from aiohttp import web
2325
from aiopg.sa import Engine
2426
from aiopg.sa.result import ResultProxy, RowProxy
27+
from pydantic import AnyUrl, parse_obj_as
2528
from servicelib.aiohttp.aiopg_utils import DBAPIError, PostgresRetryPolicyUponOperation
2629
from servicelib.aiohttp.client_session import get_client_session
2730
from servicelib.utils import fire_and_forget_task
@@ -493,7 +496,7 @@ async def auto_update_database_from_storage_task(
493496
file_uuid, bucket_name, object_name, silence_exception=True
494497
)
495498

496-
async def upload_link(self, user_id: str, file_uuid: str):
499+
async def upload_link(self, user_id: str, file_uuid: str, as_presigned_link: bool):
497500
"""
498501
Creates pre-signed upload link and updates metadata table when
499502
link is used and upload is successfuly completed
@@ -545,9 +548,16 @@ async def _init_metadata() -> Tuple[int, str]:
545548
object_name=object_name,
546549
)
547550
)
548-
return self.s3_client.create_presigned_put_url(bucket_name, object_name)
551+
link = parse_obj_as(
552+
AnyUrl, f"s3://{bucket_name}/{urllib.parse.quote( object_name)}"
553+
)
554+
if as_presigned_link:
555+
link = self.s3_client.create_presigned_put_url(bucket_name, object_name)
556+
return f"{link}"
549557

550-
async def download_link_s3(self, file_uuid: str, user_id: int) -> str:
558+
async def download_link_s3(
559+
self, file_uuid: str, user_id: int, as_presigned_link: bool
560+
) -> str:
551561

552562
# access layer
553563
async with self.engine.acquire() as conn:
@@ -577,9 +587,12 @@ async def download_link_s3(self, file_uuid: str, user_id: int) -> str:
577587
raise web.HTTPNotFound(
578588
reason=f"File '{file_uuid}' does not exists in storage."
579589
)
580-
581-
link = self.s3_client.create_presigned_get_url(bucket_name, object_name)
582-
return link
590+
link = parse_obj_as(
591+
AnyUrl, f"s3://{bucket_name}/{urllib.parse.quote( object_name)}"
592+
)
593+
if as_presigned_link:
594+
link = self.s3_client.create_presigned_get_url(bucket_name, object_name)
595+
return f"{link}"
583596

584597
async def download_link_datcore(self, user_id: str, file_id: str) -> URL:
585598
api_token, api_secret = self._get_datcore_tokens(user_id)
@@ -592,17 +605,23 @@ async def download_link_datcore(self, user_id: str, file_id: str) -> URL:
592605

593606
# COPY -----------------------------
594607

595-
async def copy_file_s3_s3(self, user_id: str, dest_uuid: str, source_uuid: str):
608+
async def copy_file_s3_s3(
609+
self, user_id: str, dest_uuid: str, source_uuid: str
610+
) -> None:
596611
# FIXME: operation MUST be atomic
597612

598613
# source is s3, location is s3
599614
to_bucket_name = self.simcore_bucket_name
600615
to_object_name = dest_uuid
601616
from_bucket = self.simcore_bucket_name
602617
from_object_name = source_uuid
603-
# FIXME: This is not async!
604-
self.s3_client.copy_object(
605-
to_bucket_name, to_object_name, from_bucket, from_object_name
618+
await asyncio.get_event_loop().run_in_executor(
619+
None,
620+
self.s3_client.copy_object,
621+
to_bucket_name,
622+
to_object_name,
623+
from_bucket,
624+
from_object_name,
606625
)
607626

608627
# update db
@@ -616,6 +635,7 @@ async def copy_file_s3_s3(self, user_id: str, dest_uuid: str, source_uuid: str):
616635
async def copy_file_s3_datcore(
617636
self, user_id: str, dest_uuid: str, source_uuid: str
618637
):
638+
assert self.app # nosec
619639
session = get_client_session(self.app)
620640

621641
# source is s3, get link and copy to datcore
@@ -648,6 +668,7 @@ async def copy_file_datcore_s3(
648668
source_uuid: str,
649669
filename_missing: bool = False,
650670
):
671+
assert self.app # nosec
651672
session = get_client_session(self.app)
652673

653674
# 2 steps: Get download link for local copy, the upload link to s3
@@ -658,7 +679,9 @@ async def copy_file_datcore_s3(
658679
if filename_missing:
659680
dest_uuid = str(Path(dest_uuid) / filename)
660681

661-
s3_upload_link = await self.upload_link(user_id, dest_uuid)
682+
s3_upload_link = await self.upload_link(
683+
user_id, dest_uuid, as_presigned_link=True
684+
)
662685

663686
with tempfile.TemporaryDirectory() as tmpdir:
664687
# FIXME: connect download and upload streams
@@ -690,7 +713,7 @@ async def copy_file(
690713
dest_uuid: str,
691714
source_location: str,
692715
source_uuid: str,
693-
):
716+
) -> None:
694717
if source_location == SIMCORE_S3_STR:
695718
if dest_location == DATCORE_STR:
696719
await self.copy_file_s3_datcore(user_id, dest_uuid, source_uuid)

services/storage/src/simcore_service_storage/handlers.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,7 @@ async def download_file(request: web.Request):
299299
assert params["location_id"] # nosec
300300
assert params["fileId"] # nosec
301301
assert query["user_id"] # nosec
302+
link_type = query.get("link_type", "presigned")
302303

303304
with handle_storage_errors():
304305
location_id = params["location_id"]
@@ -308,7 +309,9 @@ async def download_file(request: web.Request):
308309
dsm = await _prepare_storage_manager(params, query, request)
309310
location = dsm.location_from_id(location_id)
310311
if location == SIMCORE_S3_STR:
311-
link = await dsm.download_link_s3(file_uuid, user_id)
312+
link = await dsm.download_link_s3(
313+
file_uuid, user_id, as_presigned_link=bool(link_type == "presigned")
314+
)
312315
else:
313316
link = await dsm.download_link_datcore(user_id, file_uuid)
314317

@@ -318,10 +321,11 @@ async def download_file(request: web.Request):
318321
@routes.put(f"/{api_vtag}/locations/{{location_id}}/files/{{fileId}}") # type: ignore
319322
async def upload_file(request: web.Request):
320323
params, query, body = await extract_and_validate(request)
321-
324+
log.debug("received call to upload_file with %s", f"{params=}, {query=}, {body=}")
322325
assert params, "params %s" % params # nosec
323326
assert query, "query %s" % query # nosec
324327
assert not body, "body %s" % body # nosec
328+
link_type = query.get("link_type", "presigned")
325329

326330
with handle_storage_errors():
327331
location_id = params["location_id"]
@@ -335,6 +339,7 @@ async def upload_file(request: web.Request):
335339
source_uuid = query["extra_source"]
336340
source_id = query["extra_location"]
337341
source_location = dsm.location_from_id(source_id)
342+
# FIXME: this does not even return a link... nobody is using this??
338343
link = await dsm.copy_file(
339344
user_id=user_id,
340345
dest_location=location,
@@ -343,7 +348,11 @@ async def upload_file(request: web.Request):
343348
source_uuid=source_uuid,
344349
)
345350
else:
346-
link = await dsm.upload_link(user_id=user_id, file_uuid=file_uuid)
351+
link = await dsm.upload_link(
352+
user_id=user_id,
353+
file_uuid=file_uuid,
354+
as_presigned_link=bool(link_type == "presigned"),
355+
)
347356

348357
return {"error": None, "data": {"link": link}}
349358

services/storage/tests/unit/test_dsm.py

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,9 @@ def _create_file_meta_for_s3(
143143
async def _upload_file(
144144
dsm: DataStorageManager, file_metadata: FileMetaData, file_path: Path
145145
) -> FileMetaData:
146-
up_url = await dsm.upload_link(file_metadata.user_id, file_metadata.file_uuid)
146+
up_url = await dsm.upload_link(
147+
file_metadata.user_id, file_metadata.file_uuid, as_presigned_link=True
148+
)
147149
assert file_path.exists()
148150
with file_path.open("rb") as fp:
149151
d = fp.read()
@@ -254,7 +256,9 @@ async def test_links_s3(
254256

255257
tmp_file2 = f"{tmp_file}.rec"
256258
user_id = 0
257-
down_url = await dsm.download_link_s3(fmd.file_uuid, user_id)
259+
down_url = await dsm.download_link_s3(
260+
fmd.file_uuid, user_id, as_presigned_link=True
261+
)
258262

259263
urllib.request.urlretrieve(down_url, tmp_file2)
260264

@@ -276,7 +280,7 @@ async def test_copy_s3_s3(
276280
assert len(data) == 0
277281

278282
# upload the file
279-
up_url = await dsm.upload_link(fmd.user_id, fmd.file_uuid)
283+
up_url = await dsm.upload_link(fmd.user_id, fmd.file_uuid, as_presigned_link=True)
280284
with tmp_file.open("rb") as fp:
281285
d = fp.read()
282286
req = urllib.request.Request(up_url, data=d, method="PUT")
@@ -350,7 +354,7 @@ async def test_dsm_s3_to_datcore(
350354

351355
dsm = dsm_fixture
352356

353-
up_url = await dsm.upload_link(fmd.user_id, fmd.file_uuid)
357+
up_url = await dsm.upload_link(fmd.user_id, fmd.file_uuid, as_presigned_link=True)
354358
with tmp_file.open("rb") as fp:
355359
d = fp.read()
356360
req = urllib.request.Request(up_url, data=d, method="PUT")
@@ -360,20 +364,22 @@ async def test_dsm_s3_to_datcore(
360364
# given the fmd, upload to datcore
361365
tmp_file2 = f"{tmp_file}.fordatcore"
362366
user_id = USER_ID
363-
down_url = await dsm.download_link_s3(fmd.file_uuid)
367+
down_url = await dsm.download_link_s3(
368+
fmd.file_uuid, user_id, as_presigned_link=True
369+
)
364370
urllib.request.urlretrieve(down_url, tmp_file2)
365371
assert filecmp.cmp(tmp_file2, tmp_file)
366372
# now we have the file locally, upload the file
367373
await dsm.upload_file_to_datcore(
368-
user_id=user_id,
369-
local_file_path=tmp_file2,
370-
destination_id=datcore_structured_testbucket["dataset_id"],
374+
user_id,
375+
tmp_file2,
376+
datcore_structured_testbucket["dataset_id"],
371377
)
372378
# and into a deeper strucutre
373379
await dsm.upload_file_to_datcore(
374-
user_id=user_id,
375-
local_file_path=tmp_file2,
376-
destination_id=datcore_structured_testbucket["coll2_id"],
380+
user_id,
381+
tmp_file2,
382+
datcore_structured_testbucket["coll2_id"],
377383
)
378384

379385
# FIXME: upload takes some time
@@ -460,7 +466,7 @@ async def test_dsm_datcore_to_S3(
460466

461467
# and the one on s3
462468
tmp_file2 = f"{tmp_file}.fromS3"
463-
down_url_s3 = await dsm.download_link_s3(dest_uuid)
469+
down_url_s3 = await dsm.download_link_s3(dest_uuid, user_id, as_presigned_link=True)
464470
urllib.request.urlretrieve(down_url_s3, tmp_file2)
465471

466472
assert filecmp.cmp(tmp_file1, tmp_file2)
@@ -486,7 +492,7 @@ async def test_copy_datcore(
486492
tmp_file = mock_files_factory(1)[0]
487493
fmd = _create_file_meta_for_s3(postgres_service_url, s3_client, tmp_file)
488494

489-
up_url = await dsm.upload_link(fmd.user_id, fmd.file_uuid)
495+
up_url = await dsm.upload_link(fmd.user_id, fmd.file_uuid, as_presigned_link=True)
490496
with tmp_file.open("rb") as fp:
491497
d = fp.read()
492498
req = urllib.request.Request(up_url, data=d, method="PUT")

0 commit comments

Comments
 (0)