Skip to content

Commit 3ffc782

Browse files
authored
Bugfix/storage lazy update of files (ITISFoundation#2636)
1 parent 0e00ee8 commit 3ffc782

File tree

15 files changed

+264
-208
lines changed

15 files changed

+264
-208
lines changed

packages/simcore-sdk/src/simcore_sdk/node_ports_v2/ports_mapping.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ def __getitem__(self, key: Union[int, PortKey]) -> Port:
1818
key = list(self.__root__.keys())[key]
1919
if not key in self.__root__:
2020
raise UnboundPortError(key)
21-
assert isinstance(key, str) # no sec
21+
assert isinstance(key, str) # nosec
2222
return self.__root__[key]
2323

2424
def __iter__(self) -> Iterator[PortKey]:

scripts/maintenance/check_consistency_data.py

Lines changed: 58 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,6 @@
11
#! /usr/bin/env python3
22

3-
"""Script to check consistency of the file storage backend in oSparc.
43

5-
6-
7-
1. From an osparc database, go over all projects, get the project IDs and Node IDs
8-
2. From the same database, now get all the files listed like projectID/nodeID from 1.
9-
3. We get a list of files that are needed for the current projects
10-
4. connect to the S3 backend, check that these files exist
11-
5. generate a report with: project uuid, owner, files missing in S3
12-
"""
134
import asyncio
145
import csv
156
import logging
@@ -25,7 +16,10 @@
2516
import aiopg
2617
import typer
2718
from dateutil import parser
28-
from tenacity import after_log, retry, stop_after_attempt, wait_random
19+
from tenacity import retry
20+
from tenacity.after import after_log
21+
from tenacity.stop import stop_after_attempt
22+
from tenacity.wait import wait_random
2923

3024
log = logging.getLogger(__name__)
3125

@@ -38,8 +32,8 @@ async def managed_docker_compose(
3832
compose_file = Path.cwd() / "consistency" / "docker-compose.yml"
3933
try:
4034
subprocess.run(
41-
f"docker-compose --file {compose_file} up --detach",
42-
shell=True,
35+
["docker-compose", "--file", compose_file, "up", "--detach"],
36+
shell=False,
4337
check=True,
4438
cwd=compose_file.parent,
4539
env={**os.environ, **{"POSTGRES_DATA_VOLUME": postgres_volume_name}},
@@ -65,8 +59,8 @@ async def postgres_responsive():
6559
yield
6660
finally:
6761
subprocess.run(
68-
f"docker-compose --file {compose_file} down",
69-
shell=True,
62+
["docker-compose", "--file", compose_file, "down"],
63+
shell=False,
7064
check=True,
7165
cwd=compose_file.parent,
7266
)
@@ -122,6 +116,23 @@ async def _get_files_from_project_nodes(
122116
}
123117

124118

119+
async def _get_all_invalid_files_from_file_meta_data(
120+
pool,
121+
) -> Set[Tuple[str, int, datetime]]:
122+
async with pool.acquire() as conn:
123+
async with conn.cursor() as cursor:
124+
await cursor.execute(
125+
'SELECT file_uuid, file_size, last_modified FROM "file_meta_data" '
126+
"WHERE file_meta_data.file_size < 1 OR file_meta_data.entity_tag IS NULL"
127+
)
128+
# here we got all the files for that project uuid/node_ids combination
129+
file_rows = await cursor.fetchall()
130+
return {
131+
(file_uuid, file_size, parser.parse(last_modified or "2000-01-01"))
132+
for file_uuid, file_size, last_modified in file_rows
133+
}
134+
135+
125136
POWER_LABELS = {0: "B", 1: "KiB", 2: "MiB", 3: "GiB"}
126137
LABELS_POWER = {v: k for k, v in POWER_LABELS.items()}
127138

@@ -214,7 +225,7 @@ async def main_async(
214225
s3_bucket: str,
215226
):
216227

217-
# ---------------------- GET FILE ENTRIES FROM DB ---------------------------------------------------------------------
228+
# ---------------------- GET FILE ENTRIES FROM DB PROKECT TABLE -------------------------------------------------------------
218229
async with managed_docker_compose(
219230
postgres_volume_name, postgres_username, postgres_password
220231
):
@@ -231,6 +242,9 @@ async def main_async(
231242
for project_uuid, prj_data in project_nodes.items()
232243
]
233244
)
245+
all_invalid_files_in_file_meta_data = (
246+
await _get_all_invalid_files_from_file_meta_data(pool)
247+
)
234248
db_file_entries: Set[Tuple[str, int, datetime]] = set().union(
235249
*all_sets_of_file_entries
236250
)
@@ -243,6 +257,20 @@ async def main_async(
243257
fg=typer.colors.YELLOW,
244258
)
245259

260+
if all_invalid_files_in_file_meta_data:
261+
db_file_meta_data_invalid_entries_path = (
262+
Path.cwd() / f"{s3_endpoint}_db_file_meta_data_invalid_entries.csv"
263+
)
264+
write_file(
265+
db_file_meta_data_invalid_entries_path,
266+
all_invalid_files_in_file_meta_data,
267+
["file_uuid", "size", "last modified"],
268+
)
269+
typer.secho(
270+
f"processed {len(all_invalid_files_in_file_meta_data)} INVALID file entries, saved in {db_file_meta_data_invalid_entries_path}",
271+
fg=typer.colors.YELLOW,
272+
)
273+
246274
# ---------------------- GET FILE ENTRIES FROM S3 ---------------------------------------------------------------------
247275
# let's proceed with S3 backend: files are saved in BUCKET_NAME/projectID/nodeID/fileA.ext
248276
# Rationale: Similarly we list here all the files in each of the projects. And it goes faster to list them recursively.
@@ -336,6 +364,21 @@ def main(
336364
s3_secret: str,
337365
s3_bucket: str,
338366
):
367+
"""Script to check consistency of the file storage backend in oSparc.
368+
369+
requirements:
370+
- local docker volume containing a database from a deployment (see make import-db-from-docker-volume in /packages/postgres-database)
371+
372+
1. From an osparc database, go over all projects, get the project IDs and Node IDs
373+
374+
2. From the same database, now get all the files listed like projectID/nodeID from 1.
375+
376+
3. We get a list of files that are needed for the current projects
377+
378+
4. connect to the S3 backend, check that these files exist
379+
380+
5. generate a report with: project uuid, owner, files missing in S3"""
381+
339382
asyncio.run(
340383
main_async(
341384
postgres_volume_name,

services/dask-sidecar/src/simcore_service_dask_sidecar/file_utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ async def push_file_to_remote(
118118
None, zfp.write, src_path, src_path.name
119119
)
120120
logger.debug("%s created.", archive_file_path)
121-
assert archive_file_path.exists() # no sec
121+
assert archive_file_path.exists() # nosec
122122
file_to_upload = archive_file_path
123123
await log_publishing_cb(
124124
f"Compression of '{src_path.name}' to '{archive_file_path.name}' complete."

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/dask_scheduler.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ async def _on_task_completed(self, event: TaskStateEvent) -> None:
8989
event.job_id
9090
)
9191

92-
assert event.state in COMPLETED_STATES # no sec
92+
assert event.state in COMPLETED_STATES # nosec
9393

9494
logger.info(
9595
"task %s completed with state: %s",
@@ -98,7 +98,7 @@ async def _on_task_completed(self, event: TaskStateEvent) -> None:
9898
)
9999
if event.state == RunningState.SUCCESS:
100100
# we need to parse the results
101-
assert event.msg # no sec
101+
assert event.msg # nosec
102102
await parse_output_data(
103103
self.db_engine,
104104
event.job_id,

services/director-v2/src/simcore_service_director_v2/utils/dask.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ def done_dask_callback(
225225
# remove the future from the dict to remove any handle to the future, so the worker can free the memory
226226
task_to_future_map.pop(job_id)
227227
logger.debug("dispatching callback to finish task '%s'", job_id)
228-
assert event_data # no sec
228+
assert event_data # nosec
229229
try:
230230
asyncio.run_coroutine_threadsafe(user_callback(event_data), main_loop)
231231
except Exception: # pylint: disable=broad-except

services/storage/src/simcore_service_storage/dsm.py

Lines changed: 50 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@
1010
import tempfile
1111
from collections import deque
1212
from concurrent.futures import ThreadPoolExecutor
13+
from dataclasses import dataclass, field
1314
from pathlib import Path
14-
from typing import Any, Dict, List, Optional, Tuple, Union
15+
from typing import Any, Dict, Final, List, Optional, Tuple, Union
1516

1617
import aiobotocore
1718
import attr
@@ -61,7 +62,11 @@
6162
)
6263
from .s3wrapper.s3_client import MinioClientWrapper
6364
from .settings import Settings
64-
from .utils import download_to_file_or_raise
65+
from .utils import download_to_file_or_raise, is_file_entry_valid, to_meta_data_extended
66+
67+
_MINUTE: Final[int] = 60
68+
_HOUR: Final[int] = 60 * _MINUTE
69+
6570

6671
logger = logging.getLogger(__name__)
6772

@@ -96,17 +101,7 @@ async def _cleanup_context(app: web.Application):
96101
app.cleanup_ctx.append(_cleanup_context)
97102

98103

99-
def to_meta_data_extended(row: RowProxy) -> FileMetaDataEx:
100-
assert row
101-
meta = FileMetaData(**dict(row)) # type: ignore
102-
meta_extended = FileMetaDataEx(
103-
fmd=meta,
104-
parent_id=str(Path(meta.object_name).parent),
105-
) # type: ignore
106-
return meta_extended
107-
108-
109-
@attr.s(auto_attribs=True)
104+
@dataclass
110105
class DatCoreApiToken:
111106
api_token: Optional[str] = None
112107
api_secret: Optional[str] = None
@@ -115,7 +110,7 @@ def to_tuple(self):
115110
return (self.api_token, self.api_secret)
116111

117112

118-
@attr.s(auto_attribs=True)
113+
@dataclass
119114
class DataStorageManager: # pylint: disable=too-many-public-methods
120115
"""Data storage manager
121116
@@ -154,12 +149,12 @@ class DataStorageManager: # pylint: disable=too-many-public-methods
154149
pool: ThreadPoolExecutor
155150
simcore_bucket_name: str
156151
has_project_db: bool
157-
session: AioSession = attr.Factory(aiobotocore.get_session)
158-
datcore_tokens: Dict[str, DatCoreApiToken] = attr.Factory(dict)
152+
session: AioSession = field(default_factory=aiobotocore.get_session)
153+
datcore_tokens: Dict[str, DatCoreApiToken] = field(default_factory=dict)
159154
app: Optional[web.Application] = None
160155

161156
def _create_aiobotocore_client_context(self) -> ClientCreatorContext:
162-
assert hasattr(self.session, "create_client")
157+
assert hasattr(self.session, "create_client") # nosec
163158
# pylint: disable=no-member
164159

165160
# SEE API in https://botocore.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html
@@ -217,22 +212,28 @@ async def list_files(
217212
accesible_projects_ids = await get_readable_project_ids(
218213
conn, int(user_id)
219214
)
220-
has_read_access = (
215+
where_statement = (
221216
file_meta_data.c.user_id == user_id
222217
) | file_meta_data.c.project_id.in_(accesible_projects_ids)
223-
224-
query = sa.select([file_meta_data]).where(has_read_access)
218+
if uuid_filter:
219+
where_statement &= file_meta_data.c.file_uuid.ilike(
220+
f"%{uuid_filter}%"
221+
)
222+
query = sa.select([file_meta_data]).where(where_statement)
225223

226224
async for row in conn.execute(query):
227225
dex = to_meta_data_extended(row)
228-
if dex.fmd.entity_tag is None:
226+
if not is_file_entry_valid(dex.fmd):
227+
# NOTE: the file is not updated with the information from S3 backend.
228+
# 1. Either the file exists, but was never updated in the database
229+
# 2. Or the file does not exist or was never completed, and the file_meta_data entry is old and faulty
229230
# we need to update from S3 here since the database is not up-to-date
230-
dex = await self.update_database_from_storage(
231+
dex = await self.try_update_database_from_storage(
231232
dex.fmd.file_uuid,
232233
dex.fmd.bucket_name,
233234
dex.fmd.object_name,
234235
)
235-
if dex and dex.fmd.entity_tag:
236+
if dex:
236237
data.append(dex)
237238

238239
if self.has_project_db:
@@ -288,6 +289,9 @@ async def list_files(
288289

289290
elif location == DATCORE_STR:
290291
api_token, api_secret = self._get_datcore_tokens(user_id)
292+
assert self.app # nosec
293+
assert api_secret # nosec
294+
assert api_token # nosec
291295
return await datcore_adapter.list_all_datasets_files_metadatas(
292296
self.app, api_token, api_secret
293297
)
@@ -330,6 +334,9 @@ async def list_files_dataset(
330334
elif location == DATCORE_STR:
331335
api_token, api_secret = self._get_datcore_tokens(user_id)
332336
# lists all the files inside the dataset
337+
assert self.app # nosec
338+
assert api_secret # nosec
339+
assert api_token # nosec
333340
return await datcore_adapter.list_all_files_metadatas_in_dataset(
334341
self.app, api_token, api_secret, dataset_id
335342
)
@@ -368,6 +375,9 @@ async def list_datasets(self, user_id: str, location: str) -> List[DatasetMetaDa
368375

369376
elif location == DATCORE_STR:
370377
api_token, api_secret = self._get_datcore_tokens(user_id)
378+
assert self.app # nosec
379+
assert api_secret # nosec
380+
assert api_token # nosec
371381
return await datcore_adapter.list_datasets(self.app, api_token, api_secret)
372382

373383
return data
@@ -391,13 +401,14 @@ async def list_file(
391401
if not row:
392402
return None
393403
file_metadata = to_meta_data_extended(row)
394-
if file_metadata.fmd.entity_tag is None:
395-
# we need to update from S3 here since the database is not up-to-date
396-
file_metadata = await self.update_database_from_storage(
397-
file_metadata.fmd.file_uuid,
398-
file_metadata.fmd.bucket_name,
399-
file_metadata.fmd.object_name,
400-
)
404+
if is_file_entry_valid(file_metadata.fmd):
405+
return file_metadata
406+
# we need to update from S3 here since the database is not up-to-date
407+
file_metadata = await self.try_update_database_from_storage(
408+
file_metadata.fmd.file_uuid,
409+
file_metadata.fmd.bucket_name,
410+
file_metadata.fmd.object_name,
411+
)
401412
return file_metadata
402413
# FIXME: returns None in both cases: file does not exist or use has no access
403414
logger.debug("User %s cannot read file %s", user_id, file_uuid)
@@ -423,7 +434,7 @@ async def upload_file_to_datcore(
423434
# api_token, api_secret = self._get_datcore_tokens(user_id)
424435
# await dcw.upload_file_to_id(destination_id, local_file_path)
425436

426-
async def update_database_from_storage(
437+
async def try_update_database_from_storage(
427438
self,
428439
file_uuid: str,
429440
bucket_name: str,
@@ -469,7 +480,7 @@ async def update_database_from_storage(
469480
return None
470481

471482
@retry(
472-
stop=stop_after_delay(3600),
483+
stop=stop_after_delay(1 * _HOUR),
473484
wait=wait_exponential(multiplier=0.1, exp_base=1.2, max=30),
474485
retry=(
475486
retry_if_exception_type() | retry_if_result(lambda result: result is None)
@@ -479,7 +490,7 @@ async def update_database_from_storage(
479490
async def auto_update_database_from_storage_task(
480491
self, file_uuid: str, bucket_name: str, object_name: str
481492
):
482-
return await self.update_database_from_storage(
493+
return await self.try_update_database_from_storage(
483494
file_uuid, bucket_name, object_name, silence_exception=True
484495
)
485496

@@ -573,6 +584,9 @@ async def download_link_s3(self, file_uuid: str, user_id: int) -> str:
573584

574585
async def download_link_datcore(self, user_id: str, file_id: str) -> URL:
575586
api_token, api_secret = self._get_datcore_tokens(user_id)
587+
assert self.app # nosec
588+
assert api_secret # nosec
589+
assert api_token # nosec
576590
return await datcore_adapter.get_file_download_presigned_link(
577591
self.app, api_token, api_secret, file_id
578592
)
@@ -928,6 +942,9 @@ async def delete_file(self, user_id: str, location: str, file_uuid: str):
928942
elif location == DATCORE_STR:
929943
# FIXME: review return inconsistencies
930944
api_token, api_secret = self._get_datcore_tokens(user_id)
945+
assert self.app # nosec
946+
assert api_secret # nosec
947+
assert api_token # nosec
931948
await datcore_adapter.delete_file(
932949
self.app, api_token, api_secret, file_uuid
933950
)

0 commit comments

Comments
 (0)