diff --git a/services/director-v2/src/simcore_service_director_v2/core/settings.py b/services/director-v2/src/simcore_service_director_v2/core/settings.py index 03f256b01b0e..28208ec34ff9 100644 --- a/services/director-v2/src/simcore_service_director_v2/core/settings.py +++ b/services/director-v2/src/simcore_service_director_v2/core/settings.py @@ -6,6 +6,7 @@ from functools import cached_property from typing import Annotated, cast +from common_library.basic_types import DEFAULT_FACTORY from common_library.pydantic_validators import validate_numeric_string_as_timedelta from fastapi import FastAPI from models_library.basic_types import LogLevel, PortInt @@ -50,38 +51,53 @@ class ComputationalBackendSettings(BaseCustomSettings): - COMPUTATIONAL_BACKEND_ENABLED: bool = Field( - default=True, - ) - COMPUTATIONAL_BACKEND_SCHEDULING_CONCURRENCY: PositiveInt = Field( - default=50, - description="defines how many pipelines the application can schedule concurrently", - ) - COMPUTATIONAL_BACKEND_DASK_CLIENT_ENABLED: bool = Field( - default=True, - ) - COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_URL: AnyUrl = Field( - ..., - description="This is the cluster that will be used by default" - " when submitting computational services (typically " - "tcp://dask-scheduler:8786, tls://dask-scheduler:8786 for the internal cluster", - ) - COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_AUTH: ClusterAuthentication = Field( - default=..., - description="this is the cluster authentication that will be used by default", - ) - COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_FILE_LINK_TYPE: FileLinkType = Field( - FileLinkType.S3, - description=f"Default file link type to use with the internal cluster '{list(FileLinkType)}'", - ) - COMPUTATIONAL_BACKEND_DEFAULT_FILE_LINK_TYPE: FileLinkType = Field( - FileLinkType.PRESIGNED, - description=f"Default file link type to use with computational backend '{list(FileLinkType)}'", - ) - COMPUTATIONAL_BACKEND_ON_DEMAND_CLUSTERS_FILE_LINK_TYPE: FileLinkType = Field( - FileLinkType.PRESIGNED, - description=f"Default file link type to use with computational backend on-demand clusters '{list(FileLinkType)}'", - ) + COMPUTATIONAL_BACKEND_ENABLED: bool = True + COMPUTATIONAL_BACKEND_SCHEDULING_CONCURRENCY: Annotated[ + PositiveInt, + Field( + description="defines how many pipelines the application can schedule concurrently" + ), + ] = 50 + COMPUTATIONAL_BACKEND_DASK_CLIENT_ENABLED: bool = True + COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_URL: Annotated[ + AnyUrl, + Field( + description="This is the cluster that will be used by default" + " when submitting computational services (typically " + "tcp://dask-scheduler:8786, tls://dask-scheduler:8786 for the internal cluster", + ), + ] + COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_AUTH: Annotated[ + ClusterAuthentication, + Field( + description="this is the cluster authentication that will be used by default" + ), + ] + COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_FILE_LINK_TYPE: Annotated[ + FileLinkType, + Field( + description=f"Default file link type to use with the internal cluster '{list(FileLinkType)}'" + ), + ] = FileLinkType.S3 + COMPUTATIONAL_BACKEND_DEFAULT_FILE_LINK_TYPE: Annotated[ + FileLinkType, + Field( + description=f"Default file link type to use with computational backend '{list(FileLinkType)}'" + ), + ] = FileLinkType.PRESIGNED + COMPUTATIONAL_BACKEND_ON_DEMAND_CLUSTERS_FILE_LINK_TYPE: Annotated[ + FileLinkType, + Field( + description=f"Default file link type to use with computational backend on-demand clusters '{list(FileLinkType)}'" + ), + ] = FileLinkType.PRESIGNED + COMPUTATIONAL_BACKEND_MAX_WAITING_FOR_CLUSTER_TIMEOUT: Annotated[ + datetime.timedelta, + Field( + description="maximum time a pipeline can wait for a cluster to start" + "(default to seconds, or see https://pydantic-docs.helpmanual.io/usage/types/#datetime-types for string formatting)." + ), + ] = datetime.timedelta(minutes=10) @cached_property def default_cluster(self) -> BaseCluster: @@ -111,91 +127,107 @@ class AppSettings(BaseApplicationSettings, MixinLoggingSettings): ), ] = LogLevel.INFO - DIRECTOR_V2_LOG_FORMAT_LOCAL_DEV_ENABLED: bool = Field( - default=False, - validation_alias=AliasChoices( - "DIRECTOR_V2_LOG_FORMAT_LOCAL_DEV_ENABLED", - "LOG_FORMAT_LOCAL_DEV_ENABLED", + DIRECTOR_V2_LOG_FORMAT_LOCAL_DEV_ENABLED: Annotated[ + bool, + Field( + validation_alias=AliasChoices( + "DIRECTOR_V2_LOG_FORMAT_LOCAL_DEV_ENABLED", + "LOG_FORMAT_LOCAL_DEV_ENABLED", + ), + description="Enables local development log format. WARNING: make sure it is disabled if you want to have structured logs!", ), - description="Enables local development log format. WARNING: make sure it is disabled if you want to have structured logs!", - ) - DIRECTOR_V2_LOG_FILTER_MAPPING: dict[LoggerName, list[MessageSubstring]] = Field( - default_factory=dict, - validation_alias=AliasChoices( - "DIRECTOR_V2_LOG_FILTER_MAPPING", "LOG_FILTER_MAPPING" + ] = False + DIRECTOR_V2_LOG_FILTER_MAPPING: Annotated[ + dict[LoggerName, list[MessageSubstring]], + Field( + default_factory=dict, + validation_alias=AliasChoices( + "DIRECTOR_V2_LOG_FILTER_MAPPING", "LOG_FILTER_MAPPING" + ), + description="is a dictionary that maps specific loggers (such as 'uvicorn.access' or 'gunicorn.access') to a list of log message patterns that should be filtered out.", ), - description="is a dictionary that maps specific loggers (such as 'uvicorn.access' or 'gunicorn.access') to a list of log message patterns that should be filtered out.", - ) + ] = DEFAULT_FACTORY DIRECTOR_V2_DEV_FEATURES_ENABLED: bool = False - DIRECTOR_V2_DEV_FEATURE_R_CLONE_MOUNTS_ENABLED: bool = Field( - default=False, - description=( - "Under development feature. If enabled state " - "is saved using rclone docker volumes." + DIRECTOR_V2_DEV_FEATURE_R_CLONE_MOUNTS_ENABLED: Annotated[ + bool, + Field( + description=( + "Under development feature. If enabled state " + "is saved using rclone docker volumes." + ) ), - ) + ] = False # for passing self-signed certificate to spawned services - DIRECTOR_V2_SELF_SIGNED_SSL_SECRET_ID: str = Field( - default="", - description="ID of the docker secret containing the self-signed certificate", - ) - DIRECTOR_V2_SELF_SIGNED_SSL_SECRET_NAME: str = Field( - default="", - description="Name of the docker secret containing the self-signed certificate", - ) - DIRECTOR_V2_SELF_SIGNED_SSL_FILENAME: str = Field( - default="", - description="Filepath to self-signed osparc.crt file *as mounted inside the container*, empty strings disables it", - ) + DIRECTOR_V2_SELF_SIGNED_SSL_SECRET_ID: Annotated[ + str, + Field( + description="ID of the docker secret containing the self-signed certificate" + ), + ] = "" + DIRECTOR_V2_SELF_SIGNED_SSL_SECRET_NAME: Annotated[ + str, + Field( + description="Name of the docker secret containing the self-signed certificate" + ), + ] = "" + DIRECTOR_V2_SELF_SIGNED_SSL_FILENAME: Annotated[ + str, + Field( + description="Filepath to self-signed osparc.crt file *as mounted inside the container*, empty strings disables it" + ), + ] = "" DIRECTOR_V2_PROMETHEUS_INSTRUMENTATION_ENABLED: bool = True DIRECTOR_V2_PROFILING: bool = False - DIRECTOR_V2_REMOTE_DEBUGGING_PORT: PortInt | None = Field(default=None) + DIRECTOR_V2_REMOTE_DEBUGGING_PORT: PortInt | None = None # extras - SWARM_STACK_NAME: str = Field(default="undefined-please-check") - SERVICE_TRACKING_HEARTBEAT: datetime.timedelta = Field( - default=DEFAULT_RESOURCE_USAGE_HEARTBEAT_INTERVAL, - description="Service scheduler heartbeat (everytime a heartbeat is sent into RabbitMQ)" - " (default to seconds, or see https://pydantic-docs.helpmanual.io/usage/types/#datetime-types for string formating)", - ) + SWARM_STACK_NAME: str = "undefined-please-check" + SERVICE_TRACKING_HEARTBEAT: Annotated[ + datetime.timedelta, + Field( + description="Service scheduler heartbeat (everytime a heartbeat is sent into RabbitMQ)" + " (default to seconds, or see https://pydantic-docs.helpmanual.io/usage/types/#datetime-types for string formating)" + ), + ] = DEFAULT_RESOURCE_USAGE_HEARTBEAT_INTERVAL - SIMCORE_SERVICES_NETWORK_NAME: str | None = Field( - default=None, - description="used to find the right network name", - ) - SIMCORE_SERVICES_PREFIX: str | None = Field( - "simcore/services", - description="useful when developing with an alternative registry namespace", - ) + SIMCORE_SERVICES_NETWORK_NAME: Annotated[ + str | None, Field(description="used to find the right network name") + ] = None + SIMCORE_SERVICES_PREFIX: Annotated[ + str | None, + Field( + description="useful when developing with an alternative registry namespace" + ), + ] = "simcore/services" - DIRECTOR_V2_NODE_PORTS_400_REQUEST_TIMEOUT_ATTEMPTS: NonNegativeInt = Field( - default=NODE_PORTS_400_REQUEST_TIMEOUT_ATTEMPTS_DEFAULT_VALUE, - description="forwarded to sidecars which use nodeports", - ) + DIRECTOR_V2_NODE_PORTS_400_REQUEST_TIMEOUT_ATTEMPTS: Annotated[ + NonNegativeInt, Field(description="forwarded to sidecars which use nodeports") + ] = NODE_PORTS_400_REQUEST_TIMEOUT_ATTEMPTS_DEFAULT_VALUE # debug settings - CLIENT_REQUEST: ClientRequestSettings = Field( - json_schema_extra={"auto_default_from_env": True} - ) + CLIENT_REQUEST: Annotated[ + ClientRequestSettings, Field(json_schema_extra={"auto_default_from_env": True}) + ] = DEFAULT_FACTORY # App modules settings --------------------- DIRECTOR_V2_STORAGE: Annotated[ StorageSettings, Field(json_schema_extra={"auto_default_from_env": True}) ] - DIRECTOR_V2_NODE_PORTS_STORAGE_AUTH: StorageAuthSettings | None = Field( - json_schema_extra={"auto_default_from_env": True} - ) + DIRECTOR_V2_NODE_PORTS_STORAGE_AUTH: Annotated[ + StorageAuthSettings | None, + Field(json_schema_extra={"auto_default_from_env": True}), + ] = None DIRECTOR_V2_CATALOG: Annotated[ CatalogSettings | None, Field(json_schema_extra={"auto_default_from_env": True}) ] - DIRECTOR_V0: DirectorV0Settings = Field( - json_schema_extra={"auto_default_from_env": True} - ) + DIRECTOR_V0: Annotated[ + DirectorV0Settings, Field(json_schema_extra={"auto_default_from_env": True}) + ] = DEFAULT_FACTORY DYNAMIC_SERVICES: Annotated[ DynamicServicesSettings, @@ -206,35 +238,47 @@ class AppSettings(BaseApplicationSettings, MixinLoggingSettings): PostgresSettings, Field(json_schema_extra={"auto_default_from_env": True}) ] - REDIS: RedisSettings = Field(json_schema_extra={"auto_default_from_env": True}) + REDIS: Annotated[ + RedisSettings, Field(json_schema_extra={"auto_default_from_env": True}) + ] = DEFAULT_FACTORY - DIRECTOR_V2_RABBITMQ: RabbitSettings = Field( - json_schema_extra={"auto_default_from_env": True} - ) + DIRECTOR_V2_RABBITMQ: Annotated[ + RabbitSettings, Field(json_schema_extra={"auto_default_from_env": True}) + ] = DEFAULT_FACTORY - TRAEFIK_SIMCORE_ZONE: str = Field("internal_simcore_stack") + TRAEFIK_SIMCORE_ZONE: str = "internal_simcore_stack" - DIRECTOR_V2_COMPUTATIONAL_BACKEND: ComputationalBackendSettings = Field( - json_schema_extra={"auto_default_from_env": True} - ) + DIRECTOR_V2_COMPUTATIONAL_BACKEND: Annotated[ + ComputationalBackendSettings, + Field(json_schema_extra={"auto_default_from_env": True}), + ] = DEFAULT_FACTORY - DIRECTOR_V2_DOCKER_REGISTRY: RegistrySettings = Field( - json_schema_extra={"auto_default_from_env": True}, - description="settings for the private registry deployed with the platform", - ) - DIRECTOR_V2_DOCKER_HUB_REGISTRY: RegistrySettings | None = Field( - default=None, description="public DockerHub registry settings" - ) + DIRECTOR_V2_DOCKER_REGISTRY: Annotated[ + RegistrySettings, + Field( + json_schema_extra={"auto_default_from_env": True}, + description="settings for the private registry deployed with the platform", + ), + ] = DEFAULT_FACTORY + DIRECTOR_V2_DOCKER_HUB_REGISTRY: Annotated[ + RegistrySettings | None, Field(description="public DockerHub registry settings") + ] = None - DIRECTOR_V2_RESOURCE_USAGE_TRACKER: ResourceUsageTrackerSettings = Field( - json_schema_extra={"auto_default_from_env": True}, - description="resource usage tracker service client's plugin", - ) + DIRECTOR_V2_RESOURCE_USAGE_TRACKER: Annotated[ + ResourceUsageTrackerSettings, + Field( + json_schema_extra={"auto_default_from_env": True}, + description="resource usage tracker service client's plugin", + ), + ] = DEFAULT_FACTORY - DIRECTOR_V2_TRACING: TracingSettings | None = Field( - json_schema_extra={"auto_default_from_env": True}, - description="settings for opentelemetry tracing", - ) + DIRECTOR_V2_TRACING: Annotated[ + TracingSettings | None, + Field( + json_schema_extra={"auto_default_from_env": True}, + description="settings for opentelemetry tracing", + ), + ] = None @field_validator("LOG_LEVEL", mode="before") @classmethod diff --git a/services/director-v2/src/simcore_service_director_v2/models/comp_tasks.py b/services/director-v2/src/simcore_service_director_v2/models/comp_tasks.py index 9d1513db3c9e..e13607d22441 100644 --- a/services/director-v2/src/simcore_service_director_v2/models/comp_tasks.py +++ b/services/director-v2/src/simcore_service_director_v2/models/comp_tasks.py @@ -276,3 +276,15 @@ class ComputationTaskForRpcDBGet(BaseModel): image: dict[str, Any] started_at: dt.datetime | None ended_at: dt.datetime | None + + @field_validator("state", mode="before") + @classmethod + def _convert_from_state_type_enum_if_needed(cls, v): + if isinstance(v, str): + # try to convert to a StateType, if it fails the validations will continue + # and pydantic will try to convert it to a RunninState later on + with suppress(ValueError): + v = StateType(v) + if isinstance(v, StateType): + return RunningState(DB_TO_RUNNING_STATE[v]) + return v diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py index e0d574ebc0f4..078d4510b914 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py @@ -21,6 +21,7 @@ import arrow import networkx as nx +from common_library.user_messages import user_message from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID, NodeIDStr from models_library.projects_state import RunningState @@ -30,6 +31,7 @@ from networkx.classes.reportviews import InDegreeView from pydantic import PositiveInt from servicelib.common_headers import UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE +from servicelib.logging_errors import create_troubleshootting_log_kwargs from servicelib.logging_utils import log_catch, log_context from servicelib.rabbitmq import RabbitMQClient, RabbitMQRPCClient from servicelib.redis import RedisClientSDK @@ -76,7 +78,10 @@ _Previous = CompTaskAtDB _Current = CompTaskAtDB -_MAX_WAITING_FOR_CLUSTER_TIMEOUT_IN_MIN: Final[int] = 10 + +_MAX_WAITING_TIME_FOR_UNKNOWN_TASKS: Final[datetime.timedelta] = datetime.timedelta( + seconds=30 +) def _auto_schedule_callback( @@ -117,11 +122,6 @@ class SortedTasks: potentially_lost: list[CompTaskAtDB] -_MAX_WAITING_TIME_FOR_UNKNOWN_TASKS: Final[datetime.timedelta] = datetime.timedelta( - seconds=30 -) - - async def _triage_changed_tasks( changed_tasks: list[tuple[_Previous, _Current]], ) -> SortedTasks: @@ -248,7 +248,10 @@ async def _set_run_result( self.rabbitmq_client, user_id=user_id, project_id=project_id, - log=f"Pipeline run {run_result.value} for iteration {iteration} is done with {run_result.value} state", + log=user_message( + f"Project pipeline execution for iteration {iteration} has completed with status: {run_result.value}", + _version=1, + ), log_level=logging.INFO, ) await publish_pipeline_scheduling_state( @@ -273,9 +276,12 @@ async def _set_processing_done( ) async def _set_states_following_failed_to_aborted( - self, project_id: ProjectID, dag: nx.DiGraph, run_id: PositiveInt + self, + project_id: ProjectID, + dag: nx.DiGraph, + tasks: dict[NodeIDStr, CompTaskAtDB], + run_id: PositiveInt, ) -> dict[NodeIDStr, CompTaskAtDB]: - tasks = await self._get_pipeline_tasks(project_id, dag) # Perform a reverse topological sort to ensure tasks are ordered from last to first sorted_node_ids = list(reversed(list(nx.topological_sort(dag)))) tasks = { @@ -629,20 +635,28 @@ async def apply( msg=f"scheduling pipeline {user_id=}:{project_id=}:{iteration=}", ): dag: nx.DiGraph = nx.DiGraph() + try: comp_run = await CompRunsRepository.instance(self.db_engine).get( user_id, project_id, iteration ) dag = await self._get_pipeline_dag(project_id) + # 1. Update our list of tasks with data from backend (state, results) await self._update_states_from_comp_backend( user_id, project_id, iteration, dag, comp_run ) - # 2. Any task following a FAILED task shall be ABORTED + # 1.1. get the updated tasks NOTE: we need to get them again as some states might have changed + comp_tasks = await self._get_pipeline_tasks(project_id, dag) + # 2. timeout if waiting for cluster has been there for more than X minutes + comp_tasks = await self._timeout_if_waiting_for_cluster_too_long( + user_id, project_id, comp_run, comp_tasks + ) + # 3. Any task following a FAILED task shall be ABORTED comp_tasks = await self._set_states_following_failed_to_aborted( - project_id, dag, comp_run.run_id + project_id, dag, comp_tasks, comp_run.run_id ) - # 3. do we want to stop the pipeline now? + # 4. do we want to stop the pipeline now? if comp_run.cancelled: comp_tasks = await self._schedule_tasks_to_stop( user_id, project_id, comp_tasks, comp_run @@ -664,10 +678,7 @@ async def apply( iteration=iteration, ), ) - # 4. timeout if waiting for cluster has been there for more than X minutes - comp_tasks = await self._timeout_if_waiting_for_cluster_too_long( - user_id, project_id, comp_run.run_id, comp_tasks - ) + # 5. send a heartbeat await self._send_running_tasks_heartbeat( user_id, project_id, comp_run.run_id, iteration, dag @@ -687,42 +698,75 @@ async def apply( f"{project_id=}", f"{pipeline_result=}", ) - except PipelineNotFoundError: - _logger.warning( - "pipeline %s does not exist in comp_pipeline table, it will be removed from scheduler", - f"{project_id=}", + except PipelineNotFoundError as exc: + _logger.exception( + **create_troubleshootting_log_kwargs( + f"pipeline {project_id} is missing from `comp_pipelines` DB table, something is corrupted. Aborting scheduling", + error=exc, + error_context={ + "user_id": f"{user_id}", + "project_id": f"{project_id}", + "iteration": f"{iteration}", + }, + tip="Check that the project still exists", + ) ) + + # NOTE: no need to update task states here as pipeline is already broken await self._set_run_result( - user_id, project_id, iteration, RunningState.ABORTED + user_id, project_id, iteration, RunningState.FAILED ) except InvalidPipelineError as exc: - _logger.warning( - "pipeline %s appears to be misconfigured, it will be removed from scheduler. Please check pipeline:\n%s", - f"{project_id=}", - exc, + _logger.exception( + **create_troubleshootting_log_kwargs( + f"pipeline {project_id} appears to be misconfigured. Aborting scheduling", + error=exc, + error_context={ + "user_id": f"{user_id}", + "project_id": f"{project_id}", + "iteration": f"{iteration}", + }, + tip="Check that the project pipeline is valid and all tasks are present in the DB", + ), ) + # NOTE: no need to update task states here as pipeline is already broken await self._set_run_result( - user_id, project_id, iteration, RunningState.ABORTED + user_id, project_id, iteration, RunningState.FAILED ) - except (DaskClientAcquisisitonError, ClustersKeeperNotAvailableError): + except ( + DaskClientAcquisisitonError, + ComputationalBackendNotConnectedError, + ClustersKeeperNotAvailableError, + ) as exc: _logger.exception( - "Unexpected error while connecting with computational backend, aborting pipeline" - ) - tasks: dict[NodeIDStr, CompTaskAtDB] = await self._get_pipeline_tasks( - project_id, dag + **create_troubleshootting_log_kwargs( + "Unexpectedly lost connection to the computational backend. Tasks are set back to WAITING_FOR_CLUSTER state until we eventually reconnect", + error=exc, + error_context={ + "user_id": f"{user_id}", + "project_id": f"{project_id}", + "iteration": f"{iteration}", + }, + tip="Check network connection and the status of the computational backend (clusters-keeper, dask-scheduler, dask-workers)", + ) ) + processing_tasks = { + k: v + for k, v in ( + await self._get_pipeline_tasks(project_id, dag) + ).items() + if v.state in PROCESSING_STATES + } comp_tasks_repo = CompTasksRepository(self.db_engine) await comp_tasks_repo.update_project_tasks_state( project_id, comp_run.run_id, - [t.node_id for t in tasks.values()], - RunningState.FAILED, + [t.node_id for t in processing_tasks.values()], + RunningState.WAITING_FOR_CLUSTER, ) await self._set_run_result( - user_id, project_id, iteration, RunningState.FAILED + user_id, project_id, iteration, RunningState.WAITING_FOR_CLUSTER ) - except ComputationalBackendNotConnectedError: - _logger.exception("Computational backend is not connected!") finally: await self._set_processing_done(user_id, project_id, iteration) @@ -755,7 +799,7 @@ async def _schedule_tasks_to_stop( return comp_tasks - async def _schedule_tasks_to_start( # noqa: C901 + async def _schedule_tasks_to_start( self, user_id: UserID, project_id: ProjectID, @@ -800,10 +844,32 @@ async def _schedule_tasks_to_start( # noqa: C901 except ( ComputationalBackendNotConnectedError, ComputationalSchedulerChangedError, - ): + ClustersKeeperNotAvailableError, + ) as exc: _logger.exception( - "Issue with computational backend. Tasks are set back " - "to WAITING_FOR_CLUSTER state until scheduler comes back!", + **create_troubleshootting_log_kwargs( + "Computational backend is not connected. Tasks are set back " + "to WAITING_FOR_CLUSTER state until scheduler comes back!", + error=exc, + error_context={ + "user_id": f"{user_id}", + "project_id": f"{project_id}", + "tasks_ready_to_start": f"{list(tasks_ready_to_start.keys())}", + "comp_run_use_on_demand_clusters": f"{comp_run.use_on_demand_clusters}", + "comp_run.run_id": f"{comp_run.run_id}", + }, + ) + ) + + await publish_project_log( + self.rabbitmq_client, + user_id, + project_id, + log=user_message( + "An unexpected error occurred during task scheduling. Please contact oSparc support if this issue persists.", + _version=1, + ), + log_level=logging.ERROR, ) await CompTasksRepository.instance( self.db_engine @@ -818,7 +884,17 @@ async def _schedule_tasks_to_start( # noqa: C901 except ComputationalBackendOnDemandNotReadyError as exc: _logger.info( - "The on demand computational backend is not ready yet: %s", exc + **create_troubleshootting_log_kwargs( + "The on demand computational backend is not ready yet. Tasks are set to WAITING_FOR_CLUSTER state until the cluster is ready!", + error=exc, + error_context={ + "user_id": f"{user_id}", + "project_id": f"{project_id}", + "tasks_ready_to_start": f"{list(tasks_ready_to_start.keys())}", + "comp_run_use_on_demand_clusters": f"{comp_run.use_on_demand_clusters}", + "comp_run.run_id": f"{comp_run.run_id}", + }, + ) ) await publish_project_log( self.rabbitmq_client, @@ -838,34 +914,20 @@ async def _schedule_tasks_to_start( # noqa: C901 ) for task in tasks_ready_to_start: comp_tasks[f"{task}"].state = RunningState.WAITING_FOR_CLUSTER - except ClustersKeeperNotAvailableError: - _logger.exception("Unexpected error while starting tasks:") - await publish_project_log( - self.rabbitmq_client, - user_id, - project_id, - log="Unexpected error while scheduling computational tasks! TIP: contact osparc support.", - log_level=logging.ERROR, - ) - - await CompTasksRepository.instance( - self.db_engine - ).update_project_tasks_state( - project_id, - comp_run.run_id, - list(tasks_ready_to_start.keys()), - RunningState.FAILED, - optional_progress=1.0, - optional_stopped=arrow.utcnow().datetime, - ) - for task in tasks_ready_to_start: - comp_tasks[f"{task}"].state = RunningState.FAILED - raise except TaskSchedulingError as exc: _logger.exception( - "Project '%s''s task '%s' could not be scheduled", - exc.project_id, - exc.node_id, + **create_troubleshootting_log_kwargs( + "A task could not be scheduled, it is set to FAILED and the rest of the pipeline will be ABORTED", + error=exc, + error_context={ + "user_id": f"{user_id}", + "project_id": f"{project_id}", + "node_id": f"{exc.node_id}", + "tasks_ready_to_start": f"{list(tasks_ready_to_start.keys())}", + "comp_run_use_on_demand_clusters": f"{comp_run.use_on_demand_clusters}", + "comp_run.run_id": f"{comp_run.run_id}", + }, + ) ) await CompTasksRepository.instance( self.db_engine @@ -879,13 +941,19 @@ async def _schedule_tasks_to_start( # noqa: C901 optional_stopped=arrow.utcnow().datetime, ) comp_tasks[f"{exc.node_id}"].state = RunningState.FAILED - except Exception: + except Exception as exc: _logger.exception( - "Unexpected error for %s with %s on %s happened when scheduling %s:", - f"{comp_run.user_id=}", - f"{comp_run.project_uuid=}", - f"{comp_run.use_on_demand_clusters=}", - f"{tasks_ready_to_start.keys()=}", + **create_troubleshootting_log_kwargs( + "Unexpected error happened when scheduling tasks, all tasks to start are set to FAILED and the rest of the pipeline will be ABORTED", + error=exc, + error_context={ + "user_id": f"{comp_run.user_id}", + "project_id": f"{comp_run.project_uuid}", + "tasks_ready_to_start": f"{list(tasks_ready_to_start.keys())}", + "comp_run_use_on_demand_clusters": f"{comp_run.use_on_demand_clusters}", + "comp_run.run_id": f"{comp_run.run_id}", + }, + ) ) await CompTasksRepository.instance( self.db_engine @@ -907,33 +975,39 @@ async def _timeout_if_waiting_for_cluster_too_long( self, user_id: UserID, project_id: ProjectID, - run_id: PositiveInt, + comp_run: CompRunsAtDB, comp_tasks: dict[NodeIDStr, CompTaskAtDB], ) -> dict[NodeIDStr, CompTaskAtDB]: - if all( - c.state is RunningState.WAITING_FOR_CLUSTER for c in comp_tasks.values() - ): + if comp_run.result is RunningState.WAITING_FOR_CLUSTER: + tasks_waiting_for_cluster = [ + t + for t in comp_tasks.values() + if t.state is RunningState.WAITING_FOR_CLUSTER + ] # get latest modified task latest_modified_of_all_tasks = max( - comp_tasks.values(), key=lambda task: task.modified + tasks_waiting_for_cluster, key=lambda task: task.modified ).modified if ( arrow.utcnow().datetime - latest_modified_of_all_tasks - ) > datetime.timedelta(minutes=_MAX_WAITING_FOR_CLUSTER_TIMEOUT_IN_MIN): + ) > self.settings.COMPUTATIONAL_BACKEND_MAX_WAITING_FOR_CLUSTER_TIMEOUT: await CompTasksRepository.instance( self.db_engine ).update_project_tasks_state( project_id, - run_id, - [NodeID(idstr) for idstr in comp_tasks], + comp_run.run_id, + [task.node_id for task in tasks_waiting_for_cluster], RunningState.FAILED, optional_progress=1.0, optional_stopped=arrow.utcnow().datetime, ) - for task in comp_tasks.values(): + for task in tasks_waiting_for_cluster: task.state = RunningState.FAILED - msg = "Timed-out waiting for computational cluster! Please try again and/or contact Osparc support." + msg = user_message( + "The system has timed out while waiting for computational resources. Please try running your project again or contact oSparc support if this issue persists.", + _version=1, + ) _logger.error(msg) await publish_project_log( self.rabbitmq_client, diff --git a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py index 7e67e852a0af..2f471360930e 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py @@ -323,11 +323,14 @@ async def list_for_user__only_latest_iterations( total_count = await conn.scalar(count_query) items = [ - ComputationRunRpcGet.model_validate( - { - **row, - "state": DB_TO_RUNNING_STATE[row.state], - } + ComputationRunRpcGet( + project_uuid=row.project_uuid, + iteration=row.iteration, + state=DB_TO_RUNNING_STATE[row.state], + info=row.info, + submitted_at=row.submitted_at, + started_at=row.started_at, + ended_at=row.ended_at, ) async for row in await conn.stream(list_query) ] @@ -525,7 +528,7 @@ async def create( iteration = await _get_next_iteration(conn, user_id, project_id) result = await conn.execute( - comp_runs.insert() # pylint: disable=no-value-for-parameter + comp_runs.insert() .values( user_id=user_id, project_uuid=f"{project_id}", diff --git a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_core.py b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_core.py index ec997b011579..1195930d8bd7 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_core.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_core.py @@ -23,7 +23,7 @@ from .....models.comp_tasks import CompTaskAtDB, ComputationTaskForRpcDBGet from .....modules.resource_usage_tracker_client import ResourceUsageTrackerClient from .....utils.computations import to_node_class -from .....utils.db import DB_TO_RUNNING_STATE, RUNNING_STATE_TO_DB +from .....utils.db import RUNNING_STATE_TO_DB from ....catalog import CatalogClient from ...tables import NodeClass, StateType, comp_run_snapshot_tasks, comp_tasks from .._base import BaseRepository @@ -130,12 +130,7 @@ async def list_computational_tasks_rpc_domain( total_count = await conn.scalar(count_query) items = [ - ComputationTaskForRpcDBGet.model_validate( - { - **row, - "state": DB_TO_RUNNING_STATE[row["state"]], # Convert the state - } - ) + ComputationTaskForRpcDBGet.model_validate(row, from_attributes=True) async for row in await conn.stream(list_query) ] return cast(int, total_count), items diff --git a/services/director-v2/src/simcore_service_director_v2/utils/dask.py b/services/director-v2/src/simcore_service_director_v2/utils/dask.py index 0d29f9e58918..7a0d1fff7b9e 100644 --- a/services/director-v2/src/simcore_service_director_v2/utils/dask.py +++ b/services/director-v2/src/simcore_service_director_v2/utils/dask.py @@ -481,7 +481,7 @@ def check_scheduler_is_still_the_same( ) -def check_communication_with_scheduler_is_open(client: distributed.Client): +def check_communication_with_scheduler_is_open(client: distributed.Client) -> None: if ( client.scheduler_comm and client.scheduler_comm.comm is not None @@ -490,12 +490,9 @@ def check_communication_with_scheduler_is_open(client: distributed.Client): raise ComputationalBackendNotConnectedError -def check_scheduler_status(client: distributed.Client): +def check_scheduler_status(client: distributed.Client) -> None: client_status = client.status if client_status not in "running": - _logger.error( - "The computational backend is not connected!", - ) raise ComputationalBackendNotConnectedError diff --git a/services/director-v2/tests/unit/with_dbs/comp_scheduler/conftest.py b/services/director-v2/tests/unit/with_dbs/comp_scheduler/conftest.py index 0804a848d356..5f78823f2880 100644 --- a/services/director-v2/tests/unit/with_dbs/comp_scheduler/conftest.py +++ b/services/director-v2/tests/unit/with_dbs/comp_scheduler/conftest.py @@ -8,6 +8,7 @@ # pylint: disable=too-many-statements +import datetime from unittest import mock import pytest @@ -69,3 +70,15 @@ def with_disabled_scheduler_publisher(mocker: MockerFixture) -> mock.Mock: "simcore_service_director_v2.modules.comp_scheduler._manager.request_pipeline_scheduling", autospec=True, ) + + +@pytest.fixture +def with_short_max_wait_for_clusters_keeper( + monkeypatch: pytest.MonkeyPatch, mocker: MockerFixture +) -> datetime.timedelta: + short_time = datetime.timedelta(seconds=5) + setenvs_from_dict( + monkeypatch, + {"COMPUTATIONAL_BACKEND_MAX_WAITING_FOR_CLUSTER_TIMEOUT": f"{short_time}"}, + ) + return short_time diff --git a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py index 4e298139cb58..638d10f88197 100644 --- a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py +++ b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py @@ -1272,7 +1272,7 @@ async def test_broken_pipeline_configuration_is_not_scheduled_and_aborted( await assert_comp_runs( sqlalchemy_async_engine, expected_total=1, - expected_state=RunningState.ABORTED, + expected_state=RunningState.FAILED, where_statement=(comp_runs.c.user_id == user["id"]) & (comp_runs.c.project_uuid == f"{sleepers_project.uuid}"), ) @@ -2048,9 +2048,10 @@ async def test_pipeline_with_on_demand_cluster_with_not_ready_backend_waits( "get_or_create_exception", [ClustersKeeperNotAvailableError], ) -async def test_pipeline_with_on_demand_cluster_with_no_clusters_keeper_fails( +async def test_pipeline_with_on_demand_cluster_with_no_clusters_keeper_waits_and_eventually_timesout_fails( with_disabled_auto_scheduling: mock.Mock, with_disabled_scheduler_publisher: mock.Mock, + with_short_max_wait_for_clusters_keeper: datetime.timedelta, initialized_app: FastAPI, scheduler_api: BaseCompScheduler, sqlalchemy_async_engine: AsyncEngine, @@ -2061,8 +2062,6 @@ async def test_pipeline_with_on_demand_cluster_with_no_clusters_keeper_fails( computational_pipeline_rabbit_client_parser: mock.AsyncMock, fake_collection_run_id: CollectionRunID, ): - # needs to change: https://github.com/ITISFoundation/osparc-simcore/issues/6817 - mocked_get_or_create_cluster.side_effect = get_or_create_exception # running the pipeline will trigger a call to the clusters-keeper assert published_project.project.prj_owner @@ -2100,8 +2099,8 @@ async def test_pipeline_with_on_demand_cluster_with_no_clusters_keeper_fails( expected_progress=None, run_id=run_in_db.run_id, ) - # now it should switch to failed, the run still runs until the next iteration - expected_failed_tasks = [ + # now it should switch to waiting for cluster and waits + expected_waiting_for_cluster_tasks = [ published_project.tasks[1], published_project.tasks[3], ] @@ -2116,7 +2115,7 @@ async def test_pipeline_with_on_demand_cluster_with_no_clusters_keeper_fails( await assert_comp_runs( sqlalchemy_async_engine, expected_total=1, - expected_state=RunningState.FAILED, + expected_state=RunningState.WAITING_FOR_CLUSTER, where_statement=and_( comp_runs.c.user_id == published_project.project.prj_owner, comp_runs.c.project_uuid == f"{published_project.project.uuid}", @@ -2130,12 +2129,44 @@ async def test_pipeline_with_on_demand_cluster_with_no_clusters_keeper_fails( await assert_comp_tasks_and_comp_run_snapshot_tasks( sqlalchemy_async_engine, project_uuid=published_project.project.uuid, - task_ids=[t.node_id for t in expected_failed_tasks], - expected_state=RunningState.FAILED, - expected_progress=1.0, + task_ids=[t.node_id for t in expected_waiting_for_cluster_tasks], + expected_state=RunningState.WAITING_FOR_CLUSTER, + expected_progress=None, + run_id=run_in_db.run_id, + ) + # again will trigger the call again + await scheduler_api.apply( + user_id=run_in_db.user_id, + project_id=run_in_db.project_uuid, + iteration=run_in_db.iteration, + ) + mocked_get_or_create_cluster.assert_called() + assert mocked_get_or_create_cluster.call_count == 1 + mocked_get_or_create_cluster.reset_mock() + await assert_comp_runs( + sqlalchemy_async_engine, + expected_total=1, + expected_state=RunningState.WAITING_FOR_CLUSTER, + where_statement=and_( + comp_runs.c.user_id == published_project.project.prj_owner, + comp_runs.c.project_uuid == f"{published_project.project.uuid}", + ), + ) + await _assert_message_received( + computational_pipeline_rabbit_client_parser, + 0, + ComputationalPipelineStatusMessage.model_validate_json, + ) + await assert_comp_tasks_and_comp_run_snapshot_tasks( + sqlalchemy_async_engine, + project_uuid=published_project.project.uuid, + task_ids=[t.node_id for t in expected_waiting_for_cluster_tasks], + expected_state=RunningState.WAITING_FOR_CLUSTER, + expected_progress=None, run_id=run_in_db.run_id, ) - # again will not re-trigger the call to clusters-keeper + await asyncio.sleep(with_short_max_wait_for_clusters_keeper.total_seconds() + 1) + # again will trigger the call again, but now it will start failing, first the task will be mark as FAILED await scheduler_api.apply( user_id=run_in_db.user_id, project_id=run_in_db.project_uuid, @@ -2153,13 +2184,13 @@ async def test_pipeline_with_on_demand_cluster_with_no_clusters_keeper_fails( ) await _assert_message_received( computational_pipeline_rabbit_client_parser, - 0, + 1, ComputationalPipelineStatusMessage.model_validate_json, ) await assert_comp_tasks_and_comp_run_snapshot_tasks( sqlalchemy_async_engine, project_uuid=published_project.project.uuid, - task_ids=[t.node_id for t in expected_failed_tasks], + task_ids=[t.node_id for t in expected_waiting_for_cluster_tasks], expected_state=RunningState.FAILED, expected_progress=1.0, run_id=run_in_db.run_id,