Skip to content

Commit 441387c

Browse files
committed
influence the job_database_directly
1 parent c1edf0d commit 441387c

File tree

2 files changed

+116
-9
lines changed

2 files changed

+116
-9
lines changed

openeo/extra/job_management.py

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -665,28 +665,56 @@ def on_job_cancel(self, job: BatchJob, row):
665665
"""
666666
pass
667667

668-
def _cancel_prolonged_job(self, job: BatchJob, row):
668+
def _cancel_prolonged_job(self, job: BatchJob, row, df):
669669
"""Cancel the job if it has been running for too long."""
670670
try:
671-
running_start_time_str = row.get("running_start_time")
672-
if not running_start_time_str or pd.isna(running_start_time_str):
673-
_log.warning(f"Job {job.job_id} does not have a valid running start time. Cancellation skipped.")
674-
return
671+
# Ensure running start time is valid
672+
running_start_time = self._ensure_running_start_time(job, row, df)
675673

676-
job_running_start_time = rfc3339.parse_datetime(running_start_time_str, with_timezone=True)
677-
elapsed = datetime.datetime.now(tz=datetime.timezone.utc) - job_running_start_time
674+
# Get the current time in RFC 3339 format (timezone-aware)
675+
current_time_rfc3339 = rfc3339.utcnow()
676+
677+
# Parse the current time into a datetime object with timezone info
678+
current_time = rfc3339.parse_datetime(current_time_rfc3339, with_timezone=True)
679+
680+
# Calculate the elapsed time between job start and now
681+
elapsed = current_time - running_start_time
678682

679683
if elapsed > self._cancel_running_job_after:
680684
try:
681685
_log.info(
682-
f"Cancelling long-running job {job.job_id} (after {elapsed}, running since {job_running_start_time})"
686+
f"Cancelling long-running job {job.job_id} (after {elapsed}, running since {running_start_time})"
683687
)
684688
job.stop()
685689
except OpenEoApiError as e:
686690
_log.error(f"Failed to cancel long-running job {job.job_id}: {e}")
687691
except Exception as e:
688692
_log.error(f"Unexpected error while handling job {job.job_id}: {e}")
689693

694+
def _ensure_running_start_time(self, job: BatchJob, row, df) -> datetime.datetime:
695+
"""
696+
Ensures the running start time is valid. If missing, approximates with the current time.
697+
Returns the parsed running start time as a datetime object.
698+
"""
699+
running_start_time_str = row.get("running_start_time")
700+
701+
if not running_start_time_str or pd.isna(running_start_time_str):
702+
_log.warning(
703+
f"Job {job.job_id} does not have a valid running start time. Setting the current time as an approximation."
704+
)
705+
# Generate the current time in RFC 3339 format
706+
current_time_rfc3339 = rfc3339.utcnow()
707+
708+
# Update the DataFrame safely using `.loc`
709+
df.loc[df.index[row.name], "running_start_time"] = current_time_rfc3339
710+
711+
# Parse and return the datetime object with UTC timezone
712+
return rfc3339.parse_datetime(current_time_rfc3339, with_timezone=True)
713+
714+
# Parse the existing time string and return it
715+
return rfc3339.parse_datetime(running_start_time_str, with_timezone=True)
716+
717+
690718
def get_job_dir(self, job_id: str) -> Path:
691719
"""Path to directory where job metadata, results and error logs are be saved."""
692720
return self._root_dir / f"job_{job_id}"
@@ -746,7 +774,7 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] =
746774
self.on_job_cancel(the_job, active.loc[i])
747775

748776
if self._cancel_running_job_after and new_status == "running":
749-
self._cancel_prolonged_job(the_job, active.loc[i])
777+
self._cancel_prolonged_job(the_job, active.loc[i], active)
750778

751779
active.loc[i, "status"] = new_status
752780

tests/extra/test_job_management.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from time import sleep
88
from typing import Callable, Union
99
from unittest import mock
10+
import datetime
1011

1112
import dirty_equals
1213
import geopandas
@@ -554,6 +555,7 @@ def start_job(row, connection_provider, connection, **kwargs):
554555
12 * 60 * 60,
555556
"finished",
556557
),
558+
557559
],
558560
)
559561
def test_automatic_cancel_of_too_long_running_jobs(
@@ -645,6 +647,83 @@ def test_status_logging(self, tmp_path, job_manager, job_manager_root_dir, sleep
645647
assert needle.search(caplog.text)
646648

647649

650+
@pytest.mark.parametrize(
651+
["create_time", "start_time", "running_start_time", "end_time", "end_status", "cancel_after_seconds", "expected_status"],
652+
[
653+
# Scenario 1: Missing running_start_time (None)
654+
(
655+
"2024-09-01T09:00:00Z", # Job creation time
656+
"2024-09-01T09:00:00Z", # Job start time (should be 1 hour after create_time)
657+
None, # Missing running_start_time
658+
"2024-09-01T20:00:00Z", # Job end time
659+
"finished", # Job final status
660+
6 * 60 * 60, # Cancel after 6 hours
661+
"finished", # Expected final status
662+
),
663+
# Scenario 2: NaN running_start_time
664+
(
665+
"2024-09-01T09:00:00Z",
666+
"2024-09-01T09:00:00Z",
667+
float("nan"), # NaN running_start_time
668+
"2024-09-01T20:00:00Z", # Job end time
669+
"finished", # Job final status
670+
6 * 60 * 60, # Cancel after 6 hours
671+
"finished", # Expected final status
672+
),
673+
]
674+
)
675+
def test_ensure_running_start_time_is_datetime(
676+
self,
677+
tmp_path,
678+
time_machine,
679+
create_time,
680+
start_time,
681+
running_start_time,
682+
end_time,
683+
end_status,
684+
cancel_after_seconds,
685+
expected_status,
686+
dummy_backend_foo,
687+
job_manager_root_dir,
688+
):
689+
def get_status(job_id, current_status):
690+
if rfc3339.utcnow() < start_time:
691+
return "queued"
692+
elif rfc3339.utcnow() < end_time:
693+
return "running"
694+
return end_status
695+
696+
# Set the job status updater function for the mock backend
697+
dummy_backend_foo.job_status_updater = get_status
698+
699+
job_manager = MultiBackendJobManager(
700+
root_dir=job_manager_root_dir, cancel_running_job_after=cancel_after_seconds
701+
)
702+
job_manager.add_backend("foo", connection=dummy_backend_foo.connection)
703+
704+
# Create a DataFrame representing the job database
705+
df = pd.DataFrame({
706+
"year": [2024],
707+
"running_start_time": [running_start_time], # Initial running_start_time
708+
})
709+
710+
# Move the time machine to the job creation time
711+
time_machine.move_to(create_time)
712+
713+
job_db_path = tmp_path / "jobs.csv"
714+
715+
# Mock sleep() to skip one hour at a time instead of actually sleeping
716+
with mock.patch.object(openeo.extra.job_management.time, "sleep", new=lambda s: time_machine.shift(60 * 60)):
717+
job_manager.run_jobs(df=df, start_job=self._create_year_job, job_db=job_db_path)
718+
719+
final_df = CsvJobDatabase(job_db_path).read()
720+
721+
# Validate running_start_time is a valid datetime object
722+
filled_running_start_time = final_df.iloc[0]["running_start_time"]
723+
assert isinstance(rfc3339.parse_datetime(filled_running_start_time), datetime.datetime)
724+
725+
726+
648727
JOB_DB_DF_BASICS = pd.DataFrame(
649728
{
650729
"numbers": [3, 2, 1],

0 commit comments

Comments
 (0)