diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_auto_scaling_core.py b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_auto_scaling_core.py index 23a93de2e6a2..a7a027a89452 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_auto_scaling_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_auto_scaling_core.py @@ -575,8 +575,8 @@ def _try_assign_task_to_ec2_instance( _logger.debug( "%s", f"assigned task with {task_required_resources=}, {task_required_ec2_instance=} to " - f"{instance.ec2_instance.id=}:{instance.ec2_instance.type}, " - f"remaining resources:{instance.available_resources}/{instance.ec2_instance.resources}", + f"{instance.ec2_instance.id=}:{instance.ec2_instance.type=}, " + f"{instance.available_resources=}, {instance.ec2_instance.resources=}", ) return True return False @@ -599,8 +599,8 @@ def _try_assign_task_to_ec2_instance_type( _logger.debug( "%s", f"assigned task with {task_required_resources=}, {task_required_ec2_instance=} to " - f"{instance.instance_type}, " - f"remaining resources:{instance.available_resources}/{instance.instance_type.resources}", + f"{instance.instance_type=}, " + f"{instance.available_resources=}, {instance.instance_type.resources=}", ) return True return False @@ -1217,7 +1217,10 @@ async def _scale_down_unused_cluster_instances( ) -> Cluster: if any(not instance.has_assigned_tasks() for instance in cluster.active_nodes): # ask the provider to try to retire nodes actively - with log_catch(_logger, reraise=False): + with ( + log_catch(_logger, reraise=False), + log_context(_logger, logging.INFO, "actively ask to retire unused nodes"), + ): await auto_scaling_mode.try_retire_nodes(app) cluster = await _deactivate_empty_nodes(app, cluster) return await _try_scale_down_cluster(app, cluster) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.py index 3a4ae4a6f97f..dc904a5b435f 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.py @@ -12,7 +12,6 @@ TaskProgressEvent, ) from dask_task_models_library.container_tasks.io import TaskOutputData -from dask_task_models_library.container_tasks.utils import parse_dask_job_id from models_library.clusters import BaseCluster from models_library.errors import ErrorDict from models_library.projects import ProjectID @@ -370,9 +369,8 @@ async def _process_completed_tasks( self._process_task_result( task, result, - comp_run.metadata, iteration, - comp_run.run_id, + comp_run, ) for task, result in zip(tasks, tasks_results, strict=True) ), @@ -380,7 +378,7 @@ async def _process_completed_tasks( ): with log_catch(_logger, reraise=False): task_can_be_cleaned, job_id = await future - if task_can_be_cleaned: + if task_can_be_cleaned and job_id: await client.release_task_result(job_id) async def _handle_successful_run( @@ -411,11 +409,9 @@ async def _handle_successful_run( async def _handle_computational_retrieval_error( self, task: CompTaskAtDB, - user_id: UserID, result: ComputationalBackendTaskResultsNotReadyError, log_error_context: dict[str, Any], ) -> tuple[RunningState, SimcorePlatformStatus, list[ErrorDict], bool]: - assert task.job_id # nosec _logger.warning( **create_troubleshooting_log_kwargs( f"Retrieval of task {task.job_id} result timed-out", @@ -448,10 +444,7 @@ async def _handle_computational_retrieval_error( type=_TASK_RETRIEVAL_ERROR_TYPE, ctx={ _TASK_RETRIEVAL_ERROR_CONTEXT_TIME_KEY: f"{check_time}", - "user_id": user_id, - "project_id": f"{task.project_id}", - "node_id": f"{task.node_id}", - "job_id": task.job_id, + **log_error_context, }, ) ) @@ -472,7 +465,6 @@ async def _handle_computational_backend_not_connected_error( result: ComputationalBackendNotConnectedError, log_error_context: dict[str, Any], ) -> tuple[RunningState, SimcorePlatformStatus, list[ErrorDict], bool]: - assert task.job_id # nosec _logger.warning( **create_troubleshooting_log_kwargs( f"Computational backend disconnected when retrieving task {task.job_id} result", @@ -492,8 +484,6 @@ async def _handle_task_error( result: BaseException, log_error_context: dict[str, Any], ) -> tuple[RunningState, SimcorePlatformStatus, list[ErrorDict], bool]: - assert task.job_id # nosec - # the task itself failed, check why if isinstance(result, TaskCancelledError): _logger.info( @@ -529,102 +519,100 @@ async def _process_task_result( self, task: TaskStateTracker, result: BaseException | TaskOutputData, - run_metadata: RunMetadataDict, iteration: Iteration, - run_id: PositiveInt, - ) -> tuple[bool, str]: + comp_run: CompRunsAtDB, + ) -> tuple[bool, str | None]: """Returns True and the job ID if the task was successfully processed and can be released from the Dask cluster.""" - _logger.debug("received %s result: %s", f"{task=}", f"{result=}") - assert task.current.job_id # nosec - ( - _service_key, - _service_version, - user_id, - project_id, - node_id, - ) = parse_dask_job_id(task.current.job_id) - - assert task.current.project_id == project_id # nosec - assert task.current.node_id == node_id # nosec - log_error_context = { - "user_id": user_id, - "project_id": project_id, - "node_id": node_id, - "job_id": task.current.job_id, - } - - if isinstance(result, TaskOutputData): - ( - task_final_state, - simcore_platform_status, - task_errors, - task_completed, - ) = await self._handle_successful_run( - task.current, result, log_error_context - ) + with log_context( + _logger, logging.DEBUG, msg=f"{comp_run.run_id=}, {task=}, {result=}" + ): + log_error_context = { + "user_id": comp_run.user_id, + "project_id": f"{comp_run.project_uuid}", + "node_id": f"{task.current.node_id}", + "job_id": task.current.job_id, + } + + if isinstance(result, TaskOutputData): + ( + task_final_state, + simcore_platform_status, + task_errors, + task_completed, + ) = await self._handle_successful_run( + task.current, result, log_error_context + ) - elif isinstance(result, ComputationalBackendTaskResultsNotReadyError): - ( - task_final_state, - simcore_platform_status, - task_errors, - task_completed, - ) = await self._handle_computational_retrieval_error( - task.current, user_id, result, log_error_context - ) - elif isinstance(result, ComputationalBackendNotConnectedError): - ( - task_final_state, - simcore_platform_status, - task_errors, - task_completed, - ) = await self._handle_computational_backend_not_connected_error( - task.current, result, log_error_context - ) - else: - ( - task_final_state, - simcore_platform_status, - task_errors, - task_completed, - ) = await self._handle_task_error(task.current, result, log_error_context) - - # we need to remove any invalid files in the storage - await clean_task_output_and_log_files_if_invalid( - self.db_engine, user_id, project_id, node_id - ) + elif isinstance(result, ComputationalBackendTaskResultsNotReadyError): + ( + task_final_state, + simcore_platform_status, + task_errors, + task_completed, + ) = await self._handle_computational_retrieval_error( + task.current, result, log_error_context + ) + elif isinstance(result, ComputationalBackendNotConnectedError): + ( + task_final_state, + simcore_platform_status, + task_errors, + task_completed, + ) = await self._handle_computational_backend_not_connected_error( + task.current, result, log_error_context + ) + else: + ( + task_final_state, + simcore_platform_status, + task_errors, + task_completed, + ) = await self._handle_task_error( + task.current, result, log_error_context + ) - if task_completed: - # resource tracking - await publish_service_resource_tracking_stopped( - self.rabbitmq_client, - ServiceRunID.get_resource_tracking_run_id_for_computational( - user_id, project_id, node_id, iteration - ), - simcore_platform_status=simcore_platform_status, - ) - # instrumentation - await publish_service_stopped_metrics( - self.rabbitmq_client, - user_id=user_id, - simcore_user_agent=run_metadata.get( - "simcore_user_agent", UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE - ), - task=task.current, - task_final_state=task_final_state, - ) + # we need to remove any invalid files in the storage + await clean_task_output_and_log_files_if_invalid( + self.db_engine, + comp_run.user_id, + comp_run.project_uuid, + task.current.node_id, + ) - await CompTasksRepository(self.db_engine).update_project_tasks_state( - task.current.project_id, - run_id, - [task.current.node_id], - task_final_state if task_completed else task.previous.state, - errors=task_errors, - optional_progress=1 if task_completed else None, - optional_stopped=arrow.utcnow().datetime if task_completed else None, - ) + if task_completed: + # resource tracking + await publish_service_resource_tracking_stopped( + self.rabbitmq_client, + ServiceRunID.get_resource_tracking_run_id_for_computational( + comp_run.user_id, + comp_run.project_uuid, + task.current.node_id, + iteration, + ), + simcore_platform_status=simcore_platform_status, + ) + # instrumentation + await publish_service_stopped_metrics( + self.rabbitmq_client, + user_id=comp_run.user_id, + simcore_user_agent=comp_run.metadata.get( + "simcore_user_agent", UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE + ), + task=task.current, + task_final_state=task_final_state, + ) + + await CompTasksRepository(self.db_engine).update_project_tasks_state( + task.current.project_id, + comp_run.run_id, + [task.current.node_id], + task_final_state if task_completed else task.previous.state, + errors=task_errors, + optional_progress=1 if task_completed else None, + optional_stopped=arrow.utcnow().datetime if task_completed else None, + ) - return task_completed, task.current.job_id + return task_completed, task.current.job_id async def _task_progress_change_handler( self, event: tuple[UnixTimestamp, Any] diff --git a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_pipelines.py b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_pipelines.py index b01886564c98..d6faaf3d3624 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_pipelines.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_pipelines.py @@ -42,16 +42,12 @@ async def upsert_pipeline( **pipeline_at_db.model_dump(mode="json", by_alias=True) ) # FIXME: This is not a nice thing. this part of the information should be kept in comp_runs. - update_exclusion_policy = set() - if not dag_graph.nodes(): - update_exclusion_policy.add("dag_adjacency_list") on_update_stmt = insert_stmt.on_conflict_do_update( index_elements=[comp_pipeline.c.project_id], set_=pipeline_at_db.model_dump( mode="json", by_alias=True, exclude_unset=True, - exclude=update_exclusion_policy, ), ) async with self.db_engine.begin() as conn: