@@ -665,56 +665,29 @@ def on_job_cancel(self, job: BatchJob, row):
665665 """
666666 pass
667667
668- def _cancel_prolonged_job (self , job : BatchJob , row , df ):
668+ def _cancel_prolonged_job (self , job : BatchJob , row ):
669669 """Cancel the job if it has been running for too long."""
670670 try :
671671 # Ensure running start time is valid
672- running_start_time = self ._ensure_running_start_time (job , row , df )
673-
674- # Get the current time in RFC 3339 format (timezone-aware)
675- current_time_rfc3339 = rfc3339 .utcnow ()
676-
672+ job_running_start_time = rfc3339 .parse_datetime (row .get ("running_start_time" ), with_timezone = True )
673+
677674 # Parse the current time into a datetime object with timezone info
678- current_time = rfc3339 .parse_datetime (current_time_rfc3339 , with_timezone = True )
675+ current_time = rfc3339 .parse_datetime (rfc3339 . utcnow () , with_timezone = True )
679676
680677 # Calculate the elapsed time between job start and now
681- elapsed = current_time - running_start_time
678+ elapsed = current_time - job_running_start_time
682679
683680 if elapsed > self ._cancel_running_job_after :
684681 try :
685682 _log .info (
686- f"Cancelling long-running job { job .job_id } (after { elapsed } , running since { running_start_time } )"
683+ f"Cancelling long-running job { job .job_id } (after { elapsed } , running since { job_running_start_time } )"
687684 )
688685 job .stop ()
689686 except OpenEoApiError as e :
690687 _log .error (f"Failed to cancel long-running job { job .job_id } : { e } " )
691688 except Exception as e :
692689 _log .error (f"Unexpected error while handling job { job .job_id } : { e } " )
693690
694- def _ensure_running_start_time (self , job : BatchJob , row , df ) -> datetime .datetime :
695- """
696- Ensures the running start time is valid. If missing, approximates with the current time.
697- Returns the parsed running start time as a datetime object.
698- """
699- running_start_time_str = row .get ("running_start_time" )
700-
701- if not running_start_time_str or pd .isna (running_start_time_str ):
702- _log .warning (
703- f"Job { job .job_id } does not have a valid running start time. Setting the current time as an approximation."
704- )
705- # Generate the current time in RFC 3339 format
706- current_time_rfc3339 = rfc3339 .utcnow ()
707-
708- # Update the DataFrame safely using `.loc`
709- df .loc [df .index [row .name ], "running_start_time" ] = current_time_rfc3339
710-
711- # Parse and return the datetime object with UTC timezone
712- return rfc3339 .parse_datetime (current_time_rfc3339 , with_timezone = True )
713-
714- # Parse the existing time string and return it
715- return rfc3339 .parse_datetime (running_start_time_str , with_timezone = True )
716-
717-
718691 def get_job_dir (self , job_id : str ) -> Path :
719692 """Path to directory where job metadata, results and error logs are be saved."""
720693 return self ._root_dir / f"job_{ job_id } "
@@ -774,7 +747,14 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] =
774747 self .on_job_cancel (the_job , active .loc [i ])
775748
776749 if self ._cancel_running_job_after and new_status == "running" :
777- self ._cancel_prolonged_job (the_job , active .loc [i ], active )
750+ if (not active .loc [i , "running_start_time" ] or pd .isna (active .loc [i , "running_start_time" ])):
751+ _log .warning (
752+ f"Unknown 'running_start_time' for running job { job_id } . Using current time as an approximation."
753+ )
754+ stats ["job started running" ] += 1
755+ active .loc [i , "running_start_time" ] = rfc3339 .utcnow ()
756+
757+ self ._cancel_prolonged_job (the_job , active .loc [i ])
778758
779759 active .loc [i , "status" ] = new_status
780760
0 commit comments