3030from networkx .classes .reportviews import InDegreeView
3131from pydantic import PositiveInt
3232from servicelib .common_headers import UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE
33+ from servicelib .logging_errors import create_troubleshootting_log_kwargs
3334from servicelib .logging_utils import log_catch , log_context
3435from servicelib .rabbitmq import RabbitMQClient , RabbitMQRPCClient
3536from servicelib .redis import RedisClientSDK
@@ -641,7 +642,7 @@ async def apply(
641642 await self ._update_states_from_comp_backend (
642643 user_id , project_id , iteration , dag , comp_run
643644 )
644- # 1.bis get the updated tasks NOTE: we need to get them again as some states might have changed
645+ # 1.1. get the updated tasks NOTE: we need to get them again as some states might have changed
645646 comp_tasks = await self ._get_pipeline_tasks (project_id , dag )
646647 # 2. timeout if waiting for cluster has been there for more than X minutes
647648 comp_tasks = await self ._timeout_if_waiting_for_cluster_too_long (
@@ -693,19 +694,36 @@ async def apply(
693694 f"{ project_id = } " ,
694695 f"{ pipeline_result = } " ,
695696 )
696- except PipelineNotFoundError :
697+ except PipelineNotFoundError as exc :
697698 _logger .exception (
698- "pipeline %s is missing from `comp_pipelines` DB table, something is corrupted. Aborting scheduling" ,
699- f"{ project_id = } " ,
699+ ** create_troubleshootting_log_kwargs (
700+ f"pipeline { project_id } is missing from `comp_pipelines` DB table, something is corrupted. Aborting scheduling" ,
701+ error = exc ,
702+ error_context = {
703+ "user_id" : f"{ user_id } " ,
704+ "project_id" : f"{ project_id } " ,
705+ "iteration" : f"{ iteration } " ,
706+ },
707+ tip = "Check that the project still exists" ,
708+ )
700709 )
710+
701711 # NOTE: no need to update task states here as pipeline is already broken
702712 await self ._set_run_result (
703713 user_id , project_id , iteration , RunningState .FAILED
704714 )
705- except InvalidPipelineError :
715+ except InvalidPipelineError as exc :
706716 _logger .exception (
707- "pipeline %s appears to be misconfigured, it will be removed from scheduler. Aborting scheduling" ,
708- f"{ project_id = } " ,
717+ ** create_troubleshootting_log_kwargs (
718+ f"pipeline { project_id } appears to be misconfigured. Aborting scheduling" ,
719+ error = exc ,
720+ error_context = {
721+ "user_id" : f"{ user_id } " ,
722+ "project_id" : f"{ project_id } " ,
723+ "iteration" : f"{ iteration } " ,
724+ },
725+ tip = "Check that the project pipeline is valid and all tasks are present in the DB" ,
726+ ),
709727 )
710728 # NOTE: no need to update task states here as pipeline is already broken
711729 await self ._set_run_result (
@@ -715,11 +733,18 @@ async def apply(
715733 DaskClientAcquisisitonError ,
716734 ComputationalBackendNotConnectedError ,
717735 ClustersKeeperNotAvailableError ,
718- ):
736+ ) as exc :
719737 _logger .exception (
720- "Unexpectedly lost connection to the computational backend. "
721- "TIP: this might be a network error or some service restarting. "
722- "All scheduled tasks will be set to WAITING_FOR_CLUSTER state until we reconnect" ,
738+ ** create_troubleshootting_log_kwargs (
739+ "Unexpectedly lost connection to the computational backend. Tasks are set back to WAITING_FOR_CLUSTER state until we eventually reconnect" ,
740+ error = exc ,
741+ error_context = {
742+ "user_id" : f"{ user_id } " ,
743+ "project_id" : f"{ project_id } " ,
744+ "iteration" : f"{ iteration } " ,
745+ },
746+ tip = "Check network connection and the status of the computational backend (clusters-keeper, dask-scheduler, dask-workers)" ,
747+ )
723748 )
724749 processing_tasks = {
725750 k : v
@@ -816,11 +841,22 @@ async def _schedule_tasks_to_start(
816841 ComputationalBackendNotConnectedError ,
817842 ComputationalSchedulerChangedError ,
818843 ClustersKeeperNotAvailableError ,
819- ):
844+ ) as exc :
820845 _logger .exception (
821- "Issue with computational backend. Tasks are set back "
822- "to WAITING_FOR_CLUSTER state until scheduler comes back!" ,
846+ ** create_troubleshootting_log_kwargs (
847+ "Computational backend is not connected. Tasks are set back "
848+ "to WAITING_FOR_CLUSTER state until scheduler comes back!" ,
849+ error = exc ,
850+ error_context = {
851+ "user_id" : f"{ user_id } " ,
852+ "project_id" : f"{ project_id } " ,
853+ "tasks_ready_to_start" : f"{ list (tasks_ready_to_start .keys ())} " ,
854+ "comp_run_use_on_demand_clusters" : f"{ comp_run .use_on_demand_clusters } " ,
855+ "comp_run.run_id" : f"{ comp_run .run_id } " ,
856+ },
857+ )
823858 )
859+
824860 await publish_project_log (
825861 self .rabbitmq_client ,
826862 user_id ,
@@ -841,7 +877,17 @@ async def _schedule_tasks_to_start(
841877
842878 except ComputationalBackendOnDemandNotReadyError as exc :
843879 _logger .info (
844- "The on demand computational backend is not ready yet: %s" , exc
880+ ** create_troubleshootting_log_kwargs (
881+ "The on demand computational backend is not ready yet. Tasks are set to WAITING_FOR_CLUSTER state until the cluster is ready!" ,
882+ error = exc ,
883+ error_context = {
884+ "user_id" : f"{ user_id } " ,
885+ "project_id" : f"{ project_id } " ,
886+ "tasks_ready_to_start" : f"{ list (tasks_ready_to_start .keys ())} " ,
887+ "comp_run_use_on_demand_clusters" : f"{ comp_run .use_on_demand_clusters } " ,
888+ "comp_run.run_id" : f"{ comp_run .run_id } " ,
889+ },
890+ )
845891 )
846892 await publish_project_log (
847893 self .rabbitmq_client ,
@@ -863,9 +909,18 @@ async def _schedule_tasks_to_start(
863909 comp_tasks [f"{ task } " ].state = RunningState .WAITING_FOR_CLUSTER
864910 except TaskSchedulingError as exc :
865911 _logger .exception (
866- "Project '%s''s task '%s' could not be scheduled" ,
867- exc .project_id ,
868- exc .node_id ,
912+ ** create_troubleshootting_log_kwargs (
913+ "A task could not be scheduled, it is set to FAILED and the rest of the pipeline will be ABORTED" ,
914+ error = exc ,
915+ error_context = {
916+ "user_id" : f"{ user_id } " ,
917+ "project_id" : f"{ project_id } " ,
918+ "node_id" : f"{ exc .node_id } " ,
919+ "tasks_ready_to_start" : f"{ list (tasks_ready_to_start .keys ())} " ,
920+ "comp_run_use_on_demand_clusters" : f"{ comp_run .use_on_demand_clusters } " ,
921+ "comp_run.run_id" : f"{ comp_run .run_id } " ,
922+ },
923+ )
869924 )
870925 await CompTasksRepository .instance (
871926 self .db_engine
@@ -879,13 +934,19 @@ async def _schedule_tasks_to_start(
879934 optional_stopped = arrow .utcnow ().datetime ,
880935 )
881936 comp_tasks [f"{ exc .node_id } " ].state = RunningState .FAILED
882- except Exception :
937+ except Exception as exc :
883938 _logger .exception (
884- "Unexpected error for %s with %s on %s happened when scheduling %s:" ,
885- f"{ comp_run .user_id = } " ,
886- f"{ comp_run .project_uuid = } " ,
887- f"{ comp_run .use_on_demand_clusters = } " ,
888- f"{ tasks_ready_to_start .keys ()= } " ,
939+ ** create_troubleshootting_log_kwargs (
940+ "Unexpected error happened when scheduling tasks, all tasks to start are set to FAILED and the rest of the pipeline will be ABORTED" ,
941+ error = exc ,
942+ error_context = {
943+ "user_id" : f"{ comp_run .user_id } " ,
944+ "project_id" : f"{ comp_run .project_uuid } " ,
945+ "tasks_ready_to_start" : f"{ list (tasks_ready_to_start .keys ())} " ,
946+ "comp_run_use_on_demand_clusters" : f"{ comp_run .use_on_demand_clusters } " ,
947+ "comp_run.run_id" : f"{ comp_run .run_id } " ,
948+ },
949+ )
889950 )
890951 await CompTasksRepository .instance (
891952 self .db_engine
0 commit comments