Skip to content

Commit cda0a87

Browse files
committed
do not fail right away when the clusters-keeper restarts
1 parent 797cad0 commit cda0a87

File tree

1 file changed

+20
-30
lines changed
  • services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler

1 file changed

+20
-30
lines changed

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py

Lines changed: 20 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -632,11 +632,11 @@ async def apply(
632632
msg=f"scheduling pipeline {user_id=}:{project_id=}:{iteration=}",
633633
):
634634
dag: nx.DiGraph = nx.DiGraph()
635-
comp_run = await CompRunsRepository.instance(self.db_engine).get(
636-
user_id, project_id, iteration
637-
)
638-
try:
639635

636+
try:
637+
comp_run = await CompRunsRepository.instance(self.db_engine).get(
638+
user_id, project_id, iteration
639+
)
640640
dag = await self._get_pipeline_dag(project_id)
641641
# 1. Update our list of tasks with data from backend (state, results)
642642
await self._update_states_from_comp_backend(
@@ -692,55 +692,45 @@ async def apply(
692692
f"{pipeline_result=}",
693693
)
694694
except PipelineNotFoundError:
695-
_logger.warning(
696-
"pipeline %s does not exist in comp_pipeline table, it will be removed from scheduler",
695+
_logger.exception(
696+
"pipeline %s is missing from `comp_pipelines` DB table, something is corrupted. Aborting scheduling",
697697
f"{project_id=}",
698698
)
699+
# NOTE: no need to update task states here as pipeline is already broken
699700
await self._set_run_result(
700-
user_id, project_id, iteration, RunningState.ABORTED
701+
user_id, project_id, iteration, RunningState.FAILED
701702
)
702-
except InvalidPipelineError as exc:
703-
_logger.warning(
704-
"pipeline %s appears to be misconfigured, it will be removed from scheduler. Please check pipeline:\n%s",
703+
except InvalidPipelineError:
704+
_logger.exception(
705+
"pipeline %s appears to be misconfigured, it will be removed from scheduler. Aborting scheduling",
705706
f"{project_id=}",
706-
exc,
707707
)
708+
# NOTE: no need to update task states here as pipeline is already broken
708709
await self._set_run_result(
709-
user_id, project_id, iteration, RunningState.ABORTED
710+
user_id, project_id, iteration, RunningState.FAILED
710711
)
711712
except (
712713
DaskClientAcquisisitonError,
713714
ComputationalBackendNotConnectedError,
714715
ClustersKeeperNotAvailableError,
715716
):
716-
# we somehow lost access to the dask scheduler for different reasons
717-
# maybe network glitch in which case we might recover?
718-
# maybe the dask scheduler was restarted in which case we lost everything
719-
# maybe the private cluster machine was restarted in which case we lost everything
720-
# maybe the private cluster was shutdown in which case we lost everything
721-
# we should switch to WAITING_FOR_CLUSTER
722-
_logger.warning(
723-
"Unexpected error while connecting with computational backend, waiting for it to re-appear"
717+
_logger.exception(
718+
"Unexpectedly lost connection to the computational backend. "
719+
"TIP: this might be a network error or some service restarting. "
720+
"All scheduled tasks will be set to WAITING_FOR_CLUSTER state until we reconnect",
724721
)
725-
# what if we keep the tasks status?
726-
# just set the run result to waiting for result? and check when it last changed?
727-
not_completed_tasks = {
722+
processing_tasks = {
728723
k: v
729724
for k, v in (
730725
await self._get_pipeline_tasks(project_id, dag)
731726
).items()
732-
if v.state
733-
not in {
734-
*COMPLETED_STATES,
735-
RunningState.WAITING_FOR_CLUSTER,
736-
RunningState.PUBLISHED,
737-
}
727+
if v.state in PROCESSING_STATES
738728
}
739729
comp_tasks_repo = CompTasksRepository(self.db_engine)
740730
await comp_tasks_repo.update_project_tasks_state(
741731
project_id,
742732
comp_run.run_id,
743-
[t.node_id for t in not_completed_tasks.values()],
733+
[t.node_id for t in processing_tasks.values()],
744734
RunningState.WAITING_FOR_CLUSTER,
745735
)
746736
await self._set_run_result(

0 commit comments

Comments
 (0)