Skip to content

Commit d5180c3

Browse files
authored
Merge pull request #675 from Open-EO/issue664-harden-prolonged-cancel
hardening the cancelation functionality
2 parents c161f08 + 75c3797 commit d5180c3

File tree

2 files changed

+98
-6
lines changed

2 files changed

+98
-6
lines changed

openeo/extra/job_management/__init__.py

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -658,16 +658,25 @@ def on_job_cancel(self, job: BatchJob, row):
658658

659659
def _cancel_prolonged_job(self, job: BatchJob, row):
660660
"""Cancel the job if it has been running for too long."""
661-
job_running_start_time = rfc3339.parse_datetime(row["running_start_time"], with_timezone=True)
662-
elapsed = datetime.datetime.now(tz=datetime.timezone.utc) - job_running_start_time
663-
if elapsed > self._cancel_running_job_after:
664-
try:
661+
try:
662+
# Ensure running start time is valid
663+
job_running_start_time = rfc3339.parse_datetime(row.get("running_start_time"), with_timezone=True)
664+
665+
# Parse the current time into a datetime object with timezone info
666+
current_time = rfc3339.parse_datetime(rfc3339.utcnow(), with_timezone=True)
667+
668+
# Calculate the elapsed time between job start and now
669+
elapsed = current_time - job_running_start_time
670+
671+
if elapsed > self._cancel_running_job_after:
672+
665673
_log.info(
666674
f"Cancelling long-running job {job.job_id} (after {elapsed}, running since {job_running_start_time})"
667675
)
668676
job.stop()
669-
except OpenEoApiError as e:
670-
_log.error(f"Failed to cancel long-running job {job.job_id}: {e}")
677+
678+
except Exception as e:
679+
_log.error(f"Unexpected error while handling job {job.job_id}: {e}")
671680

672681
def get_job_dir(self, job_id: str) -> Path:
673682
"""Path to directory where job metadata, results and error logs are be saved."""
@@ -728,6 +737,13 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] =
728737
self.on_job_cancel(the_job, active.loc[i])
729738

730739
if self._cancel_running_job_after and new_status == "running":
740+
if (not active.loc[i, "running_start_time"] or pd.isna(active.loc[i, "running_start_time"])):
741+
_log.warning(
742+
f"Unknown 'running_start_time' for running job {job_id}. Using current time as an approximation."
743+
)
744+
stats["job started running"] += 1
745+
active.loc[i, "running_start_time"] = rfc3339.utcnow()
746+
731747
self._cancel_prolonged_job(the_job, active.loc[i])
732748

733749
active.loc[i, "status"] = new_status

tests/extra/job_management/test_job_management.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from time import sleep
88
from typing import Callable, Union
99
from unittest import mock
10+
import datetime
1011

1112
import dirty_equals
1213
import geopandas
@@ -554,6 +555,7 @@ def start_job(row, connection_provider, connection, **kwargs):
554555
12 * 60 * 60,
555556
"finished",
556557
),
558+
557559
],
558560
)
559561
def test_automatic_cancel_of_too_long_running_jobs(
@@ -645,6 +647,80 @@ def test_status_logging(self, tmp_path, job_manager, job_manager_root_dir, sleep
645647
assert needle.search(caplog.text)
646648

647649

650+
@pytest.mark.parametrize(
651+
["create_time", "start_time", "running_start_time", "end_time", "end_status", "cancel_after_seconds"],
652+
[
653+
# Scenario 1: Missing running_start_time (None)
654+
(
655+
"2024-09-01T09:00:00Z", # Job creation time
656+
"2024-09-01T09:00:00Z", # Job start time (should be 1 hour after create_time)
657+
None, # Missing running_start_time
658+
"2024-09-01T20:00:00Z", # Job end time
659+
"finished", # Job final status
660+
6 * 60 * 60, # Cancel after 6 hours
661+
),
662+
# Scenario 2: NaN running_start_time
663+
(
664+
"2024-09-01T09:00:00Z",
665+
"2024-09-01T09:00:00Z",
666+
float("nan"), # NaN running_start_time
667+
"2024-09-01T20:00:00Z", # Job end time
668+
"finished", # Job final status
669+
6 * 60 * 60, # Cancel after 6 hours
670+
),
671+
]
672+
)
673+
def test_ensure_running_start_time_is_datetime(
674+
self,
675+
tmp_path,
676+
time_machine,
677+
create_time,
678+
start_time,
679+
running_start_time,
680+
end_time,
681+
end_status,
682+
cancel_after_seconds,
683+
dummy_backend_foo,
684+
job_manager_root_dir,
685+
):
686+
def get_status(job_id, current_status):
687+
if rfc3339.utcnow() < start_time:
688+
return "queued"
689+
elif rfc3339.utcnow() < end_time:
690+
return "running"
691+
return end_status
692+
693+
# Set the job status updater function for the mock backend
694+
dummy_backend_foo.job_status_updater = get_status
695+
696+
job_manager = MultiBackendJobManager(
697+
root_dir=job_manager_root_dir, cancel_running_job_after=cancel_after_seconds
698+
)
699+
job_manager.add_backend("foo", connection=dummy_backend_foo.connection)
700+
701+
# Create a DataFrame representing the job database
702+
df = pd.DataFrame({
703+
"year": [2024],
704+
"running_start_time": [running_start_time], # Initial running_start_time
705+
})
706+
707+
# Move the time machine to the job creation time
708+
time_machine.move_to(create_time)
709+
710+
job_db_path = tmp_path / "jobs.csv"
711+
712+
# Mock sleep() to skip one hour at a time instead of actually sleeping
713+
with mock.patch.object(openeo.extra.job_management.time, "sleep", new=lambda s: time_machine.shift(60 * 60)):
714+
job_manager.run_jobs(df=df, start_job=self._create_year_job, job_db=job_db_path)
715+
716+
final_df = CsvJobDatabase(job_db_path).read()
717+
718+
# Validate running_start_time is a valid datetime object
719+
filled_running_start_time = final_df.iloc[0]["running_start_time"]
720+
assert isinstance(rfc3339.parse_datetime(filled_running_start_time), datetime.datetime)
721+
722+
723+
648724
JOB_DB_DF_BASICS = pd.DataFrame(
649725
{
650726
"numbers": [3, 2, 1],

0 commit comments

Comments
 (0)