Skip to content

Commit fc44133

Browse files
adding tags to rut
1 parent 2af7f21 commit fc44133

File tree

20 files changed

+291
-5
lines changed

20 files changed

+291
-5
lines changed

packages/models-library/src/models_library/rabbitmq_messages.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,23 @@ def routing_key(self) -> str | None:
194194
return None
195195

196196

197+
class RabbitResourceTrackingProjectSyncMessage(RabbitMessageBase):
198+
channel_name: Literal["io.simcore.service.tracking-project-sync"] = Field(
199+
default="io.simcore.service.tracking-project-sync", const=True
200+
)
201+
202+
project_id: ProjectID
203+
project_name: str | None = None
204+
project_tags_names: list[str] | None = None
205+
created_at: datetime.datetime = Field(
206+
default_factory=lambda: arrow.utcnow().datetime,
207+
description="message creation datetime",
208+
)
209+
210+
def routing_key(self) -> str | None:
211+
return None
212+
213+
197214
class DynamicServiceRunningMessage(RabbitMessageBase):
198215
channel_name: Literal["io.simcore.service.dynamic-service-running"] = Field(
199216
default="io.simcore.service.dynamic-service-running", const=True
@@ -232,6 +249,7 @@ class RabbitResourceTrackingStartedMessage(RabbitResourceTrackingBaseMessage):
232249

233250
project_id: ProjectID
234251
project_name: str
252+
project_tags: list[tuple[int, str]]
235253

236254
node_id: NodeID
237255
node_name: str

packages/models-library/src/models_library/services_creation.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ class CreateServiceMetricsAdditionalParams(BaseModel):
1717
simcore_user_agent: str
1818
user_email: str
1919
project_name: str
20+
project_tags: list[tuple[int, str]]
2021
node_name: str
2122
service_key: ServiceKey
2223
service_version: ServiceVersion
@@ -35,6 +36,7 @@ class Config:
3536
"simcore_user_agent": "undefined",
3637
"user_email": "[email protected]",
3738
"project_name": "_!New Study",
39+
"project_tags": [],
3840
"node_name": "the service of a lifetime _ *!",
3941
"service_key": ServiceKey("simcore/services/dynamic/test"),
4042
"service_version": ServiceVersion("0.0.1"),

packages/postgres-database/src/simcore_postgres_database/models/resource_tracker_service_runs.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,3 +236,30 @@ class ResourceTrackerServiceRunStatus(str, enum.Enum):
236236
== ResourceTrackerServiceRunStatus.RUNNING
237237
),
238238
)
239+
240+
241+
resource_tracker_project_metadata = sa.Table(
242+
"resource_tracker_project_metadata",
243+
metadata,
244+
sa.Column(
245+
"project_id", # UUID
246+
sa.String,
247+
nullable=False,
248+
doc="We want to store the project id for tracking/billing purposes and be sure it stays there even when the project is deleted (that's also reason why we do not introduce foreign key)",
249+
primary_key=True,
250+
),
251+
# sa.Column(
252+
# "project_name",
253+
# sa.String,
254+
# nullable=False,
255+
# doc="we want to store the project name for tracking/billing purposes and be sure it stays there even when the project is deleted (that's also reason why we do not introduce foreign key)",
256+
# ),
257+
sa.Column(
258+
"project_tags_names",
259+
JSONB,
260+
nullable=False,
261+
server_default=sa.text("'{}'::jsonb"),
262+
doc="we want to store the project name for tracking/billing purposes and be sure it stays there even when the project is deleted (that's also reason why we do not introduce foreign key)",
263+
),
264+
column_modified_datetime(timezone=True),
265+
)

packages/postgres-database/src/simcore_postgres_database/utils_tags.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
"""
33

44
from typing import TypedDict
5+
from uuid import UUID
56

67
from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine
78

@@ -11,6 +12,7 @@
1112
create_tag_stmt,
1213
delete_tag_stmt,
1314
get_tag_stmt,
15+
list_tag_ids_and_names_by_project_uuid_stmt,
1416
list_tags_stmt,
1517
set_tag_access_rights_stmt,
1618
update_tag_stmt,
@@ -175,6 +177,16 @@ async def get(
175177
delete=row.delete,
176178
)
177179

180+
async def list_tag_ids_and_names_by_project_uuid(
181+
self, connection: AsyncConnection | None = None, *, project_uuid: UUID
182+
) -> list[tuple[int, str]]:
183+
stmt_list = list_tag_ids_and_names_by_project_uuid_stmt(
184+
project_uuid=project_uuid
185+
)
186+
async with pass_or_acquire_connection(self.engine, connection) as conn:
187+
result = await conn.stream(stmt_list)
188+
return [(row.id, row.name) async for row in result]
189+
178190
async def update(
179191
self,
180192
connection: AsyncConnection | None = None,

packages/postgres-database/src/simcore_postgres_database/utils_tags_sql.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import functools
2+
from uuid import UUID
23

34
import sqlalchemy as sa
45
from simcore_postgres_database.models.groups import user_to_groups
6+
from simcore_postgres_database.models.projects import projects
57
from simcore_postgres_database.models.projects_tags import projects_tags
68
from simcore_postgres_database.models.services_tags import services_tags
79
from simcore_postgres_database.models.tags import tags
@@ -60,7 +62,7 @@ def get_tag_stmt(
6062
# aggregation ensures MOST PERMISSIVE policy of access-rights
6163
sa.func.bool_or(tags_access_rights.c.read).label("read"),
6264
sa.func.bool_or(tags_access_rights.c.write).label("write"),
63-
sa.func.bool_or(tags_access_rights.c.delete).label("delete")
65+
sa.func.bool_or(tags_access_rights.c.delete).label("delete"),
6466
)
6567
.select_from(
6668
_join_user_to_given_tag(
@@ -73,14 +75,29 @@ def get_tag_stmt(
7375
)
7476

7577

78+
def list_tag_ids_and_names_by_project_uuid_stmt(
79+
project_uuid: UUID,
80+
):
81+
return (
82+
sa.select(tags.c.id, tags.c.name)
83+
.select_from(
84+
projects_tags.join(tags, tags.c.id == projects_tags.c.tag_id).join(
85+
projects, projects_tags.c.project_id == projects.c.id
86+
)
87+
)
88+
.where(projects.c.uuid == f"{project_uuid}")
89+
.group_by(tags.c.id, tags.c.name)
90+
)
91+
92+
7693
def list_tags_stmt(*, user_id: int):
7794
return (
7895
sa.select(
7996
*_TAG_COLUMNS,
8097
# aggregation ensures MOST PERMISSIVE policy of access-rights
8198
sa.func.bool_or(tags_access_rights.c.read).label("read"),
8299
sa.func.bool_or(tags_access_rights.c.write).label("write"),
83-
sa.func.bool_or(tags_access_rights.c.delete).label("delete")
100+
sa.func.bool_or(tags_access_rights.c.delete).label("delete"),
84101
)
85102
.select_from(
86103
_join_user_to_tags(
@@ -104,7 +121,7 @@ def count_groups_with_given_access_rights_stmt(
104121
tag_id: int,
105122
read: bool | None,
106123
write: bool | None,
107-
delete: bool | None
124+
delete: bool | None,
108125
):
109126
"""
110127
How many groups (from this user_id) are given EXACTLY these access permissions

services/director-v2/src/simcore_service_director_v2/api/routes/computations.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040
from servicelib.logging_utils import log_decorator
4141
from servicelib.rabbitmq import RabbitMQRPCClient
4242
from simcore_postgres_database.utils_projects_metadata import DBProjectNotFoundError
43+
from simcore_postgres_database.utils_tags import TagsRepo
44+
from sqlalchemy.ext.asyncio import AsyncEngine
4345
from starlette import status
4446
from starlette.requests import Request
4547
from tenacity import retry
@@ -64,6 +66,7 @@
6466
from ...models.comp_tasks import CompTaskAtDB
6567
from ...modules.catalog import CatalogClient
6668
from ...modules.comp_scheduler import BaseCompScheduler
69+
from ...modules.db._asyncpg import get_asyncpg_engine
6770
from ...modules.db.repositories.clusters import ClustersRepository
6871
from ...modules.db.repositories.comp_pipelines import CompPipelinesRepository
6972
from ...modules.db.repositories.comp_runs import CompRunsRepository
@@ -219,6 +222,7 @@ async def _try_start_pipeline(
219222
project: ProjectAtDB,
220223
users_repo: UsersRepository,
221224
projects_metadata_repo: ProjectsMetadataRepository,
225+
asyncpg_engine: AsyncEngine,
222226
) -> None:
223227
if not minimal_dag.nodes():
224228
# 2 options here: either we have cycles in the graph or it's really done
@@ -240,6 +244,12 @@ async def _try_start_pipeline(
240244
wallet_id = computation.wallet_info.wallet_id
241245
wallet_name = computation.wallet_info.wallet_name
242246

247+
# Get project tags
248+
repo = TagsRepo(asyncpg_engine)
249+
project_tags = await repo.list_tag_ids_and_names_by_project_uuid(
250+
project_uuid=project.uuid
251+
)
252+
243253
await scheduler.run_new_pipeline(
244254
computation.user_id,
245255
computation.project_id,
@@ -251,6 +261,7 @@ async def _try_start_pipeline(
251261
},
252262
product_name=computation.product_name,
253263
project_name=project.name,
264+
project_tags=project_tags,
254265
simcore_user_agent=computation.simcore_user_agent,
255266
user_email=await users_repo.get_user_email(computation.user_id),
256267
wallet_id=wallet_id,
@@ -376,6 +387,7 @@ async def create_computation( # noqa: PLR0913 # pylint: disable=too-many-positi
376387
project=project,
377388
users_repo=users_repo,
378389
projects_metadata_repo=projects_metadata_repo,
390+
asyncpg_engine=get_asyncpg_engine(request.app),
379391
)
380392

381393
# filter the tasks by the effective pipeline

services/director-v2/src/simcore_service_director_v2/models/comp_runs.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ class ProjectMetadataDict(TypedDict, total=False):
2727
class RunMetadataDict(TypedDict, total=False):
2828
node_id_names_map: dict[NodeID, str]
2929
project_name: str
30+
project_tags: list[tuple[int, str]]
3031
product_name: str
3132
simcore_user_agent: str
3233
user_email: str

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_base_scheduler.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,7 @@ async def _process_started_tasks(
452452
project_name=run_metadata.get("project_metadata", {}).get( # type: ignore[arg-type]
453453
"project_name", UNDEFINED_STR_METADATA
454454
),
455+
project_tags=run_metadata.get("project_tags", []),
455456
node_id=t.node_id,
456457
node_name=run_metadata.get("node_id_names_map", {}).get(
457458
t.node_id, UNDEFINED_STR_METADATA
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,27 @@
11
from fastapi import FastAPI
22
from settings_library.postgres import PostgresSettings
33

4+
from ._asyncpg import (
5+
asyncpg_close_db_connection,
6+
asyncpg_connect_to_db,
7+
get_asyncpg_engine,
8+
)
49
from .events import close_db_connection, connect_to_db
510

611

712
def setup(app: FastAPI, settings: PostgresSettings) -> None:
813
async def on_startup() -> None:
914
await connect_to_db(app, settings)
15+
await asyncpg_connect_to_db(app, settings)
1016

1117
async def on_shutdown() -> None:
18+
await asyncpg_close_db_connection(app)
1219
await close_db_connection(app)
1320

21+
# Add async asyncpg
22+
1423
app.add_event_handler("startup", on_startup)
1524
app.add_event_handler("shutdown", on_shutdown)
25+
26+
27+
__all__: tuple[str, ...] = ("get_asyncpg_engine",)
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import logging
2+
3+
from fastapi import FastAPI
4+
from servicelib.db_asyncpg_utils import create_async_engine_and_pg_database_ready
5+
from servicelib.logging_utils import log_context
6+
from settings_library.postgres import PostgresSettings
7+
from simcore_postgres_database.utils_aiosqlalchemy import get_pg_engine_stateinfo
8+
9+
_logger = logging.getLogger(__name__)
10+
11+
12+
async def asyncpg_connect_to_db(app: FastAPI, settings: PostgresSettings) -> None:
13+
with log_context(
14+
_logger,
15+
logging.DEBUG,
16+
f"Connecting and migraging {settings.dsn_with_async_sqlalchemy}",
17+
):
18+
engine = await create_async_engine_and_pg_database_ready(settings)
19+
20+
app.state.asyncpg_engine = engine
21+
_logger.debug(
22+
"Setup engine: %s",
23+
await get_pg_engine_stateinfo(engine),
24+
)
25+
26+
27+
async def asyncpg_close_db_connection(app: FastAPI) -> None:
28+
with log_context(
29+
_logger, logging.DEBUG, f"db disconnect of {app.state.asyncpg_engine}"
30+
):
31+
if engine := app.state.asyncpg_engine:
32+
await engine.dispose()
33+
34+
35+
def get_asyncpg_engine(app: FastAPI):
36+
return app.state.asyncpg_engine

0 commit comments

Comments
 (0)