Skip to content

Commit 9ab60d6

Browse files
committed
Issue #635 add basic "on_exists" modes to initialize_from_df
1 parent 9f50cc4 commit 9ab60d6

File tree

2 files changed

+115
-5
lines changed

2 files changed

+115
-5
lines changed

openeo/extra/job_management.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -267,8 +267,6 @@ def _normalize_df(df: pd.DataFrame) -> pd.DataFrame:
267267
:param df: The dataframe to normalize.
268268
:return: a new dataframe that is normalized.
269269
"""
270-
# TODO: this was originally an internal helper, but we need a clean public API for the user
271-
272270
# check for some required columns.
273271
required_with_default = [
274272
("status", "not_started"),
@@ -699,20 +697,30 @@ def __init__(self):
699697
super().__init__()
700698
self._df = None
701699

702-
def initialize_from_df(self, df: pd.DataFrame):
700+
def initialize_from_df(self, df: pd.DataFrame, on_exists: str = "error"):
703701
"""
704702
Initialize the job database from a given dataframe,
705703
which will be first normalized to be compatible
706704
with :py:class:`MultiBackendJobManager` usage.
707705
708-
:param df: data frame with some columns that
706+
:param df: dataframe with some columns your ``start_job`` callable expects
707+
:param on_exists: what to do when the job database already exists (persisted on disk):
708+
- "error": (default) raise an exception
709+
- "skip": work with existing database, ignore given dataframe and skip any initialization
710+
709711
:return: initialized job database.
710712
711713
.. versionadded:: 0.33.0
712714
"""
713715
# TODO: option to provide custom MultiBackendJobManager subclass with custom normalize?
714716
if self.exists():
715-
raise RuntimeError(f"Job database {self!r} already exists.")
717+
if on_exists == "skip":
718+
return self
719+
elif on_exists == "error":
720+
raise FileExistsError(f"Job database {self!r} already exists.")
721+
else:
722+
# TODO handle other on_exists modes: e.g. overwrite, merge, ...
723+
raise ValueError(f"Invalid on_exists={on_exists!r}")
716724
df = MultiBackendJobManager._normalize_df(df)
717725
self.persist(df)
718726
# Return self to allow chaining with constructor.

tests/extra/test_job_management.py

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,29 @@ def start_job(row, connection, **kwargs):
146146
metadata_path = manager.get_job_metadata_path(job_id="job-2022")
147147
assert metadata_path.exists()
148148

149+
@pytest.mark.parametrize("db_class", [CsvJobDatabase, ParquetJobDatabase])
150+
def test_db_class(self, tmp_path, requests_mock, sleep_mock, db_class):
151+
"""
152+
Basic run parameterized on database class
153+
"""
154+
manager = self._create_basic_mocked_manager(requests_mock, tmp_path)
155+
156+
def start_job(row, connection, **kwargs):
157+
year = int(row["year"])
158+
return BatchJob(job_id=f"job-{year}", connection=connection)
159+
160+
df = pd.DataFrame({"year": [2018, 2019, 2020, 2021, 2022]})
161+
output_file = tmp_path / "jobs.db"
162+
job_db = db_class(output_file).initialize_from_df(df)
163+
164+
manager.run_jobs(job_db=job_db, start_job=start_job)
165+
assert sleep_mock.call_count > 10
166+
167+
result = job_db.read()
168+
assert len(result) == 5
169+
assert set(result.status) == {"finished"}
170+
assert set(result.backend_name) == {"foo", "bar"}
171+
149172
def test_basic_threading(self, tmp_path, requests_mock, sleep_mock):
150173
manager = self._create_basic_mocked_manager(requests_mock, tmp_path)
151174

@@ -626,6 +649,63 @@ def test_automatic_cancel_of_too_long_running_jobs(
626649
)
627650

628651

652+
class TestFullDataFrameJobDatabase:
653+
@pytest.mark.parametrize("db_class", [CsvJobDatabase, ParquetJobDatabase])
654+
def test_initialize_from_df(self, tmp_path, db_class):
655+
orig_df = pd.DataFrame({"some_number": [3, 2, 1]})
656+
path = tmp_path / "jobs.db"
657+
658+
db = db_class(path)
659+
assert not path.exists()
660+
db.initialize_from_df(orig_df)
661+
assert path.exists()
662+
663+
# Check persisted CSV
664+
assert path.exists()
665+
expected_columns = {
666+
"some_number",
667+
"status",
668+
"id",
669+
"start_time",
670+
"running_start_time",
671+
"cpu",
672+
"memory",
673+
"duration",
674+
"backend_name",
675+
}
676+
677+
actual_columns = set(db_class(path).read().columns)
678+
assert actual_columns == expected_columns
679+
680+
@pytest.mark.parametrize("db_class", [CsvJobDatabase, ParquetJobDatabase])
681+
def test_initialize_from_df_on_exists_error(self, tmp_path, db_class):
682+
df = pd.DataFrame({"some_number": [3, 2, 1]})
683+
path = tmp_path / "jobs.csv"
684+
_ = db_class(path).initialize_from_df(df, on_exists="error")
685+
assert path.exists()
686+
687+
with pytest.raises(FileExistsError, match="Job database.* already exists"):
688+
_ = db_class(path).initialize_from_df(df, on_exists="error")
689+
690+
assert set(db_class(path).read()["some_number"]) == {1, 2, 3}
691+
692+
@pytest.mark.parametrize("db_class", [CsvJobDatabase, ParquetJobDatabase])
693+
def test_initialize_from_df_on_exists_skip(self, tmp_path, db_class):
694+
path = tmp_path / "jobs.csv"
695+
696+
db = db_class(path).initialize_from_df(
697+
pd.DataFrame({"some_number": [3, 2, 1]}),
698+
on_exists="skip",
699+
)
700+
assert set(db.read()["some_number"]) == {1, 2, 3}
701+
702+
db = db_class(path).initialize_from_df(
703+
pd.DataFrame({"some_number": [444, 555, 666]}),
704+
on_exists="skip",
705+
)
706+
assert set(db.read()["some_number"]) == {1, 2, 3}
707+
708+
629709
class TestCsvJobDatabase:
630710

631711
def test_repr(self, tmp_path):
@@ -745,6 +825,28 @@ def test_initialize_from_df(self, tmp_path):
745825
assert raw_columns == expected_columns
746826
assert read_columns == expected_columns
747827

828+
def test_initialize_from_df_on_exists_error(self, tmp_path):
829+
orig_df = pd.DataFrame({"some_number": [3, 2, 1]})
830+
path = tmp_path / "jobs.csv"
831+
_ = CsvJobDatabase(path).initialize_from_df(orig_df, on_exists="error")
832+
with pytest.raises(FileExistsError, match="Job database.* already exists"):
833+
_ = CsvJobDatabase(path).initialize_from_df(orig_df, on_exists="error")
834+
835+
def test_initialize_from_df_on_exists_skip(self, tmp_path):
836+
path = tmp_path / "jobs.csv"
837+
838+
db = CsvJobDatabase(path).initialize_from_df(
839+
pd.DataFrame({"some_number": [3, 2, 1]}),
840+
on_exists="skip",
841+
)
842+
assert set(db.read()["some_number"]) == {1, 2, 3}
843+
844+
db = CsvJobDatabase(path).initialize_from_df(
845+
pd.DataFrame({"some_number": [444, 555, 666]}),
846+
on_exists="skip",
847+
)
848+
assert set(db.read()["some_number"]) == {1, 2, 3}
849+
748850

749851
class TestParquetJobDatabase:
750852

0 commit comments

Comments
 (0)