diff --git a/CHANGELOG.md b/CHANGELOG.md index cde43590d..480ced93b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `MultiBackendJobManager`: Fix encoding issue of job metadata in `on_job_done` ([#657](https://github.com/Open-EO/openeo-python-client/issues/657)) - `MultiBackendJobManager`: Avoid `SettingWithCopyWarning` ([#641](https://github.com/Open-EO/openeo-python-client/issues/641)) - Avoid creating empty file if asset download request failed. +- `MultiBackendJobManager`: avoid dtype loading mistakes in `CsvJobDatabase` on empty columns ([#656](https://github.com/Open-EO/openeo-python-client/issues/656)) ## [0.34.0] - 2024-10-31 diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index fa241ff6b..a84d8dadc 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -1,6 +1,7 @@ import abc import collections import contextlib +import dataclasses import datetime import json import logging @@ -9,7 +10,7 @@ import warnings from pathlib import Path from threading import Thread -from typing import Callable, Dict, List, NamedTuple, Optional, Union +from typing import Any, Callable, Dict, List, Mapping, NamedTuple, Optional, Union import numpy import pandas as pd @@ -104,6 +105,14 @@ def _start_job_default(row: pd.Series, connection: Connection, *args, **kwargs): raise NotImplementedError("No 'start_job' callable provided") +@dataclasses.dataclass(frozen=True) +class _ColumnProperties: + """Expected/required properties of a column in the job manager related dataframes""" + + dtype: str = "object" + default: Any = None + + class MultiBackendJobManager: """ Tracker for multiple jobs on multiple backends. @@ -171,6 +180,23 @@ def start_job( Added ``cancel_running_job_after`` parameter. """ + # Expected columns in the job DB dataframes. + # TODO: make this part of public API when settled? + _COLUMN_REQUIREMENTS: Mapping[str, _ColumnProperties] = { + "id": _ColumnProperties(dtype="str"), + "backend_name": _ColumnProperties(dtype="str"), + "status": _ColumnProperties(dtype="str", default="not_started"), + # TODO: use proper date/time dtype instead of legacy str for start times? + "start_time": _ColumnProperties(dtype="str"), + "running_start_time": _ColumnProperties(dtype="str"), + # TODO: these columns "cpu", "memory", "duration" are not referenced explicitly from MultiBackendJobManager, + # but are indirectly coupled through handling of VITO-specific "usage" metadata in `_track_statuses`. + # Since bfd99e34 they are not really required to be present anymore, can we make that more explicit? + "cpu": _ColumnProperties(dtype="str"), + "memory": _ColumnProperties(dtype="str"), + "duration": _ColumnProperties(dtype="str"), + } + def __init__( self, poll_sleep: int = 60, @@ -267,8 +293,8 @@ def _make_resilient(connection): connection.session.mount("https://", HTTPAdapter(max_retries=retries)) connection.session.mount("http://", HTTPAdapter(max_retries=retries)) - @staticmethod - def _normalize_df(df: pd.DataFrame) -> pd.DataFrame: + @classmethod + def _normalize_df(cls, df: pd.DataFrame) -> pd.DataFrame: """ Normalize given pandas dataframe (creating a new one): ensure we have the required columns. @@ -276,22 +302,7 @@ def _normalize_df(df: pd.DataFrame) -> pd.DataFrame: :param df: The dataframe to normalize. :return: a new dataframe that is normalized. """ - # check for some required columns. - required_with_default = [ - ("status", "not_started"), - ("id", None), - ("start_time", None), - ("running_start_time", None), - # TODO: columns "cpu", "memory", "duration" are not referenced directly - # within MultiBackendJobManager making it confusing to claim they are required. - # However, they are through assumptions about job "usage" metadata in `_track_statuses`. - # => proposed solution: allow to configure usage columns when adding a backend - ("cpu", None), - ("memory", None), - ("duration", None), - ("backend_name", None), - ] - new_columns = {col: val for (col, val) in required_with_default if col not in df.columns} + new_columns = {col: req.default for (col, req) in cls._COLUMN_REQUIREMENTS.items() if col not in df.columns} df = df.assign(**new_columns) return df @@ -486,6 +497,9 @@ def _job_update_loop( go through the necessary jobs to check for status updates, trigger status events, start new jobs when there is room for them, etc. """ + if not self.backends: + raise RuntimeError("No backends registered") + stats = stats if stats is not None else collections.defaultdict(int) with ignore_connection_errors(context="get statuses"): @@ -832,7 +846,11 @@ def _is_valid_wkt(self, wkt: str) -> bool: return False def read(self) -> pd.DataFrame: - df = pd.read_csv(self.path) + df = pd.read_csv( + self.path, + # TODO: possible to avoid hidden coupling with MultiBackendJobManager here? + dtype={c: r.dtype for (c, r) in MultiBackendJobManager._COLUMN_REQUIREMENTS.items()}, + ) if ( "geometry" in df.columns and df["geometry"].dtype.name != "geometry" diff --git a/openeo/rest/_testing.py b/openeo/rest/_testing.py index 7dc079d76..a1e036be1 100644 --- a/openeo/rest/_testing.py +++ b/openeo/rest/_testing.py @@ -3,7 +3,17 @@ import collections import json import re -from typing import Callable, Iterable, Iterator, Optional, Sequence, Tuple, Union +from typing import ( + Callable, + Dict, + Iterable, + Iterator, + Mapping, + Optional, + Sequence, + Tuple, + Union, +) from openeo import Connection, DataCube from openeo.rest.vectorcube import VectorCube @@ -32,7 +42,9 @@ class DummyBackend: "validation_requests", "next_result", "next_validation_errors", + "_forced_job_status", "job_status_updater", + "job_id_generator", "extra_job_metadata_fields", ) @@ -53,6 +65,7 @@ def __init__( self.next_result = self.DEFAULT_RESULT self.next_validation_errors = [] self.extra_job_metadata_fields = [] + self._forced_job_status: Dict[str, str] = {} # Job status update hook: # callable that is called on starting a job, and getting job metadata @@ -60,6 +73,12 @@ def __init__( # By default: immediately set to "finished" once job is started self.job_status_updater = lambda job_id, current_status: "finished" + # Optional job id generator hook: + # callable that generates a job id, e.g. based on the process graph. + # When set to None, or the callable returns None, or it returns an existing job id: + # things fall back to auto-increment job ids ("job-000", "job-001", "job-002", ...) + self.job_id_generator: Optional[Callable[[dict], str]] = None + requests_mock.post( connection.build_url("/result"), content=self._handle_post_result, @@ -75,10 +94,18 @@ def __init__( requests_mock.get( re.compile(connection.build_url(r"/jobs/(job-\d+)/results$")), json=self._handle_get_job_results ) + requests_mock.delete( + re.compile(connection.build_url(r"/jobs/(job-\d+)/results$")), json=self._handle_delete_job_results + ) requests_mock.get( re.compile(connection.build_url("/jobs/(.*?)/results/result.data$")), content=self._handle_get_job_result_asset, ) + requests_mock.get( + re.compile(connection.build_url(r"/jobs/(.*?)/logs($|\?.*)")), + # TODO: need to fine-tune dummy logs? + json={"logs": [], "links": []}, + ) requests_mock.post(connection.build_url("/validation"), json=self._handle_post_validation) @classmethod @@ -88,7 +115,7 @@ def at_url(cls, root_url: str, *, requests_mock, capabilities: Optional[dict] = including creation of connection and mocking of capabilities doc """ root_url = root_url.rstrip("/") + "/" - requests_mock.get(root_url, json=build_capabilities(**(capabilities or None))) + requests_mock.get(root_url, json=build_capabilities(**(capabilities or {}))) connection = Connection(root_url) return cls(requests_mock=requests_mock, connection=connection) @@ -150,7 +177,14 @@ def _handle_post_jobs(self, request, context): """handler of `POST /jobs` (create batch job)""" post_data = request.json() pg = post_data["process"]["process_graph"] - job_id = f"job-{len(self.batch_jobs):03d}" + + # Generate (new) job id + job_id = self.job_id_generator and self.job_id_generator(process_graph=pg) + if not job_id or job_id in self.batch_jobs: + # As fallback: use auto-increment job ids ("job-000", "job-001", "job-002", ...) + job_id = f"job-{len(self.batch_jobs):03d}" + assert job_id not in self.batch_jobs + job_data = {"job_id": job_id, "pg": pg, "status": "created"} for field in ["title", "description"]: if field in post_data: @@ -169,11 +203,16 @@ def _get_job_id(self, request) -> str: assert job_id in self.batch_jobs return job_id + def _get_job_status(self, job_id: str, current_status: str) -> str: + if job_id in self._forced_job_status: + return self._forced_job_status[job_id] + return self.job_status_updater(job_id=job_id, current_status=current_status) + def _handle_post_job_results(self, request, context): """Handler of `POST /job/{job_id}/results` (start batch job).""" job_id = self._get_job_id(request) assert self.batch_jobs[job_id]["status"] == "created" - self.batch_jobs[job_id]["status"] = self.job_status_updater( + self.batch_jobs[job_id]["status"] = self._get_job_status( job_id=job_id, current_status=self.batch_jobs[job_id]["status"] ) context.status_code = 202 @@ -183,10 +222,14 @@ def _handle_get_job(self, request, context): job_id = self._get_job_id(request) # Allow updating status with `job_status_setter` once job got past status "created" if self.batch_jobs[job_id]["status"] != "created": - self.batch_jobs[job_id]["status"] = self.job_status_updater( + self.batch_jobs[job_id]["status"] = self._get_job_status( job_id=job_id, current_status=self.batch_jobs[job_id]["status"] ) - return {"id": job_id, "status": self.batch_jobs[job_id]["status"]} + return { + # TODO: add some more required fields like "process" and "created"? + "id": job_id, + "status": self.batch_jobs[job_id]["status"], + } def _handle_get_job_results(self, request, context): """Handler of `GET /job/{job_id}/results` (list batch job results).""" @@ -197,6 +240,13 @@ def _handle_get_job_results(self, request, context): "assets": {"result.data": {"href": self.connection.build_url(f"/jobs/{job_id}/results/result.data")}}, } + def _handle_delete_job_results(self, request, context): + """Handler of `DELETE /job/{job_id}/results` (cancel job).""" + job_id = self._get_job_id(request) + self.batch_jobs[job_id]["status"] = "canceled" + self._forced_job_status[job_id] = "canceled" + context.status_code = 204 + def _handle_get_job_result_asset(self, request, context): """Handler of `GET /job/{job_id}/results/result.data` (get batch job result asset).""" job_id = self._get_job_id(request) @@ -261,18 +311,30 @@ def execute(self, cube: Union[DataCube, VectorCube], process_id: Optional[str] = cube.execute() return self.get_pg(process_id=process_id) - def setup_simple_job_status_flow(self, *, queued: int = 1, running: int = 4, final: str = "finished"): + def setup_simple_job_status_flow( + self, + *, + queued: int = 1, + running: int = 4, + final: str = "finished", + final_per_job: Optional[Mapping[str, str]] = None, + ): """ Set up simple job status flow: - queued (a couple of times) -> running (a couple of times) -> finished/error. + + queued (a couple of times) -> running (a couple of times) -> finished/error. + + Final state can be specified generically with arg `final` + and, optionally, further fine-tuned per job with `final_per_job`. """ - template = ["queued"] * queued + ["running"] * running + [final] + template = ["queued"] * queued + ["running"] * running job_stacks = collections.defaultdict(template.copy) + final_per_job = final_per_job or {} def get_status(job_id: str, current_status: str) -> str: stack = job_stacks[job_id] - # Pop first item each time, but repeat the last one at the end - return stack.pop(0) if len(stack) > 1 else stack[0] + # Pop first item each time, unless we're in final state + return stack.pop(0) if len(stack) > 0 else final_per_job.get(job_id, final) self.job_status_updater = get_status diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index 49d131d5f..e484c27d3 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -2,6 +2,7 @@ import json import re import threading +from pathlib import Path from time import sleep from typing import Callable, Union from unittest import mock @@ -45,45 +46,29 @@ def con(requests_mock) -> openeo.Connection: return con -class FakeBackend: - """ - Fake openEO backend with some basic job management functionality for testing job manager logic. - """ +def _job_id_from_year(process_graph) -> Union[str, None]: + """Job id generator that extracts the year from the process graph""" + try: + (year,) = (n["arguments"]["year"] for n in process_graph.values()) + return f"job-{year}" + except Exception: + pass - # TODO: replace/merge with openeo.rest._testing.DummyBackend - def __init__(self, *, backend_root_url: str = "http://openeo.test", requests_mock): - self.url = backend_root_url.rstrip("/") - requests_mock.get(f"{self.url}/", json={"api_version": "1.1.0"}) - self.job_db = {} - self.get_job_metadata_mock = requests_mock.get( - re.compile(rf"^{self.url}/jobs/[\w_-]*$"), - json=self._handle_get_job_metadata, - ) - self.cancel_job_mock = requests_mock.delete( - re.compile(rf"^{self.url}/jobs/[\w_-]*/results$"), - json=self._handle_cancel_job, - ) - requests_mock.get(re.compile(rf"^{self.url}/jobs/[\w_-]*/results"), json={"links": []}) - - def set_job_status(self, job_id: str, status: Union[str, Callable[[], str]]): - self.job_db.setdefault(job_id, {})["status"] = status - - def get_job_status(self, job_id: str): - status = self.job_db[job_id]["status"] - if callable(status): - status = status() - return status +@pytest.fixture +def dummy_backend_foo(requests_mock) -> DummyBackend: + dummy = DummyBackend.at_url("https://foo.test", requests_mock=requests_mock) + dummy.setup_simple_job_status_flow(queued=3, running=5) + dummy.job_id_generator = _job_id_from_year + return dummy - def _handle_get_job_metadata(self, request, context): - job_id = request.path.split("/")[-1] - return {"id": job_id, "status": self.get_job_status(job_id)} - def _handle_cancel_job(self, request, context): - job_id = request.path.split("/")[-2] - assert self.get_job_status(job_id) == "running" - self.set_job_status(job_id, "canceled") - context.status_code = 204 +@pytest.fixture +def dummy_backend_bar(requests_mock) -> DummyBackend: + dummy = DummyBackend.at_url("https://bar.test", requests_mock=requests_mock) + dummy.setup_simple_job_status_flow(queued=5, running=8) + dummy.job_id_generator = _job_id_from_year + return dummy @pytest.fixture @@ -91,16 +76,31 @@ def sleep_mock(): with mock.patch("time.sleep") as sleep: yield sleep + class TestMultiBackendJobManager: + @pytest.fixture + def job_manager_root_dir(self, tmp_path): + return tmp_path / "job_mgr_root" + + @pytest.fixture + def job_manager(self, job_manager_root_dir, dummy_backend_foo, dummy_backend_bar): + manager = MultiBackendJobManager(root_dir=job_manager_root_dir) + manager.add_backend("foo", connection=dummy_backend_foo.connection) + manager.add_backend("bar", connection=dummy_backend_bar.connection) + return manager + @staticmethod + def _create_year_job(row, connection, **kwargs): + """Job creation callable to use with MultiBackendJobManager run_jobs""" + year = int(row["year"]) + pg = {"yearify": {"process_id": "yearify", "arguments": {"year": year}, "result": True}} + return connection.create_job(pg) - def test_basic_legacy(self, tmp_path, requests_mock, sleep_mock): + def test_basic_legacy(self, tmp_path, job_manager, job_manager_root_dir, sleep_mock): """ Legacy `run_jobs()` usage with explicit dataframe and output file """ - manager = self._create_basic_mocked_manager(requests_mock, tmp_path) - df = pd.DataFrame( { "year": [2018, 2019, 2020, 2021, 2022], @@ -108,13 +108,9 @@ def test_basic_legacy(self, tmp_path, requests_mock, sleep_mock): "geometry": ["POINT (1 2)"] * 5, } ) - output_file = tmp_path / "jobs.csv" + job_db_path = tmp_path / "jobs.csv" - def start_job(row, connection, **kwargs): - year = int(row["year"]) - return BatchJob(job_id=f"job-{year}", connection=connection) - - run_stats = manager.run_jobs(df=df, start_job=start_job, output_file=output_file) + run_stats = job_manager.run_jobs(df=df, start_job=self._create_year_job, output_file=job_db_path) assert run_stats == dirty_equals.IsPartialDict( { "sleep": dirty_equals.IsInt(gt=10), @@ -126,23 +122,26 @@ def start_job(row, connection, **kwargs): } ) - result = pd.read_csv(output_file) - assert len(result) == 5 - assert set(result.status) == {"finished"} - assert set(result.backend_name) == {"foo", "bar"} + assert [(r.id, r.status, r.backend_name) for r in pd.read_csv(job_db_path).itertuples()] == [ + ("job-2018", "finished", "foo"), + ("job-2019", "finished", "foo"), + ("job-2020", "finished", "bar"), + ("job-2021", "finished", "bar"), + ("job-2022", "finished", "foo"), + ] - # We expect that the job metadata was saved, so verify that it exists. - # Checking for one of the jobs is enough. - metadata_path = manager.get_job_metadata_path(job_id="job-2022") - assert metadata_path.exists() + # Check downloaded results and metadata. + assert set(p.relative_to(job_manager_root_dir) for p in job_manager_root_dir.glob("**/*.*")) == { + Path(f"job_{job_id}") / filename + for job_id in ["job-2018", "job-2019", "job-2020", "job-2021", "job-2022"] + for filename in ["job-results.json", f"job_{job_id}.json", "result.data"] + } - def test_basic(self, tmp_path, requests_mock, sleep_mock): + def test_basic(self, tmp_path, job_manager, job_manager_root_dir, sleep_mock): """ `run_jobs()` usage with a `CsvJobDatabase` (and no explicit dataframe or output file) """ - manager = self._create_basic_mocked_manager(requests_mock, tmp_path) - df = pd.DataFrame( { "year": [2018, 2019, 2020, 2021, 2022], @@ -150,15 +149,11 @@ def test_basic(self, tmp_path, requests_mock, sleep_mock): "geometry": ["POINT (1 2)"] * 5, } ) - output_file = tmp_path / "jobs.csv" + job_db_path = tmp_path / "jobs.csv" - def start_job(row, connection, **kwargs): - year = int(row["year"]) - return BatchJob(job_id=f"job-{year}", connection=connection) + job_db = CsvJobDatabase(job_db_path).initialize_from_df(df) - job_db = CsvJobDatabase(output_file).initialize_from_df(df) - - run_stats = manager.run_jobs(job_db=job_db, start_job=start_job) + run_stats = job_manager.run_jobs(job_db=job_db, start_job=self._create_year_job) assert run_stats == dirty_equals.IsPartialDict( { "sleep": dirty_equals.IsInt(gt=10), @@ -170,32 +165,32 @@ def start_job(row, connection, **kwargs): } ) - result = pd.read_csv(output_file) - assert len(result) == 5 - assert set(result.status) == {"finished"} - assert set(result.backend_name) == {"foo", "bar"} + assert [(r.id, r.status, r.backend_name) for r in pd.read_csv(job_db_path).itertuples()] == [ + ("job-2018", "finished", "foo"), + ("job-2019", "finished", "foo"), + ("job-2020", "finished", "bar"), + ("job-2021", "finished", "bar"), + ("job-2022", "finished", "foo"), + ] - # We expect that the job metadata was saved, so verify that it exists. - # Checking for one of the jobs is enough. - metadata_path = manager.get_job_metadata_path(job_id="job-2022") - assert metadata_path.exists() + # Check downloaded results and metadata. + assert set(p.relative_to(job_manager_root_dir) for p in job_manager_root_dir.glob("**/*.*")) == { + Path(f"job_{job_id}") / filename + for job_id in ["job-2018", "job-2019", "job-2020", "job-2021", "job-2022"] + for filename in ["job-results.json", f"job_{job_id}.json", "result.data"] + } @pytest.mark.parametrize("db_class", [CsvJobDatabase, ParquetJobDatabase]) - def test_db_class(self, tmp_path, requests_mock, sleep_mock, db_class): + def test_db_class(self, tmp_path, job_manager, job_manager_root_dir, sleep_mock, db_class): """ Basic run parameterized on database class """ - manager = self._create_basic_mocked_manager(requests_mock, tmp_path) - - def start_job(row, connection, **kwargs): - year = int(row["year"]) - return BatchJob(job_id=f"job-{year}", connection=connection) df = pd.DataFrame({"year": [2018, 2019, 2020, 2021, 2022]}) output_file = tmp_path / "jobs.db" job_db = db_class(output_file).initialize_from_df(df) - run_stats = manager.run_jobs(job_db=job_db, start_job=start_job) + run_stats = job_manager.run_jobs(job_db=job_db, start_job=self._create_year_job) assert run_stats == dirty_equals.IsPartialDict( { "start_job call": 5, @@ -216,21 +211,16 @@ def start_job(row, connection, **kwargs): ("jobz.parquet", ParquetJobDatabase), ], ) - def test_create_job_db(self, tmp_path, requests_mock, sleep_mock, filename, expected_db_class): + def test_create_job_db(self, tmp_path, job_manager, job_manager_root_dir, sleep_mock, filename, expected_db_class): """ Basic run with `create_job_db()` usage """ - manager = self._create_basic_mocked_manager(requests_mock, tmp_path) - - def start_job(row, connection, **kwargs): - year = int(row["year"]) - return BatchJob(job_id=f"job-{year}", connection=connection) df = pd.DataFrame({"year": [2018, 2019, 2020, 2021, 2022]}) output_file = tmp_path / filename job_db = create_job_db(path=output_file, df=df) - run_stats = manager.run_jobs(job_db=job_db, start_job=start_job) + run_stats = job_manager.run_jobs(job_db=job_db, start_job=self._create_year_job) assert run_stats == dirty_equals.IsPartialDict( { "start_job call": 5, @@ -244,9 +234,7 @@ def start_job(row, connection, **kwargs): assert set(result.status) == {"finished"} assert set(result.backend_name) == {"foo", "bar"} - def test_basic_threading(self, tmp_path, requests_mock, sleep_mock): - manager = self._create_basic_mocked_manager(requests_mock, tmp_path) - + def test_basic_threading(self, tmp_path, job_manager, job_manager_root_dir, sleep_mock): df = pd.DataFrame( { "year": [2018, 2019, 2020, 2021, 2022], @@ -254,89 +242,31 @@ def test_basic_threading(self, tmp_path, requests_mock, sleep_mock): "geometry": ["POINT (1 2)"] * 5, } ) - output_file = tmp_path / "jobs.csv" - - def start_job(row, connection, **kwargs): - year = int(row["year"]) - return BatchJob(job_id=f"job-{year}", connection=connection) + job_db_path = tmp_path / "jobs.csv" - job_db = CsvJobDatabase(output_file).initialize_from_df(df) + job_db = CsvJobDatabase(job_db_path).initialize_from_df(df) - manager.start_job_thread(start_job=start_job, job_db=job_db) + job_manager.start_job_thread(start_job=self._create_year_job, job_db=job_db) # Trigger context switch to job thread sleep(1) - manager.stop_job_thread() + job_manager.stop_job_thread() # TODO #645 how to collect stats with the threaded run_job? assert sleep_mock.call_count > 10 - result = pd.read_csv(output_file) - assert len(result) == 5 - assert set(result.status) == {"finished"} - assert set(result.backend_name) == {"foo", "bar"} + assert [(r.id, r.status, r.backend_name) for r in pd.read_csv(job_db_path).itertuples()] == [ + ("job-2018", "finished", "foo"), + ("job-2019", "finished", "foo"), + ("job-2020", "finished", "bar"), + ("job-2021", "finished", "bar"), + ("job-2022", "finished", "foo"), + ] - # We expect that the job metadata was saved, so verify that it exists. - # Checking for one of the jobs is enough. - metadata_path = manager.get_job_metadata_path(job_id="job-2022") - assert metadata_path.exists() - - def _create_basic_mocked_manager(self, requests_mock, tmp_path): - # TODO: separate aspects of job manager and dummy backends - requests_mock.get("http://foo.test/", json={"api_version": "1.1.0"}) - requests_mock.get("http://bar.test/", json={"api_version": "1.1.0"}) - - def mock_job_status(job_id, queued=1, running=2): - """Mock job status polling sequence""" - response_list = sum( - [ - [ - { - "json": { - "id": job_id, - "title": f"Job {job_id}", - "status": "queued", - } - } - ] - * queued, - [ - { - "json": { - "id": job_id, - "title": f"Job {job_id}", - "status": "running", - } - } - ] - * running, - [ - { - "json": { - "id": job_id, - "title": f"Job {job_id}", - "status": "finished", - } - } - ], - ], - [], - ) - for backend in ["http://foo.test", "http://bar.test"]: - requests_mock.get(f"{backend}/jobs/{job_id}", response_list) - # It also needs the job results endpoint, though that can be a dummy implementation. - # When the job is finished the system tries to download the results and that is what - # needs this endpoint. - requests_mock.get(f"{backend}/jobs/{job_id}/results", json={"links": []}) - - mock_job_status("job-2018", queued=1, running=2) - mock_job_status("job-2019", queued=2, running=3) - mock_job_status("job-2020", queued=3, running=4) - mock_job_status("job-2021", queued=3, running=5) - mock_job_status("job-2022", queued=5, running=6) - root_dir = tmp_path / "job_mgr_root" - manager = MultiBackendJobManager(root_dir=root_dir) - manager.add_backend("foo", connection=openeo.connect("http://foo.test")) - manager.add_backend("bar", connection=openeo.connect("http://bar.test")) - return manager + # Check downloaded results and metadata. + assert set(p.relative_to(job_manager_root_dir) for p in job_manager_root_dir.glob("**/*.*")) == { + Path(f"job_{job_id}") / filename + for job_id in ["job-2018", "job-2019", "job-2020", "job-2021", "job-2022"] + for filename in ["job-results.json", f"job_{job_id}.json", "result.data"] + } def test_normalize_df(self): df = pd.DataFrame({"some_number": [3, 2, 1]}) @@ -355,7 +285,9 @@ def test_normalize_df(self): ] ) - def test_manager_must_exit_when_all_jobs_done(self, tmp_path, requests_mock, sleep_mock): + def test_manager_must_exit_when_all_jobs_done( + self, tmp_path, sleep_mock, job_manager, job_manager_root_dir, dummy_backend_foo, dummy_backend_bar + ): """Make sure the MultiBackendJobManager does not hang after all processes finish. Regression test for: @@ -364,84 +296,12 @@ def test_manager_must_exit_when_all_jobs_done(self, tmp_path, requests_mock, sle Cause was that the run_jobs had an infinite loop when jobs ended with status error. """ - requests_mock.get("http://foo.test/", json={"api_version": "1.1.0"}) - requests_mock.get("http://bar.test/", json={"api_version": "1.1.0"}) - - def mock_job_status(job_id, succeeds: bool): - """Mock job status polling sequence. - We set up one job with finishes with status error - """ - response_list = sum( - [ - [ - { - "json": { - "id": job_id, - "title": f"Job {job_id}", - "status": "queued", - } - } - ], - [ - { - "json": { - "id": job_id, - "title": f"Job {job_id}", - "status": "running", - } - } - ], - [ - { - "json": { - "id": job_id, - "title": f"Job {job_id}", - "status": "finished" if succeeds else "error", - } - } - ], - ], - [], - ) - for backend in ["http://foo.test", "http://bar.test"]: - requests_mock.get(f"{backend}/jobs/{job_id}", response_list) - # It also needs job results endpoint for succesful jobs and the - # log endpoint for a failed job. Both are dummy implementations. - # When the job is finished the system tries to download the - # results or the logs and that is what needs these endpoints. - if succeeds: - requests_mock.get(f"{backend}/jobs/{job_id}/results", json={"links": []}) - else: - response = { - "level": "error", - "logs": [ - { - "id": "1", - "code": "SampleError", - "level": "error", - "message": "Error for testing", - "time": "2019-08-24T14:15:22Z", - "data": None, - "path": [], - "usage": {}, - "links": [], - } - ], - "links": [], - } - requests_mock.get(f"{backend}/jobs/{job_id}/logs?level=error", json=response) - - mock_job_status("job-2018", succeeds=True) - mock_job_status("job-2019", succeeds=True) - mock_job_status("job-2020", succeeds=True) - mock_job_status("job-2021", succeeds=True) - mock_job_status("job-2022", succeeds=False) - - root_dir = tmp_path / "job_mgr_root" - manager = MultiBackendJobManager(root_dir=root_dir) - - manager.add_backend("foo", connection=openeo.connect("http://foo.test")) - manager.add_backend("bar", connection=openeo.connect("http://bar.test")) + dummy_backend_foo.setup_simple_job_status_flow( + queued=2, running=3, final="finished", final_per_job={"job-2022": "error"} + ) + dummy_backend_bar.setup_simple_job_status_flow( + queued=2, running=3, final="finished", final_per_job={"job-2022": "error"} + ) df = pd.DataFrame( { @@ -450,16 +310,13 @@ def mock_job_status(job_id, succeeds: bool): "geometry": ["POINT (1 2)"] * 5, } ) - output_file = tmp_path / "jobs.csv" - - def start_job(row, connection, **kwargs): - year = int(row["year"]) - return BatchJob(job_id=f"job-{year}", connection=connection) + job_db_path = tmp_path / "jobs.csv" + job_db = CsvJobDatabase(job_db_path).initialize_from_df(df) is_done_file = tmp_path / "is_done.txt" def start_worker_thread(): - manager.run_jobs(df=df, start_job=start_job, output_file=output_file) + job_manager.run_jobs(job_db=job_db, start_job=self._create_year_job) is_done_file.write_text("Done!") thread = threading.Thread(target=start_worker_thread, name="Worker process", daemon=True) @@ -474,17 +331,21 @@ def start_worker_thread(): "MultiBackendJobManager did not finish on its own and was killed. " + "Infinite loop is probable." ) - # Also check that we got sensible end results. - result = pd.read_csv(output_file) - assert len(result) == 5 - assert set(result.status) == {"finished", "error"} - assert set(result.backend_name) == {"foo", "bar"} + # Also check that we got sensible end results in the job db. + assert [(r.id, r.status, r.backend_name) for r in pd.read_csv(job_db_path).itertuples()] == [ + ("job-2018", "finished", "foo"), + ("job-2019", "finished", "foo"), + ("job-2020", "finished", "bar"), + ("job-2021", "finished", "bar"), + ("job-2022", "error", "foo"), + ] - # We expect that the job metadata was saved for a successful job, - # so verify that it exists. - # Checking it for one of the jobs is enough. - metadata_path = manager.get_job_metadata_path(job_id="job-2021") - assert metadata_path.exists() + # Check downloaded results and metadata. + assert set(p.relative_to(job_manager_root_dir) for p in job_manager_root_dir.glob("**/*.*")) == { + Path(f"job_{job_id}") / filename + for job_id in ["job-2018", "job-2019", "job-2020", "job-2021"] + for filename in ["job-results.json", f"job_{job_id}.json", "result.data"] + } def test_on_error_log(self, tmp_path, requests_mock): backend = "http://foo.test" @@ -573,9 +434,9 @@ def start_job(row, connection_provider, connection, **kwargs): year = int(row["year"]) return BatchJob(job_id=f"job-{year}", connection=connection) - output_file = tmp_path / "jobs.csv" + job_db_path = tmp_path / "jobs.csv" - run_stats = manager.run_jobs(df=df, start_job=start_job, output_file=output_file) + run_stats = manager.run_jobs(df=df, start_job=start_job, output_file=job_db_path) assert run_stats == dirty_equals.IsPartialDict( { "start_job call": 1, @@ -583,10 +444,9 @@ def start_job(row, connection_provider, connection, **kwargs): ) # Sanity check: the job succeeded - result = pd.read_csv(output_file) - assert len(result) == 1 - assert set(result.status) == {"finished"} - assert set(result.backend_name) == {"foo"} + assert [(r.id, r.status, r.backend_name) for r in pd.read_csv(job_db_path).itertuples()] == [ + ("job-2018", "finished", "foo"), + ] @httpretty.activate(allow_net_connect=False, verbose=True) @pytest.mark.parametrize("http_error_status", [502, 503, 504]) @@ -646,61 +506,115 @@ def start_job(row, connection_provider, connection, **kwargs): year = int(row["year"]) return BatchJob(job_id=f"job-{year}", connection=connection) - output_file = tmp_path / "jobs.csv" + job_db_path = tmp_path / "jobs.csv" with pytest.raises(requests.exceptions.RetryError) as exc: - manager.run_jobs(df=df, start_job=start_job, output_file=output_file) + manager.run_jobs(df=df, start_job=start_job, output_file=job_db_path) # TODO #645 how to still check stats when run_jobs raised exception? assert sleep_mock.call_count > 3 # Sanity check: the job has status "error" - result = pd.read_csv(output_file) - assert len(result) == 1 - assert set(result.status) == {"running"} - assert set(result.backend_name) == {"foo"} + assert [(r.id, r.status, r.backend_name) for r in pd.read_csv(job_db_path).itertuples()] == [ + ("job-2018", "running", "foo"), + ] @pytest.mark.parametrize( - ["start_time", "end_time", "end_status", "cancel_after_seconds", "expected_status"], + ["create_time", "start_time", "end_time", "end_status", "cancel_after_seconds", "expected_status"], [ - ("2024-09-01T10:00:00Z", "2024-09-01T20:00:00Z", "finished", 6 * 60 * 60, "canceled"), - ("2024-09-01T10:00:00Z", "2024-09-01T20:00:00Z", "finished", 12 * 60 * 60, "finished"), + ( + "2024-09-01T9:00:00Z", + "2024-09-01T10:00:00Z", + "2024-09-01T20:00:00Z", + "finished", + 6 * 60 * 60, + "canceled", + ), + ( + "2024-09-01T09:00:00Z", + "2024-09-01T10:00:00Z", + "2024-09-01T20:00:00Z", + "finished", + 12 * 60 * 60, + "finished", + ), ], ) def test_automatic_cancel_of_too_long_running_jobs( self, - requests_mock, tmp_path, time_machine, + create_time, start_time, end_time, end_status, cancel_after_seconds, expected_status, + dummy_backend_foo, + job_manager_root_dir, ): - fake_backend = FakeBackend(requests_mock=requests_mock) + def get_status(job_id, current_status): + if rfc3339.utcnow() < start_time: + return "queued" + elif rfc3339.utcnow() < end_time: + return "running" + return end_status - # For simplicity, set up pre-existing job with status "running" (instead of job manager creating+starting it) - job_id = "job-123" - fake_backend.set_job_status(job_id, lambda: "running" if rfc3339.utcnow() < end_time else end_status) + dummy_backend_foo.job_status_updater = get_status - manager = MultiBackendJobManager(root_dir=tmp_path, cancel_running_job_after=cancel_after_seconds) - manager.add_backend("foo", connection=openeo.connect(fake_backend.url)) + job_manager = MultiBackendJobManager( + root_dir=job_manager_root_dir, cancel_running_job_after=cancel_after_seconds + ) + job_manager.add_backend("foo", connection=dummy_backend_foo.connection) - # Initialize data frame with status "created" (to make sure the start of "running" state is recorded) - df = pd.DataFrame({"id": [job_id], "backend_name": ["foo"], "status": ["created"]}) + df = pd.DataFrame({"year": [2024]}) - time_machine.move_to(start_time) + time_machine.move_to(create_time) + job_db_path = tmp_path / "jobs.csv" # Mock sleep() to not actually sleep, but skip one hour at a time with mock.patch.object(openeo.extra.job_management.time, "sleep", new=lambda s: time_machine.shift(60 * 60)): - manager.run_jobs(df=df, start_job=lambda **kwargs: None, job_db=tmp_path / "jobs.csv") + job_manager.run_jobs(df=df, start_job=self._create_year_job, job_db=job_db_path) - final_df = CsvJobDatabase(tmp_path / "jobs.csv").read() + final_df = CsvJobDatabase(job_db_path).read() assert final_df.iloc[0].to_dict() == dirty_equals.IsPartialDict( - id="job-123", status=expected_status, running_start_time="2024-09-01T10:00:00Z" + id="job-2024", status=expected_status, running_start_time="2024-09-01T10:00:00Z" ) - assert fake_backend.cancel_job_mock.called == (expected_status == "canceled") + assert dummy_backend_foo.batch_jobs == { + "job-2024": { + "job_id": "job-2024", + "pg": {"yearify": {"process_id": "yearify", "arguments": {"year": 2024}, "result": True}}, + "status": expected_status, + } + } + + def test_empty_csv_handling(self, tmp_path, sleep_mock, recwarn, job_manager): + """ + Check how starting from an empty CSV is handled: + will empty columns accepts string values without warning/error? + """ + df = pd.DataFrame({"year": [2021, 2022]}) + + job_db_path = tmp_path / "jobs.csv" + # Initialize job db and trigger writing it to CSV file + _ = CsvJobDatabase(job_db_path).initialize_from_df(df) + + assert job_db_path.exists() + # Simple check for empty columns in the CSV file + assert ",,,,," in job_db_path.read_text() + + # Start over with existing file + job_db = CsvJobDatabase(job_db_path) + + run_stats = job_manager.run_jobs(job_db=job_db, start_job=self._create_year_job) + assert run_stats == dirty_equals.IsPartialDict({"start_job call": 2, "job finished": 2}) + + assert [(r.id, r.status) for r in pd.read_csv(job_db_path).itertuples()] == [ + ("job-2021", "finished"), + ("job-2022", "finished"), + ] + + assert [(w.category, w.message, str(w)) for w in recwarn.list] == [] diff --git a/tests/rest/test_testing.py b/tests/rest/test_testing.py index 8feb63aa0..0c28fb391 100644 --- a/tests/rest/test_testing.py +++ b/tests/rest/test_testing.py @@ -49,7 +49,7 @@ def test_setup_simple_job_status_flow(self, dummy_backend, con120, final): job = con120.create_job(DUMMY_PG_ADD35) assert dummy_backend.batch_jobs["job-000"]["status"] == "created" - # Note that first status update (to queued here) is triggered from `start()`, not `status()` like below + # Note that first status update (to "queued" here) is triggered from `start()`, not `status()` like below job.start() assert dummy_backend.batch_jobs["job-000"]["status"] == "queued" @@ -62,3 +62,35 @@ def test_setup_simple_job_status_flow(self, dummy_backend, con120, final): assert job.status() == final assert job.status() == final assert job.status() == final + + def test_setup_simple_job_status_flow_final_per_job(self, dummy_backend, con120): + """Test per-job specific final status""" + dummy_backend.setup_simple_job_status_flow( + queued=2, running=3, final="finished", final_per_job={"job-001": "error"} + ) + job0 = con120.create_job(DUMMY_PG_ADD35) + job1 = con120.create_job(DUMMY_PG_ADD35) + job2 = con120.create_job(DUMMY_PG_ADD35) + assert dummy_backend.batch_jobs["job-000"]["status"] == "created" + assert dummy_backend.batch_jobs["job-001"]["status"] == "created" + assert dummy_backend.batch_jobs["job-002"]["status"] == "created" + + # Note that first status update (to "queued" here) is triggered from `start()`, not `status()` like below + job0.start() + job1.start() + job2.start() + assert dummy_backend.batch_jobs["job-000"]["status"] == "queued" + assert dummy_backend.batch_jobs["job-001"]["status"] == "queued" + assert dummy_backend.batch_jobs["job-002"]["status"] == "queued" + + # Now go through rest of status flow, through `status()` calls + for expected_status in ["queued", "running", "running", "running"]: + assert job0.status() == expected_status + assert job1.status() == expected_status + assert job2.status() == expected_status + + # Differentiation in final state + for _ in range(3): + assert job0.status() == "finished" + assert job1.status() == "error" + assert job2.status() == "finished"