|
| 1 | +from collections import deque |
| 2 | +from contextlib import contextmanager |
| 3 | +from typing import Any, Deque, Dict, Iterator, Optional, Tuple |
| 4 | +from uuid import UUID |
| 5 | + |
| 6 | +import typer |
| 7 | +from models import DBConfig |
| 8 | +from simcore_postgres_database.models.file_meta_data import file_meta_data |
| 9 | +from simcore_postgres_database.models.projects import projects |
| 10 | +from sqlalchemy import and_, create_engine, select |
| 11 | +from sqlalchemy.dialects.postgresql import insert |
| 12 | +from sqlalchemy.engine.base import Connection |
| 13 | +from sqlalchemy.engine.cursor import ResultProxy |
| 14 | + |
| 15 | + |
| 16 | +@contextmanager |
| 17 | +def db_connection(db_config: DBConfig) -> Iterator[Connection]: |
| 18 | + engine = create_engine( |
| 19 | + f"postgresql://{db_config.user}:{db_config.password}@{db_config.address}/{db_config.database}", |
| 20 | + echo=True, |
| 21 | + ) |
| 22 | + with engine.connect() as con: |
| 23 | + yield con |
| 24 | + |
| 25 | + |
| 26 | +def _project_uuid_exists_in_destination( |
| 27 | + connection: Connection, project_id: str |
| 28 | +) -> bool: |
| 29 | + query = select([projects.c.id]).where(projects.c.uuid == f"{project_id}") |
| 30 | + exists = len(list(connection.execute(query))) > 0 |
| 31 | + return exists |
| 32 | + |
| 33 | + |
| 34 | +def _meta_data_exists_in_destination(connection: Connection, file_uuid: str) -> bool: |
| 35 | + query = select([file_meta_data.c.file_uuid]).where( |
| 36 | + file_meta_data.c.file_uuid == f"{file_uuid}" |
| 37 | + ) |
| 38 | + exists = len(list(connection.execute(query))) > 0 |
| 39 | + return exists |
| 40 | + |
| 41 | + |
| 42 | +def _get_project(connection: Connection, project_uuid: UUID) -> ResultProxy: |
| 43 | + return connection.execute( |
| 44 | + select([projects]).where(projects.c.uuid == f"{project_uuid}") |
| 45 | + ) |
| 46 | + |
| 47 | + |
| 48 | +def _get_hidden_project(connection: Connection, prj_owner: int) -> ResultProxy: |
| 49 | + return connection.execute( |
| 50 | + select([projects]).where( |
| 51 | + and_(projects.c.prj_owner == prj_owner, projects.c.hidden == True) |
| 52 | + ) |
| 53 | + ) |
| 54 | + |
| 55 | + |
| 56 | +def _get_file_meta_data_without_soft_links( |
| 57 | + connection: Connection, user_id: int, project_id: UUID |
| 58 | +) -> ResultProxy: |
| 59 | + return connection.execute( |
| 60 | + select([file_meta_data]).where( |
| 61 | + and_( |
| 62 | + file_meta_data.c.user_id == f"{user_id}", |
| 63 | + file_meta_data.c.project_id == f"{project_id}", |
| 64 | + file_meta_data.c.is_soft_link != True, |
| 65 | + ) |
| 66 | + ) |
| 67 | + ) |
| 68 | + |
| 69 | + |
| 70 | +def _format_message(message: str, color: str, bold: bool = False) -> None: |
| 71 | + formatted_message = typer.style(message, fg=color, bold=bold) |
| 72 | + typer.echo(formatted_message) |
| 73 | + |
| 74 | + |
| 75 | +def _red_message(message: str) -> None: |
| 76 | + _format_message(message, typer.colors.RED, bold=True) |
| 77 | + |
| 78 | + |
| 79 | +def _green_message(message: str) -> None: |
| 80 | + _format_message(message, typer.colors.GREEN) |
| 81 | + |
| 82 | + |
| 83 | +def _project_summary(project: Dict) -> str: |
| 84 | + return f"PROJECT: {project['uuid']} {project['name']}" |
| 85 | + |
| 86 | + |
| 87 | +def _file_summary(file_meta_data: Dict) -> str: |
| 88 | + return f"FILE: {file_meta_data['file_uuid']}" |
| 89 | + |
| 90 | + |
| 91 | +def get_project_and_files_to_migrate( |
| 92 | + project_uuid: UUID, |
| 93 | + hidden_projects_for_user: Optional[int], |
| 94 | + src_conn: Connection, |
| 95 | + dst_conn: Connection, |
| 96 | +) -> Tuple[Deque, Deque]: |
| 97 | + skipped_projects = deque() |
| 98 | + skipped_files_meta_data = deque() |
| 99 | + |
| 100 | + projects_to_migrate = deque() |
| 101 | + files_meta_data_to_migrate = deque() |
| 102 | + |
| 103 | + user_project_selection = list(_get_project(src_conn, project_uuid)) |
| 104 | + assert len(user_project_selection) == 1 |
| 105 | + user_project = user_project_selection[0] |
| 106 | + |
| 107 | + project = dict(user_project.items()) |
| 108 | + project_id = project["uuid"] |
| 109 | + |
| 110 | + if _project_uuid_exists_in_destination(dst_conn, project_id): |
| 111 | + error_message = f"main project {project['uuid']} already exists at destination!" |
| 112 | + _red_message(error_message) |
| 113 | + raise Exception(error_message) |
| 114 | + |
| 115 | + projects_to_migrate.append(project) |
| 116 | + |
| 117 | + if hidden_projects_for_user: |
| 118 | + # extract all hidden projects and check if they require syncing |
| 119 | + hidden_projects_cursor = _get_hidden_project(src_conn, hidden_projects_for_user) |
| 120 | + for hidden_result in hidden_projects_cursor: |
| 121 | + hidden_project = dict(hidden_result.items()) |
| 122 | + if _project_uuid_exists_in_destination(dst_conn, hidden_project["uuid"]): |
| 123 | + _red_message(f"SKIPPING, sync for {_project_summary(project)}") |
| 124 | + skipped_projects.append(project) |
| 125 | + continue |
| 126 | + |
| 127 | + projects_to_migrate.append(hidden_project) |
| 128 | + |
| 129 | + # check file_meta_data in the projects to migrate |
| 130 | + for project in projects_to_migrate: |
| 131 | + user_id = project["prj_owner"] |
| 132 | + project_id = project["uuid"] |
| 133 | + |
| 134 | + files_metadata_cursor = _get_file_meta_data_without_soft_links( |
| 135 | + connection=src_conn, user_id=user_id, project_id=project_id |
| 136 | + ) |
| 137 | + for result in files_metadata_cursor: |
| 138 | + file_meta_data = dict(result.items()) |
| 139 | + file_uuid = file_meta_data["file_uuid"] |
| 140 | + |
| 141 | + if _meta_data_exists_in_destination(dst_conn, file_uuid): |
| 142 | + _red_message(f"SKIPPING, sync for {_file_summary(file_meta_data)}") |
| 143 | + skipped_files_meta_data.append(file_meta_data) |
| 144 | + continue |
| 145 | + |
| 146 | + files_meta_data_to_migrate.append(file_meta_data) |
| 147 | + |
| 148 | + if len(skipped_projects) > 0: |
| 149 | + _red_message("SKIPPED projects count %s" % len(skipped_projects)) |
| 150 | + if len(skipped_files_meta_data) > 0: |
| 151 | + _red_message("SKIPPED files count %s" % len(skipped_files_meta_data)) |
| 152 | + |
| 153 | + _green_message("Projects to move %s" % len(projects_to_migrate)) |
| 154 | + _green_message("Files to move %s" % len(files_meta_data_to_migrate)) |
| 155 | + |
| 156 | + # if files and projects already exist |
| 157 | + if len(skipped_files_meta_data) > 0 or len(skipped_projects) > 0: |
| 158 | + _red_message( |
| 159 | + "Projects skipped uuid(primary keys) listing: %s" |
| 160 | + % [x["uuid"] for x in skipped_projects], |
| 161 | + ) |
| 162 | + _red_message( |
| 163 | + "File meta data skipped file_uuid(primary keys) listing: %s" |
| 164 | + % [x["file_uuid"] for x in skipped_files_meta_data], |
| 165 | + ) |
| 166 | + raise Exception( |
| 167 | + "Could not continue migration, some projects or files already exist." |
| 168 | + ) |
| 169 | + |
| 170 | + return projects_to_migrate, files_meta_data_to_migrate |
| 171 | + |
| 172 | + |
| 173 | +def insert_file_meta_data(connection: Connection, data: Dict[str, Any]) -> None: |
| 174 | + connection.execute(insert(file_meta_data).values(**data)) |
| 175 | + |
| 176 | + |
| 177 | +def insert_projects(connection: Connection, data: Dict[str, Any]) -> None: |
| 178 | + connection.execute(insert(projects).values(**data)) |
0 commit comments