Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions packages/models-library/src/models_library/rabbitmq_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,25 @@ def routing_key(self) -> str | None:
return None


class RabbitResourceTrackingProjectSyncMessage(RabbitMessageBase):
channel_name: Literal["io.simcore.service.tracking-project-sync"] = Field(
default="io.simcore.service.tracking-project-sync", const=True
)

project_id: ProjectID | None = None
project_name: str | None = None
project_tags: list[tuple[int, str]] | None = None
created_at: datetime.datetime = Field(
default_factory=lambda: arrow.utcnow().datetime,
description="message creation datetime",
)

def routing_key(self) -> str | None:
return None

# MD: Add validation ...


class DynamicServiceRunningMessage(RabbitMessageBase):
channel_name: Literal["io.simcore.service.dynamic-service-running"] = Field(
default="io.simcore.service.dynamic-service-running", const=True
Expand Down Expand Up @@ -232,6 +251,7 @@ class RabbitResourceTrackingStartedMessage(RabbitResourceTrackingBaseMessage):

project_id: ProjectID
project_name: str
project_tags: list[tuple[int, str]]

node_id: NodeID
node_name: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class CreateServiceMetricsAdditionalParams(BaseModel):
simcore_user_agent: str
user_email: str
project_name: str
project_tags: list[tuple[int, str]]
node_name: str
service_key: ServiceKey
service_version: ServiceVersion
Expand All @@ -35,6 +36,7 @@ class Config:
"simcore_user_agent": "undefined",
"user_email": "[email protected]",
"project_name": "_!New Study",
"project_tags": [],
"node_name": "the service of a lifetime _ *!",
"service_key": ServiceKey("simcore/services/dynamic/test"),
"service_version": ServiceVersion("0.0.1"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""introducing rut project metadata

Revision ID: da145a5fe27b
Revises: 8bfe65a5e294
Create Date: 2024-11-12 16:03:41.595377+00:00

"""
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = "da145a5fe27b"
down_revision = "8bfe65a5e294"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"resource_tracker_project_metadata",
sa.Column("project_id", sa.String(), nullable=False),
sa.Column("project_name", sa.String(), nullable=False),
sa.Column(
"project_tags",
postgresql.JSONB(astext_type=sa.Text()),
server_default=sa.text("'{}'::jsonb"),
nullable=False,
),
sa.Column(
"modified",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.PrimaryKeyConstraint("project_id"),
)
op.create_foreign_key(
"fk_resource_tracker_service_runs_project_metadata_project_id",
"resource_tracker_service_runs",
"resource_tracker_project_metadata",
["project_id"],
["project_id"],
onupdate="CASCADE",
ondelete="RESTRICT",
)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(
"fk_resource_tracker_service_runs_project_metadata_project_id",
"resource_tracker_service_runs",
type_="foreignkey",
)
op.drop_table("resource_tracker_project_metadata")
# ### end Alembic commands ###
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
""" resource_tracker_service_runs table
"""

import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import JSONB

from ._common import column_modified_datetime
from .base import metadata

resource_tracker_project_metadata = sa.Table(
"resource_tracker_project_metadata",
metadata,
sa.Column(
"project_id", # UUID
sa.String,
nullable=False,
primary_key=True,
),
sa.Column(
"project_name",
sa.String,
nullable=False,
doc="we want to store the project name for tracking/billing purposes and be sure it stays there even when the project is deleted (that's also reason why we do not introduce foreign key)",
),
sa.Column(
"project_tags",
JSONB,
nullable=False,
server_default=sa.text("'{}'::jsonb"),
doc="we want to store the project tags for tracking/billing purposes and be sure it stays there even when the project is deleted (that's also reason why we do not introduce foreign key)",
),
column_modified_datetime(timezone=True),
)
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ class ResourceTrackerServiceRunStatus(str, enum.Enum):
sa.Column(
"project_id", # UUID
sa.String,
sa.ForeignKey(
"resource_tracker_project_metadata.project_id",
name="fk_resource_tracker_service_runs_project_metadata_project_id",
onupdate="CASCADE",
ondelete="RESTRICT",
),
nullable=False,
doc="We want to store the project id for tracking/billing purposes and be sure it stays there even when the project is deleted (that's also reason why we do not introduce foreign key)",
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"""

from typing import TypedDict
from uuid import UUID

from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine

Expand All @@ -11,6 +12,7 @@
create_tag_stmt,
delete_tag_stmt,
get_tag_stmt,
list_tag_ids_and_names_by_project_uuid_stmt,
list_tags_stmt,
set_tag_access_rights_stmt,
update_tag_stmt,
Expand Down Expand Up @@ -175,6 +177,16 @@ async def get(
delete=row.delete,
)

async def list_tag_ids_and_names_by_project_uuid(
self, connection: AsyncConnection | None = None, *, project_uuid: UUID
) -> list[tuple[int, str]]:
stmt_list = list_tag_ids_and_names_by_project_uuid_stmt(
project_uuid=project_uuid
)
async with pass_or_acquire_connection(self.engine, connection) as conn:
result = await conn.stream(stmt_list)
return [(row.id, row.name) async for row in result]

async def update(
self,
connection: AsyncConnection | None = None,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import functools
from uuid import UUID

import sqlalchemy as sa
from simcore_postgres_database.models.groups import user_to_groups
from simcore_postgres_database.models.projects import projects
from simcore_postgres_database.models.projects_tags import projects_tags
from simcore_postgres_database.models.services_tags import services_tags
from simcore_postgres_database.models.tags import tags
Expand Down Expand Up @@ -60,7 +62,7 @@ def get_tag_stmt(
# aggregation ensures MOST PERMISSIVE policy of access-rights
sa.func.bool_or(tags_access_rights.c.read).label("read"),
sa.func.bool_or(tags_access_rights.c.write).label("write"),
sa.func.bool_or(tags_access_rights.c.delete).label("delete")
sa.func.bool_or(tags_access_rights.c.delete).label("delete"),
)
.select_from(
_join_user_to_given_tag(
Expand All @@ -73,14 +75,29 @@ def get_tag_stmt(
)


def list_tag_ids_and_names_by_project_uuid_stmt(
project_uuid: UUID,
):
return (
sa.select(tags.c.id, tags.c.name)
.select_from(
projects_tags.join(tags, tags.c.id == projects_tags.c.tag_id).join(
projects, projects_tags.c.project_id == projects.c.id
)
)
.where(projects.c.uuid == f"{project_uuid}")
.group_by(tags.c.id, tags.c.name)
)


def list_tags_stmt(*, user_id: int):
return (
sa.select(
*_TAG_COLUMNS,
# aggregation ensures MOST PERMISSIVE policy of access-rights
sa.func.bool_or(tags_access_rights.c.read).label("read"),
sa.func.bool_or(tags_access_rights.c.write).label("write"),
sa.func.bool_or(tags_access_rights.c.delete).label("delete")
sa.func.bool_or(tags_access_rights.c.delete).label("delete"),
)
.select_from(
_join_user_to_tags(
Expand All @@ -104,7 +121,7 @@ def count_groups_with_given_access_rights_stmt(
tag_id: int,
read: bool | None,
write: bool | None,
delete: bool | None
delete: bool | None,
):
"""
How many groups (from this user_id) are given EXACTLY these access permissions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
from servicelib.logging_utils import log_decorator
from servicelib.rabbitmq import RabbitMQRPCClient
from simcore_postgres_database.utils_projects_metadata import DBProjectNotFoundError
from simcore_postgres_database.utils_tags import TagsRepo
from sqlalchemy.ext.asyncio import AsyncEngine
from starlette import status
from starlette.requests import Request
from tenacity import retry
Expand All @@ -64,6 +66,7 @@
from ...models.comp_tasks import CompTaskAtDB
from ...modules.catalog import CatalogClient
from ...modules.comp_scheduler import BaseCompScheduler
from ...modules.db._asyncpg import get_asyncpg_engine
from ...modules.db.repositories.clusters import ClustersRepository
from ...modules.db.repositories.comp_pipelines import CompPipelinesRepository
from ...modules.db.repositories.comp_runs import CompRunsRepository
Expand Down Expand Up @@ -219,6 +222,7 @@ async def _try_start_pipeline(
project: ProjectAtDB,
users_repo: UsersRepository,
projects_metadata_repo: ProjectsMetadataRepository,
asyncpg_engine: AsyncEngine,
) -> None:
if not minimal_dag.nodes():
# 2 options here: either we have cycles in the graph or it's really done
Expand All @@ -240,6 +244,12 @@ async def _try_start_pipeline(
wallet_id = computation.wallet_info.wallet_id
wallet_name = computation.wallet_info.wallet_name

# Get project tags
repo = TagsRepo(asyncpg_engine)
project_tags = await repo.list_tag_ids_and_names_by_project_uuid(
project_uuid=project.uuid
)

await scheduler.run_new_pipeline(
computation.user_id,
computation.project_id,
Expand All @@ -251,6 +261,7 @@ async def _try_start_pipeline(
},
product_name=computation.product_name,
project_name=project.name,
project_tags=project_tags,
simcore_user_agent=computation.simcore_user_agent,
user_email=await users_repo.get_user_email(computation.user_id),
wallet_id=wallet_id,
Expand Down Expand Up @@ -376,6 +387,7 @@ async def create_computation( # noqa: PLR0913 # pylint: disable=too-many-positi
project=project,
users_repo=users_repo,
projects_metadata_repo=projects_metadata_repo,
asyncpg_engine=get_asyncpg_engine(request.app),
)

# filter the tasks by the effective pipeline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class ProjectMetadataDict(TypedDict, total=False):
class RunMetadataDict(TypedDict, total=False):
node_id_names_map: dict[NodeID, str]
project_name: str
project_tags: list[tuple[int, str]]
product_name: str
simcore_user_agent: str
user_email: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ async def _process_started_tasks(
project_name=run_metadata.get("project_metadata", {}).get( # type: ignore[arg-type]
"project_name", UNDEFINED_STR_METADATA
),
project_tags=run_metadata.get("project_tags", []),
node_id=t.node_id,
node_name=run_metadata.get("node_id_names_map", {}).get(
t.node_id, UNDEFINED_STR_METADATA
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,27 @@
from fastapi import FastAPI
from settings_library.postgres import PostgresSettings

from ._asyncpg import (
asyncpg_close_db_connection,
asyncpg_connect_to_db,
get_asyncpg_engine,
)
from .events import close_db_connection, connect_to_db


def setup(app: FastAPI, settings: PostgresSettings) -> None:
async def on_startup() -> None:
await connect_to_db(app, settings)
await asyncpg_connect_to_db(app, settings)

async def on_shutdown() -> None:
await asyncpg_close_db_connection(app)
await close_db_connection(app)

# Add async asyncpg

app.add_event_handler("startup", on_startup)
app.add_event_handler("shutdown", on_shutdown)


__all__: tuple[str, ...] = ("get_asyncpg_engine",)
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import logging

from fastapi import FastAPI
from servicelib.db_asyncpg_utils import create_async_engine_and_pg_database_ready
from servicelib.logging_utils import log_context
from settings_library.postgres import PostgresSettings
from simcore_postgres_database.utils_aiosqlalchemy import get_pg_engine_stateinfo

_logger = logging.getLogger(__name__)


async def asyncpg_connect_to_db(app: FastAPI, settings: PostgresSettings) -> None:
with log_context(
_logger,
logging.DEBUG,
f"Connecting and migraging {settings.dsn_with_async_sqlalchemy}",
):
engine = await create_async_engine_and_pg_database_ready(settings)

app.state.asyncpg_engine = engine
_logger.debug(
"Setup engine: %s",
await get_pg_engine_stateinfo(engine),
)


async def asyncpg_close_db_connection(app: FastAPI) -> None:
with log_context(
_logger, logging.DEBUG, f"db disconnect of {app.state.asyncpg_engine}"
):
if engine := app.state.asyncpg_engine:
await engine.dispose()


def get_asyncpg_engine(app: FastAPI):
return app.state.asyncpg_engine
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,7 @@ async def generate_tasks_list_from_project(
if pricing_info
else None,
hardware_info=hardware_info,
project_tags=project_tags,
)

list_comp_tasks.append(task_db)
Expand Down
Loading
Loading