9393
9494
9595async def _check_pipeline_not_running_or_raise_409 (
96- comp_tasks_repo : CompTasksRepository , computation : ComputationCreate
96+ comp_runs_repo : CompRunsRepository ,
97+ computation : ComputationCreate ,
9798) -> None :
98- pipeline_state = utils .get_pipeline_state_from_task_states (
99- await comp_tasks_repo .list_computational_tasks (computation .project_id )
100- )
101- if utils .is_pipeline_running (pipeline_state ):
102- raise HTTPException (
103- status_code = status .HTTP_409_CONFLICT ,
104- detail = f"Project { computation .project_id } already started, current state is { pipeline_state } " ,
99+ with contextlib .suppress (ComputationalRunNotFoundError ):
100+ last_run = await comp_runs_repo .get (
101+ user_id = computation .user_id , project_id = computation .project_id
105102 )
103+ pipeline_state = last_run .result
104+
105+ if utils .is_pipeline_running (pipeline_state ):
106+ raise HTTPException (
107+ status_code = status .HTTP_409_CONFLICT ,
108+ detail = f"Project { computation .project_id } already started, current state is { pipeline_state } " ,
109+ )
106110
107111
108112async def _check_pipeline_startable (
@@ -302,7 +306,7 @@ async def create_or_update_or_start_computation( # noqa: PLR0913 # pylint: disa
302306 project : ProjectAtDB = await project_repo .get_project (computation .project_id )
303307
304308 # check if current state allow to modify the computation
305- await _check_pipeline_not_running_or_raise_409 (comp_tasks_repo , computation )
309+ await _check_pipeline_not_running_or_raise_409 (comp_runs_repo , computation )
306310
307311 # create the complete DAG graph
308312 complete_dag = create_complete_dag (project .workbench )
@@ -353,20 +357,14 @@ async def create_or_update_or_start_computation( # noqa: PLR0913 # pylint: disa
353357 projects_metadata_repo = projects_metadata_repo ,
354358 )
355359
356- # filter the tasks by the effective pipeline
357- filtered_tasks = [
358- t
359- for t in comp_tasks
360- if f"{ t .node_id } " in set (minimal_computational_dag .nodes ())
361- ]
362- pipeline_state = utils .get_pipeline_state_from_task_states (filtered_tasks )
363-
364360 # get run details if any
365361 last_run : CompRunsAtDB | None = None
362+ pipeline_state = RunningState .NOT_STARTED
366363 with contextlib .suppress (ComputationalRunNotFoundError ):
367364 last_run = await comp_runs_repo .get (
368365 user_id = computation .user_id , project_id = computation .project_id
369366 )
367+ pipeline_state = last_run .result
370368
371369 return ComputationGet (
372370 id = computation .project_id ,
@@ -449,21 +447,10 @@ async def get_computation(
449447 # check that project actually exists
450448 await project_repo .get_project (project_id )
451449
452- pipeline_dag , all_tasks , filtered_tasks = await analyze_pipeline (
450+ pipeline_dag , all_tasks , _filtered_tasks = await analyze_pipeline (
453451 project_id , comp_pipelines_repo , comp_tasks_repo
454452 )
455453
456- pipeline_state : RunningState = utils .get_pipeline_state_from_task_states (
457- filtered_tasks
458- )
459-
460- _logger .debug (
461- "Computational task status by %s for %s has %s" ,
462- f"{ user_id = } " ,
463- f"{ project_id = } " ,
464- f"{ pipeline_state = } " ,
465- )
466-
467454 # create the complete DAG graph
468455 complete_dag = create_complete_dag_from_tasks (all_tasks )
469456 pipeline_details = await compute_pipeline_details (
@@ -472,8 +459,17 @@ async def get_computation(
472459
473460 # get run details if any
474461 last_run : CompRunsAtDB | None = None
462+ pipeline_state = RunningState .NOT_STARTED
475463 with contextlib .suppress (ComputationalRunNotFoundError ):
476464 last_run = await comp_runs_repo .get (user_id = user_id , project_id = project_id )
465+ pipeline_state = last_run .result
466+
467+ _logger .debug (
468+ "Computational task status by %s for %s has %s" ,
469+ f"{ user_id = } " ,
470+ f"{ project_id = } " ,
471+ f"{ pipeline_state = } " ,
472+ )
477473
478474 self_url = request .url .remove_query_params ("user_id" )
479475 return ComputationGet (
@@ -536,23 +532,18 @@ async def stop_computation(
536532 tasks : list [CompTaskAtDB ] = await comp_tasks_repo .list_tasks (project_id )
537533 # create the complete DAG graph
538534 complete_dag = create_complete_dag_from_tasks (tasks )
539- # filter the tasks by the effective pipeline
540- filtered_tasks = [
541- t for t in tasks if f"{ t .node_id } " in set (pipeline_dag .nodes ())
542- ]
543- pipeline_state = utils .get_pipeline_state_from_task_states (filtered_tasks )
544-
545- if utils .is_pipeline_running (pipeline_state ):
546- await stop_pipeline (
547- request .app , user_id = computation_stop .user_id , project_id = project_id
548- )
549-
550- # get run details if any
535+ # stop the pipeline if it is running
551536 last_run : CompRunsAtDB | None = None
537+ pipeline_state = RunningState .UNKNOWN
552538 with contextlib .suppress (ComputationalRunNotFoundError ):
553539 last_run = await comp_runs_repo .get (
554540 user_id = computation_stop .user_id , project_id = project_id
555541 )
542+ pipeline_state = last_run .result
543+ if utils .is_pipeline_running (last_run .result ):
544+ await stop_pipeline (
545+ request .app , user_id = computation_stop .user_id , project_id = project_id
546+ )
556547
557548 return ComputationGet (
558549 id = project_id ,
@@ -594,15 +585,20 @@ async def delete_computation(
594585 comp_tasks_repo : Annotated [
595586 CompTasksRepository , Depends (get_repository (CompTasksRepository ))
596587 ],
588+ comp_runs_repo : Annotated [
589+ CompRunsRepository , Depends (get_repository (CompRunsRepository ))
590+ ],
597591) -> None :
598592 try :
599593 # get the project
600594 project : ProjectAtDB = await project_repo .get_project (project_id )
601595 # check if current state allow to stop the computation
602- comp_tasks : list [CompTaskAtDB ] = await comp_tasks_repo .list_computational_tasks (
603- project_id
604- )
605- pipeline_state = utils .get_pipeline_state_from_task_states (comp_tasks )
596+ pipeline_state = RunningState .UNKNOWN
597+ with contextlib .suppress (ComputationalRunNotFoundError ):
598+ last_run = await comp_runs_repo .get (
599+ user_id = computation_stop .user_id , project_id = project_id
600+ )
601+ pipeline_state = last_run .result
606602 if utils .is_pipeline_running (pipeline_state ):
607603 if not computation_stop .force :
608604 raise HTTPException (
@@ -634,12 +630,10 @@ def return_last_value(retry_state: Any) -> Any:
634630 before_sleep = before_sleep_log (_logger , logging .INFO ),
635631 )
636632 async def check_pipeline_stopped () -> bool :
637- comp_tasks : list [CompTaskAtDB ] = (
638- await comp_tasks_repo .list_computational_tasks (project_id )
639- )
640- pipeline_state = utils .get_pipeline_state_from_task_states (
641- comp_tasks ,
633+ last_run = await comp_runs_repo .get (
634+ user_id = computation_stop .user_id , project_id = project_id
642635 )
636+ pipeline_state = last_run .result
643637 return utils .is_pipeline_stopped (pipeline_state )
644638
645639 # wait for the pipeline to be stopped
0 commit comments