diff --git a/packages/models-library/src/models_library/rabbitmq_messages.py b/packages/models-library/src/models_library/rabbitmq_messages.py index 44d2b7ddc9a..3be08148628 100644 --- a/packages/models-library/src/models_library/rabbitmq_messages.py +++ b/packages/models-library/src/models_library/rabbitmq_messages.py @@ -316,3 +316,13 @@ class WalletCreditsLimitReachedMessage(RabbitMessageBase): def routing_key(self) -> str | None: return f"{self.wallet_id}.{self.credits_limit}" + + +class ComputationalPipelineStatusMessage(RabbitMessageBase, ProjectMessageBase): + channel_name: Literal["io.simcore.service.computation.pipeline-status"] = ( + "io.simcore.service.computation.pipeline-status" + ) + run_result: RunningState + + def routing_key(self) -> str | None: + return f"{self.project_id}" diff --git a/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py b/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py index 4a0bd5dfc20..f8798aa3e63 100644 --- a/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py +++ b/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py @@ -93,16 +93,20 @@ async def _check_pipeline_not_running_or_raise_409( - comp_tasks_repo: CompTasksRepository, computation: ComputationCreate + comp_runs_repo: CompRunsRepository, + computation: ComputationCreate, ) -> None: - pipeline_state = utils.get_pipeline_state_from_task_states( - await comp_tasks_repo.list_computational_tasks(computation.project_id) - ) - if utils.is_pipeline_running(pipeline_state): - raise HTTPException( - status_code=status.HTTP_409_CONFLICT, - detail=f"Project {computation.project_id} already started, current state is {pipeline_state}", + with contextlib.suppress(ComputationalRunNotFoundError): + last_run = await comp_runs_repo.get( + user_id=computation.user_id, project_id=computation.project_id ) + pipeline_state = last_run.result + + if utils.is_pipeline_running(pipeline_state): + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail=f"Project {computation.project_id} already started, current state is {pipeline_state}", + ) async def _check_pipeline_startable( @@ -302,7 +306,7 @@ async def create_or_update_or_start_computation( # noqa: PLR0913 # pylint: disa project: ProjectAtDB = await project_repo.get_project(computation.project_id) # check if current state allow to modify the computation - await _check_pipeline_not_running_or_raise_409(comp_tasks_repo, computation) + await _check_pipeline_not_running_or_raise_409(comp_runs_repo, computation) # create the complete DAG graph complete_dag = create_complete_dag(project.workbench) @@ -353,20 +357,14 @@ async def create_or_update_or_start_computation( # noqa: PLR0913 # pylint: disa projects_metadata_repo=projects_metadata_repo, ) - # filter the tasks by the effective pipeline - filtered_tasks = [ - t - for t in comp_tasks - if f"{t.node_id}" in set(minimal_computational_dag.nodes()) - ] - pipeline_state = utils.get_pipeline_state_from_task_states(filtered_tasks) - # get run details if any last_run: CompRunsAtDB | None = None + pipeline_state = RunningState.NOT_STARTED with contextlib.suppress(ComputationalRunNotFoundError): last_run = await comp_runs_repo.get( user_id=computation.user_id, project_id=computation.project_id ) + pipeline_state = last_run.result return ComputationGet( id=computation.project_id, @@ -449,21 +447,10 @@ async def get_computation( # check that project actually exists await project_repo.get_project(project_id) - pipeline_dag, all_tasks, filtered_tasks = await analyze_pipeline( + pipeline_dag, all_tasks, _filtered_tasks = await analyze_pipeline( project_id, comp_pipelines_repo, comp_tasks_repo ) - pipeline_state: RunningState = utils.get_pipeline_state_from_task_states( - filtered_tasks - ) - - _logger.debug( - "Computational task status by %s for %s has %s", - f"{user_id=}", - f"{project_id=}", - f"{pipeline_state=}", - ) - # create the complete DAG graph complete_dag = create_complete_dag_from_tasks(all_tasks) pipeline_details = await compute_pipeline_details( @@ -472,8 +459,17 @@ async def get_computation( # get run details if any last_run: CompRunsAtDB | None = None + pipeline_state = RunningState.NOT_STARTED with contextlib.suppress(ComputationalRunNotFoundError): last_run = await comp_runs_repo.get(user_id=user_id, project_id=project_id) + pipeline_state = last_run.result + + _logger.debug( + "Computational task status by %s for %s has %s", + f"{user_id=}", + f"{project_id=}", + f"{pipeline_state=}", + ) self_url = request.url.remove_query_params("user_id") return ComputationGet( @@ -536,23 +532,18 @@ async def stop_computation( tasks: list[CompTaskAtDB] = await comp_tasks_repo.list_tasks(project_id) # create the complete DAG graph complete_dag = create_complete_dag_from_tasks(tasks) - # filter the tasks by the effective pipeline - filtered_tasks = [ - t for t in tasks if f"{t.node_id}" in set(pipeline_dag.nodes()) - ] - pipeline_state = utils.get_pipeline_state_from_task_states(filtered_tasks) - - if utils.is_pipeline_running(pipeline_state): - await stop_pipeline( - request.app, user_id=computation_stop.user_id, project_id=project_id - ) - - # get run details if any + # stop the pipeline if it is running last_run: CompRunsAtDB | None = None + pipeline_state = RunningState.UNKNOWN with contextlib.suppress(ComputationalRunNotFoundError): last_run = await comp_runs_repo.get( user_id=computation_stop.user_id, project_id=project_id ) + pipeline_state = last_run.result + if utils.is_pipeline_running(last_run.result): + await stop_pipeline( + request.app, user_id=computation_stop.user_id, project_id=project_id + ) return ComputationGet( id=project_id, @@ -594,15 +585,20 @@ async def delete_computation( comp_tasks_repo: Annotated[ CompTasksRepository, Depends(get_repository(CompTasksRepository)) ], + comp_runs_repo: Annotated[ + CompRunsRepository, Depends(get_repository(CompRunsRepository)) + ], ) -> None: try: # get the project project: ProjectAtDB = await project_repo.get_project(project_id) # check if current state allow to stop the computation - comp_tasks: list[CompTaskAtDB] = await comp_tasks_repo.list_computational_tasks( - project_id - ) - pipeline_state = utils.get_pipeline_state_from_task_states(comp_tasks) + pipeline_state = RunningState.UNKNOWN + with contextlib.suppress(ComputationalRunNotFoundError): + last_run = await comp_runs_repo.get( + user_id=computation_stop.user_id, project_id=project_id + ) + pipeline_state = last_run.result if utils.is_pipeline_running(pipeline_state): if not computation_stop.force: raise HTTPException( @@ -634,12 +630,10 @@ def return_last_value(retry_state: Any) -> Any: before_sleep=before_sleep_log(_logger, logging.INFO), ) async def check_pipeline_stopped() -> bool: - comp_tasks: list[CompTaskAtDB] = ( - await comp_tasks_repo.list_computational_tasks(project_id) - ) - pipeline_state = utils.get_pipeline_state_from_task_states( - comp_tasks, + last_run = await comp_runs_repo.get( + user_id=computation_stop.user_id, project_id=project_id ) + pipeline_state = last_run.result return utils.is_pipeline_stopped(pipeline_state) # wait for the pipeline to be stopped diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_manager.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_manager.py index 430bb4a871e..a52a0b43938 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_manager.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_manager.py @@ -1,3 +1,4 @@ +import contextlib import logging from typing import Final @@ -13,10 +14,11 @@ from servicelib.utils import limited_gather from sqlalchemy.ext.asyncio import AsyncEngine +from ...core.errors import ComputationalRunNotFoundError from ...models.comp_pipelines import CompPipelineAtDB from ...models.comp_runs import RunMetadataDict from ...models.comp_tasks import CompTaskAtDB -from ...utils.rabbitmq import publish_project_log +from ...utils.rabbitmq import publish_pipeline_scheduling_state, publish_project_log from ..db import get_db_engine from ..db.repositories.comp_pipelines import CompPipelinesRepository from ..db.repositories.comp_runs import CompRunsRepository @@ -57,6 +59,18 @@ async def run_new_pipeline( ) return + with contextlib.suppress(ComputationalRunNotFoundError): + # if the run already exists and is scheduled, do not schedule again. + last_run = await CompRunsRepository.instance(db_engine).get( + user_id=user_id, project_id=project_id + ) + if last_run.result.is_running(): + _logger.warning( + "run for project %s is already running. not scheduling it again.", + f"{project_id=}", + ) + return + new_run = await CompRunsRepository.instance(db_engine).create( user_id=user_id, project_id=project_id, @@ -92,6 +106,9 @@ async def run_new_pipeline( log=f"Project pipeline scheduled using {'on-demand clusters' if use_on_demand_clusters else 'pre-defined clusters'}, starting soon...", log_level=logging.INFO, ) + await publish_pipeline_scheduling_state( + rabbitmq_client, user_id, project_id, new_run.result + ) async def stop_pipeline( @@ -128,8 +145,7 @@ async def _get_pipeline_at_db( project_id: ProjectID, db_engine: AsyncEngine ) -> CompPipelineAtDB: comp_pipeline_repo = CompPipelinesRepository.instance(db_engine) - pipeline_at_db = await comp_pipeline_repo.get_pipeline(project_id) - return pipeline_at_db + return await comp_pipeline_repo.get_pipeline(project_id) async def _get_pipeline_tasks_at_db( diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_publisher.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_publisher.py index 42c4b1d7938..f358f5eb3b9 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_publisher.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_publisher.py @@ -16,9 +16,10 @@ async def request_pipeline_scheduling( project_id: ProjectID, iteration: Iteration, ) -> None: - # NOTE: we should use the transaction and the asyncpg engine here to ensure 100% consistency - # https://github.com/ITISFoundation/osparc-simcore/issues/6818 - # async with transaction_context(get_asyncpg_engine(app)) as connection: + # NOTE: it is important that the DB is set up first before scheduling, in case the worker already schedules before we change the DB + await CompRunsRepository.instance(db_engine).mark_for_scheduling( + user_id=user_id, project_id=project_id, iteration=iteration + ) await rabbitmq_client.publish( SchedulePipelineRabbitMessage.get_channel_name(), SchedulePipelineRabbitMessage( @@ -27,6 +28,3 @@ async def request_pipeline_scheduling( iteration=iteration, ), ) - await CompRunsRepository.instance(db_engine).mark_for_scheduling( - user_id=user_id, project_id=project_id, iteration=iteration - ) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py index 9d40b534db9..e0d574ebc0f 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py @@ -52,6 +52,7 @@ from ...models.comp_tasks import CompTaskAtDB from ...utils.computations import get_pipeline_state_from_task_states from ...utils.rabbitmq import ( + publish_pipeline_scheduling_state, publish_project_log, publish_service_resource_tracking_heartbeat, publish_service_resource_tracking_started, @@ -208,10 +209,13 @@ async def _update_run_result_from_tasks( project_id: ProjectID, iteration: Iteration, pipeline_tasks: dict[NodeIDStr, CompTaskAtDB], + current_result: RunningState, ) -> RunningState: pipeline_state_from_tasks = get_pipeline_state_from_task_states( list(pipeline_tasks.values()), ) + if pipeline_state_from_tasks == current_result: + return pipeline_state_from_tasks _logger.debug( "pipeline %s is currently in %s", f"{user_id=}_{project_id=}_{iteration=}", @@ -238,17 +242,35 @@ async def _set_run_result( final_state=(run_result in COMPLETED_STATES), ) - async def _set_schedule_done( + if run_result in COMPLETED_STATES: + # send event to notify the piipeline is done + await publish_project_log( + self.rabbitmq_client, + user_id=user_id, + project_id=project_id, + log=f"Pipeline run {run_result.value} for iteration {iteration} is done with {run_result.value} state", + log_level=logging.INFO, + ) + await publish_pipeline_scheduling_state( + self.rabbitmq_client, user_id, project_id, run_result + ) + + async def _set_processing_done( self, user_id: UserID, project_id: ProjectID, iteration: Iteration, ) -> None: - await CompRunsRepository.instance(self.db_engine).mark_as_processed( - user_id=user_id, - project_id=project_id, - iteration=iteration, - ) + with log_context( + _logger, + logging.DEBUG, + msg=f"mark pipeline run for {iteration=} for {user_id=} and {project_id=} as processed", + ): + await CompRunsRepository.instance(self.db_engine).mark_as_processed( + user_id=user_id, + project_id=project_id, + iteration=iteration, + ) async def _set_states_following_failed_to_aborted( self, project_id: ProjectID, dag: nx.DiGraph, run_id: PositiveInt @@ -622,7 +644,7 @@ async def apply( ) # 3. do we want to stop the pipeline now? if comp_run.cancelled: - await self._schedule_tasks_to_stop( + comp_tasks = await self._schedule_tasks_to_stop( user_id, project_id, comp_tasks, comp_run ) else: @@ -653,7 +675,7 @@ async def apply( # 6. Update the run result pipeline_result = await self._update_run_result_from_tasks( - user_id, project_id, iteration, comp_tasks + user_id, project_id, iteration, comp_tasks, comp_run.result ) # 7. Are we done scheduling that pipeline? @@ -702,7 +724,7 @@ async def apply( except ComputationalBackendNotConnectedError: _logger.exception("Computational backend is not connected!") finally: - await self._set_schedule_done(user_id, project_id, iteration) + await self._set_processing_done(user_id, project_id, iteration) async def _schedule_tasks_to_stop( self, @@ -710,20 +732,29 @@ async def _schedule_tasks_to_stop( project_id: ProjectID, comp_tasks: dict[NodeIDStr, CompTaskAtDB], comp_run: CompRunsAtDB, - ) -> None: - # get any running task and stop them + ) -> dict[NodeIDStr, CompTaskAtDB]: + # NOTE: tasks that were not yet started but can be marked as ABORTED straight away, + # the tasks that are already processing need some time to stop + # and we need to stop them in the backend + tasks_instantly_stopeable = [ + t for t in comp_tasks.values() if t.state in TASK_TO_START_STATES + ] comp_tasks_repo = CompTasksRepository.instance(self.db_engine) await ( comp_tasks_repo.mark_project_published_waiting_for_cluster_tasks_as_aborted( project_id, comp_run.run_id ) ) + for task in tasks_instantly_stopeable: + comp_tasks[f"{task.node_id}"].state = RunningState.ABORTED # stop any remaining running task, these are already submitted if tasks_to_stop := [ t for t in comp_tasks.values() if t.state in PROCESSING_STATES ]: await self._stop_tasks(user_id, tasks_to_stop, comp_run) + return comp_tasks + async def _schedule_tasks_to_start( # noqa: C901 self, user_id: UserID, diff --git a/services/director-v2/src/simcore_service_director_v2/utils/rabbitmq.py b/services/director-v2/src/simcore_service_director_v2/utils/rabbitmq.py index 6f6e1693193..fb20fc0ee2e 100644 --- a/services/director-v2/src/simcore_service_director_v2/utils/rabbitmq.py +++ b/services/director-v2/src/simcore_service_director_v2/utils/rabbitmq.py @@ -5,6 +5,7 @@ from models_library.projects_nodes_io import NodeID from models_library.projects_state import RunningState from models_library.rabbitmq_messages import ( + ComputationalPipelineStatusMessage, InstrumentationRabbitMessage, LoggerRabbitMessage, ProgressRabbitMessageNode, @@ -197,3 +198,17 @@ async def publish_project_log( log_level=log_level, ) await rabbitmq_client.publish(message.channel_name, message) + + +async def publish_pipeline_scheduling_state( + rabbitmq_client: RabbitMQClient, + user_id: UserID, + project_id: ProjectID, + state: RunningState, +) -> None: + message = ComputationalPipelineStatusMessage.model_construct( + user_id=user_id, + project_id=project_id, + run_result=state, + ) + await rabbitmq_client.publish(message.channel_name, message) diff --git a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_route_computations.py b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_route_computations.py index 368b6c273b5..6359149ec59 100644 --- a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_route_computations.py +++ b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_api_route_computations.py @@ -854,7 +854,7 @@ async def test_get_computation_from_empty_project( assert returned_computation expected_computation = ComputationGet( id=proj.uuid, - state=RunningState.UNKNOWN, + state=RunningState.NOT_STARTED, pipeline_details=PipelineDetails( adjacency_list={}, node_states={}, progress=None ), diff --git a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py index c4d360adba2..1a390c200ab 100644 --- a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py +++ b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py @@ -5,7 +5,8 @@ # pylint:disable=protected-access # pylint:disable=too-many-arguments # pylint:disable=no-name-in-module -# pylint: disable=too-many-statements +# pylint:disable=too-many-positional-arguments +# pylint:disable=too-many-statements import asyncio @@ -35,6 +36,7 @@ from models_library.projects_nodes_io import NodeID from models_library.projects_state import RunningState from models_library.rabbitmq_messages import ( + ComputationalPipelineStatusMessage, InstrumentationRabbitMessage, RabbitResourceTrackingBaseMessage, RabbitResourceTrackingHeartbeatMessage, @@ -46,6 +48,7 @@ from pydantic import TypeAdapter from pytest_mock.plugin import MockerFixture from servicelib.rabbitmq import RabbitMQClient +from servicelib.rabbitmq._constants import BIND_TO_ALL_TOPICS from simcore_postgres_database.models.comp_runs import comp_runs from simcore_postgres_database.models.comp_tasks import NodeClass from simcore_service_director_v2.core.errors import ( @@ -160,6 +163,7 @@ async def _assert_start_pipeline( sqlalchemy_async_engine: AsyncEngine, published_project: PublishedProject, run_metadata: RunMetadataDict, + computational_pipeline_rabbit_client_parser: mock.AsyncMock, ) -> tuple[CompRunsAtDB, list[CompTaskAtDB]]: exp_published_tasks = deepcopy(published_project.tasks) assert published_project.project.prj_owner @@ -181,6 +185,11 @@ async def _assert_start_pipeline( comp_runs.c.project_uuid == f"{published_project.project.uuid}", ), ) + await _assert_message_received( + computational_pipeline_rabbit_client_parser, + 1, + ComputationalPipelineStatusMessage.model_validate_json, + ) await assert_comp_tasks_and_comp_run_snapshot_tasks( sqlalchemy_async_engine, project_uuid=published_project.project.uuid, @@ -198,6 +207,7 @@ async def _assert_publish_in_dask_backend( published_tasks: list[CompTaskAtDB], mocked_dask_client: mock.MagicMock, scheduler: BaseCompScheduler, + computational_pipeline_rabbit_client_parser: mock.AsyncMock, ) -> tuple[list[CompTaskAtDB], dict[NodeID, Callable[[], None]]]: expected_pending_tasks = [ published_tasks[1], @@ -285,6 +295,11 @@ async def _return_tasks_pending(job_ids: list[str]) -> list[RunningState]: where_statement=(comp_runs.c.user_id == published_project.project.prj_owner) & (comp_runs.c.project_uuid == f"{published_project.project.uuid}"), ) + await _assert_message_received( + computational_pipeline_rabbit_client_parser, + 1, + ComputationalPipelineStatusMessage.model_validate_json, + ) await assert_comp_tasks_and_comp_run_snapshot_tasks( sqlalchemy_async_engine, project_uuid=published_project.project.uuid, @@ -336,11 +351,42 @@ async def resource_tracking_rabbit_client_parser( await client.unsubscribe(queue_name) +@pytest.fixture +async def computational_pipeline_rabbit_client_parser( + create_rabbitmq_client: Callable[[str], RabbitMQClient], mocker: MockerFixture +) -> AsyncIterator[mock.AsyncMock]: + client = create_rabbitmq_client("computational_pipeline_pytest_consumer") + mock = mocker.AsyncMock(return_value=True) + queue_name, _ = await client.subscribe( + ComputationalPipelineStatusMessage.get_channel_name(), + mock, + topics=[BIND_TO_ALL_TOPICS], + ) + yield mock + await client.unsubscribe(queue_name) + + async def _assert_message_received( mocked_message_parser: mock.AsyncMock, expected_call_count: int, message_parser: Callable, ) -> list: + if expected_call_count == 0: + # ensure it remains so for a few seconds + mocked_message_parser.assert_not_called() + async for attempt in AsyncRetrying( + wait=wait_fixed(1), + stop=stop_after_delay(3), + retry=retry_if_exception_type(AssertionError), + reraise=True, + ): + with attempt: + print( + f"--> waiting for rabbitmq message [{attempt.retry_state.attempt_number}, {attempt.retry_state.idle_for}]" + ) + mocked_message_parser.assert_not_called() + + return [] async for attempt in AsyncRetrying( wait=wait_fixed(0.1), stop=stop_after_delay(5), @@ -351,7 +397,9 @@ async def _assert_message_received( print( f"--> waiting for rabbitmq message [{attempt.retry_state.attempt_number}, {attempt.retry_state.idle_for}]" ) - assert mocked_message_parser.call_count == expected_call_count + assert ( + mocked_message_parser.call_count == expected_call_count + ), mocked_message_parser.call_args_list print( f"<-- rabbitmq message received after [{attempt.retry_state.attempt_number}, {attempt.retry_state.idle_for}]" ) @@ -422,6 +470,7 @@ async def test_proper_pipeline_is_scheduled( # noqa: PLR0915 mocked_clean_task_output_and_log_files_if_invalid: mock.Mock, instrumentation_rabbit_client_parser: mock.AsyncMock, resource_tracking_rabbit_client_parser: mock.AsyncMock, + computational_pipeline_rabbit_client_parser: mock.AsyncMock, run_metadata: RunMetadataDict, ): with_disabled_auto_scheduling.assert_called_once() @@ -435,6 +484,7 @@ async def test_proper_pipeline_is_scheduled( # noqa: PLR0915 sqlalchemy_async_engine=sqlalchemy_async_engine, published_project=published_project, run_metadata=run_metadata, + computational_pipeline_rabbit_client_parser=computational_pipeline_rabbit_client_parser, ) with_disabled_scheduler_publisher.assert_called() @@ -446,6 +496,7 @@ async def test_proper_pipeline_is_scheduled( # noqa: PLR0915 expected_published_tasks, mocked_dask_client, scheduler_api, + computational_pipeline_rabbit_client_parser, ) # ------------------------------------------------------------------------------- @@ -478,6 +529,11 @@ async def _return_1st_task_running(job_ids: list[str]) -> list[RunningState]: comp_runs.c.project_uuid == f"{published_project.project.uuid}", ), ) + await _assert_message_received( + computational_pipeline_rabbit_client_parser, + 1, + ComputationalPipelineStatusMessage.model_validate_json, + ) await assert_comp_tasks_and_comp_run_snapshot_tasks( sqlalchemy_async_engine, project_uuid=published_project.project.uuid, @@ -854,6 +910,11 @@ async def _return_3rd_task_success(job_ids: list[str]) -> list[RunningState]: comp_runs.c.project_uuid == f"{published_project.project.uuid}", ), ) + await _assert_message_received( + computational_pipeline_rabbit_client_parser, + 1, + ComputationalPipelineStatusMessage.model_validate_json, + ) await assert_comp_tasks_and_comp_run_snapshot_tasks( sqlalchemy_async_engine, @@ -903,6 +964,7 @@ async def with_started_project( scheduler_api: BaseCompScheduler, instrumentation_rabbit_client_parser: mock.AsyncMock, resource_tracking_rabbit_client_parser: mock.AsyncMock, + computational_pipeline_rabbit_client_parser: mock.AsyncMock, ) -> RunningProject: with_disabled_auto_scheduling.assert_called_once() published_project = await publish_project() @@ -914,6 +976,7 @@ async def with_started_project( sqlalchemy_async_engine=sqlalchemy_async_engine, published_project=published_project, run_metadata=run_metadata, + computational_pipeline_rabbit_client_parser=computational_pipeline_rabbit_client_parser, ) with_disabled_scheduler_publisher.assert_called_once() @@ -929,6 +992,7 @@ async def with_started_project( expected_published_tasks, mocked_dask_client, scheduler_api, + computational_pipeline_rabbit_client_parser, ) # @@ -966,6 +1030,11 @@ async def _return_1st_task_running(job_ids: list[str]) -> list[RunningState]: comp_runs.c.project_uuid == f"{published_project.project.uuid}", ), ) + await _assert_message_received( + computational_pipeline_rabbit_client_parser, + 1, + ComputationalPipelineStatusMessage.model_validate_json, + ) await assert_comp_tasks_and_comp_run_snapshot_tasks( sqlalchemy_async_engine, project_uuid=published_project.project.uuid, @@ -1140,6 +1209,7 @@ async def test_broken_pipeline_configuration_is_not_scheduled_and_aborted( fake_workbench_adjacency: dict[str, Any], sqlalchemy_async_engine: AsyncEngine, run_metadata: RunMetadataDict, + computational_pipeline_rabbit_client_parser: mock.AsyncMock, ): """A pipeline which comp_tasks are missing should not be scheduled. It shall be aborted and shown as such in the comp_runs db""" @@ -1172,6 +1242,11 @@ async def test_broken_pipeline_configuration_is_not_scheduled_and_aborted( & (comp_runs.c.project_uuid == f"{sleepers_project.uuid}"), ) )[0] + await _assert_message_received( + computational_pipeline_rabbit_client_parser, + 1, + ComputationalPipelineStatusMessage.model_validate_json, + ) # # Trigger scheduling manually. since the pipeline is broken, it shall be aborted @@ -1188,6 +1263,11 @@ async def test_broken_pipeline_configuration_is_not_scheduled_and_aborted( where_statement=(comp_runs.c.user_id == user["id"]) & (comp_runs.c.project_uuid == f"{sleepers_project.uuid}"), ) + await _assert_message_received( + computational_pipeline_rabbit_client_parser, + 1, + ComputationalPipelineStatusMessage.model_validate_json, + ) async def test_task_progress_triggers( @@ -1201,6 +1281,7 @@ async def test_task_progress_triggers( mocked_parse_output_data_fct: mock.Mock, mocked_clean_task_output_and_log_files_if_invalid: mock.Mock, run_metadata: RunMetadataDict, + computational_pipeline_rabbit_client_parser: mock.AsyncMock, ): _with_mock_send_computation_tasks(published_project.tasks, mocked_dask_client) _run_in_db, expected_published_tasks = await _assert_start_pipeline( @@ -1208,7 +1289,9 @@ async def test_task_progress_triggers( sqlalchemy_async_engine=sqlalchemy_async_engine, published_project=published_project, run_metadata=run_metadata, + computational_pipeline_rabbit_client_parser=computational_pipeline_rabbit_client_parser, ) + # ------------------------------------------------------------------------------- # 1. first run will move comp_tasks to PENDING so the dask-worker can take them expected_pending_tasks, _ = await _assert_publish_in_dask_backend( @@ -1217,6 +1300,7 @@ async def test_task_progress_triggers( expected_published_tasks, mocked_dask_client, scheduler_api, + computational_pipeline_rabbit_client_parser, ) # send some progress @@ -1272,6 +1356,7 @@ async def test_handling_of_disconnected_scheduler_dask( published_project: PublishedProject, backend_error: ComputationalSchedulerError, run_metadata: RunMetadataDict, + computational_pipeline_rabbit_client_parser: mock.AsyncMock, ): # this will create a non connected backend issue that will trigger re-connection mocked_dask_client_send_task = mocker.patch( @@ -1289,6 +1374,11 @@ async def test_handling_of_disconnected_scheduler_dask( run_metadata=run_metadata, use_on_demand_clusters=False, ) + await _assert_message_received( + computational_pipeline_rabbit_client_parser, + 1, + ComputationalPipelineStatusMessage.model_validate_json, + ) # since there is no cluster, there is no dask-scheduler, # the tasks shall all still be in PUBLISHED state now @@ -1337,6 +1427,11 @@ async def test_handling_of_disconnected_scheduler_dask( expected_progress=1, run_id=run_in_db.run_id, ) + await _assert_message_received( + computational_pipeline_rabbit_client_parser, + 1, + ComputationalPipelineStatusMessage.model_validate_json, + ) # then we have another scheduler run await scheduler_api.apply( user_id=run_in_db.user_id, @@ -1372,6 +1467,7 @@ class RebootState: expected_task_state_group2: RunningState expected_task_progress_group2: float expected_run_state: RunningState + expected_pipeline_state_notification: int @pytest.mark.parametrize( @@ -1386,6 +1482,7 @@ class RebootState: expected_task_state_group2=RunningState.ABORTED, expected_task_progress_group2=1, expected_run_state=RunningState.FAILED, + expected_pipeline_state_notification=1, ), id="reboot with lost tasks", ), @@ -1398,6 +1495,7 @@ class RebootState: expected_task_state_group2=RunningState.ABORTED, expected_task_progress_group2=1, expected_run_state=RunningState.ABORTED, + expected_pipeline_state_notification=1, ), id="reboot with aborted tasks", ), @@ -1410,6 +1508,7 @@ class RebootState: expected_task_state_group2=RunningState.ABORTED, expected_task_progress_group2=1, expected_run_state=RunningState.FAILED, + expected_pipeline_state_notification=1, ), id="reboot with failed tasks", ), @@ -1424,6 +1523,7 @@ class RebootState: expected_task_state_group2=RunningState.STARTED, expected_task_progress_group2=0, expected_run_state=RunningState.STARTED, + expected_pipeline_state_notification=0, ), id="reboot with running tasks", ), @@ -1436,6 +1536,7 @@ class RebootState: expected_task_state_group2=RunningState.SUCCESS, expected_task_progress_group2=1, expected_run_state=RunningState.SUCCESS, + expected_pipeline_state_notification=1, ), id="reboot with completed tasks", ), @@ -1452,6 +1553,7 @@ async def test_handling_scheduled_tasks_after_director_reboots( mocked_parse_output_data_fct: mock.Mock, mocked_clean_task_output_fct: mock.Mock, reboot_state: RebootState, + computational_pipeline_rabbit_client_parser: mock.AsyncMock, ): """After the dask client is rebooted, or that the director-v2 reboots the dv-2 internal scheduler shall continue scheduling correctly. Even though the task might have continued to run @@ -1534,6 +1636,11 @@ async def mocked_get_task_result(_job_id: str) -> TaskOutputData: comp_runs.c.project_uuid == f"{running_project.project.uuid}", ), ) + await _assert_message_received( + computational_pipeline_rabbit_client_parser, + reboot_state.expected_pipeline_state_notification, + ComputationalPipelineStatusMessage.model_validate_json, + ) async def test_handling_cancellation_of_jobs_after_reboot( @@ -1545,6 +1652,7 @@ async def test_handling_cancellation_of_jobs_after_reboot( scheduler_api: BaseCompScheduler, mocked_parse_output_data_fct: mock.Mock, mocked_clean_task_output_fct: mock.Mock, + computational_pipeline_rabbit_client_parser: mock.AsyncMock, ): """A running pipeline was cancelled by a user and the DV-2 was restarted BEFORE It could actually cancel the task. On reboot the DV-2 shall recover @@ -1564,6 +1672,11 @@ async def test_handling_cancellation_of_jobs_after_reboot( ), ) )[0] + await _assert_message_received( + computational_pipeline_rabbit_client_parser, + 0, + ComputationalPipelineStatusMessage.model_validate_json, + ) await assert_comp_tasks_and_comp_run_snapshot_tasks( sqlalchemy_async_engine, @@ -1660,6 +1773,11 @@ async def _return_random_task_result(job_id) -> TaskOutputData: ), ) mocked_clean_task_output_fct.assert_called() + await _assert_message_received( + computational_pipeline_rabbit_client_parser, + 1, + ComputationalPipelineStatusMessage.model_validate_json, + ) @pytest.fixture @@ -1682,6 +1800,7 @@ async def test_running_pipeline_triggers_heartbeat( published_project: PublishedProject, resource_tracking_rabbit_client_parser: mock.AsyncMock, run_metadata: RunMetadataDict, + computational_pipeline_rabbit_client_parser: mock.AsyncMock, ): _with_mock_send_computation_tasks(published_project.tasks, mocked_dask_client) run_in_db, expected_published_tasks = await _assert_start_pipeline( @@ -1689,6 +1808,7 @@ async def test_running_pipeline_triggers_heartbeat( sqlalchemy_async_engine=sqlalchemy_async_engine, published_project=published_project, run_metadata=run_metadata, + computational_pipeline_rabbit_client_parser=computational_pipeline_rabbit_client_parser, ) # ------------------------------------------------------------------------------- # 1. first run will move comp_tasks to PENDING so the dask-worker can take them @@ -1698,6 +1818,7 @@ async def test_running_pipeline_triggers_heartbeat( expected_published_tasks, mocked_dask_client, scheduler_api, + computational_pipeline_rabbit_client_parser, ) # ------------------------------------------------------------------------------- # 2. the "worker" starts processing a task @@ -1796,6 +1917,7 @@ async def test_pipeline_with_on_demand_cluster_with_not_ready_backend_waits( run_metadata: RunMetadataDict, mocked_get_or_create_cluster: mock.Mock, faker: Faker, + computational_pipeline_rabbit_client_parser: mock.AsyncMock, ): mocked_get_or_create_cluster.side_effect = ( ComputationalBackendOnDemandNotReadyError( @@ -1824,6 +1946,11 @@ async def test_pipeline_with_on_demand_cluster_with_not_ready_backend_waits( ), ) )[0] + await _assert_message_received( + computational_pipeline_rabbit_client_parser, + 1, + ComputationalPipelineStatusMessage.model_validate_json, + ) await assert_comp_tasks_and_comp_run_snapshot_tasks( sqlalchemy_async_engine, project_uuid=published_project.project.uuid, @@ -1855,6 +1982,11 @@ async def test_pipeline_with_on_demand_cluster_with_not_ready_backend_waits( comp_runs.c.project_uuid == f"{published_project.project.uuid}", ), ) + await _assert_message_received( + computational_pipeline_rabbit_client_parser, + 1, + ComputationalPipelineStatusMessage.model_validate_json, + ) await assert_comp_tasks_and_comp_run_snapshot_tasks( sqlalchemy_async_engine, project_uuid=published_project.project.uuid, @@ -1905,6 +2037,7 @@ async def test_pipeline_with_on_demand_cluster_with_no_clusters_keeper_fails( run_metadata: RunMetadataDict, mocked_get_or_create_cluster: mock.Mock, get_or_create_exception: Exception, + computational_pipeline_rabbit_client_parser: mock.AsyncMock, ): # needs to change: https://github.com/ITISFoundation/osparc-simcore/issues/6817 @@ -1931,6 +2064,11 @@ async def test_pipeline_with_on_demand_cluster_with_no_clusters_keeper_fails( ), ) )[0] + await _assert_message_received( + computational_pipeline_rabbit_client_parser, + 1, + ComputationalPipelineStatusMessage.model_validate_json, + ) await assert_comp_tasks_and_comp_run_snapshot_tasks( sqlalchemy_async_engine, project_uuid=published_project.project.uuid, @@ -1961,6 +2099,11 @@ async def test_pipeline_with_on_demand_cluster_with_no_clusters_keeper_fails( comp_runs.c.project_uuid == f"{published_project.project.uuid}", ), ) + await _assert_message_received( + computational_pipeline_rabbit_client_parser, + 1, + ComputationalPipelineStatusMessage.model_validate_json, + ) await assert_comp_tasks_and_comp_run_snapshot_tasks( sqlalchemy_async_engine, project_uuid=published_project.project.uuid, @@ -1985,6 +2128,11 @@ async def test_pipeline_with_on_demand_cluster_with_no_clusters_keeper_fails( comp_runs.c.project_uuid == f"{published_project.project.uuid}", ), ) + await _assert_message_received( + computational_pipeline_rabbit_client_parser, + 0, + ComputationalPipelineStatusMessage.model_validate_json, + ) await assert_comp_tasks_and_comp_run_snapshot_tasks( sqlalchemy_async_engine, project_uuid=published_project.project.uuid, @@ -1993,3 +2141,78 @@ async def test_pipeline_with_on_demand_cluster_with_no_clusters_keeper_fails( expected_progress=1.0, run_id=run_in_db.run_id, ) + + +async def test_run_new_pipeline_called_twice_prevents_duplicate_runs( + with_disabled_auto_scheduling: mock.Mock, + with_disabled_scheduler_publisher: mock.Mock, + initialized_app: FastAPI, + sqlalchemy_async_engine: AsyncEngine, + published_project: PublishedProject, + run_metadata: RunMetadataDict, + computational_pipeline_rabbit_client_parser: mock.AsyncMock, +): + # Ensure we start with an empty database + await assert_comp_runs_empty(sqlalchemy_async_engine) + + # First call to run_new_pipeline - should succeed + assert published_project.project.prj_owner + await run_new_pipeline( + initialized_app, + user_id=published_project.project.prj_owner, + project_id=published_project.project.uuid, + run_metadata=run_metadata, + use_on_demand_clusters=False, + ) + + # Verify first run was created and published + runs_after_first_call = await assert_comp_runs( + sqlalchemy_async_engine, + expected_total=1, + expected_state=RunningState.PUBLISHED, + where_statement=and_( + comp_runs.c.user_id == published_project.project.prj_owner, + comp_runs.c.project_uuid == f"{published_project.project.uuid}", + ), + ) + first_run = runs_after_first_call[0] + + # Verify first RabbitMQ message was sent + await _assert_message_received( + computational_pipeline_rabbit_client_parser, + 1, + ComputationalPipelineStatusMessage.model_validate_json, + ) + + # Second call to run_new_pipeline - should be ignored since first run is still running + await run_new_pipeline( + initialized_app, + user_id=published_project.project.prj_owner, + project_id=published_project.project.uuid, + run_metadata=run_metadata, + use_on_demand_clusters=False, + ) + + # Verify still only one run exists with same run_id + runs_after_second_call = await assert_comp_runs( + sqlalchemy_async_engine, + expected_total=1, + expected_state=RunningState.PUBLISHED, + where_statement=and_( + comp_runs.c.user_id == published_project.project.prj_owner, + comp_runs.c.project_uuid == f"{published_project.project.uuid}", + ), + ) + second_run = runs_after_second_call[0] + + # Verify it's the same run (same run_id, same created timestamp) + assert first_run.run_id == second_run.run_id + assert first_run.created == second_run.created + assert first_run.iteration == second_run.iteration + + # Verify no additional RabbitMQ message was sent (still only 1 total) + await _assert_message_received( + computational_pipeline_rabbit_client_parser, + 0, # No new messages expected + ComputationalPipelineStatusMessage.model_validate_json, + ) diff --git a/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py b/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py index b1b92d4af16..438dff9ef1f 100644 --- a/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py +++ b/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py @@ -7,6 +7,7 @@ from aiohttp import web from models_library.groups import GroupID from models_library.rabbitmq_messages import ( + ComputationalPipelineStatusMessage, EventRabbitMessage, LoggerRabbitMessage, ProgressRabbitMessageNode, @@ -97,6 +98,21 @@ async def _progress_message_parser(app: web.Application, data: bytes) -> bool: return True +async def _computational_pipeline_status_message_parser( + app: web.Application, data: bytes +) -> bool: + rabbit_message = ComputationalPipelineStatusMessage.model_validate_json(data) + project = await _projects_service.get_project_for_user( + app, + f"{rabbit_message.project_id}", + rabbit_message.user_id, + include_state=True, + ) + await _projects_service.notify_project_state_update(app, project) + + return True + + async def _log_message_parser(app: web.Application, data: bytes) -> bool: rabbit_message = LoggerRabbitMessage.model_validate_json(data) await send_message_to_user( @@ -171,6 +187,11 @@ async def _osparc_credits_message_parser(app: web.Application, data: bytes) -> b _osparc_credits_message_parser, {"topics": []}, ), + SubcribeArgumentsTuple( + ComputationalPipelineStatusMessage.get_channel_name(), + _computational_pipeline_status_message_parser, + {"topics": []}, + ), ) diff --git a/services/web/server/src/simcore_service_webserver/notifications/project_logs.py b/services/web/server/src/simcore_service_webserver/notifications/project_logs.py index 5b971da7fbe..dde5c310bb9 100644 --- a/services/web/server/src/simcore_service_webserver/notifications/project_logs.py +++ b/services/web/server/src/simcore_service_webserver/notifications/project_logs.py @@ -4,6 +4,7 @@ from aiohttp import web from models_library.projects import ProjectID from models_library.rabbitmq_messages import ( + ComputationalPipelineStatusMessage, LoggerRabbitMessage, ProgressRabbitMessageNode, ProgressRabbitMessageProject, @@ -21,6 +22,7 @@ LoggerRabbitMessage, ProgressRabbitMessageNode, ProgressRabbitMessageProject, + ComputationalPipelineStatusMessage, ]