diff --git a/packages/pytest-simcore/src/pytest_simcore/db_entries_mocks.py b/packages/pytest-simcore/src/pytest_simcore/db_entries_mocks.py index ae3e05f7b36..eb1e1c68cdf 100644 --- a/packages/pytest-simcore/src/pytest_simcore/db_entries_mocks.py +++ b/packages/pytest-simcore/src/pytest_simcore/db_entries_mocks.py @@ -10,11 +10,15 @@ import pytest import sqlalchemy as sa from faker import Faker +from models_library.products import ProductName from models_library.projects import ProjectAtDB, ProjectID from models_library.projects_nodes_io import NodeID +from pytest_simcore.helpers.postgres_tools import insert_and_get_row_lifespan from simcore_postgres_database.models.comp_pipeline import StateType, comp_pipeline from simcore_postgres_database.models.comp_tasks import comp_tasks +from simcore_postgres_database.models.products import products from simcore_postgres_database.models.projects import ProjectType, projects +from simcore_postgres_database.models.projects_to_products import projects_to_products from simcore_postgres_database.models.services import services_access_rights from simcore_postgres_database.models.users import UserRole, UserStatus, users from simcore_postgres_database.utils_projects_nodes import ( @@ -63,9 +67,22 @@ def creator(**user_kwargs) -> dict[str, Any]: print(f"<-- deleted users {created_user_ids=}") +@pytest.fixture +async def product_db( + sqlalchemy_async_engine: AsyncEngine, product: dict[str, Any] +) -> AsyncIterator[dict[str, Any]]: + async with insert_and_get_row_lifespan( # pylint:disable=contextmanager-generator-missing-cleanup + sqlalchemy_async_engine, + table=products, + values=product, + pk_col=products.c.name, + ) as created_product: + yield created_product + + @pytest.fixture async def project( - sqlalchemy_async_engine: AsyncEngine, faker: Faker + sqlalchemy_async_engine: AsyncEngine, faker: Faker, product_name: ProductName ) -> AsyncIterator[Callable[..., Awaitable[ProjectAtDB]]]: created_project_ids: list[str] = [] @@ -113,6 +130,12 @@ async def creator( for node_id in inserted_project.workbench ], ) + await con.execute( + projects_to_products.insert().values( + project_uuid=f"{inserted_project.uuid}", + product_name=product_name, + ) + ) print(f"--> created {inserted_project=}") created_project_ids.append(f"{inserted_project.uuid}") return inserted_project diff --git a/services/director-v2/tests/conftest.py b/services/director-v2/tests/conftest.py index 8335706ad7b..ee63f868c35 100644 --- a/services/director-v2/tests/conftest.py +++ b/services/director-v2/tests/conftest.py @@ -43,6 +43,7 @@ "pytest_simcore.docker_registry", "pytest_simcore.docker_swarm", "pytest_simcore.environment_configs", + "pytest_simcore.faker_products_data", "pytest_simcore.faker_projects_data", "pytest_simcore.faker_users_data", "pytest_simcore.minio_service", @@ -355,7 +356,6 @@ async def wrapper(*args, **kwargs): @pytest.fixture def mock_osparc_variables_api_auth_rpc(mocker: MockerFixture) -> None: - fake_data = ApiKeyGet.model_validate(ApiKeyGet.model_json_schema()["examples"][0]) async def _create( diff --git a/services/director-v2/tests/integration/01/test_computation_api.py b/services/director-v2/tests/integration/01/test_computation_api.py index f16977bc1cf..af83d218f20 100644 --- a/services/director-v2/tests/integration/01/test_computation_api.py +++ b/services/director-v2/tests/integration/01/test_computation_api.py @@ -192,6 +192,7 @@ def test_invalid_computation( async def test_start_empty_computation_is_refused( async_client: httpx.AsyncClient, create_registered_user: Callable, + product_db: dict[str, Any], project: Callable[..., Awaitable[ProjectAtDB]], osparc_product_name: str, osparc_product_api_base_url: str, @@ -397,6 +398,7 @@ async def test_run_partial_computation( wait_for_catalog_service: Callable[[UserID, str], Awaitable[None]], async_client: httpx.AsyncClient, create_registered_user: Callable, + product_db: dict[str, Any], project: Callable[..., Awaitable[ProjectAtDB]], update_project_workbench_with_comp_tasks: Callable, fake_workbench_without_outputs: dict[str, Any], @@ -549,6 +551,7 @@ async def test_run_computation( wait_for_catalog_service: Callable[[UserID, str], Awaitable[None]], async_client: httpx.AsyncClient, create_registered_user: Callable, + product_db: dict[str, Any], project: Callable[..., Awaitable[ProjectAtDB]], fake_workbench_without_outputs: dict[str, Any], update_project_workbench_with_comp_tasks: Callable, @@ -667,6 +670,7 @@ async def test_run_computation( async def test_abort_computation( async_client: httpx.AsyncClient, create_registered_user: Callable, + product_db: dict[str, Any], project: Callable[..., Awaitable[ProjectAtDB]], fake_workbench_without_outputs: dict[str, Any], fake_workbench_computational_pipeline_details: PipelineDetails, @@ -746,6 +750,7 @@ async def test_abort_computation( async def test_update_and_delete_computation( async_client: httpx.AsyncClient, create_registered_user: Callable, + product_db: dict[str, Any], project: Callable[..., Awaitable[ProjectAtDB]], fake_workbench_without_outputs: dict[str, Any], fake_workbench_computational_pipeline_details_not_started: PipelineDetails, @@ -874,6 +879,7 @@ async def test_update_and_delete_computation( async def test_pipeline_with_no_computational_services_still_create_correct_comp_tasks_in_db( async_client: httpx.AsyncClient, create_registered_user: Callable, + product_db: dict[str, Any], project: Callable[..., Awaitable[ProjectAtDB]], jupyter_service: dict[str, Any], osparc_product_name: str, @@ -920,6 +926,7 @@ async def test_pipeline_with_no_computational_services_still_create_correct_comp async def test_pipeline_with_control_loop_made_of_dynamic_services_is_allowed( client: TestClient, create_registered_user: Callable, + product_db: dict[str, Any], project: Callable[..., Awaitable[ProjectAtDB]], jupyter_service: dict[str, Any], osparc_product_name: str, @@ -991,6 +998,7 @@ async def test_pipeline_with_control_loop_made_of_dynamic_services_is_allowed( async def test_pipeline_with_cycle_containing_a_computational_service_is_forbidden( client: TestClient, create_registered_user: Callable, + product_db: dict[str, Any], project: Callable[..., Awaitable[ProjectAtDB]], sleeper_service: dict[str, Any], jupyter_service: dict[str, Any], @@ -1075,6 +1083,7 @@ async def test_pipeline_with_cycle_containing_a_computational_service_is_forbidd async def test_burst_create_computations( async_client: httpx.AsyncClient, create_registered_user: Callable, + product_db: dict[str, Any], project: Callable[..., Awaitable[ProjectAtDB]], fake_workbench_without_outputs: dict[str, Any], update_project_workbench_with_comp_tasks: Callable, diff --git a/services/director-v2/tests/integration/02/test_dynamic_services_routes.py b/services/director-v2/tests/integration/02/test_dynamic_services_routes.py index e4a5cc39047..e6a7da0a8bc 100644 --- a/services/director-v2/tests/integration/02/test_dynamic_services_routes.py +++ b/services/director-v2/tests/integration/02/test_dynamic_services_routes.py @@ -97,7 +97,9 @@ def user_id(user_db: dict[str, Any]) -> UserID: @pytest.fixture async def project_id( - user_db: dict[str, Any], project: Callable[..., Awaitable[ProjectAtDB]] + user_db: dict[str, Any], + project: Callable[..., Awaitable[ProjectAtDB]], + product_db: dict[str, Any], ) -> str: prj = await project(user=user_db) return f"{prj.uuid}" diff --git a/services/director-v2/tests/integration/02/test_dynamic_sidecar_nodeports_integration.py b/services/director-v2/tests/integration/02/test_dynamic_sidecar_nodeports_integration.py index 0c8f4ecdef3..5d738a068ce 100644 --- a/services/director-v2/tests/integration/02/test_dynamic_sidecar_nodeports_integration.py +++ b/services/director-v2/tests/integration/02/test_dynamic_sidecar_nodeports_integration.py @@ -28,6 +28,7 @@ ) from models_library.api_schemas_directorv2.computations import ComputationGet from models_library.clusters import ClusterAuthentication +from models_library.products import ProductName from models_library.projects import ( Node, NodesDict, @@ -164,6 +165,7 @@ async def minimal_configuration( ensure_swarm_and_networks: None, minio_s3_settings_envs: EnvVarsDict, current_user: dict[str, Any], + product_db: dict[str, Any], osparc_product_name: str, ) -> AsyncIterator[None]: await wait_for_catalog_service(current_user["id"], osparc_product_name) @@ -917,6 +919,13 @@ async def _assert_retrieve_completed( ), "TIP: Message missing suggests that the data was never uploaded: look in services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py" +def product_name(osparc_product_name: ProductName) -> ProductName: + """ + override the product name to be used in these tests + """ + return osparc_product_name + + @pytest.mark.flaky(max_runs=3) async def test_nodeports_integration( cleanup_services_and_networks: None, diff --git a/services/director-v2/tests/integration/02/test_mixed_dynamic_sidecar_and_legacy_project.py b/services/director-v2/tests/integration/02/test_mixed_dynamic_sidecar_and_legacy_project.py index 61af3dd5823..a7a6e375a64 100644 --- a/services/director-v2/tests/integration/02/test_mixed_dynamic_sidecar_and_legacy_project.py +++ b/services/director-v2/tests/integration/02/test_mixed_dynamic_sidecar_and_legacy_project.py @@ -139,6 +139,7 @@ def user_dict(create_registered_user: Callable) -> dict[str, Any]: async def dy_static_file_server_project( minimal_configuration: None, user_dict: dict[str, Any], + product_db: dict[str, Any], project: Callable[..., Awaitable[ProjectAtDB]], dy_static_file_server_service: dict, dy_static_file_server_dynamic_sidecar_service: dict, diff --git a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_route_computations.py b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_route_computations.py index 6359149ec59..aa2ff573f87 100644 --- a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_route_computations.py +++ b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_route_computations.py @@ -88,6 +88,7 @@ def minimal_configuration( faker: Faker, with_disabled_auto_scheduling: mock.Mock, with_disabled_scheduler_publisher: mock.Mock, + product_db: dict[str, Any], ): monkeypatch.setenv("DIRECTOR_V2_DYNAMIC_SIDECAR_ENABLED", "false") monkeypatch.setenv("COMPUTATIONAL_BACKEND_DASK_CLIENT_ENABLED", "1") @@ -363,11 +364,6 @@ def _mocked_get_pricing_unit(request, pricing_plan_id: int) -> httpx.Response: yield respx_mock -@pytest.fixture -def product_name(faker: Faker) -> str: - return faker.name() - - @pytest.fixture def product_api_base_url(faker: Faker) -> AnyHttpUrl: return TypeAdapter(AnyHttpUrl).validate_python(faker.url()) @@ -379,7 +375,7 @@ async def test_computation_create_validators( fake_workbench_without_outputs: dict[str, Any], product_name: str, product_api_base_url: AnyHttpUrl, - faker: Faker, + product_db: dict[str, Any], ): user = create_registered_user() proj = await project(user, workbench=fake_workbench_without_outputs) @@ -911,7 +907,7 @@ async def test_get_computation_from_not_started_computation_task( node_states={ t.node_id: NodeState( modified=True, - currentStatus=RunningState.NOT_STARTED, + current_status=RunningState.NOT_STARTED, progress=None, dependencies={ NodeID(node) @@ -983,7 +979,7 @@ async def test_get_computation_from_published_computation_task( node_states={ t.node_id: NodeState( modified=True, - currentStatus=RunningState.PUBLISHED, + current_status=RunningState.PUBLISHED, dependencies={ NodeID(node) for node, next_nodes in fake_workbench_adjacency.items() diff --git a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_route_computations_tasks.py b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_route_computations_tasks.py index 2c539a7c2b6..1ce7319738d 100644 --- a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_route_computations_tasks.py +++ b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_route_computations_tasks.py @@ -120,6 +120,7 @@ async def project_id( project: Callable[..., Awaitable[ProjectAtDB]], create_pipeline: Callable[..., Awaitable[CompPipelineAtDB]], create_tasks: Callable[..., Awaitable[list[CompTaskAtDB]]], + product_db: dict[str, Any], ) -> ProjectID: """project uuid of a saved project (w/ tasks up-to-date)""" @@ -133,6 +134,7 @@ async def project_id( ) # insert tasks -> comp_tasks comp_tasks = await create_tasks(user=user, project=proj) + assert comp_tasks return proj.uuid @@ -166,7 +168,7 @@ async def test_get_all_tasks_log_files( assert resp.status_code == status.HTTP_200_OK log_files = TypeAdapter(list[TaskLogFileGet]).validate_json(resp.text) assert log_files - assert all(l.download_link for l in log_files) + assert all(file.download_link for file in log_files) async def test_get_task_logs_file( diff --git a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_rpc_computations.py b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_rpc_computations.py index b4358dfc789..6a208daf72a 100644 --- a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_rpc_computations.py +++ b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_rpc_computations.py @@ -40,6 +40,7 @@ async def test_rpc_list_computation_runs_and_tasks( create_tasks: Callable[..., Awaitable[list[CompTaskAtDB]]], create_comp_run: Callable[..., Awaitable[CompRunsAtDB]], rpc_client: RabbitMQRPCClient, + product_db: dict[str, Any], ): user = create_registered_user() proj = await project(user, workbench=fake_workbench_without_outputs) @@ -119,6 +120,7 @@ async def test_rpc_list_computation_runs_with_filtering( create_tasks: Callable[..., Awaitable[list[CompTaskAtDB]]], create_comp_run: Callable[..., Awaitable[CompRunsAtDB]], rpc_client: RabbitMQRPCClient, + product_db: dict[str, Any], ): user = create_registered_user() @@ -175,6 +177,7 @@ async def test_rpc_list_computation_runs_history( create_tasks: Callable[..., Awaitable[list[CompTaskAtDB]]], create_comp_run: Callable[..., Awaitable[CompRunsAtDB]], rpc_client: RabbitMQRPCClient, + product_db: dict[str, Any], ): user = create_registered_user() @@ -186,6 +189,7 @@ async def test_rpc_list_computation_runs_history( comp_tasks = await create_tasks( user=user, project=proj, state=StateType.PUBLISHED, progress=None ) + assert comp_tasks comp_runs_1 = await create_comp_run( user=user, project=proj, @@ -195,6 +199,7 @@ async def test_rpc_list_computation_runs_history( iteration=1, dag_adjacency_list=fake_workbench_adjacency, ) + assert comp_runs_1 comp_runs_2 = await create_comp_run( user=user, project=proj, @@ -204,6 +209,7 @@ async def test_rpc_list_computation_runs_history( iteration=2, dag_adjacency_list=fake_workbench_adjacency, ) + assert comp_runs_2 comp_runs_3 = await create_comp_run( user=user, project=proj, @@ -213,6 +219,7 @@ async def test_rpc_list_computation_runs_history( iteration=3, dag_adjacency_list=fake_workbench_adjacency, ) + assert comp_runs_3 output = await rpc_computations.list_computations_iterations_page( rpc_client, product_name="osparc", user_id=user["id"], project_ids=[proj.uuid] diff --git a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_db_repositories_comp_runs.py b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_db_repositories_comp_runs.py index 64de723a48a..67edec5591e 100644 --- a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_db_repositories_comp_runs.py +++ b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_db_repositories_comp_runs.py @@ -9,7 +9,7 @@ import datetime import random from collections.abc import Awaitable, Callable -from typing import cast +from typing import Any, cast import arrow import pytest @@ -56,6 +56,7 @@ async def test_get( fake_project_id: ProjectID, publish_project: Callable[[], Awaitable[PublishedProject]], create_comp_run: Callable[..., Awaitable[CompRunsAtDB]], + product_db: dict[str, Any], ): with pytest.raises(ComputationalRunNotFoundError): await CompRunsRepository(sqlalchemy_async_engine).get( @@ -88,6 +89,7 @@ async def test_list( publish_project: Callable[[], Awaitable[PublishedProject]], run_metadata: RunMetadataDict, faker: Faker, + product_db: dict[str, Any], ): assert await CompRunsRepository(sqlalchemy_async_engine).list_() == [] @@ -269,6 +271,7 @@ async def test_create( run_metadata: RunMetadataDict, faker: Faker, publish_project: Callable[[], Awaitable[PublishedProject]], + product_db: dict[str, Any], ): with pytest.raises(ProjectNotFoundError): await CompRunsRepository(sqlalchemy_async_engine).create( @@ -331,6 +334,7 @@ async def test_update( run_metadata: RunMetadataDict, faker: Faker, publish_project: Callable[[], Awaitable[PublishedProject]], + product_db: dict[str, Any], ): # this updates nothing but also does not complain updated = await CompRunsRepository(sqlalchemy_async_engine).update( @@ -371,6 +375,7 @@ async def test_set_run_result( run_metadata: RunMetadataDict, faker: Faker, publish_project: Callable[[], Awaitable[PublishedProject]], + product_db: dict[str, Any], ): published_project = await publish_project() created = await CompRunsRepository(sqlalchemy_async_engine).create( @@ -419,6 +424,7 @@ async def test_mark_for_cancellation( run_metadata: RunMetadataDict, faker: Faker, publish_project: Callable[[], Awaitable[PublishedProject]], + product_db: dict[str, Any], ): published_project = await publish_project() created = await CompRunsRepository(sqlalchemy_async_engine).create( @@ -451,6 +457,7 @@ async def test_mark_for_scheduling( run_metadata: RunMetadataDict, faker: Faker, publish_project: Callable[[], Awaitable[PublishedProject]], + product_db: dict[str, Any], ): published_project = await publish_project() created = await CompRunsRepository(sqlalchemy_async_engine).create( @@ -485,6 +492,7 @@ async def test_mark_scheduling_done( run_metadata: RunMetadataDict, faker: Faker, publish_project: Callable[[], Awaitable[PublishedProject]], + product_db: dict[str, Any], ): published_project = await publish_project() created = await CompRunsRepository(sqlalchemy_async_engine).create( diff --git a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_manager.py b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_manager.py index f38b3302431..9c08e86c886 100644 --- a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_manager.py +++ b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_manager.py @@ -82,6 +82,7 @@ async def test_manager_starts_and_auto_schedules_pipelines( mocked_schedule_all_pipelines: mock.Mock, initialized_app: FastAPI, sqlalchemy_async_engine: AsyncEngine, + product_db: dict[str, Any], ): await assert_comp_runs_empty(sqlalchemy_async_engine) mocked_schedule_all_pipelines.assert_called() @@ -93,6 +94,7 @@ async def test_schedule_all_pipelines_empty_db( initialized_app: FastAPI, scheduler_rabbit_client_parser: mock.AsyncMock, sqlalchemy_async_engine: AsyncEngine, + product_db: dict[str, Any], ): await assert_comp_runs_empty(sqlalchemy_async_engine) @@ -109,6 +111,7 @@ async def test_schedule_all_pipelines_concurently_runs_exclusively_and_raises( with_disabled_auto_scheduling: mock.Mock, initialized_app: FastAPI, mocker: MockerFixture, + product_db: dict[str, Any], ): CONCURRENCY = 5 # NOTE: this ensure no flakyness as empty scheduling is very fast @@ -320,6 +323,7 @@ async def test_empty_pipeline_is_not_scheduled( with_disabled_auto_scheduling: mock.Mock, with_disabled_scheduler_worker: mock.Mock, initialized_app: FastAPI, + product_db: dict[str, Any], create_registered_user: Callable[..., dict[str, Any]], project: Callable[..., Awaitable[ProjectAtDB]], create_pipeline: Callable[..., Awaitable[CompPipelineAtDB]], diff --git a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py index 1a390c200ab..3112d732360 100644 --- a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py +++ b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py @@ -965,6 +965,7 @@ async def with_started_project( instrumentation_rabbit_client_parser: mock.AsyncMock, resource_tracking_rabbit_client_parser: mock.AsyncMock, computational_pipeline_rabbit_client_parser: mock.AsyncMock, + product_db: dict[str, Any], ) -> RunningProject: with_disabled_auto_scheduling.assert_called_once() published_project = await publish_project() @@ -1210,6 +1211,7 @@ async def test_broken_pipeline_configuration_is_not_scheduled_and_aborted( sqlalchemy_async_engine: AsyncEngine, run_metadata: RunMetadataDict, computational_pipeline_rabbit_client_parser: mock.AsyncMock, + product_db: dict[str, Any], ): """A pipeline which comp_tasks are missing should not be scheduled. It shall be aborted and shown as such in the comp_runs db""" diff --git a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_worker.py b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_worker.py index 1970797e5d7..ecd611113f4 100644 --- a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_worker.py +++ b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_worker.py @@ -9,6 +9,7 @@ import asyncio from collections.abc import Awaitable, Callable +from typing import Any from unittest import mock import pytest @@ -109,6 +110,7 @@ async def test_worker_scheduling_parallelism( initialized_app: FastAPI, publish_project: Callable[[], Awaitable[PublishedProject]], run_metadata: RunMetadataDict, + product_db: dict[str, Any], ): with_disabled_auto_scheduling.assert_called_once() diff --git a/services/director-v2/tests/unit/with_dbs/conftest.py b/services/director-v2/tests/unit/with_dbs/conftest.py index b95dfdb162b..1b019bd8e8f 100644 --- a/services/director-v2/tests/unit/with_dbs/conftest.py +++ b/services/director-v2/tests/unit/with_dbs/conftest.py @@ -342,6 +342,7 @@ async def _() -> PublishedProject: @pytest.fixture async def published_project( + product_db: dict[str, Any], publish_project: Callable[[], Awaitable[PublishedProject]], ) -> PublishedProject: return await publish_project() @@ -349,6 +350,7 @@ async def published_project( @pytest.fixture async def running_project( + product_db: dict[str, Any], create_registered_user: Callable[..., dict[str, Any]], project: Callable[..., Awaitable[ProjectAtDB]], create_pipeline: Callable[..., Awaitable[CompPipelineAtDB]], @@ -409,6 +411,7 @@ async def running_project_mark_for_cancellation( ], fake_workbench_without_outputs: dict[str, Any], fake_workbench_adjacency: dict[str, Any], + product_db: dict[str, Any], ) -> RunningProject: user = create_registered_user() created_project = await project(user, workbench=fake_workbench_without_outputs) diff --git a/services/director-v2/tests/unit/with_dbs/test_cli.py b/services/director-v2/tests/unit/with_dbs/test_cli.py index 2f7706cbda1..6faf75b6a99 100644 --- a/services/director-v2/tests/unit/with_dbs/test_cli.py +++ b/services/director-v2/tests/unit/with_dbs/test_cli.py @@ -43,6 +43,7 @@ def minimal_configuration( postgres_host_config: dict[str, str], monkeypatch: pytest.MonkeyPatch, faker: Faker, + product_db: dict[str, Any], ): monkeypatch.setenv("DIRECTOR_V2_DYNAMIC_SIDECAR_ENABLED", "false") monkeypatch.setenv("DIRECTOR_V2_POSTGRES_ENABLED", "1") @@ -62,6 +63,7 @@ def cli_runner(minimal_configuration: None) -> CliRunner: @pytest.fixture async def project_at_db( create_registered_user: Callable[..., dict[str, Any]], + product_db: dict[str, Any], project: Callable[..., Awaitable[ProjectAtDB]], fake_workbench_without_outputs: dict[str, Any], ) -> ProjectAtDB: diff --git a/services/director-v2/tests/unit/with_dbs/test_modules_db_repositories_projects.py b/services/director-v2/tests/unit/with_dbs/test_modules_db_repositories_projects.py index 14ff015d790..67c4ac8d6f9 100644 --- a/services/director-v2/tests/unit/with_dbs/test_modules_db_repositories_projects.py +++ b/services/director-v2/tests/unit/with_dbs/test_modules_db_repositories_projects.py @@ -72,6 +72,7 @@ def workbench() -> dict[str, Any]: async def project( mock_env: EnvVarsDict, create_registered_user: Callable[..., dict], + product_db: dict[str, Any], project: Callable[..., Awaitable[ProjectAtDB]], workbench: dict[str, Any], ) -> ProjectAtDB: diff --git a/services/director-v2/tests/unit/with_dbs/test_utils_rabbitmq.py b/services/director-v2/tests/unit/with_dbs/test_utils_rabbitmq.py index cb3d81a910d..17a36f316b4 100644 --- a/services/director-v2/tests/unit/with_dbs/test_utils_rabbitmq.py +++ b/services/director-v2/tests/unit/with_dbs/test_utils_rabbitmq.py @@ -88,6 +88,7 @@ async def project( user: dict[str, Any], fake_workbench_without_outputs: dict[str, Any], project: Callable[..., Awaitable[ProjectAtDB]], + product_db: dict[str, Any], ) -> ProjectAtDB: return await project(user, workbench=fake_workbench_without_outputs) diff --git a/services/web/server/tests/data/workbench_2connected_jupyterlabs.json b/services/web/server/tests/data/workbench_2connected_jupyterlabs.json new file mode 100644 index 00000000000..af2efefe7ea --- /dev/null +++ b/services/web/server/tests/data/workbench_2connected_jupyterlabs.json @@ -0,0 +1,37 @@ +{ + "e8eae2cd-ae0f-4ba6-ae0b-86eeadf99b42": { + "key": "simcore/services/dynamic/jupyter-math", + "version": "3.0.5", + "label": "JupyterLab Math (Python+Octave)", + "inputs": {}, + "inputsRequired": [], + "inputNodes": [] + }, + "f7d6dc1e-a6dc-44e1-9588-a2f4b05d3d9c": { + "key": "simcore/services/dynamic/jupyter-math", + "version": "3.0.5", + "label": "JupyterLab Math (Python+Octave)_2", + "inputs": { + "input_1": { + "nodeUuid": "e8eae2cd-ae0f-4ba6-ae0b-86eeadf99b42", + "output": "output_1" + }, + "input_2": { + "nodeUuid": "e8eae2cd-ae0f-4ba6-ae0b-86eeadf99b42", + "output": "output_2" + }, + "input_3": { + "nodeUuid": "e8eae2cd-ae0f-4ba6-ae0b-86eeadf99b42", + "output": "output_3" + }, + "input_4": { + "nodeUuid": "e8eae2cd-ae0f-4ba6-ae0b-86eeadf99b42", + "output": "output_4" + } + }, + "inputsRequired": [], + "inputNodes": [ + "e8eae2cd-ae0f-4ba6-ae0b-86eeadf99b42" + ] + } +} diff --git a/services/web/server/tests/unit/with_dbs/04/notifications/test_notifications__db_comp_tasks_listening_task.py b/services/web/server/tests/unit/with_dbs/04/notifications/test_notifications__db_comp_tasks_listening_task.py index e6b71ae72b8..b1f590c8982 100644 --- a/services/web/server/tests/unit/with_dbs/04/notifications/test_notifications__db_comp_tasks_listening_task.py +++ b/services/web/server/tests/unit/with_dbs/04/notifications/test_notifications__db_comp_tasks_listening_task.py @@ -5,11 +5,14 @@ # pylint:disable=too-many-arguments # pylint:disable=protected-access +import asyncio import json import logging import secrets from collections.abc import AsyncIterator, Awaitable, Callable from dataclasses import dataclass +from datetime import timedelta +from pathlib import Path from typing import Any from unittest import mock @@ -18,10 +21,13 @@ import simcore_service_webserver.db_listener import simcore_service_webserver.db_listener._db_comp_tasks_listening_task from aiohttp.test_utils import TestClient +from common_library.async_tools import delayed_start from faker import Faker from models_library.projects import ProjectAtDB +from models_library.projects_nodes import InputsDict from pytest_mock import MockType from pytest_mock.plugin import MockerFixture +from pytest_simcore.helpers.logging_tools import log_context from pytest_simcore.helpers.webserver_users import UserInfoDict from simcore_postgres_database.models.comp_pipeline import StateType from simcore_postgres_database.models.comp_tasks import NodeClass, comp_tasks @@ -30,6 +36,7 @@ create_comp_tasks_listening_task, ) from sqlalchemy.ext.asyncio import AsyncEngine +from tenacity import stop_after_attempt from tenacity.asyncio import AsyncRetrying from tenacity.before_sleep import before_sleep_log from tenacity.retry import retry_if_exception_type @@ -208,3 +215,164 @@ async def test_db_listener_triggers_on_event_with_multiple_tasks( ), f"_get_changed_comp_task_row was not called with task_id={updated_task_id}. Calls: {spied_get_changed_comp_task_row.call_args_list}" else: spied_get_changed_comp_task_row.assert_not_called() + + +@pytest.fixture +def fake_2connected_jupyterlabs_workbench(tests_data_dir: Path) -> dict[str, Any]: + fpath = tests_data_dir / "workbench_2connected_jupyterlabs.json" + assert fpath.exists() + return json.loads(fpath.read_text()) + + +@pytest.fixture +async def mock_dynamic_service_rpc( + mocker: MockerFixture, +) -> mock.AsyncMock: + """ + Mocks the dynamic service RPC calls to avoid actual service calls during tests. + """ + return mocker.patch( + "servicelib.rabbitmq.rpc_interfaces.dynamic_scheduler.services.retrieve_inputs", + autospec=True, + ) + + +async def _check_for_stability( + function: Callable[..., Awaitable[None]], *args, **kwargs +) -> None: + async for attempt in AsyncRetrying( + stop=stop_after_attempt(5), + wait=wait_fixed(1), + retry=retry_if_exception_type(), + reraise=True, + ): + with attempt: # noqa: SIM117 + with log_context( + logging.INFO, + msg=f"check stability of {function.__name__} {attempt.retry_state.retry_object.statistics}", + ) as log_ctx: + await function(*args, **kwargs) + log_ctx.logger.info( + "stable for %s...", attempt.retry_state.seconds_since_start + ) + + +@pytest.mark.parametrize("user_role", [UserRole.USER]) +async def test_db_listener_upgrades_projects_row_correctly( + with_started_listening_task: None, + mock_dynamic_service_rpc: mock.AsyncMock, + sqlalchemy_async_engine: AsyncEngine, + logged_user: UserInfoDict, + project: Callable[..., Awaitable[ProjectAtDB]], + fake_2connected_jupyterlabs_workbench: dict[str, Any], + pipeline: Callable[..., dict[str, Any]], + comp_task: Callable[..., dict[str, Any]], + spied_get_changed_comp_task_row: MockType, + faker: Faker, +): + some_project = await project( + logged_user, workbench=fake_2connected_jupyterlabs_workbench + ) + + # create the corresponding comp_task entries for the project workbench + pipeline(project_id=f"{some_project.uuid}") + tasks = [ + comp_task( + project_id=f"{some_project.uuid}", + node_id=node_id, + outputs=node_data.get("outputs", {}), + node_class=( + NodeClass.INTERACTIVE + if "dynamic" in node_data["key"] + else NodeClass.COMPUTATIONAL + ), + inputs=node_data.get("inputs", InputsDict()), + ) + for node_id, node_data in fake_2connected_jupyterlabs_workbench.items() + ] + assert len(tasks) == 2, "Expected two tasks for the two JupyterLab nodes" + first_jupyter_task = tasks[0] + second_jupyter_task = tasks[1] + assert ( + len(second_jupyter_task["inputs"]) > 0 + ), "Expected inputs for the second JupyterLab task" + number_of_inputs_linked = len(second_jupyter_task["inputs"]) + + # simulate a concurrent change in all the outputs of first jupyterlab + async def _update_first_jupyter_task_output( + port_index: int, data: dict[str, Any] + ) -> None: + with log_context(logging.INFO, msg=f"Updating output {port_index + 1}"): + async with sqlalchemy_async_engine.begin() as conn: + result = await conn.execute( + comp_tasks.select() + .with_only_columns(comp_tasks.c.outputs) + .where(comp_tasks.c.task_id == first_jupyter_task["task_id"]) + .with_for_update() + ) + row = result.first() + current_outputs = row[0] if row and row[0] else {} + + # Update/add the new key while preserving existing keys + current_outputs[f"output_{port_index + 1}"] = data + + # Write back the updated outputs + await conn.execute( + comp_tasks.update() + .values(outputs=current_outputs) + .where(comp_tasks.c.task_id == first_jupyter_task["task_id"]) + ) + + @delayed_start(timedelta(seconds=2)) + async def _change_outputs_sequentially(sleep: float) -> None: + """ + Sequentially updates the outputs of the second JupyterLab task to trigger the dynamic service RPC. + """ + for i in range(number_of_inputs_linked): + await _update_first_jupyter_task_output(i, {"data": i}) + await asyncio.sleep(sleep) + + # this runs in a task + sequential_task = asyncio.create_task(_change_outputs_sequentially(5)) + assert sequential_task is not None, "Failed to create the sequential task" + + async def _check_retrieve_rpc_called(expected_ports_retrieved: int) -> None: + async for attempt in AsyncRetrying( + stop=stop_after_delay(60), + wait=wait_fixed(1), + retry=retry_if_exception_type(AssertionError), + reraise=True, + ): + with attempt: # noqa: SIM117 + with log_context( + logging.INFO, + msg=f"Checking if dynamic service retrieve RPC was called and " + f"all expected ports were retrieved {expected_ports_retrieved} " + f"times, {attempt.retry_state.retry_object.statistics}", + ) as log_ctx: + if mock_dynamic_service_rpc.call_count > 0: + log_ctx.logger.info( + "call arguments: %s", + mock_dynamic_service_rpc.call_args_list, + ) + # Assert that the dynamic service RPC was called + assert ( + mock_dynamic_service_rpc.call_count > 0 + ), "Dynamic service retrieve RPC was not called" + # now get we check which ports were retrieved, we expect all of them + all_ports = set() + for call in mock_dynamic_service_rpc.call_args_list: + retrieved_ports = call[1]["port_keys"] + all_ports.update(retrieved_ports) + assert len(all_ports) == expected_ports_retrieved, ( + f"Expected {expected_ports_retrieved} ports to be retrieved, " + f"but got {len(all_ports)}: {all_ports}" + ) + log_ctx.logger.info( + "Dynamic service retrieve RPC was called with all expected ports!" + ) + + await _check_for_stability(_check_retrieve_rpc_called, number_of_inputs_linked) + await asyncio.wait_for(sequential_task, timeout=60) + assert sequential_task.done(), "Sequential task did not complete" + assert not sequential_task.cancelled(), "Sequential task was cancelled unexpectedly"