| 
 | 1 | +import asyncio  | 
 | 2 | +import logging  | 
 | 3 | +import sys  | 
 | 4 | +from contextlib import asynccontextmanager  | 
 | 5 | +from typing import AsyncIterator, Final  | 
 | 6 | +from uuid import UUID  | 
 | 7 | + | 
 | 8 | +import typer  | 
 | 9 | +from fastapi import FastAPI  | 
 | 10 | +from models_library.projects import NodeIDStr, ProjectID  | 
 | 11 | +from models_library.projects_nodes_io import NodeID  | 
 | 12 | +from pydantic import AnyHttpUrl, parse_obj_as  | 
 | 13 | +from settings_library.utils_cli import create_settings_command  | 
 | 14 | +from tenacity._asyncio import AsyncRetrying  | 
 | 15 | +from tenacity.stop import stop_after_attempt  | 
 | 16 | +from tenacity.wait import wait_random_exponential  | 
 | 17 | + | 
 | 18 | +from .core.application import create_base_app  | 
 | 19 | +from .core.settings import AppSettings  | 
 | 20 | +from .meta import PROJECT_NAME  | 
 | 21 | +from .models.schemas.dynamic_services import DynamicSidecarNames  | 
 | 22 | +from .modules import db, director_v0, dynamic_sidecar  | 
 | 23 | +from .modules.db.repositories.projects import ProjectsRepository  | 
 | 24 | +from .modules.director_v0 import DirectorV0Client  | 
 | 25 | +from .modules.dynamic_sidecar import api_client  | 
 | 26 | +from .modules.dynamic_sidecar.scheduler.events_utils import (  | 
 | 27 | +    fetch_repo_outside_of_request,  | 
 | 28 | +)  | 
 | 29 | +from .modules.projects_networks import requires_dynamic_sidecar  | 
 | 30 | + | 
 | 31 | +DEFAULT_NODE_SAVE_RETRY: Final[int] = 3  | 
 | 32 | + | 
 | 33 | +main = typer.Typer(name=PROJECT_NAME)  | 
 | 34 | + | 
 | 35 | +log = logging.getLogger(__name__)  | 
 | 36 | +main.command()(create_settings_command(settings_cls=AppSettings, logger=log))  | 
 | 37 | + | 
 | 38 | + | 
 | 39 | +@asynccontextmanager  | 
 | 40 | +async def _initialized_app() -> AsyncIterator[FastAPI]:  | 
 | 41 | +    app = create_base_app()  | 
 | 42 | +    settings: AppSettings = app.state.settings  | 
 | 43 | + | 
 | 44 | +    # Initialize minimal required components for the application  | 
 | 45 | +    db.setup(app, settings.POSTGRES)  | 
 | 46 | +    dynamic_sidecar.setup(app)  | 
 | 47 | +    director_v0.setup(app, settings.DIRECTOR_V0)  | 
 | 48 | + | 
 | 49 | +    await app.router.startup()  | 
 | 50 | +    yield app  | 
 | 51 | +    await app.router.shutdown()  | 
 | 52 | + | 
 | 53 | + | 
 | 54 | +def _get_dynamic_sidecar_endpoint(  | 
 | 55 | +    settings: AppSettings, node_id: NodeIDStr  | 
 | 56 | +) -> AnyHttpUrl:  | 
 | 57 | +    dynamic_sidecar_names = DynamicSidecarNames.make(UUID(node_id))  | 
 | 58 | +    hostname = dynamic_sidecar_names.service_name_dynamic_sidecar  | 
 | 59 | +    port = settings.DYNAMIC_SERVICES.DYNAMIC_SIDECAR.DYNAMIC_SIDECAR_PORT  | 
 | 60 | +    return parse_obj_as(AnyHttpUrl, f"http://{hostname}:{port}")  # NOSONAR  | 
 | 61 | + | 
 | 62 | + | 
 | 63 | +async def _save_node_state(  | 
 | 64 | +    app,  | 
 | 65 | +    dynamic_sidecar_client: api_client.DynamicSidecarClient,  | 
 | 66 | +    retry_save: int,  | 
 | 67 | +    node_uuid: NodeIDStr,  | 
 | 68 | +    label: str = "",  | 
 | 69 | +) -> None:  | 
 | 70 | +    typer.echo(f"Saving state for {node_uuid} {label}")  | 
 | 71 | +    async for attempt in AsyncRetrying(  | 
 | 72 | +        wait=wait_random_exponential(),  | 
 | 73 | +        stop=stop_after_attempt(retry_save),  | 
 | 74 | +        reraise=True,  | 
 | 75 | +    ):  | 
 | 76 | +        with attempt:  | 
 | 77 | +            typer.echo(f"Attempting to save {node_uuid} {label}")  | 
 | 78 | +            await dynamic_sidecar_client.save_service_state(  | 
 | 79 | +                _get_dynamic_sidecar_endpoint(app.state.settings, node_uuid)  | 
 | 80 | +            )  | 
 | 81 | + | 
 | 82 | + | 
 | 83 | +async def _async_project_save_state(project_id: ProjectID, retry_save: int) -> None:  | 
 | 84 | +    async with _initialized_app() as app:  | 
 | 85 | +        projects_repository: ProjectsRepository = fetch_repo_outside_of_request(  | 
 | 86 | +            app, ProjectsRepository  | 
 | 87 | +        )  | 
 | 88 | +        project_at_db = await projects_repository.get_project(project_id)  | 
 | 89 | + | 
 | 90 | +        typer.echo(f"Saving project '{project_at_db.uuid}' - '{project_at_db.name}'")  | 
 | 91 | + | 
 | 92 | +        dynamic_sidecar_client = api_client.get_dynamic_sidecar_client(app)  | 
 | 93 | +        nodes_failed_to_save: list[NodeIDStr] = []  | 
 | 94 | +        for node_uuid, node_content in project_at_db.workbench.items():  | 
 | 95 | +            # onl dynamic-sidecars are used  | 
 | 96 | +            if not await requires_dynamic_sidecar(  | 
 | 97 | +                service_key=node_content.key,  | 
 | 98 | +                service_version=node_content.version,  | 
 | 99 | +                director_v0_client=DirectorV0Client.instance(app),  | 
 | 100 | +            ):  | 
 | 101 | +                continue  | 
 | 102 | + | 
 | 103 | +            try:  | 
 | 104 | +                await _save_node_state(  | 
 | 105 | +                    app,  | 
 | 106 | +                    dynamic_sidecar_client,  | 
 | 107 | +                    retry_save,  | 
 | 108 | +                    node_uuid,  | 
 | 109 | +                    node_content.label,  | 
 | 110 | +                )  | 
 | 111 | +            except Exception:  # pylint: disable=broad-except  | 
 | 112 | +                nodes_failed_to_save.append(node_uuid)  | 
 | 113 | + | 
 | 114 | +    if nodes_failed_to_save:  | 
 | 115 | +        typer.echo(  | 
 | 116 | +            "The following nodes failed to save:"  | 
 | 117 | +            + "\n- "  | 
 | 118 | +            + "\n- ".join(nodes_failed_to_save)  | 
 | 119 | +            + "\nPlease try to save them individually!"  | 
 | 120 | +        )  | 
 | 121 | +        sys.exit(1)  | 
 | 122 | + | 
 | 123 | +    typer.echo(f"Save complete for project {project_id}")  | 
 | 124 | + | 
 | 125 | + | 
 | 126 | +@main.command()  | 
 | 127 | +def project_save_state(  | 
 | 128 | +    project_id: ProjectID, retry_save: int = DEFAULT_NODE_SAVE_RETRY  | 
 | 129 | +):  | 
 | 130 | +    """  | 
 | 131 | +    Saves the state of all dy-sidecars in a project.  | 
 | 132 | +    In case of error while saving the state of an individual node,  | 
 | 133 | +    it will retry to save.  | 
 | 134 | +    If errors persist it will produce a list of nodes which failed to save.  | 
 | 135 | +    """  | 
 | 136 | +    asyncio.run(_async_project_save_state(project_id, retry_save))  | 
 | 137 | + | 
 | 138 | + | 
 | 139 | +async def _async_node_save_state(node_id: NodeID, retry_save: int) -> None:  | 
 | 140 | +    async with _initialized_app() as app:  | 
 | 141 | +        dynamic_sidecar_client = api_client.get_dynamic_sidecar_client(app)  | 
 | 142 | +        await _save_node_state(  | 
 | 143 | +            app, dynamic_sidecar_client, retry_save, NodeIDStr(f"{node_id}")  | 
 | 144 | +        )  | 
 | 145 | + | 
 | 146 | +    typer.echo(f"Node {node_id} save completed")  | 
 | 147 | + | 
 | 148 | + | 
 | 149 | +@main.command()  | 
 | 150 | +def node_save_state(node_id: NodeID, retry_save: int = DEFAULT_NODE_SAVE_RETRY):  | 
 | 151 | +    """  | 
 | 152 | +    Saves the state of an individual node in the project.  | 
 | 153 | +    """  | 
 | 154 | +    asyncio.run(_async_node_save_state(node_id, retry_save))  | 
0 commit comments