|
16 | 16 | import aiobotocore |
17 | 17 | import attr |
18 | 18 | import sqlalchemy as sa |
| 19 | +from aiobotocore.client import AioBaseClient |
19 | 20 | from aiobotocore.session import AioSession, ClientCreatorContext |
20 | 21 | from aiohttp import web |
21 | 22 | from aiopg.sa import Engine |
@@ -154,6 +155,8 @@ class DataStorageManager: |
154 | 155 | def _create_client_context(self) -> ClientCreatorContext: |
155 | 156 | assert hasattr(self.session, "create_client") |
156 | 157 | # pylint: disable=no-member |
| 158 | + |
| 159 | + # SEE API in https://botocore.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html |
157 | 160 | return self.session.create_client( |
158 | 161 | "s3", |
159 | 162 | endpoint_url=self.s3_client.endpoint_url, |
@@ -898,7 +901,7 @@ async def delete_file(self, user_id: str, location: str, file_uuid: str): |
898 | 901 |
|
899 | 902 | async def delete_project_simcore_s3( |
900 | 903 | self, user_id: str, project_id: str, node_id: Optional[str] = None |
901 | | - ) -> web.Response: |
| 904 | + ) -> Optional[web.Response]: |
902 | 905 |
|
903 | 906 | """Deletes all files from a given node in a project in simcore.s3 and updated db accordingly. |
904 | 907 | If node_id is not given, then all the project files db entries are deleted. |
@@ -1013,38 +1016,55 @@ async def create_soft_link( |
1013 | 1016 | async def synchronise_meta_data_table( |
1014 | 1017 | self, location: str, dry_run: bool |
1015 | 1018 | ) -> Dict[str, Any]: |
1016 | | - sync_results = {"removed": []} |
| 1019 | + |
| 1020 | + to_remove = [] |
| 1021 | + |
| 1022 | + assert ( # nosec |
| 1023 | + location == SIMCORE_S3_STR |
| 1024 | + ), "Only with s3, no other sync implemented" # nosec |
| 1025 | + |
1017 | 1026 | if location == SIMCORE_S3_STR: |
1018 | | - # NOTE: only valid for Simcore, since datcore data is not in the database table |
| 1027 | + |
| 1028 | + # NOTE: only valid for simcore, since datcore data is not in the database table |
1019 | 1029 | # let's get all the files in the table |
1020 | 1030 | logger.warning( |
1021 | 1031 | "synchronisation of database/s3 storage started, this will take some time..." |
1022 | 1032 | ) |
| 1033 | + |
1023 | 1034 | async with self.engine.acquire() as conn, self._create_client_context() as s3_client: |
1024 | | - number_of_rows_in_db = await conn.scalar(file_meta_data.count()) |
| 1035 | + |
| 1036 | + number_of_rows_in_db = await conn.scalar(file_meta_data.count()) or 0 |
1025 | 1037 | logger.warning( |
1026 | | - "total number of entries to check %d", |
| 1038 | + "Total number of entries to check %d", |
1027 | 1039 | number_of_rows_in_db, |
1028 | 1040 | ) |
1029 | 1041 |
|
1030 | | - async for row in conn.execute(file_meta_data.select()): |
| 1042 | + assert isinstance(s3_client, AioBaseClient) # nosec |
| 1043 | + |
| 1044 | + async for row in conn.execute( |
| 1045 | + sa.select([file_meta_data.c.object_name]) |
| 1046 | + ): |
1031 | 1047 | s3_key = row.object_name # type: ignore |
1032 | 1048 |
|
1033 | 1049 | # now check if the file exists in S3 |
1034 | | - try: |
1035 | | - await s3_client.get_object( |
1036 | | - Bucket=self.simcore_bucket_name, |
1037 | | - Key=s3_key, |
1038 | | - ) |
1039 | | - except s3_client.exceptions.NoSuchKey: |
1040 | | - # this file does not exist |
1041 | | - sync_results["removed"].append(s3_key) |
| 1050 | + # SEE https://www.peterbe.com/plog/fastest-way-to-find-out-if-a-file-exists-in-s3 |
| 1051 | + response = await s3_client.list_objects_v2( |
| 1052 | + Bucket=self.simcore_bucket_name, Prefix=s3_key |
| 1053 | + ) |
| 1054 | + if response.get("KeyCount", 0) == 0: # this file does not exist |
| 1055 | + to_remove.append(s3_key) |
1042 | 1056 |
|
| 1057 | + logger.info( |
| 1058 | + "%s %d entries ", |
| 1059 | + "Would delete" if dry_run else "Deleting", |
| 1060 | + len(to_remove), |
| 1061 | + ) |
1043 | 1062 | if not dry_run: |
1044 | 1063 | await conn.execute( |
1045 | 1064 | file_meta_data.delete().where( |
1046 | | - file_meta_data.c.object_name.in_(sync_results["removed"]) |
| 1065 | + file_meta_data.c.object_name.in_(to_remove) |
1047 | 1066 | ) |
1048 | 1067 | ) |
| 1068 | + logger.info("Deleted %s orphan items", len(to_remove)) |
1049 | 1069 |
|
1050 | | - return sync_results |
| 1070 | + return {"removed": to_remove} |
0 commit comments