@@ -632,10 +632,11 @@ async def apply(
632632 msg = f"scheduling pipeline { user_id = } :{ project_id = } :{ iteration = } " ,
633633 ):
634634 dag : nx .DiGraph = nx .DiGraph ()
635+ comp_run = await CompRunsRepository .instance (self .db_engine ).get (
636+ user_id , project_id , iteration
637+ )
635638 try :
636- comp_run = await CompRunsRepository .instance (self .db_engine ).get (
637- user_id , project_id , iteration
638- )
639+
639640 dag = await self ._get_pipeline_dag (project_id )
640641 # 1. Update our list of tasks with data from backend (state, results)
641642 await self ._update_states_from_comp_backend (
@@ -707,25 +708,44 @@ async def apply(
707708 await self ._set_run_result (
708709 user_id , project_id , iteration , RunningState .ABORTED
709710 )
710- except (DaskClientAcquisisitonError , ClustersKeeperNotAvailableError ):
711- _logger .exception (
712- "Unexpected error while connecting with computational backend, aborting pipeline"
713- )
714- tasks : dict [NodeIDStr , CompTaskAtDB ] = await self ._get_pipeline_tasks (
715- project_id , dag
711+ except (
712+ DaskClientAcquisisitonError ,
713+ ComputationalBackendNotConnectedError ,
714+ ClustersKeeperNotAvailableError ,
715+ ):
716+ # we somehow lost access to the dask scheduler for different reasons
717+ # maybe network glitch in which case we might recover?
718+ # maybe the dask scheduler was restarted in which case we lost everything
719+ # maybe the private cluster machine was restarted in which case we lost everything
720+ # maybe the private cluster was shutdown in which case we lost everything
721+ # we should switch to WAITING_FOR_CLUSTER
722+ _logger .warning (
723+ "Unexpected error while connecting with computational backend, waiting for it to re-appear"
716724 )
725+ # what if we keep the tasks status?
726+ # just set the run result to waiting for result? and check when it last changed?
727+ not_completed_tasks = {
728+ k : v
729+ for k , v in (
730+ await self ._get_pipeline_tasks (project_id , dag )
731+ ).items ()
732+ if v .state
733+ not in {
734+ * COMPLETED_STATES ,
735+ RunningState .WAITING_FOR_CLUSTER ,
736+ RunningState .PUBLISHED ,
737+ }
738+ }
717739 comp_tasks_repo = CompTasksRepository (self .db_engine )
718740 await comp_tasks_repo .update_project_tasks_state (
719741 project_id ,
720742 comp_run .run_id ,
721- [t .node_id for t in tasks .values ()],
722- RunningState .FAILED ,
743+ [t .node_id for t in not_completed_tasks .values ()],
744+ RunningState .WAITING_FOR_CLUSTER ,
723745 )
724746 await self ._set_run_result (
725- user_id , project_id , iteration , RunningState .FAILED
747+ user_id , project_id , iteration , RunningState .WAITING_FOR_CLUSTER
726748 )
727- except ComputationalBackendNotConnectedError :
728- _logger .exception ("Computational backend is not connected!" )
729749 finally :
730750 await self ._set_processing_done (user_id , project_id , iteration )
731751
0 commit comments