@@ -632,11 +632,11 @@ async def apply(
632632 msg = f"scheduling pipeline { user_id = } :{ project_id = } :{ iteration = } " ,
633633 ):
634634 dag : nx .DiGraph = nx .DiGraph ()
635- comp_run = await CompRunsRepository .instance (self .db_engine ).get (
636- user_id , project_id , iteration
637- )
638- try :
639635
636+ try :
637+ comp_run = await CompRunsRepository .instance (self .db_engine ).get (
638+ user_id , project_id , iteration
639+ )
640640 dag = await self ._get_pipeline_dag (project_id )
641641 # 1. Update our list of tasks with data from backend (state, results)
642642 await self ._update_states_from_comp_backend (
@@ -692,55 +692,45 @@ async def apply(
692692 f"{ pipeline_result = } " ,
693693 )
694694 except PipelineNotFoundError :
695- _logger .warning (
696- "pipeline %s does not exist in comp_pipeline table, it will be removed from scheduler " ,
695+ _logger .exception (
696+ "pipeline %s is missing from `comp_pipelines` DB table, something is corrupted. Aborting scheduling " ,
697697 f"{ project_id = } " ,
698698 )
699+ # NOTE: no need to update task states here as pipeline is already broken
699700 await self ._set_run_result (
700- user_id , project_id , iteration , RunningState .ABORTED
701+ user_id , project_id , iteration , RunningState .FAILED
701702 )
702- except InvalidPipelineError as exc :
703- _logger .warning (
704- "pipeline %s appears to be misconfigured, it will be removed from scheduler. Please check pipeline: \n %s " ,
703+ except InvalidPipelineError :
704+ _logger .exception (
705+ "pipeline %s appears to be misconfigured, it will be removed from scheduler. Aborting scheduling " ,
705706 f"{ project_id = } " ,
706- exc ,
707707 )
708+ # NOTE: no need to update task states here as pipeline is already broken
708709 await self ._set_run_result (
709- user_id , project_id , iteration , RunningState .ABORTED
710+ user_id , project_id , iteration , RunningState .FAILED
710711 )
711712 except (
712713 DaskClientAcquisisitonError ,
713714 ComputationalBackendNotConnectedError ,
714715 ClustersKeeperNotAvailableError ,
715716 ):
716- # we somehow lost access to the dask scheduler for different reasons
717- # maybe network glitch in which case we might recover?
718- # maybe the dask scheduler was restarted in which case we lost everything
719- # maybe the private cluster machine was restarted in which case we lost everything
720- # maybe the private cluster was shutdown in which case we lost everything
721- # we should switch to WAITING_FOR_CLUSTER
722- _logger .warning (
723- "Unexpected error while connecting with computational backend, waiting for it to re-appear"
717+ _logger .exception (
718+ "Unexpectedly lost connection to the computational backend. "
719+ "TIP: this might be a network error or some service restarting. "
720+ "All scheduled tasks will be set to WAITING_FOR_CLUSTER state until we reconnect" ,
724721 )
725- # what if we keep the tasks status?
726- # just set the run result to waiting for result? and check when it last changed?
727- not_completed_tasks = {
722+ processing_tasks = {
728723 k : v
729724 for k , v in (
730725 await self ._get_pipeline_tasks (project_id , dag )
731726 ).items ()
732- if v .state
733- not in {
734- * COMPLETED_STATES ,
735- RunningState .WAITING_FOR_CLUSTER ,
736- RunningState .PUBLISHED ,
737- }
727+ if v .state in PROCESSING_STATES
738728 }
739729 comp_tasks_repo = CompTasksRepository (self .db_engine )
740730 await comp_tasks_repo .update_project_tasks_state (
741731 project_id ,
742732 comp_run .run_id ,
743- [t .node_id for t in not_completed_tasks .values ()],
733+ [t .node_id for t in processing_tasks .values ()],
744734 RunningState .WAITING_FOR_CLUSTER ,
745735 )
746736 await self ._set_run_result (
0 commit comments