diff --git a/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py b/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py index 49fd757e8867..72bdf37e6c7c 100644 --- a/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py +++ b/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py @@ -37,6 +37,7 @@ from models_library.utils.fastapi_encoders import jsonable_encoder from pydantic import AnyHttpUrl, parse_obj_as from servicelib.async_utils import run_sequentially_in_context +from servicelib.logging_utils import log_decorator from servicelib.rabbitmq import RabbitMQRPCClient from simcore_postgres_database.utils_projects_metadata import DBProjectNotFoundError from starlette import status @@ -150,6 +151,7 @@ async def _check_pipeline_startable( _UNKNOWN_NODE: Final[str] = "unknown node" +@log_decorator(_logger) async def _get_project_metadata( project_id: ProjectID, project_repo: ProjectsRepository, @@ -160,7 +162,7 @@ async def _get_project_metadata( project_id ) if project_ancestors.parent_project_uuid is None: - # no parents here + _logger.debug("no parent found for project %s", project_id) return {} assert project_ancestors.parent_node_id is not None # nosec diff --git a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py index f52607e4bc0a..4f9a8e42b53c 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py @@ -50,7 +50,7 @@ async def get( ) row: RowProxy | None = await result.first() if not row: - raise ComputationalRunNotFoundError() + raise ComputationalRunNotFoundError return CompRunsAtDB.from_orm(row) async def list( @@ -80,7 +80,7 @@ async def create( project_id: ProjectID, cluster_id: ClusterID, iteration: PositiveInt | None = None, - metadata: RunMetadataDict | None, + metadata: RunMetadataDict, use_on_demand_clusters: bool, ) -> CompRunsAtDB: try: @@ -102,13 +102,13 @@ async def create( .values( user_id=user_id, project_uuid=f"{project_id}", - cluster_id=cluster_id - if cluster_id != DEFAULT_CLUSTER_ID - else None, + cluster_id=( + cluster_id if cluster_id != DEFAULT_CLUSTER_ID else None + ), iteration=iteration, result=RUNNING_STATE_TO_DB[RunningState.PUBLISHED], - started=datetime.datetime.now(tz=datetime.timezone.utc), - metadata=jsonable_encoder(metadata) if metadata else None, + started=datetime.datetime.now(tz=datetime.UTC), + metadata=jsonable_encoder(metadata), use_on_demand_clusters=use_on_demand_clusters, ) .returning(literal_column("*")) @@ -146,7 +146,7 @@ async def set_run_result( ) -> CompRunsAtDB | None: values: dict[str, Any] = {"result": RUNNING_STATE_TO_DB[result_state]} if final_state: - values.update({"ended": datetime.datetime.now(tz=datetime.timezone.utc)}) + values.update({"ended": datetime.datetime.now(tz=datetime.UTC)}) return await self.update( user_id, project_id, diff --git a/services/director-v2/tests/unit/with_dbs/conftest.py b/services/director-v2/tests/unit/with_dbs/conftest.py index 09b727449f28..8dd5527f00a3 100644 --- a/services/director-v2/tests/unit/with_dbs/conftest.py +++ b/services/director-v2/tests/unit/with_dbs/conftest.py @@ -8,15 +8,16 @@ import datetime import json from collections.abc import Awaitable, Callable, Iterator -from typing import Any +from typing import Any, cast from uuid import uuid4 import pytest import sqlalchemy as sa from _helpers import PublishedProject, RunningProject from faker import Faker +from fastapi.encoders import jsonable_encoder from models_library.clusters import Cluster -from models_library.projects import ProjectAtDB +from models_library.projects import ProjectAtDB, ProjectID from models_library.projects_nodes_io import NodeID from pydantic.main import BaseModel from simcore_postgres_database.models.cluster_to_groups import cluster_to_groups @@ -25,7 +26,11 @@ from simcore_postgres_database.models.comp_runs import comp_runs from simcore_postgres_database.models.comp_tasks import comp_tasks from simcore_service_director_v2.models.comp_pipelines import CompPipelineAtDB -from simcore_service_director_v2.models.comp_runs import CompRunsAtDB, RunMetadataDict +from simcore_service_director_v2.models.comp_runs import ( + CompRunsAtDB, + ProjectMetadataDict, + RunMetadataDict, +) from simcore_service_director_v2.models.comp_tasks import CompTaskAtDB, Image from simcore_service_director_v2.utils.computations import to_node_class from simcore_service_director_v2.utils.dask import generate_dask_job_id @@ -84,28 +89,36 @@ def creator( "project_id": f"{project.uuid}", "node_id": f"{node_id}", "schema": {"inputs": {}, "outputs": {}}, - "inputs": { - key: json.loads(value.json(by_alias=True, exclude_unset=True)) - if isinstance(value, BaseModel) - else value - for key, value in node_data.inputs.items() - } - if node_data.inputs - else {}, - "outputs": { - key: json.loads(value.json(by_alias=True, exclude_unset=True)) - if isinstance(value, BaseModel) - else value - for key, value in node_data.outputs.items() - } - if node_data.outputs - else {}, + "inputs": ( + { + key: ( + json.loads(value.json(by_alias=True, exclude_unset=True)) + if isinstance(value, BaseModel) + else value + ) + for key, value in node_data.inputs.items() + } + if node_data.inputs + else {} + ), + "outputs": ( + { + key: ( + json.loads(value.json(by_alias=True, exclude_unset=True)) + if isinstance(value, BaseModel) + else value + ) + for key, value in node_data.outputs.items() + } + if node_data.outputs + else {} + ), "image": Image(name=node_data.key, tag=node_data.version).dict( # type: ignore by_alias=True, exclude_unset=True ), # type: ignore "node_class": to_node_class(node_data.key), "internal_id": internal_id + 1, - "submit": datetime.datetime.now(tz=datetime.timezone.utc), + "submit": datetime.datetime.now(tz=datetime.UTC), "job_id": generate_dask_job_id( service_key=node_data.key, service_version=node_data.version, @@ -135,9 +148,26 @@ def creator( ) +@pytest.fixture +def project_metadata(faker: Faker) -> ProjectMetadataDict: + return ProjectMetadataDict( + parent_node_id=cast(NodeID, faker.uuid4(cast_to=None)), + parent_node_name=faker.pystr(), + parent_project_id=cast(ProjectID, faker.uuid4(cast_to=None)), + parent_project_name=faker.pystr(), + root_parent_project_id=cast(ProjectID, faker.uuid4(cast_to=None)), + root_parent_project_name=faker.pystr(), + root_parent_node_id=cast(NodeID, faker.uuid4(cast_to=None)), + root_parent_node_name=faker.pystr(), + ) + + @pytest.fixture def run_metadata( - osparc_product_name: str, simcore_user_agent: str, faker: Faker + osparc_product_name: str, + simcore_user_agent: str, + project_metadata: ProjectMetadataDict, + faker: Faker, ) -> RunMetadataDict: return RunMetadataDict( node_id_names_map={}, @@ -147,6 +177,7 @@ def run_metadata( user_email=faker.email(), wallet_id=faker.pyint(min_value=1), wallet_name=faker.name(), + project_metadata=project_metadata, ) @@ -171,7 +202,7 @@ def creator( with postgres_db.connect() as conn: result = conn.execute( comp_runs.insert() - .values(**run_config) + .values(**jsonable_encoder(run_config)) .returning(sa.literal_column("*")) ) new_run = CompRunsAtDB.from_orm(result.first()) @@ -298,7 +329,7 @@ async def running_project( project=created_project, state=StateType.RUNNING, progress=0.0, - start=datetime.datetime.now(tz=datetime.timezone.utc), + start=datetime.datetime.now(tz=datetime.UTC), ), runs=runs(user=user, project=created_project, result=StateType.RUNNING), ) diff --git a/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py b/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py index 2968e96e5db6..d15ab46a4986 100644 --- a/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py +++ b/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py @@ -381,22 +381,17 @@ async def test_misconfigured_pipeline_is_not_scheduled( ) run_entry = CompRunsAtDB.parse_obj(await result.first()) assert run_entry.result == RunningState.ABORTED + assert run_entry.metadata == run_metadata async def _assert_start_pipeline( - aiopg_engine, published_project: PublishedProject, scheduler: BaseCompScheduler + aiopg_engine, + published_project: PublishedProject, + scheduler: BaseCompScheduler, + run_metadata: RunMetadataDict, ) -> list[CompTaskAtDB]: exp_published_tasks = deepcopy(published_project.tasks) assert published_project.project.prj_owner - run_metadata = RunMetadataDict( - node_id_names_map={}, - project_name="", - product_name="", - simcore_user_agent="", - user_email="", - wallet_id=231, - wallet_name="", - ) await scheduler.run_new_pipeline( user_id=published_project.project.prj_owner, project_id=published_project.project.uuid, @@ -618,11 +613,12 @@ async def test_proper_pipeline_is_scheduled( # noqa: PLR0915 mocked_clean_task_output_and_log_files_if_invalid: None, instrumentation_rabbit_client_parser: mock.AsyncMock, resource_tracking_rabbit_client_parser: mock.AsyncMock, + run_metadata: RunMetadataDict, ): _mock_send_computation_tasks(published_project.tasks, mocked_dask_client) expected_published_tasks = await _assert_start_pipeline( - aiopg_engine, published_project, scheduler + aiopg_engine, published_project, scheduler, run_metadata ) # ------------------------------------------------------------------------------- @@ -990,10 +986,11 @@ async def test_task_progress_triggers( published_project: PublishedProject, mocked_parse_output_data_fct: None, mocked_clean_task_output_and_log_files_if_invalid: None, + run_metadata: RunMetadataDict, ): _mock_send_computation_tasks(published_project.tasks, mocked_dask_client) expected_published_tasks = await _assert_start_pipeline( - aiopg_engine, published_project, scheduler + aiopg_engine, published_project, scheduler, run_metadata ) # ------------------------------------------------------------------------------- # 1. first run will move comp_tasks to PENDING so the worker can take them @@ -1286,10 +1283,11 @@ async def test_running_pipeline_triggers_heartbeat( aiopg_engine: aiopg.sa.engine.Engine, published_project: PublishedProject, resource_tracking_rabbit_client_parser: mock.AsyncMock, + run_metadata: RunMetadataDict, ): _mock_send_computation_tasks(published_project.tasks, mocked_dask_client) expected_published_tasks = await _assert_start_pipeline( - aiopg_engine, published_project, scheduler + aiopg_engine, published_project, scheduler, run_metadata ) # ------------------------------------------------------------------------------- # 1. first run will move comp_tasks to PENDING so the worker can take them