Skip to content

Commit 54865fb

Browse files
author
Andrei Neagu
committed
added new project_networks
1 parent bcde82a commit 54865fb

File tree

4 files changed

+89
-0
lines changed

4 files changed

+89
-0
lines changed

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/core/events.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,17 @@
66
create_remote_docker_client_input_state,
77
remote_docker_client_lifespan,
88
)
9+
from servicelib.fastapi.postgres_lifespan import (
10+
create_postgres_database_input_state,
11+
)
912
from servicelib.fastapi.prometheus_instrumentation import (
1013
create_prometheus_instrumentationmain_input_state,
1114
prometheus_instrumentation_lifespan,
1215
)
1316

1417
from .._meta import APP_FINISHED_BANNER_MSG, APP_STARTED_BANNER_MSG
1518
from ..api.rpc.routes import rpc_api_routes_lifespan
19+
from ..repository.events import repository_lifespan_manager
1620
from ..services.catalog import catalog_lifespan
1721
from ..services.deferred_manager import deferred_manager_lifespan
1822
from ..services.director_v0 import director_v0_lifespan
@@ -36,6 +40,7 @@ async def _settings_lifespan(app: FastAPI) -> AsyncIterator[State]:
3640
settings: ApplicationSettings = app.state.settings
3741

3842
yield {
43+
**create_postgres_database_input_state(settings.DYNAMIC_SCHEDULER_POSTGRES),
3944
**create_prometheus_instrumentationmain_input_state(
4045
enabled=settings.DYNAMIC_SCHEDULER_PROMETHEUS_INSTRUMENTATION_ENABLED
4146
),
@@ -49,6 +54,7 @@ def create_app_lifespan() -> LifespanManager:
4954
app_lifespan = LifespanManager()
5055
app_lifespan.add(_settings_lifespan)
5156

57+
app_lifespan.include(repository_lifespan_manager)
5258
app_lifespan.add(director_v2_lifespan)
5359
app_lifespan.add(director_v0_lifespan)
5460
app_lifespan.add(catalog_lifespan)

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/repository/__init__.py

Whitespace-only changes.
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import logging
2+
from collections.abc import AsyncIterator
3+
4+
from fastapi import FastAPI
5+
from fastapi_lifespan_manager import LifespanManager, State
6+
from servicelib.fastapi.postgres_lifespan import (
7+
PostgresLifespanState,
8+
postgres_database_lifespan,
9+
)
10+
11+
_logger = logging.getLogger(__name__)
12+
13+
14+
async def _database_lifespan(app: FastAPI, state: State) -> AsyncIterator[State]:
15+
app.state.engine = state[PostgresLifespanState.POSTGRES_ASYNC_ENGINE]
16+
17+
# TODO initialize all the repos here?
18+
19+
# app.state.default_product_name = await repo.get_default_product_name()
20+
21+
yield {}
22+
23+
24+
repository_lifespan_manager = LifespanManager()
25+
repository_lifespan_manager.add(postgres_database_lifespan)
26+
repository_lifespan_manager.add(_database_lifespan)
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import sqlalchemy as sa
2+
from common_library.errors_classes import OsparcErrorMixin
3+
from models_library.projects import ProjectID
4+
from models_library.projects_networks import NetworksWithAliases, ProjectsNetworks
5+
from simcore_postgres_database.models.projects_networks import projects_networks
6+
from simcore_postgres_database.utils_repos import (
7+
pass_or_acquire_connection,
8+
transaction_context,
9+
)
10+
from sqlalchemy.dialects.postgresql import insert as pg_insert
11+
from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine
12+
13+
14+
class BaseProjectNetwroksError(OsparcErrorMixin, RuntimeError):
15+
msg_template: str = "project networks unexpected error"
16+
17+
18+
class ProjectNetworkNotFoundError(BaseProjectNetwroksError):
19+
msg_template: str = "no networks found for project {project_id}"
20+
21+
22+
class ProjectNetworksRepo:
23+
def __init__(self, engine: AsyncEngine):
24+
self.engine = engine
25+
26+
async def get_projects_networks(
27+
self, connection: AsyncConnection | None = None, *, project_id: ProjectID
28+
) -> ProjectsNetworks:
29+
async with pass_or_acquire_connection(self.engine, connection) as conn:
30+
result = await conn.execute(
31+
sa.select(projects_networks).where(
32+
projects_networks.c.project_uuid == f"{project_id}"
33+
)
34+
)
35+
row = result.first()
36+
if not row:
37+
raise ProjectNetworkNotFoundError(project_id=project_id)
38+
return ProjectsNetworks.model_validate(row)
39+
40+
async def upsert_projects_networks(
41+
self,
42+
connection: AsyncConnection | None = None,
43+
*,
44+
project_id: ProjectID,
45+
networks_with_aliases: NetworksWithAliases,
46+
) -> None:
47+
projects_networks_to_insert = ProjectsNetworks.model_validate(
48+
{"project_uuid": project_id, "networks_with_aliases": networks_with_aliases}
49+
)
50+
51+
async with transaction_context(self.engine, connection) as conn:
52+
row_data = projects_networks_to_insert.model_dump(mode="json")
53+
insert_stmt = pg_insert(projects_networks).values(**row_data)
54+
upsert_snapshot = insert_stmt.on_conflict_do_update(
55+
index_elements=[projects_networks.c.project_uuid], set_=row_data
56+
)
57+
await conn.execute(upsert_snapshot)

0 commit comments

Comments
 (0)