@@ -426,3 +426,127 @@ async def download_link(self, user_id: str, location: str, file_uuid: str)->str:
426426 destination , filename = _parse_datcore (file_uuid )
427427 link = await dcw .download_link (destination , filename )
428428 return link
429+
430+ async def deep_copy_project_simcore_s3 (self , user_id : str , source_project , destination_project , node_mapping ):
431+ """ Parses a given source project and copies all related files to the destination project
432+
433+ Since all files are organized as
434+
435+ project_id/node_id/filename or links to datcore
436+
437+ this function creates a new folder structure
438+
439+ project_id/node_id/filename
440+
441+ and copies all files to the corresponding places.
442+
443+ Additionally, all external files from datcore are being copied and the paths in the destination
444+ project are adapted accordingly
445+
446+ Lastly, the meta data db is kept in sync
447+ """
448+ source_folder = source_project ["uuid" ]
449+ dest_folder = destination_project ["uuid" ]
450+
451+ # build up naming map based on labels
452+ uuid_name_dict = {}
453+ uuid_name_dict [dest_folder ] = destination_project ["name" ]
454+ for src_node_id , src_node in source_project ['workbench' ].items ():
455+ new_node_id = node_mapping .get (src_node_id )
456+ if new_node_id is not None :
457+ uuid_name_dict [new_node_id ] = src_node ['label' ]
458+
459+ # Step 1: List all objects for this project replace them with the destination object name and do a copy at the same time collect some names
460+ _loop = asyncio .get_event_loop ()
461+ session = aiobotocore .get_session (loop = _loop )
462+ async with session .create_client ('s3' , endpoint_url = self .s3_client .endpoint_url , aws_access_key_id = self .s3_client .access_key ,
463+ aws_secret_access_key = self .s3_client .secret_key ) as client :
464+ response = await client .list_objects_v2 (Bucket = self .simcore_bucket_name , Prefix = source_folder )
465+
466+ if "Contents" in response :
467+ for f in response ['Contents' ]:
468+ source_object_name = f ['Key' ]
469+ source_object_parts = Path (source_object_name ).parts
470+
471+ if len (source_object_parts ) == 3 :
472+ old_node_id = source_object_parts [1 ]
473+ new_node_id = node_mapping .get (old_node_id )
474+ if new_node_id is not None :
475+ old_filename = source_object_parts [2 ]
476+ dest_object_name = str (Path (dest_folder ) / new_node_id / old_filename )
477+ copy_source = {'Bucket' : self .simcore_bucket_name , 'Key' : source_object_name }
478+ response = await client .copy_object (CopySource = copy_source , Bucket = self .simcore_bucket_name , Key = dest_object_name )
479+ else :
480+ # This may happen once we have shared/home folders
481+ logger .info ("len(object.parts != 3" )
482+
483+
484+ # Step 2: List all references in outputs that point to datcore and copy over
485+ for node_id , node in destination_project ['workbench' ].items ():
486+ outputs = node .get ("outputs" )
487+ if outputs is not None :
488+ for _output_key , output in outputs .items ():
489+ if "store" in output and output ["store" ]== DATCORE_ID :
490+ src = output ["path" ]
491+ dest = str (Path (dest_folder ) / Path (node_id ) / Path (src ).name )
492+ logger .info ("Need to copy %s to %s" , src , dest )
493+ await self .copy_file (user_id , SIMCORE_S3_STR , dest , DATCORE_STR , src )
494+ # and change the dest project accordingly
495+ output ["store" ] = 0
496+ output ['path' ] = dest
497+
498+ # step 3: list files first to create fmds
499+ session = aiobotocore .get_session (loop = _loop )
500+ fmds = []
501+ async with session .create_client ('s3' , endpoint_url = self .s3_client .endpoint_url , aws_access_key_id = self .s3_client .access_key ,
502+ aws_secret_access_key = self .s3_client .secret_key ) as client :
503+ response = await client .list_objects_v2 (Bucket = self .simcore_bucket_name , Prefix = dest_folder + "/" )
504+ if 'Contents' in response :
505+ for f in response ['Contents' ]:
506+ fmd = FileMetaData ()
507+ fmd .simcore_from_uuid (f ["Key" ], self .simcore_bucket_name )
508+ fmd .project_name = uuid_name_dict .get (dest_folder , "Untitled" )
509+ fmd .node_name = uuid_name_dict .get (fmd .node_id , "Untitled" )
510+ fmd .raw_file_path = fmd .file_uuid
511+ fmd .display_file_path = str (Path (fmd .project_name ) / fmd .node_name / fmd .file_name )
512+ fmd .user_id = user_id
513+ fmd .file_size = f ['Size' ]
514+ fmd .last_modified = str (f ['LastModified' ])
515+ fmds .append (fmd )
516+
517+
518+ # step 4 sync db
519+ async with self .engine .acquire () as conn :
520+ for fmd in fmds :
521+ query = sa .select ([file_meta_data ]).where (file_meta_data .c .file_uuid == fmd .file_uuid )
522+ # if file already exists, we might w
523+ rows = await conn .execute (query )
524+ exists = await rows .scalar ()
525+ if exists :
526+ delete_me = file_meta_data .delete ().where (file_meta_data .c .file_uuid == fmd .file_uuid )
527+ await conn .execute (delete_me )
528+ ins = file_meta_data .insert ().values (** vars (fmd ))
529+ await conn .execute (ins )
530+
531+ async def delete_project_simcore_s3 (self , user_id : str , project_id ):
532+ """ Deletes all files from a given project in simcore.s3 and updated db accordingly
533+ """
534+
535+ async with self .engine .acquire () as conn :
536+ delete_me = file_meta_data .delete ().where (and_ (file_meta_data .c .user_id == user_id ,
537+ file_meta_data .c .project_id == project_id ))
538+ await conn .execute (delete_me )
539+
540+ _loop = asyncio .get_event_loop ()
541+ session = aiobotocore .get_session (loop = _loop )
542+ async with session .create_client ('s3' , endpoint_url = self .s3_client .endpoint_url , aws_access_key_id = self .s3_client .access_key ,
543+ aws_secret_access_key = self .s3_client .secret_key ) as client :
544+ response = await client .list_objects_v2 (Bucket = self .simcore_bucket_name , Prefix = project_id + "/" )
545+ if "Contents" in response :
546+ objects_to_delete = []
547+ for f in response ['Contents' ]:
548+ objects_to_delete .append ( { 'Key' : f ['Key' ] })
549+
550+ if objects_to_delete :
551+ response = await client .delete_objects (Bucket = self .simcore_bucket_name , Delete = {'Objects' : objects_to_delete })
552+ return response
0 commit comments