Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 22 additions & 6 deletions openeo/extra/job_management/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,16 +658,25 @@ def on_job_cancel(self, job: BatchJob, row):

def _cancel_prolonged_job(self, job: BatchJob, row):
"""Cancel the job if it has been running for too long."""
job_running_start_time = rfc3339.parse_datetime(row["running_start_time"], with_timezone=True)
elapsed = datetime.datetime.now(tz=datetime.timezone.utc) - job_running_start_time
if elapsed > self._cancel_running_job_after:
try:
try:
# Ensure running start time is valid
job_running_start_time = rfc3339.parse_datetime(row.get("running_start_time"), with_timezone=True)

# Parse the current time into a datetime object with timezone info
current_time = rfc3339.parse_datetime(rfc3339.utcnow(), with_timezone=True)

# Calculate the elapsed time between job start and now
elapsed = current_time - job_running_start_time

if elapsed > self._cancel_running_job_after:

_log.info(
f"Cancelling long-running job {job.job_id} (after {elapsed}, running since {job_running_start_time})"
)
job.stop()
except OpenEoApiError as e:
_log.error(f"Failed to cancel long-running job {job.job_id}: {e}")

except Exception as e:
_log.error(f"Unexpected error while handling job {job.job_id}: {e}")

def get_job_dir(self, job_id: str) -> Path:
"""Path to directory where job metadata, results and error logs are be saved."""
Expand Down Expand Up @@ -728,6 +737,13 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] =
self.on_job_cancel(the_job, active.loc[i])

if self._cancel_running_job_after and new_status == "running":
if (not active.loc[i, "running_start_time"] or pd.isna(active.loc[i, "running_start_time"])):
_log.warning(
f"Unknown 'running_start_time' for running job {job_id}. Using current time as an approximation."
)
stats["job started running"] += 1
active.loc[i, "running_start_time"] = rfc3339.utcnow()

self._cancel_prolonged_job(the_job, active.loc[i])

active.loc[i, "status"] = new_status
Expand Down
76 changes: 76 additions & 0 deletions tests/extra/job_management/test_job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from time import sleep
from typing import Callable, Union
from unittest import mock
import datetime

import dirty_equals
import geopandas
Expand Down Expand Up @@ -554,6 +555,7 @@ def start_job(row, connection_provider, connection, **kwargs):
12 * 60 * 60,
"finished",
),

],
)
def test_automatic_cancel_of_too_long_running_jobs(
Expand Down Expand Up @@ -645,6 +647,80 @@ def test_status_logging(self, tmp_path, job_manager, job_manager_root_dir, sleep
assert needle.search(caplog.text)


@pytest.mark.parametrize(
["create_time", "start_time", "running_start_time", "end_time", "end_status", "cancel_after_seconds"],
[
# Scenario 1: Missing running_start_time (None)
(
"2024-09-01T09:00:00Z", # Job creation time
"2024-09-01T09:00:00Z", # Job start time (should be 1 hour after create_time)
None, # Missing running_start_time
"2024-09-01T20:00:00Z", # Job end time
"finished", # Job final status
6 * 60 * 60, # Cancel after 6 hours
),
# Scenario 2: NaN running_start_time
(
"2024-09-01T09:00:00Z",
"2024-09-01T09:00:00Z",
float("nan"), # NaN running_start_time
"2024-09-01T20:00:00Z", # Job end time
"finished", # Job final status
6 * 60 * 60, # Cancel after 6 hours
),
]
)
def test_ensure_running_start_time_is_datetime(
self,
tmp_path,
time_machine,
create_time,
start_time,
running_start_time,
end_time,
end_status,
cancel_after_seconds,
dummy_backend_foo,
job_manager_root_dir,
):
def get_status(job_id, current_status):
if rfc3339.utcnow() < start_time:
return "queued"
elif rfc3339.utcnow() < end_time:
return "running"
return end_status

# Set the job status updater function for the mock backend
dummy_backend_foo.job_status_updater = get_status

job_manager = MultiBackendJobManager(
root_dir=job_manager_root_dir, cancel_running_job_after=cancel_after_seconds
)
job_manager.add_backend("foo", connection=dummy_backend_foo.connection)

# Create a DataFrame representing the job database
df = pd.DataFrame({
"year": [2024],
"running_start_time": [running_start_time], # Initial running_start_time
})

# Move the time machine to the job creation time
time_machine.move_to(create_time)

job_db_path = tmp_path / "jobs.csv"

# Mock sleep() to skip one hour at a time instead of actually sleeping
with mock.patch.object(openeo.extra.job_management.time, "sleep", new=lambda s: time_machine.shift(60 * 60)):
job_manager.run_jobs(df=df, start_job=self._create_year_job, job_db=job_db_path)

final_df = CsvJobDatabase(job_db_path).read()

# Validate running_start_time is a valid datetime object
filled_running_start_time = final_df.iloc[0]["running_start_time"]
assert isinstance(rfc3339.parse_datetime(filled_running_start_time), datetime.datetime)



JOB_DB_DF_BASICS = pd.DataFrame(
{
"numbers": [3, 2, 1],
Expand Down
Loading