Skip to content

Commit b272791

Browse files
authored
Merge pull request #595 from Open-EO/issue571-job-manager-db
Start supporting custom job databases in MultiBackendJobManager
2 parents 7e297f6 + a906340 commit b272791

File tree

4 files changed

+137
-42
lines changed

4 files changed

+137
-42
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1212
- Add experimental `openeo.testing.results` subpackage with reusable test utilities for comparing batch job results with reference data
1313
- `MultiBackendJobManager`: add initial support for storing job metadata in Parquet file (instead of CSV) ([#571](https://github.com/Open-EO/openeo-python-client/issues/571))
1414
- Add `Connection.authenticate_oidc_access_token()` to set up authorization headers with an access token that is obtained "out-of-band" ([#598](https://github.com/Open-EO/openeo-python-client/issues/598))
15+
- Add `JobDatabaseInterface` to allow custom job metadata storage with `MultiBackendJobManager` ([#571](https://github.com/Open-EO/openeo-python-client/issues/571))
1516

1617
### Changed
1718

docs/cookbook/job_manager.rst

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,12 @@ Multi Backend Job Manager
55
.. warning::
66
This is a new experimental API, subject to change.
77

8-
.. automodule:: openeo.extra.job_management
9-
:members: MultiBackendJobManager, ignore_connection_errors
8+
.. autoclass:: openeo.extra.job_management.MultiBackendJobManager
9+
:members:
10+
11+
.. autoclass:: openeo.extra.job_management.JobDatabaseInterface
12+
:members:
13+
14+
.. autoclass:: openeo.extra.job_management.CsvJobDatabase
15+
16+
.. autoclass:: openeo.extra.job_management.ParquetJobDatabase

openeo/extra/job_management.py

Lines changed: 103 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1+
import abc
12
import contextlib
23
import datetime
34
import json
45
import logging
56
import time
7+
import warnings
68
from pathlib import Path
79
from typing import Callable, Dict, NamedTuple, Optional, Union
810

@@ -30,6 +32,40 @@ class _Backend(NamedTuple):
3032

3133
MAX_RETRIES = 5
3234

35+
36+
class JobDatabaseInterface(metaclass=abc.ABCMeta):
37+
"""
38+
Interface for a database of job metadata to use with the :py:class:`MultiBackendJobManager`,
39+
allowing to regularly persist the job metadata while polling the job statuses
40+
and resume/restart the job tracking after it was interrupted.
41+
42+
.. versionadded:: 0.31.0
43+
"""
44+
45+
@abc.abstractmethod
46+
def exists(self) -> bool:
47+
"""Does the job database already exist, to read job data from?"""
48+
...
49+
50+
@abc.abstractmethod
51+
def read(self) -> pd.DataFrame:
52+
"""
53+
Read job data from the database as pandas DataFrame.
54+
55+
:return: loaded job data.
56+
"""
57+
...
58+
59+
@abc.abstractmethod
60+
def persist(self, df: pd.DataFrame):
61+
"""
62+
Store job data to the database.
63+
64+
:param df: job data to store.
65+
"""
66+
...
67+
68+
3369
class MultiBackendJobManager:
3470
"""
3571
Tracker for multiple jobs on multiple backends.
@@ -207,7 +243,8 @@ def run_jobs(
207243
self,
208244
df: pd.DataFrame,
209245
start_job: Callable[[], BatchJob],
210-
output_file: Union[str, Path],
246+
job_db: Union[str, Path, JobDatabaseInterface, None] = None,
247+
**kwargs,
211248
):
212249
"""Runs jobs, specified in a dataframe, and tracks parameters.
213250
@@ -241,30 +278,52 @@ def run_jobs(
241278
any of them out, then remember to include the ``*args`` and ``**kwargs`` parameters.
242279
Otherwise you will have an exception because :py:meth:`run_jobs` passes unknown parameters to ``start_job``.
243280
244-
:param output_file:
245-
Path to output file (CSV or Parquet) containing the status and metadata of the jobs.
281+
:param job_db:
282+
Job database to load/store existing job status data and other metadata from/to.
283+
Can be specified as a path to CSV or Parquet file,
284+
or as a custom database object following the :py:class:`JobDatabaseInterface` interface.
246285
247286
.. note::
248287
Support for Parquet files depends on the ``pyarrow`` package
249288
as :ref:`optional dependency <installation-optional-dependencies>`.
250289
251290
.. versionchanged:: 0.31.0
252-
Added support for providing a Parquet ``output_file``
291+
Added support for persisting the job metadata in Parquet format.
292+
293+
.. versionchanged:: 0.31.0
294+
Replace ``output_file`` argument with ``job_db`` argument,
295+
which can be a path to a CSV or Parquet file,
296+
or a user-defined :py:class:`JobDatabaseInterface` object.
297+
The deprecated ``output_file`` argument is still supported for now.
253298
"""
254299
# TODO: Defining start_jobs as a Protocol might make its usage more clear, and avoid complicated doctrings,
255300
# but Protocols are only supported in Python 3.8 and higher.
256-
output_file = Path(output_file)
257301

258-
if output_file.suffix.lower() == ".csv":
259-
job_db = _CsvJobDatabase(output_file)
260-
elif output_file.suffix.lower() == ".parquet":
261-
job_db = _ParquetJobDatabase(output_file)
262-
else:
263-
raise ValueError(f"Unsupported output file type: {output_file}")
302+
# Backwards compatibility for deprecated `output_file` argument
303+
if "output_file" in kwargs:
304+
if job_db is not None:
305+
raise ValueError("Only one of `output_file` and `job_db` should be provided")
306+
warnings.warn(
307+
"The `output_file` argument is deprecated. Use `job_db` instead.", DeprecationWarning, stacklevel=2
308+
)
309+
job_db = kwargs.pop("output_file")
310+
assert not kwargs, f"Unexpected keyword arguments: {kwargs!r}"
311+
312+
if isinstance(job_db, (str, Path)):
313+
job_db_path = Path(job_db)
314+
if job_db_path.suffix.lower() == ".csv":
315+
job_db = CsvJobDatabase(path=job_db_path)
316+
elif job_db_path.suffix.lower() == ".parquet":
317+
job_db = ParquetJobDatabase(path=job_db_path)
318+
else:
319+
raise ValueError(f"Unsupported job database file type {job_db_path!r}")
320+
321+
if not isinstance(job_db, JobDatabaseInterface):
322+
raise ValueError(f"Unsupported job_db {job_db!r}")
264323

265-
if output_file.exists() and output_file.is_file():
324+
if job_db.exists():
266325
# Resume from existing db
267-
_log.info(f"Resuming `run_jobs` from {output_file.absolute()}")
326+
_log.info(f"Resuming `run_jobs` from existing {job_db}")
268327
df = job_db.read()
269328
status_histogram = df.groupby("status").size().to_dict()
270329
_log.info(f"Status histogram: {status_histogram}")
@@ -462,28 +521,32 @@ def _format_usage_stat(job_metadata: dict, field: str) -> str:
462521

463522

464523
@contextlib.contextmanager
465-
def ignore_connection_errors(context: Optional[str] = None):
524+
def ignore_connection_errors(context: Optional[str] = None, sleep: int = 5):
466525
"""Context manager to ignore connection errors."""
526+
# TODO: move this out of this module and make it a more public utility?
467527
try:
468528
yield
469529
except requests.exceptions.ConnectionError as e:
470530
_log.warning(f"Ignoring connection error (context {context or 'n/a'}): {e}")
471531
# Back off a bit
472-
time.sleep(5)
532+
time.sleep(sleep)
473533

474534

475-
class _JobDatabaseInterface:
476-
def read(self) -> pd.DataFrame:
477-
raise NotImplementedError()
478-
479-
def persist(self, df: pd.DataFrame):
480-
raise NotImplementedError()
535+
class CsvJobDatabase(JobDatabaseInterface):
536+
"""
537+
Persist/load job metadata with a CSV file.
481538
539+
:implements: :py:class:`JobDatabaseInterface`
540+
:param path: Path to local CSV file.
482541
483-
class _CsvJobDatabase(_JobDatabaseInterface):
542+
.. versionadded:: 0.31.0
543+
"""
484544
def __init__(self, path: Union[str, Path]):
485545
self.path = Path(path)
486546

547+
def exists(self) -> bool:
548+
return self.path.exists()
549+
487550
def _is_valid_wkt(self, wkt: str) -> bool:
488551
try:
489552
shapely.wkt.loads(wkt)
@@ -493,23 +556,39 @@ def _is_valid_wkt(self, wkt: str) -> bool:
493556

494557
def read(self) -> pd.DataFrame:
495558
df = pd.read_csv(self.path)
496-
# Workaround for loading of geopandas "geometry" column.
559+
# `df.to_csv` in `persist()` will encode geometries as WKT, so we decode that here.
497560
if (
498561
"geometry" in df.columns
499562
and df["geometry"].dtype.name != "geometry"
500563
and self._is_valid_wkt(df["geometry"].iloc[0])
501564
):
502565
df["geometry"] = df["geometry"].apply(shapely.wkt.loads)
503566
return df
567+
504568
def persist(self, df: pd.DataFrame):
505569
self.path.parent.mkdir(parents=True, exist_ok=True)
506570
df.to_csv(self.path, index=False)
507571

508572

509-
class _ParquetJobDatabase(_JobDatabaseInterface):
573+
class ParquetJobDatabase(JobDatabaseInterface):
574+
"""
575+
Persist/load job metadata with a Parquet file.
576+
577+
:implements: :py:class:`JobDatabaseInterface`
578+
:param path: Path to the Parquet file.
579+
580+
.. versionadded:: 0.31.0
581+
582+
.. note::
583+
Support for Parquet files depends on the ``pyarrow`` package
584+
as :ref:`optional dependency <installation-optional-dependencies>`.
585+
"""
510586
def __init__(self, path: Union[str, Path]):
511587
self.path = Path(path)
512588

589+
def exists(self) -> bool:
590+
return self.path.exists()
591+
513592
def read(self) -> pd.DataFrame:
514593
return pd.read_parquet(self.path)
515594

tests/extra/test_job_management.py

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
from openeo import BatchJob
1919
from openeo.extra.job_management import (
2020
MAX_RETRIES,
21+
CsvJobDatabase,
2122
MultiBackendJobManager,
22-
_CsvJobDatabase,
23-
_ParquetJobDatabase,
23+
ParquetJobDatabase,
2424
)
2525

2626

@@ -466,7 +466,7 @@ def test_read_wkt(self, tmp_path):
466466
)
467467
path = tmp_path / "jobs.csv"
468468
wkt_df.to_csv(path, index=False)
469-
df = _CsvJobDatabase(path).read()
469+
df = CsvJobDatabase(path).read()
470470
assert isinstance(df.geometry[0], shpt.Point)
471471

472472
def test_read_non_wkt(self, tmp_path):
@@ -478,29 +478,37 @@ def test_read_non_wkt(self, tmp_path):
478478
)
479479
path = tmp_path / "jobs.csv"
480480
non_wkt_df.to_csv(path, index=False)
481-
df = _CsvJobDatabase(path).read()
481+
df = CsvJobDatabase(path).read()
482482
assert isinstance(df.geometry[0], str)
483483

484-
def test_persist(self, tmp_path):
485-
df = pd.DataFrame(
484+
def test_persist_and_read(self, tmp_path):
485+
orig = pd.DataFrame(
486486
{
487-
"some_number": [3, 2, 1],
487+
"numbers": [3, 2, 1],
488+
"names": ["apple", "banana", "coconut"],
488489
}
489490
)
490-
491491
path = tmp_path / "jobs.csv"
492-
_CsvJobDatabase(path).persist(df)
493-
assert _CsvJobDatabase(path).read().equals(df)
492+
CsvJobDatabase(path).persist(orig)
493+
assert path.exists()
494+
495+
loaded = CsvJobDatabase(path).read()
496+
assert list(loaded.dtypes) == list(orig.dtypes)
497+
assert loaded.equals(orig)
494498

495499

496500
class TestParquetJobDatabase:
497-
def test_read_persist(self, tmp_path):
498-
df = pd.DataFrame(
501+
def test_persist_and_read(self, tmp_path):
502+
orig = pd.DataFrame(
499503
{
500-
"some_number": [3, 2, 1],
504+
"numbers": [3, 2, 1],
505+
"names": ["apple", "banana", "coconut"],
501506
}
502507
)
503-
504508
path = tmp_path / "jobs.parquet"
505-
_ParquetJobDatabase(path).persist(df)
506-
assert _ParquetJobDatabase(path).read().equals(df)
509+
ParquetJobDatabase(path).persist(orig)
510+
assert path.exists()
511+
512+
loaded = ParquetJobDatabase(path).read()
513+
assert list(loaded.dtypes) == list(orig.dtypes)
514+
assert loaded.equals(orig)

0 commit comments

Comments
 (0)