Skip to content

Commit 8a69ac3

Browse files
committed
Tighter encapsulation of "column requirements" of MultiBackendJobManager #741/#815
Using another solution to break the import cycle (and some doc/test tweaks along the way)
1 parent 308025a commit 8a69ac3

File tree

8 files changed

+89
-87
lines changed

8 files changed

+89
-87
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1111

1212
### Changed
1313

14+
- Internal reorganization of `openeo.extra.job_management` submodule to ease future development ([#741](https://github.com/Open-EO/openeo-python-client/issues/741))
15+
1416
### Removed
1517

1618
### Fixed

openeo/extra/job_management/_df_schema.py

Lines changed: 0 additions & 46 deletions
This file was deleted.

openeo/extra/job_management/_interface.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66

77
class JobDatabaseInterface(metaclass=abc.ABCMeta):
88
"""
9-
Interface for a database of job metadata to use with the :py:class:`MultiBackendJobManager`,
9+
Interface for a database of job metadata to use with the
10+
:py:class:`~openeo.extra.job_management._manager.MultiBackendJobManager`,
1011
allowing to regularly persist the job metadata while polling the job statuses
1112
and resume/restart the job tracking after it was interrupted.
1213

openeo/extra/job_management/_job_db.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import shapely.errors
88
import shapely.wkt
99

10-
from openeo.extra.job_management._df_schema import _COLUMN_REQUIREMENTS, _normalize
10+
import openeo.extra.job_management._manager
1111
from openeo.extra.job_management._interface import JobDatabaseInterface
1212

1313
_log = logging.getLogger(__name__)
@@ -22,7 +22,7 @@ def initialize_from_df(self, df: pd.DataFrame, *, on_exists: str = "error"):
2222
"""
2323
Initialize the job database from a given dataframe,
2424
which will be first normalized to be compatible
25-
with :py:class:`MultiBackendJobManager` usage.
25+
with :py:class:`~openeo.extra.job_management._manager.MultiBackendJobManager` usage.
2626
2727
:param df: dataframe with some columns your ``start_job`` callable expects
2828
:param on_exists: what to do when the job database already exists (persisted on disk):
@@ -42,7 +42,7 @@ def initialize_from_df(self, df: pd.DataFrame, *, on_exists: str = "error"):
4242
else:
4343
# TODO handle other on_exists modes: e.g. overwrite, merge, ...
4444
raise ValueError(f"Invalid on_exists={on_exists!r}")
45-
df = _normalize(df)
45+
df = openeo.extra.job_management._manager.MultiBackendJobManager._column_requirements.normalize_df(df)
4646
self.persist(df)
4747
# Return self to allow chaining with constructor.
4848
return self
@@ -104,7 +104,7 @@ class CsvJobDatabase(FullDataFrameJobDatabase):
104104
"""
105105
Persist/load job metadata with a CSV file.
106106
107-
:implements: :py:class:`JobDatabaseInterface`
107+
:implements: :py:class:`~openeo.extra.job_management._interface.JobDatabaseInterface`
108108
:param path: Path to local CSV file.
109109
110110
.. note::
@@ -135,7 +135,7 @@ def read(self) -> pd.DataFrame:
135135
df = pd.read_csv(
136136
self.path,
137137
# TODO: possible to avoid hidden coupling with MultiBackendJobManager here?
138-
dtype={c: r.dtype for (c, r) in _COLUMN_REQUIREMENTS.items()},
138+
dtype=openeo.extra.job_management._manager.MultiBackendJobManager._column_requirements.dtype_mapping(),
139139
)
140140
if (
141141
"geometry" in df.columns
@@ -159,7 +159,7 @@ class ParquetJobDatabase(FullDataFrameJobDatabase):
159159
"""
160160
Persist/load job metadata with a Parquet file.
161161
162-
:implements: :py:class:`JobDatabaseInterface`
162+
:implements: :py:class:`~openeo.extra.job_management._interface.JobDatabaseInterface`
163163
:param path: Path to the Parquet file.
164164
165165
.. note::

openeo/extra/job_management/_manager.py

Lines changed: 71 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import collections
22
import contextlib
3+
import dataclasses
34
import datetime
45
import json
56
import logging
@@ -12,6 +13,7 @@
1213
Callable,
1314
Dict,
1415
List,
16+
Mapping,
1517
NamedTuple,
1618
Optional,
1719
Tuple,
@@ -23,10 +25,10 @@
2325
from requests.adapters import HTTPAdapter
2426
from urllib3.util import Retry
2527

28+
# TODO avoid this (circular) dependency on _job_db?
29+
import openeo.extra.job_management._job_db
2630
from openeo import BatchJob, Connection
27-
from openeo.extra.job_management._df_schema import _normalize
2831
from openeo.extra.job_management._interface import JobDatabaseInterface
29-
from openeo.extra.job_management._job_db import get_job_db
3032
from openeo.extra.job_management._thread_worker import (
3133
_JobManagerWorkerThreadPool,
3234
_JobStartTask,
@@ -38,6 +40,7 @@
3840
_log = logging.getLogger(__name__)
3941

4042

43+
# TODO: eliminate this module constant (should be part of some constructor interface)
4144
MAX_RETRIES = 50
4245

4346

@@ -58,6 +61,45 @@ class _Backend(NamedTuple):
5861
parallel_jobs: int
5962

6063

64+
@dataclasses.dataclass(frozen=True)
65+
class _ColumnProperties:
66+
"""Expected/required properties of a column in the job manager related dataframes"""
67+
68+
dtype: str = "object"
69+
default: Any = None
70+
71+
72+
class _ColumnRequirements:
73+
"""
74+
Helper class to encapsulate the column requirements expected by MultiBackendJobManager.
75+
The current implementation (e.g. _job_db) has some undesired coupling here,
76+
but it turns out quite hard to eliminate.
77+
The goal of this class is, currently, to at least make the coupling explicit
78+
in a centralized way.
79+
"""
80+
81+
def __init__(self, requirements: Mapping[str, _ColumnProperties]):
82+
self._requirements = dict(requirements)
83+
84+
def normalize_df(self, df: pd.DataFrame) -> pd.DataFrame:
85+
"""
86+
Normalize given pandas dataframe (creating a new one):
87+
ensure we have the required columns.
88+
89+
:param df: The dataframe to normalize.
90+
:return: a new dataframe that is normalized.
91+
"""
92+
new_columns = {col: req.default for (col, req) in self._requirements.items() if col not in df.columns}
93+
df = df.assign(**new_columns)
94+
return df
95+
96+
def dtype_mapping(self) -> Dict[str, str]:
97+
"""
98+
Get mapping of column name to expected dtype string, e.g. to be used with pandas.read_csv(dtype=...)
99+
"""
100+
return {col: req.dtype for (col, req) in self._requirements.items()}
101+
102+
61103
class MultiBackendJobManager:
62104
"""
63105
Tracker for multiple jobs on multiple backends.
@@ -125,6 +167,27 @@ def start_job(
125167
Added ``cancel_running_job_after`` parameter.
126168
"""
127169

170+
# Expected columns in the job DB dataframes.
171+
# TODO: make this part of public API when settled?
172+
# TODO: move non official statuses to separate column (not_started, queued_for_start)
173+
_column_requirements: _ColumnRequirements = _ColumnRequirements(
174+
{
175+
"id": _ColumnProperties(dtype="str"),
176+
"backend_name": _ColumnProperties(dtype="str"),
177+
"status": _ColumnProperties(dtype="str", default="not_started"),
178+
# TODO: use proper date/time dtype instead of legacy str for start times?
179+
"start_time": _ColumnProperties(dtype="str"),
180+
"running_start_time": _ColumnProperties(dtype="str"),
181+
# TODO: these columns "cpu", "memory", "duration" are not referenced explicitly from MultiBackendJobManager,
182+
# but are indirectly coupled through handling of VITO-specific "usage" metadata in `_track_statuses`.
183+
# Since bfd99e34 they are not really required to be present anymore, can we make that more explicit?
184+
"cpu": _ColumnProperties(dtype="str"),
185+
"memory": _ColumnProperties(dtype="str"),
186+
"duration": _ColumnProperties(dtype="str"),
187+
"costs": _ColumnProperties(dtype="float64"),
188+
}
189+
)
190+
128191
def __init__(
129192
self,
130193
poll_sleep: int = 60,
@@ -227,13 +290,9 @@ def _make_resilient(connection):
227290
@classmethod
228291
def _normalize_df(cls, df: pd.DataFrame) -> pd.DataFrame:
229292
"""
230-
Normalize given pandas dataframe (creating a new one):
231-
ensure we have the required columns.
232-
233-
:param df: The dataframe to normalize.
234-
:return: a new dataframe that is normalized.
293+
Deprecated, but kept for backwards compatibility
235294
"""
236-
return _normalize(df)
295+
return cls._column_requirements.normalize_df(df)
237296

238297
def start_job_thread(self, start_job: Callable[[], BatchJob], job_db: JobDatabaseInterface):
239298
"""
@@ -268,7 +327,7 @@ def start_job_thread(self, start_job: Callable[[], BatchJob], job_db: JobDatabas
268327
:param job_db:
269328
Job database to load/store existing job status data and other metadata from/to.
270329
Can be specified as a path to CSV or Parquet file,
271-
or as a custom database object following the :py:class:`JobDatabaseInterface` interface.
330+
or as a custom database object following the :py:class:`~openeo.extra.job_management._interface.JobDatabaseInterface` interface.
272331
273332
.. note::
274333
Support for Parquet files depends on the ``pyarrow`` package
@@ -373,7 +432,7 @@ def run_jobs(
373432
:param job_db:
374433
Job database to load/store existing job status data and other metadata from/to.
375434
Can be specified as a path to CSV or Parquet file,
376-
or as a custom database object following the :py:class:`JobDatabaseInterface` interface.
435+
or as a custom database object following the :py:class:`~openeo.extra.job_management._interface.JobDatabaseInterface` interface.
377436
378437
.. note::
379438
Support for Parquet files depends on the ``pyarrow`` package
@@ -389,7 +448,7 @@ def run_jobs(
389448
.. versionchanged:: 0.31.0
390449
Replace ``output_file`` argument with ``job_db`` argument,
391450
which can be a path to a CSV or Parquet file,
392-
or a user-defined :py:class:`JobDatabaseInterface` object.
451+
or a user-defined :py:class:`~openeo.extra.job_management._interface.JobDatabaseInterface` object.
393452
The deprecated ``output_file`` argument is still supported for now.
394453
395454
.. versionchanged:: 0.33.0
@@ -408,7 +467,7 @@ def run_jobs(
408467
assert not kwargs, f"Unexpected keyword arguments: {kwargs!r}"
409468

410469
if isinstance(job_db, (str, Path)):
411-
job_db = get_job_db(path=job_db) # TODO circular import
470+
job_db = openeo.extra.job_management._job_db.get_job_db(path=job_db)
412471

413472
if not isinstance(job_db, JobDatabaseInterface):
414473
raise ValueError(f"Unsupported job_db {job_db!r}")

openeo/extra/job_management/stac_job_db.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ class STACAPIJobDatabase(JobDatabaseInterface):
2222
2323
Unstable API, subject to change.
2424
25-
:implements: :py:class:`JobDatabaseInterface`
25+
:implements: :py:class:`~openeo.extra.job_management._interface.JobDatabaseInterface`
2626
"""
2727

2828
def __init__(
@@ -56,10 +56,10 @@ def exists(self) -> bool:
5656

5757
def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame:
5858
"""
59-
Normalize the given dataframe to be compatible with :py:class:`MultiBackendJobManager`
59+
Normalize the given dataframe to be compatible with :py:class:`~openeo.extra.job_management._manager.MultiBackendJobManager`
6060
by adding the default columns and setting the index.
6161
"""
62-
df = MultiBackendJobManager._normalize_df(df)
62+
df = MultiBackendJobManager._column_requirements.normalize_df(df)
6363
# If the user doesn't specify the item_id column, we will use the index.
6464
if "item_id" not in df.columns:
6565
df = df.reset_index(names=["item_id"])
@@ -69,7 +69,7 @@ def initialize_from_df(self, df: pd.DataFrame, *, on_exists: str = "error"):
6969
"""
7070
Initialize the job database from a given dataframe,
7171
which will be first normalized to be compatible
72-
with :py:class:`MultiBackendJobManager` usage.
72+
with :py:class:`~openeo.extra.job_management._manager.MultiBackendJobManager` usage.
7373
7474
:param df: dataframe with some columns your ``start_job`` callable expects
7575
:param on_exists: what to do when the job database already exists (persisted on disk):

tests/extra/job_management/test_job_db.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,6 @@
11
import re
22

33
import geopandas
4-
5-
# TODO: can we avoid using httpretty?
6-
# We need it for testing the resilience, which uses an HTTPadapter with Retry
7-
# but requests-mock also uses an HTTPAdapter for the mocking and basically
8-
# erases the HTTPAdapter we have set up.
9-
# httpretty avoids this specific problem because it mocks at the socket level,
10-
# But I would rather not have two dependencies with almost the same goal.
114
import pandas as pd
125
import pytest
136
import shapely.geometry

tests/extra/job_management/test_manager.py

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,7 @@
3131
ParquetJobDatabase,
3232
create_job_db,
3333
)
34-
from openeo.extra.job_management._manager import (
35-
MAX_RETRIES,
36-
MultiBackendJobManager,
37-
)
34+
from openeo.extra.job_management._manager import MAX_RETRIES, MultiBackendJobManager
3835
from openeo.extra.job_management._thread_worker import (
3936
Task,
4037
_JobManagerWorkerThreadPool,
@@ -296,7 +293,7 @@ def test_start_job_thread_basic(self, tmp_path, job_manager, job_manager_root_di
296293

297294
def test_normalize_df(self):
298295
df = pd.DataFrame({"some_number": [3, 2, 1]})
299-
df_normalized = MultiBackendJobManager._normalize_df(df)
296+
df_normalized = MultiBackendJobManager._column_requirements.normalize_df(df)
300297
assert set(df_normalized.columns) == set(
301298
[
302299
"some_number",
@@ -603,9 +600,7 @@ def get_status(job_id, current_status):
603600
job_db_path = tmp_path / "jobs.csv"
604601

605602
# Mock sleep() to not actually sleep, but skip one hour at a time
606-
with mock.patch.object(
607-
openeo.extra.job_management._manager.time, "sleep", new=lambda s: time_machine.shift(60 * 60)
608-
):
603+
with mock.patch("time.sleep", new=lambda s: time_machine.shift(60 * 60)):
609604
job_manager.run_jobs(df=df, start_job=self._create_year_job, job_db=job_db_path)
610605

611606
final_df = CsvJobDatabase(job_db_path).read()
@@ -724,9 +719,7 @@ def get_status(job_id, current_status):
724719
job_db_path = tmp_path / "jobs.csv"
725720

726721
# Mock sleep() to skip one hour at a time instead of actually sleeping
727-
with mock.patch.object(
728-
openeo.extra.job_management._manager.time, "sleep", new=lambda s: time_machine.shift(60 * 60)
729-
):
722+
with mock.patch("time.sleep", new=lambda s: time_machine.shift(60 * 60)):
730723
job_manager.run_jobs(df=df, start_job=self._create_year_job, job_db=job_db_path)
731724

732725
final_df = CsvJobDatabase(job_db_path).read()

0 commit comments

Comments
 (0)