2424from servicelib .aiopg_utils import DBAPIError , PostgresRetryPolicyUponOperation
2525from servicelib .client_session import get_client_session
2626from servicelib .utils import fire_and_forget_task
27+ from sqlalchemy .dialects .postgresql import insert as pg_insert
2728from sqlalchemy .sql .expression import literal_column
2829from tenacity import retry
2930from yarl import URL
@@ -152,19 +153,20 @@ class DataStorageManager:
152153 datcore_tokens : Dict [str , DatCoreApiToken ] = attr .Factory (dict )
153154 app : Optional [web .Application ] = None
154155
155- def _create_client_context (self ) -> ClientCreatorContext :
156+ def _create_aiobotocore_client_context (self ) -> ClientCreatorContext :
156157 assert hasattr (self .session , "create_client" )
157158 # pylint: disable=no-member
158159
159160 # SEE API in https://botocore.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html
161+ # SEE https://aiobotocore.readthedocs.io/en/latest/index.html
160162 return self .session .create_client (
161163 "s3" ,
162164 endpoint_url = self .s3_client .endpoint_url ,
163165 aws_access_key_id = self .s3_client .access_key ,
164166 aws_secret_access_key = self .s3_client .secret_key ,
165167 )
166168
167- def _get_datcore_tokens (self , user_id : str ) -> Tuple [str , str ]:
169+ def _get_datcore_tokens (self , user_id : str ) -> Tuple [Optional [ str ], Optional [ str ] ]:
168170 # pylint: disable=no-member
169171 token = self .datcore_tokens .get (user_id , DatCoreApiToken ())
170172 return token .to_tuple ()
@@ -175,11 +177,13 @@ async def locations(self, user_id: str):
175177 locs .append (simcore_s3 )
176178
177179 api_token , api_secret = self ._get_datcore_tokens (user_id )
178- if await datcore_adapter .check_user_can_connect (
179- self .app , api_token , api_secret
180- ):
181- datcore = {"name" : DATCORE_STR , "id" : DATCORE_ID }
182- locs .append (datcore )
180+
181+ if api_token and api_secret and self .app :
182+ if await datcore_adapter .check_user_can_connect (
183+ self .app , api_token , api_secret
184+ ):
185+ datcore = {"name" : DATCORE_STR , "id" : DATCORE_ID }
186+ locs .append (datcore )
183187
184188 return locs
185189
@@ -417,14 +421,14 @@ async def _metadata_file_updater(
417421 """
418422 current_iteraction = 0
419423
420- async with self ._create_client_context () as client :
424+ async with self ._create_aiobotocore_client_context () as aioboto_client :
421425 current_iteraction += 1
422426 continue_loop = True
423427 sleep_generator = expo ()
424428 update_succeeded = False
425429
426430 while continue_loop :
427- result = await client .list_objects_v2 (
431+ result = await aioboto_client .list_objects_v2 (
428432 Bucket = bucket_name , Prefix = object_name
429433 )
430434 sleep_amount = next (sleep_generator )
@@ -495,14 +499,15 @@ async def _init_metadata() -> Tuple[int, str]:
495499 fmd .simcore_from_uuid (file_uuid , self .simcore_bucket_name )
496500 fmd .user_id = user_id # NOTE: takes ownership of uploaded data
497501
498- query = sa .select ([file_meta_data ]).where (
499- file_meta_data .c .file_uuid == file_uuid
500- )
501502 # if file already exists, we might want to update a time-stamp
502- exists = await (await conn .execute (query )).scalar ()
503- if exists is None :
504- ins = file_meta_data .insert ().values (** vars (fmd ))
505- await conn .execute (ins )
503+
504+ # upsert file_meta_data
505+ insert_stmt = pg_insert (file_meta_data ).values (** vars (fmd ))
506+ do_nothing_stmt = insert_stmt .on_conflict_do_nothing (
507+ index_elements = ["file_uuid" ]
508+ )
509+ await conn .execute (do_nothing_stmt )
510+
506511 return fmd .file_size , fmd .last_modified
507512
508513 file_size , last_modified = await _init_metadata ()
@@ -547,7 +552,7 @@ async def download_link_s3(self, file_uuid: str, user_id: int) -> str:
547552 stmt = sa .select ([file_meta_data .c .object_name ]).where (
548553 file_meta_data .c .file_uuid == file_uuid
549554 )
550- object_name : str = await conn .scalar (stmt )
555+ object_name : Optional [ str ] = await conn .scalar (stmt )
551556
552557 if object_name is None :
553558 raise web .HTTPNotFound (
@@ -738,16 +743,30 @@ async def deep_copy_project_simcore_s3(
738743 if new_node_id is not None :
739744 uuid_name_dict [new_node_id ] = src_node ["label" ]
740745
741- async with self ._create_client_context () as client :
746+ async with self ._create_aiobotocore_client_context () as aioboto_client :
747+
748+ logger .debug (
749+ "Listing all items under %s:%s/" ,
750+ self .simcore_bucket_name ,
751+ source_folder ,
752+ )
742753
743754 # Step 1: List all objects for this project replace them with the destination object name
744755 # and do a copy at the same time collect some names
745756 # Note: the / at the end of the Prefix is VERY important, makes the listing several order of magnitudes faster
746- response = await client .list_objects_v2 (
757+ response = await aioboto_client .list_objects_v2 (
747758 Bucket = self .simcore_bucket_name , Prefix = f"{ source_folder } /"
748759 )
749760
750- for item in response .get ("Contents" , []):
761+ contents : List = response .get ("Contents" , [])
762+ logger .debug (
763+ "Listed %s items under %s:%s/" ,
764+ len (contents ),
765+ self .simcore_bucket_name ,
766+ source_folder ,
767+ )
768+
769+ for item in contents :
751770 source_object_name = item ["Key" ]
752771 source_object_parts = Path (source_object_name ).parts
753772
@@ -768,14 +787,19 @@ async def deep_copy_project_simcore_s3(
768787 Path (dest_folder ) / new_node_id / old_filename
769788 )
770789
771- await client . copy_object (
790+ copy_kwargs = dict (
772791 CopySource = {
773792 "Bucket" : self .simcore_bucket_name ,
774793 "Key" : source_object_name ,
775794 },
776795 Bucket = self .simcore_bucket_name ,
777796 Key = dest_object_name ,
778797 )
798+ logger .debug ("Copying %s ..." , copy_kwargs )
799+
800+ # FIXME: if 5GB, it must use multipart upload Upload Part - Copy API
801+ # SEE https://botocore.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.copy_object
802+ await aioboto_client .copy_object (** copy_kwargs )
779803
780804 # Step 2: List all references in outputs that point to datcore and copy over
781805 for node_id , node in destination_project ["workbench" ].items ():
@@ -804,11 +828,11 @@ async def deep_copy_project_simcore_s3(
804828 output ["path" ] = destination
805829
806830 fmds = []
807- async with self ._create_client_context () as client :
831+ async with self ._create_aiobotocore_client_context () as aioboto_client :
808832
809833 # step 3: list files first to create fmds
810834 # Note: the / at the end of the Prefix is VERY important, makes the listing several order of magnitudes faster
811- response = await client .list_objects_v2 (
835+ response = await aioboto_client .list_objects_v2 (
812836 Bucket = self .simcore_bucket_name , Prefix = f"{ dest_folder } /"
813837 )
814838
@@ -931,9 +955,9 @@ async def delete_project_simcore_s3(
931955 delete_me = delete_me .where (file_meta_data .c .node_id == node_id )
932956 await conn .execute (delete_me )
933957
934- async with self ._create_client_context () as client :
958+ async with self ._create_aiobotocore_client_context () as aioboto_client :
935959 # Note: the / at the end of the Prefix is VERY important, makes the listing several order of magnitudes faster
936- response = await client .list_objects_v2 (
960+ response = await aioboto_client .list_objects_v2 (
937961 Bucket = self .simcore_bucket_name ,
938962 Prefix = f"{ project_id } /{ node_id } /" if node_id else f"{ project_id } /" ,
939963 )
@@ -943,7 +967,7 @@ async def delete_project_simcore_s3(
943967 objects_to_delete .append ({"Key" : f ["Key" ]})
944968
945969 if objects_to_delete :
946- response = await client .delete_objects (
970+ response = await aioboto_client .delete_objects (
947971 Bucket = self .simcore_bucket_name ,
948972 Delete = {"Objects" : objects_to_delete },
949973 )
@@ -1051,15 +1075,15 @@ async def _prune_db_table(conn):
10511075 "synchronisation of database/s3 storage started, this will take some time..."
10521076 )
10531077
1054- async with self .engine .acquire () as conn , self ._create_client_context () as s3_client :
1078+ async with self .engine .acquire () as conn , self ._create_aiobotocore_client_context () as aioboto_client :
10551079
10561080 number_of_rows_in_db = await conn .scalar (file_meta_data .count ()) or 0
10571081 logger .warning (
10581082 "Total number of entries to check %d" ,
10591083 number_of_rows_in_db ,
10601084 )
10611085
1062- assert isinstance (s3_client , AioBaseClient ) # nosec
1086+ assert isinstance (aioboto_client , AioBaseClient ) # nosec
10631087
10641088 async for row in conn .execute (
10651089 sa .select ([file_meta_data .c .object_name ])
@@ -1068,7 +1092,7 @@ async def _prune_db_table(conn):
10681092
10691093 # now check if the file exists in S3
10701094 # SEE https://www.peterbe.com/plog/fastest-way-to-find-out-if-a-file-exists-in-s3
1071- response = await s3_client .list_objects_v2 (
1095+ response = await aioboto_client .list_objects_v2 (
10721096 Bucket = self .simcore_bucket_name , Prefix = s3_key
10731097 )
10741098 if response .get ("KeyCount" , 0 ) == 0 :
0 commit comments