1919 Mapping ,
2020 NamedTuple ,
2121 Optional ,
22+ Tuple ,
2223 Union ,
2324)
2425
@@ -517,7 +518,7 @@ def _job_update_loop(
517518 stats = stats if stats is not None else collections .defaultdict (int )
518519
519520 with ignore_connection_errors (context = "get statuses" ):
520- self ._track_statuses (job_db , stats = stats )
521+ jobs_done , jobs_error , jobs_cancel = self ._track_statuses (job_db , stats = stats )
521522 stats ["track_statuses" ] += 1
522523
523524 not_started = job_db .get_by_status (statuses = ["not_started" ], max = 200 ).copy ()
@@ -540,6 +541,17 @@ def _job_update_loop(
540541 stats ["job_db persist" ] += 1
541542 total_added += 1
542543
544+ # Act on jobs
545+ for job , row in jobs_done :
546+ self .on_job_done (job , row )
547+
548+ for job , row in jobs_error :
549+ self .on_job_error (job , row )
550+
551+ for job , row in jobs_cancel :
552+ self .on_job_cancel (job , row )
553+
554+
543555 def _launch_job (self , start_job , df , i , backend_name , stats : Optional [dict ] = None ):
544556 """Helper method for launching jobs
545557
@@ -696,14 +708,19 @@ def ensure_job_dir_exists(self, job_id: str) -> Path:
696708 if not job_dir .exists ():
697709 job_dir .mkdir (parents = True )
698710
699- def _track_statuses (self , job_db : JobDatabaseInterface , stats : Optional [dict ] = None ):
711+ def _track_statuses (self , job_db : JobDatabaseInterface , stats : Optional [dict ] = None ) -> Tuple [ List , List , List ] :
700712 """
701713 Tracks status (and stats) of running jobs (in place).
702714 Optionally cancels jobs when running too long.
703715 """
704716 stats = stats if stats is not None else collections .defaultdict (int )
705717
706718 active = job_db .get_by_status (statuses = ["created" , "queued" , "running" ]).copy ()
719+
720+ jobs_done = []
721+ jobs_error = []
722+ jobs_cancel = []
723+
707724 for i in active .index :
708725 job_id = active .loc [i , "id" ]
709726 backend_name = active .loc [i , "backend_name" ]
@@ -722,20 +739,20 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] =
722739
723740 if new_status == "finished" :
724741 stats ["job finished" ] += 1
725- self . on_job_done ( the_job , active .loc [i ])
742+ jobs_done . append (( the_job , active .loc [i ]) )
726743
727744 if previous_status != "error" and new_status == "error" :
728745 stats ["job failed" ] += 1
729- self .on_job_error (the_job , active .loc [i ])
746+ jobs_error .append ((the_job , active .loc [i ]))
747+
748+ if new_status == "canceled" :
749+ stats ["job canceled" ] += 1
750+ jobs_cancel .append ((the_job , active .loc [i ]))
730751
731752 if previous_status in {"created" , "queued" } and new_status == "running" :
732753 stats ["job started running" ] += 1
733754 active .loc [i , "running_start_time" ] = rfc3339 .utcnow ()
734755
735- if new_status == "canceled" :
736- stats ["job canceled" ] += 1
737- self .on_job_cancel (the_job , active .loc [i ])
738-
739756 if self ._cancel_running_job_after and new_status == "running" :
740757 if (not active .loc [i , "running_start_time" ] or pd .isna (active .loc [i , "running_start_time" ])):
741758 _log .warning (
@@ -763,6 +780,8 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] =
763780 stats ["job_db persist" ] += 1
764781 job_db .persist (active )
765782
783+ return jobs_done , jobs_error , jobs_cancel
784+
766785
767786def _format_usage_stat (job_metadata : dict , field : str ) -> str :
768787 value = deep_get (job_metadata , "usage" , field , "value" , default = 0 )
0 commit comments