Skip to content

Commit f7d3070

Browse files
committed
Issue #635 add get_job_db() and create_job_db() factories as well
1 parent 9ab60d6 commit f7d3070

File tree

3 files changed

+108
-9
lines changed

3 files changed

+108
-9
lines changed

CHANGELOG.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1010
### Added
1111

1212
- Added `DataCube.load_stac()` to also support creating a `load_stac` based cube without a connection ([#638](https://github.com/Open-EO/openeo-python-client/issues/638))
13-
- `MultiBackendJobManager`: Added `initialize_from_df(df)` (to `CsvJobDatabase` and `ParquetJobDatabase`) to initialize (and persist) the job database from a given DataFrame. ([#635](https://github.com/Open-EO/openeo-python-client/issues/635))
13+
- `MultiBackendJobManager`: Added `initialize_from_df(df)` (to `CsvJobDatabase` and `ParquetJobDatabase`) to initialize (and persist) the job database from a given DataFrame.
14+
Also added `create_job_db()` factory to easily create a job database from a given dataframe and its type guessed from filename extension.
15+
([#635](https://github.com/Open-EO/openeo-python-client/issues/635))
16+
17+
1418

1519
### Changed
1620

openeo/extra/job_management.py

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -441,13 +441,7 @@ def run_jobs(
441441
assert not kwargs, f"Unexpected keyword arguments: {kwargs!r}"
442442

443443
if isinstance(job_db, (str, Path)):
444-
job_db_path = Path(job_db)
445-
if job_db_path.suffix.lower() == ".csv":
446-
job_db = CsvJobDatabase(path=job_db_path)
447-
elif job_db_path.suffix.lower() == ".parquet":
448-
job_db = ParquetJobDatabase(path=job_db_path)
449-
else:
450-
raise ValueError(f"Unsupported job database file type {job_db_path!r}")
444+
job_db = get_job_db(path=job_db)
451445

452446
if not isinstance(job_db, JobDatabaseInterface):
453447
raise ValueError(f"Unsupported job_db {job_db!r}")
@@ -697,7 +691,7 @@ def __init__(self):
697691
super().__init__()
698692
self._df = None
699693

700-
def initialize_from_df(self, df: pd.DataFrame, on_exists: str = "error"):
694+
def initialize_from_df(self, df: pd.DataFrame, *, on_exists: str = "error"):
701695
"""
702696
Initialize the job database from a given dataframe,
703697
which will be first normalized to be compatible
@@ -851,3 +845,44 @@ def persist(self, df: pd.DataFrame):
851845
self._merge_into_df(df)
852846
self.path.parent.mkdir(parents=True, exist_ok=True)
853847
self.df.to_parquet(self.path, index=False)
848+
849+
850+
def get_job_db(path: Union[str, Path]) -> JobDatabaseInterface:
851+
"""
852+
Factory to get a job database at a given path,
853+
guessing the database type from filename extension.
854+
855+
:param path: path to job database file.
856+
857+
.. versionadded:: 0.33.0
858+
"""
859+
path = Path(path)
860+
if path.suffix.lower() in {".csv"}:
861+
job_db = CsvJobDatabase(path=path)
862+
elif path.suffix.lower() in {".parquet", ".geoparquet"}:
863+
job_db = ParquetJobDatabase(path=path)
864+
else:
865+
raise ValueError(f"Could not guess job database type from {path!r}")
866+
return job_db
867+
868+
869+
def create_job_db(path: Union[str, Path], df: pd.DataFrame, *, on_exists: str = "error"):
870+
"""
871+
Factory to create a job database at given path,
872+
initialized from a given dataframe,
873+
and its database type guessed from filename extension.
874+
875+
:param path: Path to the job database file.
876+
:param df: DataFrame to store in the job database.
877+
:param on_exists: What to do when the job database already exists:
878+
- "error": (default) raise an exception
879+
- "skip": work with existing database, ignore given dataframe and skip any initialization
880+
881+
.. versionadded:: 0.33.0
882+
"""
883+
job_db = get_job_db(path)
884+
if isinstance(job_db, FullDataFrameJobDatabase):
885+
job_db.initialize_from_df(df=df, on_exists=on_exists)
886+
else:
887+
raise NotImplementedError(f"Initialization of {type(job_db)} is not supported.")
888+
return job_db

tests/extra/test_job_management.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
CsvJobDatabase,
3030
MultiBackendJobManager,
3131
ParquetJobDatabase,
32+
create_job_db,
33+
get_job_db,
3234
)
3335
from openeo.util import rfc3339
3436

@@ -169,6 +171,35 @@ def start_job(row, connection, **kwargs):
169171
assert set(result.status) == {"finished"}
170172
assert set(result.backend_name) == {"foo", "bar"}
171173

174+
@pytest.mark.parametrize(
175+
["filename", "expected_db_class"],
176+
[
177+
("jobz.csv", CsvJobDatabase),
178+
("jobz.parquet", ParquetJobDatabase),
179+
],
180+
)
181+
def test_create_job_db(self, tmp_path, requests_mock, sleep_mock, filename, expected_db_class):
182+
"""
183+
Basic run with `create_job_db()` usage
184+
"""
185+
manager = self._create_basic_mocked_manager(requests_mock, tmp_path)
186+
187+
def start_job(row, connection, **kwargs):
188+
year = int(row["year"])
189+
return BatchJob(job_id=f"job-{year}", connection=connection)
190+
191+
df = pd.DataFrame({"year": [2018, 2019, 2020, 2021, 2022]})
192+
output_file = tmp_path / filename
193+
job_db = create_job_db(path=output_file, df=df)
194+
195+
manager.run_jobs(job_db=job_db, start_job=start_job)
196+
assert sleep_mock.call_count > 10
197+
198+
result = job_db.read()
199+
assert len(result) == 5
200+
assert set(result.status) == {"finished"}
201+
assert set(result.backend_name) == {"foo", "bar"}
202+
172203
def test_basic_threading(self, tmp_path, requests_mock, sleep_mock):
173204
manager = self._create_basic_mocked_manager(requests_mock, tmp_path)
174205

@@ -897,3 +928,32 @@ def test_initialize_from_df(self, tmp_path):
897928

898929
df_from_disk = ParquetJobDatabase(path).read()
899930
assert set(df_from_disk.columns) == expected_columns
931+
932+
933+
@pytest.mark.parametrize(
934+
["filename", "expected"],
935+
[
936+
("jobz.csv", CsvJobDatabase),
937+
("jobz.parquet", ParquetJobDatabase),
938+
],
939+
)
940+
def test_get_job_db(tmp_path, filename, expected):
941+
path = tmp_path / filename
942+
db = get_job_db(path)
943+
assert isinstance(db, expected)
944+
assert not path.exists()
945+
946+
947+
@pytest.mark.parametrize(
948+
["filename", "expected"],
949+
[
950+
("jobz.csv", CsvJobDatabase),
951+
("jobz.parquet", ParquetJobDatabase),
952+
],
953+
)
954+
def test_create_job_db(tmp_path, filename, expected):
955+
df = pd.DataFrame({"year": [2023, 2024]})
956+
path = tmp_path / filename
957+
db = create_job_db(path=path, df=df)
958+
assert isinstance(db, expected)
959+
assert path.exists()

0 commit comments

Comments
 (0)