8484_Current = CompTaskAtDB
8585_MAX_WAITING_FOR_CLUSTER_TIMEOUT_IN_MIN : Final [int ] = 10
8686_SCHEDULER_INTERVAL : Final [datetime .timedelta ] = datetime .timedelta (seconds = 5 )
87- _TASK_NAME_TEMPLATE : Final [
88- str
89- ] = "computational-scheduler-{user_id}:{project_id}:{iteration}"
87+ _TASK_NAME_TEMPLATE : Final [str ] = (
88+ "computational-scheduler-{user_id}:{project_id}:{iteration}"
89+ )
9090
9191PipelineSchedulingTask : TypeAlias = asyncio .Task
9292PipelineSchedulingWakeUpEvent : TypeAlias = asyncio .Event
@@ -219,9 +219,9 @@ async def run_new_pipeline(
219219 task , wake_up_event = self ._start_scheduling (
220220 user_id , project_id , new_run .iteration
221221 )
222- self ._scheduled_pipelines [
223- ( user_id , project_id , new_run . iteration )
224- ] = ScheduledPipelineParams ( scheduler_task = task , scheduler_waker = wake_up_event )
222+ self ._scheduled_pipelines [( user_id , project_id , new_run . iteration )] = (
223+ ScheduledPipelineParams ( scheduler_task = task , scheduler_waker = wake_up_event )
224+ )
225225 await publish_project_log (
226226 self .rabbitmq_client ,
227227 user_id ,
@@ -653,20 +653,17 @@ async def _start_tasks(
653653 scheduled_tasks : dict [NodeID , CompTaskAtDB ],
654654 comp_run : CompRunsAtDB ,
655655 wake_up_callback : Callable [[], None ],
656- ) -> None :
657- ...
656+ ) -> None : ...
658657
659658 @abstractmethod
660659 async def _get_tasks_status (
661660 self , user_id : UserID , tasks : list [CompTaskAtDB ], comp_run : CompRunsAtDB
662- ) -> list [RunningState ]:
663- ...
661+ ) -> list [RunningState ]: ...
664662
665663 @abstractmethod
666664 async def _stop_tasks (
667665 self , user_id : UserID , tasks : list [CompTaskAtDB ], comp_run : CompRunsAtDB
668- ) -> None :
669- ...
666+ ) -> None : ...
670667
671668 @abstractmethod
672669 async def _process_completed_tasks (
@@ -675,8 +672,7 @@ async def _process_completed_tasks(
675672 tasks : list [CompTaskAtDB ],
676673 iteration : Iteration ,
677674 comp_run : CompRunsAtDB ,
678- ) -> None :
679- ...
675+ ) -> None : ...
680676
681677 @staticmethod
682678 def _build_exclusive_lock_key (* args , ** kwargs ) -> str :
@@ -816,8 +812,10 @@ async def _schedule_tasks_to_stop(
816812 project_id
817813 )
818814 # stop any remaining running task, these are already submitted
819- tasks_to_stop = [t for t in comp_tasks .values () if t .state in PROCESSING_STATES ]
820- await self ._stop_tasks (user_id , tasks_to_stop , comp_run )
815+ if tasks_to_stop := [
816+ t for t in comp_tasks .values () if t .state in PROCESSING_STATES
817+ ]:
818+ await self ._stop_tasks (user_id , tasks_to_stop , comp_run )
821819
822820 async def _schedule_tasks_to_start ( # noqa: C901
823821 self ,
@@ -877,9 +875,9 @@ async def _schedule_tasks_to_start( # noqa: C901
877875 RunningState .WAITING_FOR_CLUSTER ,
878876 )
879877 for task in tasks_ready_to_start :
880- comp_tasks [
881- NodeIDStr ( f" { task } " )
882- ]. state = RunningState . WAITING_FOR_CLUSTER
878+ comp_tasks [NodeIDStr ( f" { task } " )]. state = (
879+ RunningState . WAITING_FOR_CLUSTER
880+ )
883881
884882 except ComputationalBackendOnDemandNotReadyError as exc :
885883 _logger .info (
@@ -901,9 +899,9 @@ async def _schedule_tasks_to_start( # noqa: C901
901899 RunningState .WAITING_FOR_CLUSTER ,
902900 )
903901 for task in tasks_ready_to_start :
904- comp_tasks [
905- NodeIDStr ( f" { task } " )
906- ]. state = RunningState . WAITING_FOR_CLUSTER
902+ comp_tasks [NodeIDStr ( f" { task } " )]. state = (
903+ RunningState . WAITING_FOR_CLUSTER
904+ )
907905 except ClustersKeeperNotAvailableError :
908906 _logger .exception ("Unexpected error while starting tasks:" )
909907 await publish_project_log (
0 commit comments