diff --git a/packages/models-library/src/models_library/rabbitmq_messages.py b/packages/models-library/src/models_library/rabbitmq_messages.py index 69812689baae..288731b31e4a 100644 --- a/packages/models-library/src/models_library/rabbitmq_messages.py +++ b/packages/models-library/src/models_library/rabbitmq_messages.py @@ -194,6 +194,25 @@ def routing_key(self) -> str | None: return None +class RabbitResourceTrackingProjectSyncMessage(RabbitMessageBase): + channel_name: Literal["io.simcore.service.tracking-project-sync"] = Field( + default="io.simcore.service.tracking-project-sync", const=True + ) + + project_id: ProjectID | None = None + project_name: str | None = None + project_tags: list[tuple[int, str]] | None = None + created_at: datetime.datetime = Field( + default_factory=lambda: arrow.utcnow().datetime, + description="message creation datetime", + ) + + def routing_key(self) -> str | None: + return None + + # MD: Add validation ... + + class DynamicServiceRunningMessage(RabbitMessageBase): channel_name: Literal["io.simcore.service.dynamic-service-running"] = Field( default="io.simcore.service.dynamic-service-running", const=True @@ -232,6 +251,7 @@ class RabbitResourceTrackingStartedMessage(RabbitResourceTrackingBaseMessage): project_id: ProjectID project_name: str + project_tags: list[tuple[int, str]] node_id: NodeID node_name: str diff --git a/packages/models-library/src/models_library/services_creation.py b/packages/models-library/src/models_library/services_creation.py index e2102efe0750..93c06b6a8ac6 100644 --- a/packages/models-library/src/models_library/services_creation.py +++ b/packages/models-library/src/models_library/services_creation.py @@ -17,6 +17,7 @@ class CreateServiceMetricsAdditionalParams(BaseModel): simcore_user_agent: str user_email: str project_name: str + project_tags: list[tuple[int, str]] node_name: str service_key: ServiceKey service_version: ServiceVersion @@ -35,6 +36,7 @@ class Config: "simcore_user_agent": "undefined", "user_email": "test@test.com", "project_name": "_!New Study", + "project_tags": [], "node_name": "the service of a lifetime _ *!", "service_key": ServiceKey("simcore/services/dynamic/test"), "service_version": ServiceVersion("0.0.1"), diff --git a/packages/postgres-database/src/simcore_postgres_database/migration/versions/da145a5fe27b_introducing_rut_project_metadata.py b/packages/postgres-database/src/simcore_postgres_database/migration/versions/da145a5fe27b_introducing_rut_project_metadata.py new file mode 100644 index 000000000000..a416340c7c5e --- /dev/null +++ b/packages/postgres-database/src/simcore_postgres_database/migration/versions/da145a5fe27b_introducing_rut_project_metadata.py @@ -0,0 +1,59 @@ +"""introducing rut project metadata + +Revision ID: da145a5fe27b +Revises: 8bfe65a5e294 +Create Date: 2024-11-12 16:03:41.595377+00:00 + +""" +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = "da145a5fe27b" +down_revision = "8bfe65a5e294" +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + "resource_tracker_project_metadata", + sa.Column("project_id", sa.String(), nullable=False), + sa.Column("project_name", sa.String(), nullable=False), + sa.Column( + "project_tags", + postgresql.JSONB(astext_type=sa.Text()), + server_default=sa.text("'{}'::jsonb"), + nullable=False, + ), + sa.Column( + "modified", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + nullable=False, + ), + sa.PrimaryKeyConstraint("project_id"), + ) + op.create_foreign_key( + "fk_resource_tracker_service_runs_project_metadata_project_id", + "resource_tracker_service_runs", + "resource_tracker_project_metadata", + ["project_id"], + ["project_id"], + onupdate="CASCADE", + ondelete="RESTRICT", + ) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_constraint( + "fk_resource_tracker_service_runs_project_metadata_project_id", + "resource_tracker_service_runs", + type_="foreignkey", + ) + op.drop_table("resource_tracker_project_metadata") + # ### end Alembic commands ### diff --git a/packages/postgres-database/src/simcore_postgres_database/models/resource_tracker_project_metadata.py b/packages/postgres-database/src/simcore_postgres_database/models/resource_tracker_project_metadata.py new file mode 100644 index 000000000000..606724867c42 --- /dev/null +++ b/packages/postgres-database/src/simcore_postgres_database/models/resource_tracker_project_metadata.py @@ -0,0 +1,33 @@ +""" resource_tracker_service_runs table +""" + +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import JSONB + +from ._common import column_modified_datetime +from .base import metadata + +resource_tracker_project_metadata = sa.Table( + "resource_tracker_project_metadata", + metadata, + sa.Column( + "project_id", # UUID + sa.String, + nullable=False, + primary_key=True, + ), + sa.Column( + "project_name", + sa.String, + nullable=False, + doc="we want to store the project name for tracking/billing purposes and be sure it stays there even when the project is deleted (that's also reason why we do not introduce foreign key)", + ), + sa.Column( + "project_tags", + JSONB, + nullable=False, + server_default=sa.text("'{}'::jsonb"), + doc="we want to store the project tags for tracking/billing purposes and be sure it stays there even when the project is deleted (that's also reason why we do not introduce foreign key)", + ), + column_modified_datetime(timezone=True), +) diff --git a/packages/postgres-database/src/simcore_postgres_database/models/resource_tracker_service_runs.py b/packages/postgres-database/src/simcore_postgres_database/models/resource_tracker_service_runs.py index 33eddcb9fc77..701162c69fe0 100644 --- a/packages/postgres-database/src/simcore_postgres_database/models/resource_tracker_service_runs.py +++ b/packages/postgres-database/src/simcore_postgres_database/models/resource_tracker_service_runs.py @@ -98,6 +98,12 @@ class ResourceTrackerServiceRunStatus(str, enum.Enum): sa.Column( "project_id", # UUID sa.String, + sa.ForeignKey( + "resource_tracker_project_metadata.project_id", + name="fk_resource_tracker_service_runs_project_metadata_project_id", + onupdate="CASCADE", + ondelete="RESTRICT", + ), nullable=False, doc="We want to store the project id for tracking/billing purposes and be sure it stays there even when the project is deleted (that's also reason why we do not introduce foreign key)", ), diff --git a/packages/postgres-database/src/simcore_postgres_database/utils_tags.py b/packages/postgres-database/src/simcore_postgres_database/utils_tags.py index 7421f25de0fc..474028062c4e 100644 --- a/packages/postgres-database/src/simcore_postgres_database/utils_tags.py +++ b/packages/postgres-database/src/simcore_postgres_database/utils_tags.py @@ -2,6 +2,7 @@ """ from typing import TypedDict +from uuid import UUID from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine @@ -11,6 +12,7 @@ create_tag_stmt, delete_tag_stmt, get_tag_stmt, + list_tag_ids_and_names_by_project_uuid_stmt, list_tags_stmt, set_tag_access_rights_stmt, update_tag_stmt, @@ -175,6 +177,16 @@ async def get( delete=row.delete, ) + async def list_tag_ids_and_names_by_project_uuid( + self, connection: AsyncConnection | None = None, *, project_uuid: UUID + ) -> list[tuple[int, str]]: + stmt_list = list_tag_ids_and_names_by_project_uuid_stmt( + project_uuid=project_uuid + ) + async with pass_or_acquire_connection(self.engine, connection) as conn: + result = await conn.stream(stmt_list) + return [(row.id, row.name) async for row in result] + async def update( self, connection: AsyncConnection | None = None, diff --git a/packages/postgres-database/src/simcore_postgres_database/utils_tags_sql.py b/packages/postgres-database/src/simcore_postgres_database/utils_tags_sql.py index bd727a0dcc32..86688277f486 100644 --- a/packages/postgres-database/src/simcore_postgres_database/utils_tags_sql.py +++ b/packages/postgres-database/src/simcore_postgres_database/utils_tags_sql.py @@ -1,7 +1,9 @@ import functools +from uuid import UUID import sqlalchemy as sa from simcore_postgres_database.models.groups import user_to_groups +from simcore_postgres_database.models.projects import projects from simcore_postgres_database.models.projects_tags import projects_tags from simcore_postgres_database.models.services_tags import services_tags from simcore_postgres_database.models.tags import tags @@ -60,7 +62,7 @@ def get_tag_stmt( # aggregation ensures MOST PERMISSIVE policy of access-rights sa.func.bool_or(tags_access_rights.c.read).label("read"), sa.func.bool_or(tags_access_rights.c.write).label("write"), - sa.func.bool_or(tags_access_rights.c.delete).label("delete") + sa.func.bool_or(tags_access_rights.c.delete).label("delete"), ) .select_from( _join_user_to_given_tag( @@ -73,6 +75,21 @@ def get_tag_stmt( ) +def list_tag_ids_and_names_by_project_uuid_stmt( + project_uuid: UUID, +): + return ( + sa.select(tags.c.id, tags.c.name) + .select_from( + projects_tags.join(tags, tags.c.id == projects_tags.c.tag_id).join( + projects, projects_tags.c.project_id == projects.c.id + ) + ) + .where(projects.c.uuid == f"{project_uuid}") + .group_by(tags.c.id, tags.c.name) + ) + + def list_tags_stmt(*, user_id: int): return ( sa.select( @@ -80,7 +97,7 @@ def list_tags_stmt(*, user_id: int): # aggregation ensures MOST PERMISSIVE policy of access-rights sa.func.bool_or(tags_access_rights.c.read).label("read"), sa.func.bool_or(tags_access_rights.c.write).label("write"), - sa.func.bool_or(tags_access_rights.c.delete).label("delete") + sa.func.bool_or(tags_access_rights.c.delete).label("delete"), ) .select_from( _join_user_to_tags( @@ -104,7 +121,7 @@ def count_groups_with_given_access_rights_stmt( tag_id: int, read: bool | None, write: bool | None, - delete: bool | None + delete: bool | None, ): """ How many groups (from this user_id) are given EXACTLY these access permissions diff --git a/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py b/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py index b7f47b186e71..1da0b3109096 100644 --- a/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py +++ b/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py @@ -40,6 +40,8 @@ from servicelib.logging_utils import log_decorator from servicelib.rabbitmq import RabbitMQRPCClient from simcore_postgres_database.utils_projects_metadata import DBProjectNotFoundError +from simcore_postgres_database.utils_tags import TagsRepo +from sqlalchemy.ext.asyncio import AsyncEngine from starlette import status from starlette.requests import Request from tenacity import retry @@ -64,6 +66,7 @@ from ...models.comp_tasks import CompTaskAtDB from ...modules.catalog import CatalogClient from ...modules.comp_scheduler import BaseCompScheduler +from ...modules.db._asyncpg import get_asyncpg_engine from ...modules.db.repositories.clusters import ClustersRepository from ...modules.db.repositories.comp_pipelines import CompPipelinesRepository from ...modules.db.repositories.comp_runs import CompRunsRepository @@ -219,6 +222,7 @@ async def _try_start_pipeline( project: ProjectAtDB, users_repo: UsersRepository, projects_metadata_repo: ProjectsMetadataRepository, + asyncpg_engine: AsyncEngine, ) -> None: if not minimal_dag.nodes(): # 2 options here: either we have cycles in the graph or it's really done @@ -240,6 +244,12 @@ async def _try_start_pipeline( wallet_id = computation.wallet_info.wallet_id wallet_name = computation.wallet_info.wallet_name + # Get project tags + repo = TagsRepo(asyncpg_engine) + project_tags = await repo.list_tag_ids_and_names_by_project_uuid( + project_uuid=project.uuid + ) + await scheduler.run_new_pipeline( computation.user_id, computation.project_id, @@ -251,6 +261,7 @@ async def _try_start_pipeline( }, product_name=computation.product_name, project_name=project.name, + project_tags=project_tags, simcore_user_agent=computation.simcore_user_agent, user_email=await users_repo.get_user_email(computation.user_id), wallet_id=wallet_id, @@ -376,6 +387,7 @@ async def create_computation( # noqa: PLR0913 # pylint: disable=too-many-positi project=project, users_repo=users_repo, projects_metadata_repo=projects_metadata_repo, + asyncpg_engine=get_asyncpg_engine(request.app), ) # filter the tasks by the effective pipeline diff --git a/services/director-v2/src/simcore_service_director_v2/models/comp_runs.py b/services/director-v2/src/simcore_service_director_v2/models/comp_runs.py index 2af0646c3d33..a42abe2fc011 100644 --- a/services/director-v2/src/simcore_service_director_v2/models/comp_runs.py +++ b/services/director-v2/src/simcore_service_director_v2/models/comp_runs.py @@ -27,6 +27,7 @@ class ProjectMetadataDict(TypedDict, total=False): class RunMetadataDict(TypedDict, total=False): node_id_names_map: dict[NodeID, str] project_name: str + project_tags: list[tuple[int, str]] product_name: str simcore_user_agent: str user_email: str diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_base_scheduler.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_base_scheduler.py index cae539596d41..326186140544 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_base_scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_base_scheduler.py @@ -452,6 +452,7 @@ async def _process_started_tasks( project_name=run_metadata.get("project_metadata", {}).get( # type: ignore[arg-type] "project_name", UNDEFINED_STR_METADATA ), + project_tags=run_metadata.get("project_tags", []), node_id=t.node_id, node_name=run_metadata.get("node_id_names_map", {}).get( t.node_id, UNDEFINED_STR_METADATA diff --git a/services/director-v2/src/simcore_service_director_v2/modules/db/__init__.py b/services/director-v2/src/simcore_service_director_v2/modules/db/__init__.py index 599cf89264cc..d924e86fb162 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/db/__init__.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/db/__init__.py @@ -1,15 +1,27 @@ from fastapi import FastAPI from settings_library.postgres import PostgresSettings +from ._asyncpg import ( + asyncpg_close_db_connection, + asyncpg_connect_to_db, + get_asyncpg_engine, +) from .events import close_db_connection, connect_to_db def setup(app: FastAPI, settings: PostgresSettings) -> None: async def on_startup() -> None: await connect_to_db(app, settings) + await asyncpg_connect_to_db(app, settings) async def on_shutdown() -> None: + await asyncpg_close_db_connection(app) await close_db_connection(app) + # Add async asyncpg + app.add_event_handler("startup", on_startup) app.add_event_handler("shutdown", on_shutdown) + + +__all__: tuple[str, ...] = ("get_asyncpg_engine",) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/db/_asyncpg.py b/services/director-v2/src/simcore_service_director_v2/modules/db/_asyncpg.py new file mode 100644 index 000000000000..188117d9c93d --- /dev/null +++ b/services/director-v2/src/simcore_service_director_v2/modules/db/_asyncpg.py @@ -0,0 +1,36 @@ +import logging + +from fastapi import FastAPI +from servicelib.db_asyncpg_utils import create_async_engine_and_pg_database_ready +from servicelib.logging_utils import log_context +from settings_library.postgres import PostgresSettings +from simcore_postgres_database.utils_aiosqlalchemy import get_pg_engine_stateinfo + +_logger = logging.getLogger(__name__) + + +async def asyncpg_connect_to_db(app: FastAPI, settings: PostgresSettings) -> None: + with log_context( + _logger, + logging.DEBUG, + f"Connecting and migraging {settings.dsn_with_async_sqlalchemy}", + ): + engine = await create_async_engine_and_pg_database_ready(settings) + + app.state.asyncpg_engine = engine + _logger.debug( + "Setup engine: %s", + await get_pg_engine_stateinfo(engine), + ) + + +async def asyncpg_close_db_connection(app: FastAPI) -> None: + with log_context( + _logger, logging.DEBUG, f"db disconnect of {app.state.asyncpg_engine}" + ): + if engine := app.state.asyncpg_engine: + await engine.dispose() + + +def get_asyncpg_engine(app: FastAPI): + return app.state.asyncpg_engine diff --git a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_utils.py b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_utils.py index a33f689e9daf..2ae794df8be4 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_utils.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_utils.py @@ -450,6 +450,7 @@ async def generate_tasks_list_from_project( if pricing_info else None, hardware_info=hardware_info, + project_tags=project_tags, ) list_comp_tasks.append(task_db) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_user_services.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_user_services.py index f708c1cb22c4..e782f2228fef 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_user_services.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_user_services.py @@ -9,6 +9,8 @@ from models_library.services_creation import CreateServiceMetricsAdditionalParams from pydantic import parse_obj_as from servicelib.fastapi.long_running_tasks.client import TaskId +from simcore_postgres_database.utils_tags import TagsRepo +from simcore_service_director_v2.modules.db._asyncpg import get_asyncpg_engine from tenacity import RetryError from tenacity.asyncio import AsyncRetrying from tenacity.before_sleep import before_sleep_log @@ -156,6 +158,12 @@ async def progress_create_containers( pricing_unit_id = scheduler_data.pricing_info.pricing_unit_id pricing_unit_cost_id = scheduler_data.pricing_info.pricing_unit_cost_id + # Get project tags + repo = TagsRepo(get_asyncpg_engine(app)) + project_tags = await repo.list_tag_ids_and_names_by_project_uuid( + project_uuid=project.uuid + ) + metrics_params = CreateServiceMetricsAdditionalParams( wallet_id=wallet_id, wallet_name=wallet_name, @@ -166,6 +174,7 @@ async def progress_create_containers( simcore_user_agent=scheduler_data.request_simcore_user_agent, user_email=user_email, project_name=project_name, + project_tags=project_tags, node_name=node_name, service_key=scheduler_data.key, service_version=parse_obj_as(ServiceVersion, scheduler_data.version), diff --git a/services/director-v2/src/simcore_service_director_v2/utils/rabbitmq.py b/services/director-v2/src/simcore_service_director_v2/utils/rabbitmq.py index 57d014a3c0f7..163bb879c10e 100644 --- a/services/director-v2/src/simcore_service_director_v2/utils/rabbitmq.py +++ b/services/director-v2/src/simcore_service_director_v2/utils/rabbitmq.py @@ -83,6 +83,7 @@ async def publish_service_resource_tracking_started( # pylint: disable=too-many user_email: str, project_id: ProjectID, project_name: str, + project_tags: list[tuple[int, str]], node_id: NodeID, node_name: str, parent_project_id: ProjectID | None, @@ -109,6 +110,7 @@ async def publish_service_resource_tracking_started( # pylint: disable=too-many user_email=user_email, project_id=project_id, project_name=project_name, + project_tags=project_tags, node_id=node_id, node_name=node_name, parent_project_id=parent_project_id or project_id, diff --git a/services/director-v2/tests/unit/with_dbs/conftest.py b/services/director-v2/tests/unit/with_dbs/conftest.py index 516730d4e14b..574a22598ce4 100644 --- a/services/director-v2/tests/unit/with_dbs/conftest.py +++ b/services/director-v2/tests/unit/with_dbs/conftest.py @@ -173,6 +173,7 @@ def run_metadata( return RunMetadataDict( node_id_names_map={}, project_name=faker.name(), + project_tags=[], product_name=osparc_product_name, simcore_user_agent=simcore_user_agent, user_email=faker.email(), diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/_core.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/_core.py index 6da3aa3f00ce..0f54b6cfb466 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/_core.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/_core.py @@ -124,6 +124,7 @@ async def send_service_started( user_email=metrics_params.user_email, project_id=settings.DY_SIDECAR_PROJECT_ID, project_name=metrics_params.project_name, + project_tags=metrics_params.project_tags, node_id=settings.DY_SIDECAR_NODE_ID, node_name=metrics_params.node_name, parent_project_id=settings.DY_SIDECAR_PROJECT_ID, diff --git a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/modules/db/repositories/resource_tracker.py b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/modules/db/repositories/resource_tracker.py index 33a3e58d137e..02664c93e027 100644 --- a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/modules/db/repositories/resource_tracker.py +++ b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/modules/db/repositories/resource_tracker.py @@ -9,6 +9,7 @@ ) from models_library.api_schemas_storage import S3BucketName from models_library.products import ProductName +from models_library.projects import ProjectID from models_library.resource_tracker import ( CreditClassification, CreditTransactionId, @@ -43,6 +44,9 @@ from simcore_postgres_database.models.resource_tracker_pricing_units import ( resource_tracker_pricing_units, ) +from simcore_postgres_database.models.resource_tracker_project_metadata import ( + resource_tracker_project_metadata, +) from simcore_postgres_database.models.resource_tracker_service_runs import ( resource_tracker_service_runs, ) @@ -199,6 +203,42 @@ async def update_service_run_stopped_at( return None return ServiceRunDB.from_orm(row) + async def insert_rut_project_metadata( + self, + project_id: ProjectID, + project_name: str, + project_tags_db: dict[str, dict[str, str]], + ) -> None: + async with self.db_engine.begin() as conn: + insert_stmt = resource_tracker_project_metadata.insert().values( + project_id=f"{project_id}", + project_name=project_name, + project_tags=project_tags_db, + modified=sa.func.now(), + ) + await conn.execute(insert_stmt) + + async def update_rut_project_metadata( + self, + project_id: ProjectID, + project_name: str | None = None, + project_tags: dict[str, dict[str, str]] | None = None, + ) -> None: + + _update_data = { + "project_name": project_name, + "project_tags": project_tags, + } + _update_data_clean = {k: v for k, v in _update_data.items() if v is not None} + + async with self.db_engine.begin() as conn: + update_stmt = ( + resource_tracker_service_runs.update() + .values(modified=sa.func.now(), **_update_data_clean) + .where(resource_tracker_service_runs.c.project_id == f"{project_id}") + ) + await conn.execute(update_stmt) + async def get_service_run_by_id( self, service_run_id: ServiceRunId ) -> ServiceRunDB | None: @@ -436,7 +476,9 @@ async def export_service_runs_table_to_s3( resource_tracker_service_runs.c.service_run_id, resource_tracker_service_runs.c.wallet_name, resource_tracker_service_runs.c.user_email, - resource_tracker_service_runs.c.project_name, + resource_tracker_service_runs.c.root_parent_project_name.label( + "project_name" + ), resource_tracker_service_runs.c.node_name, resource_tracker_service_runs.c.service_key, resource_tracker_service_runs.c.service_version, @@ -445,6 +487,7 @@ async def export_service_runs_table_to_s3( resource_tracker_service_runs.c.stopped_at, resource_tracker_credit_transactions.c.osparc_credits, resource_tracker_credit_transactions.c.transaction_status, + resource_tracker_project_metadata.c.project_tags, ) .select_from( resource_tracker_service_runs.join( @@ -452,6 +495,11 @@ async def export_service_runs_table_to_s3( resource_tracker_service_runs.c.service_run_id == resource_tracker_credit_transactions.c.service_run_id, isouter=True, + ).join( + resource_tracker_project_metadata, + resource_tracker_service_runs.c.project_id + == resource_tracker_project_metadata.c.project_id, + isouter=True, ) ) .where(resource_tracker_service_runs.c.product_name == product_name) diff --git a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/process_message_project_sync.py b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/process_message_project_sync.py new file mode 100644 index 000000000000..609d552d39bc --- /dev/null +++ b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/process_message_project_sync.py @@ -0,0 +1,37 @@ +import logging + +from fastapi import FastAPI +from models_library.rabbitmq_messages import RabbitResourceTrackingProjectSyncMessage +from pydantic import parse_raw_as + +from .modules.db.repositories.resource_tracker import ResourceTrackerRepository +from .utils import convert_project_tags_to_db + +_logger = logging.getLogger(__name__) + + +async def process_message(app: FastAPI, data: bytes) -> bool: + rabbit_message: RabbitResourceTrackingProjectSyncMessage = parse_raw_as( + RabbitResourceTrackingProjectSyncMessage, data # type: ignore[arg-type] + ) + _logger.info( + "Process project sync msg for project ID: %s", + rabbit_message.project_id, + ) + resource_tracker_repo: ResourceTrackerRepository = ResourceTrackerRepository( + db_engine=app.state.engine + ) + + # UPDATE + if rabbit_message.project_tags: + project_tags_db = await convert_project_tags_to_db(rabbit_message.project_tags) + else: + project_tags_db = None + + await resource_tracker_repo.update_rut_project_metadata( + rabbit_message.project_id, + project_name=rabbit_message.project_name, + project_tags=project_tags_db, + ) + + return True diff --git a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/process_message_project_sync_setup.py b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/process_message_project_sync_setup.py new file mode 100644 index 000000000000..63436a917dbe --- /dev/null +++ b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/process_message_project_sync_setup.py @@ -0,0 +1,64 @@ +import functools +import logging +from collections.abc import Awaitable, Callable + +from fastapi import FastAPI +from models_library.rabbitmq_messages import RabbitResourceTrackingProjectSyncMessage +from servicelib.logging_utils import log_catch, log_context +from servicelib.rabbitmq import RabbitMQClient +from settings_library.rabbit import RabbitSettings + +from ..core.settings import ApplicationSettings +from .modules.rabbitmq import get_rabbitmq_client +from .process_message_project_sync import process_message + +_logger = logging.getLogger(__name__) + +_RUT_MESSAGE_TTL_IN_MS = 2 * 60 * 60 * 1000 # 2 hours + + +async def _subscribe_to_rabbitmq(app) -> str: + with log_context(_logger, logging.INFO, msg="Subscribing to rabbitmq channel"): + rabbit_client: RabbitMQClient = get_rabbitmq_client(app) + subscribed_queue: str = await rabbit_client.subscribe( + RabbitResourceTrackingProjectSyncMessage.get_channel_name(), + message_handler=functools.partial(process_message, app), + exclusive_queue=False, + message_ttl=_RUT_MESSAGE_TTL_IN_MS, + ) + return subscribed_queue + + +def on_app_startup(app: FastAPI) -> Callable[[], Awaitable[None]]: + async def _startup() -> None: + with log_context( + _logger, logging.INFO, msg="setup resource tracker" + ), log_catch(_logger, reraise=False): + app_settings: ApplicationSettings = app.state.settings + app.state.resource_tracker_rabbitmq_consumer__project_sync = None + settings: RabbitSettings | None = ( + app_settings.RESOURCE_USAGE_TRACKER_RABBITMQ + ) + if not settings: + _logger.warning("RabbitMQ client is de-activated in the settings") + return + app.state.resource_tracker_rabbitmq_consumer__project_sync = ( + await _subscribe_to_rabbitmq(app) + ) + + return _startup + + +def on_app_shutdown( + _app: FastAPI, +) -> Callable[[], Awaitable[None]]: + async def _stop() -> None: + # NOTE: We want to have persistent queue, therefore we will not unsubscribe + assert _app # nosec + + return _stop + + +def setup(app: FastAPI) -> None: + app.add_event_handler("startup", on_app_startup(app)) + app.add_event_handler("shutdown", on_app_shutdown(app)) diff --git a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/process_message_running_service.py b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/process_message_running_service.py index 4352e327266b..8e6089145b9b 100644 --- a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/process_message_running_service.py +++ b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/process_message_running_service.py @@ -36,6 +36,7 @@ from .modules.rabbitmq import RabbitMQClient, get_rabbitmq_client from .utils import ( compute_service_run_credit_costs, + convert_project_tags_to_db, make_negative, publish_to_rabbitmq_wallet_credits_limit_reached, sum_credit_transactions_and_publish_to_rabbitmq, @@ -95,6 +96,13 @@ async def _process_start_event( ) pricing_unit_cost = pricing_unit_cost_db.cost_per_unit + project_tags_db = await convert_project_tags_to_db(msg.project_tags) + await resource_tracker_repo.insert_rut_project_metadata( + project_id=msg.project_id, + project_name=msg.project_name, + project_tags_db=project_tags_db, + ) + create_service_run = ServiceRunCreate( product_name=msg.product_name, service_run_id=msg.service_run_id, diff --git a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/utils.py b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/utils.py index 4466fc5e7de7..2e41229aff3b 100644 --- a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/utils.py +++ b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/utils.py @@ -3,6 +3,7 @@ from datetime import datetime, timezone from decimal import Decimal +from fastapi.encoders import jsonable_encoder from models_library.api_schemas_resource_usage_tracker.credit_transactions import ( WalletTotalCredits, ) @@ -129,3 +130,12 @@ async def compute_service_run_credit_costs( return round(Decimal(time_delta.total_seconds() / 3600) * cost_per_unit, 2) msg = f"Stop {stop} is smaller then {start} this should not happen. Investigate." raise ValueError(msg) + + +async def convert_project_tags_to_db( + project_tags: list[tuple[int, str]] +) -> dict[str, dict[str, str]]: + project_tags_db: dict[str, dict[str, str]] = {} + for tag in project_tags: + project_tags_db[f"{tag[0]}"] = {"name": tag[1]} + return jsonable_encoder(project_tags_db) diff --git a/services/resource-usage-tracker/tests/unit/with_dbs/conftest.py b/services/resource-usage-tracker/tests/unit/with_dbs/conftest.py index 581952e11004..4eb9c6a21c1a 100644 --- a/services/resource-usage-tracker/tests/unit/with_dbs/conftest.py +++ b/services/resource-usage-tracker/tests/unit/with_dbs/conftest.py @@ -14,6 +14,7 @@ from asgi_lifespan import LifespanManager from faker import Faker from fastapi import FastAPI +from models_library.projects import ProjectID from models_library.rabbitmq_messages import ( RabbitResourceTrackingHeartbeatMessage, RabbitResourceTrackingMessageType, @@ -26,6 +27,9 @@ from simcore_postgres_database.models.resource_tracker_credit_transactions import ( resource_tracker_credit_transactions, ) +from simcore_postgres_database.models.resource_tracker_project_metadata import ( + resource_tracker_project_metadata, +) from simcore_postgres_database.models.resource_tracker_service_runs import ( resource_tracker_service_runs, ) @@ -51,6 +55,7 @@ def mock_env(monkeypatch: pytest.MonkeyPatch) -> EnvVarsDict: "SC_BOOT_MODE": "production", "POSTGRES_CLIENT_NAME": "postgres_test_client", "RESOURCE_USAGE_TRACKER_MISSED_HEARTBEAT_CHECK_ENABLED": "0", + "RESOURCE_USAGE_TRACKER_TRACING": "null", } setenvs_from_dict(monkeypatch, env_vars) return env_vars @@ -181,6 +186,25 @@ async def assert_service_runs_db_row( raise ValueError +async def assert_project_metadata_db_row(postgres_db, project_id: ProjectID) -> None: + async for attempt in AsyncRetrying( + wait=wait_fixed(0.2), + stop=stop_after_delay(10), + retry=retry_if_exception_type(AssertionError), + reraise=True, + ): + with attempt, postgres_db.connect() as con: + result = con.execute( + sa.select(resource_tracker_project_metadata).where( + resource_tracker_project_metadata.c.project_id == f"{project_id}" + ) + ) + row = result.first() + assert row + return + raise ValueError + + async def assert_credit_transactions_db_row( postgres_db, service_run_id: str, modified_at: datetime | None = None ) -> CreditTransactionDB: @@ -239,6 +263,7 @@ def _creator(**kwargs: dict[str, Any]) -> RabbitResourceTrackingStartedMessage: "user_email": faker.email(), "project_id": faker.uuid4(), "project_name": faker.pystr(), + "project_tags": [(faker.pyint(), faker.pystr())], "node_id": faker.uuid4(), "node_name": faker.pystr(), "parent_project_id": faker.uuid4(), diff --git a/services/resource-usage-tracker/tests/unit/with_dbs/test_process_rabbitmq_message.py b/services/resource-usage-tracker/tests/unit/with_dbs/test_process_rabbitmq_message.py index da321f593f3d..5762515ca1e1 100644 --- a/services/resource-usage-tracker/tests/unit/with_dbs/test_process_rabbitmq_message.py +++ b/services/resource-usage-tracker/tests/unit/with_dbs/test_process_rabbitmq_message.py @@ -17,7 +17,7 @@ _process_stop_event, ) -from .conftest import assert_service_runs_db_row +from .conftest import assert_project_metadata_db_row, assert_service_runs_db_row pytest_simcore_core_services_selection = ["postgres", "rabbit"] pytest_simcore_ops_services_selection = [ @@ -48,6 +48,7 @@ async def test_process_event_functions( ) await _process_start_event(resource_tracker_repo, msg, publisher) output = await assert_service_runs_db_row(postgres_db, msg.service_run_id) + await assert_project_metadata_db_row(postgres_db, msg.project_id) assert output.stopped_at is None assert output.service_run_status == "RUNNING" first_occurence_of_last_heartbeat_at = output.last_heartbeat_at diff --git a/services/web/server/src/simcore_service_webserver/projects/_folders_db.py b/services/web/server/src/simcore_service_webserver/projects/_folders_db.py index 1ac57057c53f..46ee92fb31aa 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_folders_db.py +++ b/services/web/server/src/simcore_service_webserver/projects/_folders_db.py @@ -21,8 +21,6 @@ _logger = logging.getLogger(__name__) -_logger = logging.getLogger(__name__) - ### Models diff --git a/services/web/server/src/simcore_service_webserver/projects/_tags_api.py b/services/web/server/src/simcore_service_webserver/projects/_tags_api.py index ba4be3c5fb4f..0b2538a5910b 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_tags_api.py +++ b/services/web/server/src/simcore_service_webserver/projects/_tags_api.py @@ -6,8 +6,13 @@ from aiohttp import web from models_library.projects import ProjectID +from models_library.rabbitmq_messages import RabbitResourceTrackingProjectSyncMessage from models_library.users import UserID from models_library.workspaces import UserWorkspaceAccessRightsDB +from servicelib.aiohttp.db_asyncpg_engine import get_async_engine +from simcore_postgres_database.utils_tags import TagsRepo +from simcore_service_webserver.rabbitmq import get_rabbitmq_client +from sqlalchemy.ext.asyncio import AsyncEngine from ..workspaces import _workspaces_db as workspaces_db from ._access_rights_api import check_user_project_permission @@ -28,13 +33,29 @@ async def add_tag( project_id=project_uuid, user_id=user_id, product_name=product_name, - permission="write", # NOTE: before there was only read access necessary + permission="write", ) project: ProjectDict = await db.add_tag( project_uuid=f"{project_uuid}", user_id=user_id, tag_id=int(tag_id) ) + # Inform RUT about tag change + engine: AsyncEngine = get_async_engine(app) + tags_repo = TagsRepo(engine) + project_tags = await tags_repo.list_tag_ids_and_names_by_project_uuid( + project_uuid=project_uuid + ) + + rabbit_client = get_rabbitmq_client(app) + await rabbit_client.publish( + RabbitResourceTrackingProjectSyncMessage.channel_name, + RabbitResourceTrackingProjectSyncMessage( + project_id=project_uuid, project_tags=project_tags + ), + ) + + # Override project access rights if project["workspaceId"] is not None: workspace_db: UserWorkspaceAccessRightsDB = ( await workspaces_db.get_workspace_for_user( @@ -62,7 +83,7 @@ async def remove_tag( project_id=project_uuid, user_id=user_id, product_name=product_name, - permission="write", # NOTE: before there was only read access necessary + permission="write", ) project: ProjectDict = await db.remove_tag( diff --git a/services/web/server/src/simcore_service_webserver/projects/db.py b/services/web/server/src/simcore_service_webserver/projects/db.py index 2281b807a719..db5c7ffbbba9 100644 --- a/services/web/server/src/simcore_service_webserver/projects/db.py +++ b/services/web/server/src/simcore_service_webserver/projects/db.py @@ -645,6 +645,7 @@ async def list_projects_uuids(self, user_id: int) -> list[str]: ) ] + # NOTE: MD: Do not use this function anymore. async def get_project( self, project_uuid: str, @@ -780,6 +781,7 @@ async def get_pure_project_access_rights_without_workspace( ) return UserProjectAccessRightsDB.from_orm(row) + # NOTE: MD: Do not use this function anymore (currently used only in some unit tests) async def replace_project( self, new_project_data: ProjectDict, diff --git a/services/web/server/src/simcore_service_webserver/tags/_api.py b/services/web/server/src/simcore_service_webserver/tags/_api.py index 6f3a74853e78..04975bab8e64 100644 --- a/services/web/server/src/simcore_service_webserver/tags/_api.py +++ b/services/web/server/src/simcore_service_webserver/tags/_api.py @@ -3,6 +3,8 @@ from aiohttp import web from models_library.basic_types import IdInt +from models_library.projects import ProjectID +from models_library.rabbitmq_messages import RabbitResourceTrackingProjectSyncMessage from models_library.users import UserID from servicelib.aiohttp.db_asyncpg_engine import get_async_engine from simcore_postgres_database.utils_tags import TagsRepo @@ -11,6 +13,24 @@ from .schemas import TagCreate, TagGet, TagUpdate +async def inform_rut_about_tag_change(tags_repo: TagsRepo, rabbit_client, project_id: ProjectID | None, ): + # Inform RUT about tag change + + + if project_id: + project_tags = await tags_repo.list_tag_ids_and_names_by_project_uuid( + project_uuid=project_id + ) + + + await rabbit_client.publish( + RabbitResourceTrackingProjectSyncMessage.channel_name, + RabbitResourceTrackingProjectSyncMessage( + project_tags=project_tags + ), + ) + + async def create_tag( app: web.Application, user_id: UserID, new_tag: TagCreate ) -> TagGet: @@ -43,11 +63,17 @@ async def update_tag( engine: AsyncEngine = get_async_engine(app) repo = TagsRepo(engine) + _tags_updates_exclude_unset = tag_updates.dict(exclude_unset=True) tag = await repo.update( user_id=user_id, tag_id=tag_id, - **tag_updates.dict(exclude_unset=True), + **_tags_updates_exclude_unset, ) + + # If tag_updates name + if _tags_updates_exclude_unset.get("name") is not None: + await inform_rut_about_tag_change(tags_repo=repo, rabbit_client=) + return TagGet.from_db(tag) @@ -55,4 +81,8 @@ async def delete_tag(app: web.Application, user_id: UserID, tag_id: IdInt): engine: AsyncEngine = get_async_engine(app) repo = TagsRepo(engine) + + # Sync with RUT + # NOTE: Careful this will delete all the tags also historitically in the RUT + await repo.delete(user_id=user_id, tag_id=tag_id)