Skip to content

Commit f676d77

Browse files
committed
Issue #656: make sure CsvJobDatabase uses expected dtypes
1 parent 202408b commit f676d77

File tree

3 files changed

+67
-20
lines changed

3 files changed

+67
-20
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1919

2020
- `MultiBackendJobManager`: Fix issue with duplicate job starting across multiple backends ([#654](https://github.com/Open-EO/openeo-python-client/pull/654))
2121
- `MultiBackendJobManager`: Fix encoding issue of job metadata in `on_job_done` ([#657](https://github.com/Open-EO/openeo-python-client/issues/657))
22+
- `MultiBackendJobManager`: avoid dtype loading mistakes in `CsvJobDatabase` on empty columns ([#656](https://github.com/Open-EO/openeo-python-client/issues/656))
2223

2324

2425
## [0.34.0] - 2024-10-31

openeo/extra/job_management.py

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import abc
22
import collections
33
import contextlib
4+
import dataclasses
45
import datetime
56
import json
67
import logging
@@ -9,7 +10,7 @@
910
import warnings
1011
from pathlib import Path
1112
from threading import Thread
12-
from typing import Callable, Dict, List, NamedTuple, Optional, Union
13+
from typing import Any, Callable, Dict, List, Mapping, NamedTuple, Optional, Union
1314

1415
import numpy
1516
import pandas as pd
@@ -104,6 +105,14 @@ def _start_job_default(row: pd.Series, connection: Connection, *args, **kwargs):
104105
raise NotImplementedError("No 'start_job' callable provided")
105106

106107

108+
@dataclasses.dataclass(frozen=True)
109+
class _ColumnProperties:
110+
"""Expected/required properties of a column in the job manager related dataframes"""
111+
112+
dtype: str = "object"
113+
default: Any = None
114+
115+
107116
class MultiBackendJobManager:
108117
"""
109118
Tracker for multiple jobs on multiple backends.
@@ -171,6 +180,24 @@ def start_job(
171180
Added ``cancel_running_job_after`` parameter.
172181
"""
173182

183+
# Expected columns in the job DB dataframes.
184+
# Mapping of column name to (dtype, default value)
185+
# TODO: make this part of public API when settled?
186+
_COLUMN_REQUIREMENTS: Mapping[str, _ColumnProperties] = {
187+
"id": _ColumnProperties(dtype="str"),
188+
"backend_name": _ColumnProperties(dtype="str"),
189+
"status": _ColumnProperties(dtype="str", default="not_started"),
190+
# TODO: use proper date/time dtype instead of leagacy str for start times?
191+
"start_time": _ColumnProperties(dtype="str"),
192+
"running_start_time": _ColumnProperties(dtype="str"),
193+
# TODO: these columns "cpu", "memory", "duration" are not referenced explicitly from MultiBackendJobManager,
194+
# but are indirectly coupled through handling of VITO-specific "usage" metadata in `_track_statuses`.
195+
# Since bfd99e34 they are not really required to be present anymore, can we make that more explicit?
196+
"cpu": _ColumnProperties(dtype="str"),
197+
"memory": _ColumnProperties(dtype="str"),
198+
"duration": _ColumnProperties(dtype="str"),
199+
}
200+
174201
def __init__(
175202
self,
176203
poll_sleep: int = 60,
@@ -267,31 +294,16 @@ def _make_resilient(connection):
267294
connection.session.mount("https://", HTTPAdapter(max_retries=retries))
268295
connection.session.mount("http://", HTTPAdapter(max_retries=retries))
269296

270-
@staticmethod
271-
def _normalize_df(df: pd.DataFrame) -> pd.DataFrame:
297+
@classmethod
298+
def _normalize_df(cls, df: pd.DataFrame) -> pd.DataFrame:
272299
"""
273300
Normalize given pandas dataframe (creating a new one):
274301
ensure we have the required columns.
275302
276303
:param df: The dataframe to normalize.
277304
:return: a new dataframe that is normalized.
278305
"""
279-
# check for some required columns.
280-
required_with_default = [
281-
("status", "not_started"),
282-
("id", None),
283-
("start_time", None),
284-
("running_start_time", None),
285-
# TODO: columns "cpu", "memory", "duration" are not referenced directly
286-
# within MultiBackendJobManager making it confusing to claim they are required.
287-
# However, they are through assumptions about job "usage" metadata in `_track_statuses`.
288-
# => proposed solution: allow to configure usage columns when adding a backend
289-
("cpu", None),
290-
("memory", None),
291-
("duration", None),
292-
("backend_name", None),
293-
]
294-
new_columns = {col: val for (col, val) in required_with_default if col not in df.columns}
306+
new_columns = {col: req.default for (col, req) in cls._COLUMN_REQUIREMENTS.items() if col not in df.columns}
295307
df = df.assign(**new_columns)
296308

297309
return df
@@ -832,7 +844,10 @@ def _is_valid_wkt(self, wkt: str) -> bool:
832844
return False
833845

834846
def read(self) -> pd.DataFrame:
835-
df = pd.read_csv(self.path)
847+
df = pd.read_csv(
848+
self.path,
849+
dtype={c: r.dtype for (c, r) in MultiBackendJobManager._COLUMN_REQUIREMENTS.items()},
850+
)
836851
if (
837852
"geometry" in df.columns
838853
and df["geometry"].dtype.name != "geometry"

tests/extra/test_job_management.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -702,6 +702,37 @@ def test_automatic_cancel_of_too_long_running_jobs(
702702

703703
assert fake_backend.cancel_job_mock.called == (expected_status == "canceled")
704704

705+
def test_empty_csv_handling(self, tmp_path, requests_mock, sleep_mock, recwarn):
706+
"""
707+
Check how starting from an empty CSV is handled: will columns accepts string values without warning/error?
708+
"""
709+
manager = self._create_basic_mocked_manager(requests_mock, tmp_path)
710+
711+
df = pd.DataFrame({"year": [2021, 2022]})
712+
713+
job_db_path = tmp_path / "jobs.csv"
714+
# Initialize job db and trigger writing it to CSV file
715+
_ = CsvJobDatabase(job_db_path).initialize_from_df(df)
716+
717+
assert job_db_path.exists()
718+
# Simple check for empty columns in the CSV file
719+
assert ",,,,," in job_db_path.read_text()
720+
721+
# Start over with existing file
722+
job_db = CsvJobDatabase(job_db_path)
723+
724+
def start_job(row, connection, **kwargs):
725+
year = int(row["year"])
726+
return BatchJob(job_id=f"job-{year}", connection=connection)
727+
728+
run_stats = manager.run_jobs(job_db=job_db, start_job=start_job)
729+
assert run_stats == dirty_equals.IsPartialDict({"start_job call": 2, "job finished": 2})
730+
731+
result = pd.read_csv(job_db_path)
732+
assert [(r.id, r.status) for r in result.itertuples()] == [("job-2021", "finished"), ("job-2022", "finished")]
733+
734+
assert [(w.category, w.message, str(w)) for w in recwarn.list] == []
735+
705736

706737

707738

0 commit comments

Comments
 (0)