Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from models_library.utils.fastapi_encoders import jsonable_encoder
from pydantic import AnyHttpUrl, parse_obj_as
from servicelib.async_utils import run_sequentially_in_context
from servicelib.logging_utils import log_decorator
from servicelib.rabbitmq import RabbitMQRPCClient
from simcore_postgres_database.utils_projects_metadata import DBProjectNotFoundError
from starlette import status
Expand Down Expand Up @@ -150,6 +151,7 @@ async def _check_pipeline_startable(
_UNKNOWN_NODE: Final[str] = "unknown node"


@log_decorator(_logger)
async def _get_project_metadata(
project_id: ProjectID,
project_repo: ProjectsRepository,
Expand All @@ -160,7 +162,7 @@ async def _get_project_metadata(
project_id
)
if project_ancestors.parent_project_uuid is None:
# no parents here
_logger.debug("no parent found for project %s", project_id)
return {}

assert project_ancestors.parent_node_id is not None # nosec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async def get(
)
row: RowProxy | None = await result.first()
if not row:
raise ComputationalRunNotFoundError()
raise ComputationalRunNotFoundError
return CompRunsAtDB.from_orm(row)

async def list(
Expand Down Expand Up @@ -80,7 +80,7 @@ async def create(
project_id: ProjectID,
cluster_id: ClusterID,
iteration: PositiveInt | None = None,
metadata: RunMetadataDict | None,
metadata: RunMetadataDict,
use_on_demand_clusters: bool,
) -> CompRunsAtDB:
try:
Expand All @@ -102,13 +102,13 @@ async def create(
.values(
user_id=user_id,
project_uuid=f"{project_id}",
cluster_id=cluster_id
if cluster_id != DEFAULT_CLUSTER_ID
else None,
cluster_id=(
cluster_id if cluster_id != DEFAULT_CLUSTER_ID else None
),
iteration=iteration,
result=RUNNING_STATE_TO_DB[RunningState.PUBLISHED],
started=datetime.datetime.now(tz=datetime.timezone.utc),
metadata=jsonable_encoder(metadata) if metadata else None,
started=datetime.datetime.now(tz=datetime.UTC),
metadata=jsonable_encoder(metadata),
use_on_demand_clusters=use_on_demand_clusters,
)
.returning(literal_column("*"))
Expand Down Expand Up @@ -146,7 +146,7 @@ async def set_run_result(
) -> CompRunsAtDB | None:
values: dict[str, Any] = {"result": RUNNING_STATE_TO_DB[result_state]}
if final_state:
values.update({"ended": datetime.datetime.now(tz=datetime.timezone.utc)})
values.update({"ended": datetime.datetime.now(tz=datetime.UTC)})
return await self.update(
user_id,
project_id,
Expand Down
77 changes: 54 additions & 23 deletions services/director-v2/tests/unit/with_dbs/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@
import datetime
import json
from collections.abc import Awaitable, Callable, Iterator
from typing import Any
from typing import Any, cast
from uuid import uuid4

import pytest
import sqlalchemy as sa
from _helpers import PublishedProject, RunningProject
from faker import Faker
from fastapi.encoders import jsonable_encoder
from models_library.clusters import Cluster
from models_library.projects import ProjectAtDB
from models_library.projects import ProjectAtDB, ProjectID
from models_library.projects_nodes_io import NodeID
from pydantic.main import BaseModel
from simcore_postgres_database.models.cluster_to_groups import cluster_to_groups
Expand All @@ -25,7 +26,11 @@
from simcore_postgres_database.models.comp_runs import comp_runs
from simcore_postgres_database.models.comp_tasks import comp_tasks
from simcore_service_director_v2.models.comp_pipelines import CompPipelineAtDB
from simcore_service_director_v2.models.comp_runs import CompRunsAtDB, RunMetadataDict
from simcore_service_director_v2.models.comp_runs import (
CompRunsAtDB,
ProjectMetadataDict,
RunMetadataDict,
)
from simcore_service_director_v2.models.comp_tasks import CompTaskAtDB, Image
from simcore_service_director_v2.utils.computations import to_node_class
from simcore_service_director_v2.utils.dask import generate_dask_job_id
Expand Down Expand Up @@ -84,28 +89,36 @@ def creator(
"project_id": f"{project.uuid}",
"node_id": f"{node_id}",
"schema": {"inputs": {}, "outputs": {}},
"inputs": {
key: json.loads(value.json(by_alias=True, exclude_unset=True))
if isinstance(value, BaseModel)
else value
for key, value in node_data.inputs.items()
}
if node_data.inputs
else {},
"outputs": {
key: json.loads(value.json(by_alias=True, exclude_unset=True))
if isinstance(value, BaseModel)
else value
for key, value in node_data.outputs.items()
}
if node_data.outputs
else {},
"inputs": (
{
key: (
json.loads(value.json(by_alias=True, exclude_unset=True))
if isinstance(value, BaseModel)
else value
)
for key, value in node_data.inputs.items()
}
if node_data.inputs
else {}
),
"outputs": (
{
key: (
json.loads(value.json(by_alias=True, exclude_unset=True))
if isinstance(value, BaseModel)
else value
)
for key, value in node_data.outputs.items()
}
if node_data.outputs
else {}
),
"image": Image(name=node_data.key, tag=node_data.version).dict( # type: ignore
by_alias=True, exclude_unset=True
), # type: ignore
"node_class": to_node_class(node_data.key),
"internal_id": internal_id + 1,
"submit": datetime.datetime.now(tz=datetime.timezone.utc),
"submit": datetime.datetime.now(tz=datetime.UTC),
"job_id": generate_dask_job_id(
service_key=node_data.key,
service_version=node_data.version,
Expand Down Expand Up @@ -135,9 +148,26 @@ def creator(
)


@pytest.fixture
def project_metadata(faker: Faker) -> ProjectMetadataDict:
return ProjectMetadataDict(
parent_node_id=cast(NodeID, faker.uuid4(cast_to=None)),
parent_node_name=faker.pystr(),
parent_project_id=cast(ProjectID, faker.uuid4(cast_to=None)),
parent_project_name=faker.pystr(),
root_parent_project_id=cast(ProjectID, faker.uuid4(cast_to=None)),
root_parent_project_name=faker.pystr(),
root_parent_node_id=cast(NodeID, faker.uuid4(cast_to=None)),
root_parent_node_name=faker.pystr(),
)


@pytest.fixture
def run_metadata(
osparc_product_name: str, simcore_user_agent: str, faker: Faker
osparc_product_name: str,
simcore_user_agent: str,
project_metadata: ProjectMetadataDict,
faker: Faker,
) -> RunMetadataDict:
return RunMetadataDict(
node_id_names_map={},
Expand All @@ -147,6 +177,7 @@ def run_metadata(
user_email=faker.email(),
wallet_id=faker.pyint(min_value=1),
wallet_name=faker.name(),
project_metadata=project_metadata,
)


Expand All @@ -171,7 +202,7 @@ def creator(
with postgres_db.connect() as conn:
result = conn.execute(
comp_runs.insert()
.values(**run_config)
.values(**jsonable_encoder(run_config))
.returning(sa.literal_column("*"))
)
new_run = CompRunsAtDB.from_orm(result.first())
Expand Down Expand Up @@ -298,7 +329,7 @@ async def running_project(
project=created_project,
state=StateType.RUNNING,
progress=0.0,
start=datetime.datetime.now(tz=datetime.timezone.utc),
start=datetime.datetime.now(tz=datetime.UTC),
),
runs=runs(user=user, project=created_project, result=StateType.RUNNING),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,22 +381,17 @@ async def test_misconfigured_pipeline_is_not_scheduled(
)
run_entry = CompRunsAtDB.parse_obj(await result.first())
assert run_entry.result == RunningState.ABORTED
assert run_entry.metadata == run_metadata


async def _assert_start_pipeline(
aiopg_engine, published_project: PublishedProject, scheduler: BaseCompScheduler
aiopg_engine,
published_project: PublishedProject,
scheduler: BaseCompScheduler,
run_metadata: RunMetadataDict,
) -> list[CompTaskAtDB]:
exp_published_tasks = deepcopy(published_project.tasks)
assert published_project.project.prj_owner
run_metadata = RunMetadataDict(
node_id_names_map={},
project_name="",
product_name="",
simcore_user_agent="",
user_email="",
wallet_id=231,
wallet_name="",
)
await scheduler.run_new_pipeline(
user_id=published_project.project.prj_owner,
project_id=published_project.project.uuid,
Expand Down Expand Up @@ -618,11 +613,12 @@ async def test_proper_pipeline_is_scheduled( # noqa: PLR0915
mocked_clean_task_output_and_log_files_if_invalid: None,
instrumentation_rabbit_client_parser: mock.AsyncMock,
resource_tracking_rabbit_client_parser: mock.AsyncMock,
run_metadata: RunMetadataDict,
):
_mock_send_computation_tasks(published_project.tasks, mocked_dask_client)

expected_published_tasks = await _assert_start_pipeline(
aiopg_engine, published_project, scheduler
aiopg_engine, published_project, scheduler, run_metadata
)

# -------------------------------------------------------------------------------
Expand Down Expand Up @@ -990,10 +986,11 @@ async def test_task_progress_triggers(
published_project: PublishedProject,
mocked_parse_output_data_fct: None,
mocked_clean_task_output_and_log_files_if_invalid: None,
run_metadata: RunMetadataDict,
):
_mock_send_computation_tasks(published_project.tasks, mocked_dask_client)
expected_published_tasks = await _assert_start_pipeline(
aiopg_engine, published_project, scheduler
aiopg_engine, published_project, scheduler, run_metadata
)
# -------------------------------------------------------------------------------
# 1. first run will move comp_tasks to PENDING so the worker can take them
Expand Down Expand Up @@ -1286,10 +1283,11 @@ async def test_running_pipeline_triggers_heartbeat(
aiopg_engine: aiopg.sa.engine.Engine,
published_project: PublishedProject,
resource_tracking_rabbit_client_parser: mock.AsyncMock,
run_metadata: RunMetadataDict,
):
_mock_send_computation_tasks(published_project.tasks, mocked_dask_client)
expected_published_tasks = await _assert_start_pipeline(
aiopg_engine, published_project, scheduler
aiopg_engine, published_project, scheduler, run_metadata
)
# -------------------------------------------------------------------------------
# 1. first run will move comp_tasks to PENDING so the worker can take them
Expand Down
Loading