Skip to content

Commit 4825bbc

Browse files
committed
remove circular dependency
1 parent a296031 commit 4825bbc

File tree

3 files changed

+116
-73
lines changed

3 files changed

+116
-73
lines changed

openeo/extra/job_management/__init__.py

Lines changed: 11 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
Callable,
1414
Dict,
1515
List,
16-
Mapping,
1716
NamedTuple,
1817
Optional,
1918
Tuple,
@@ -31,7 +30,9 @@
3130
_JobStartTask,
3231
)
3332
from openeo.extra.job_management.process_based_job_creator import ProcessBasedJobCreator
34-
from openeo.extra.job_management.dataframe_job_db import JobDatabaseInterface, FullDataFrameJobDatabase, ParquetJobDatabase, CsvJobDatabase
33+
from openeo.extra.job_management._job_database import FullDataFrameJobDatabase, JobDatabaseInterface, ParquetJobDatabase, CsvJobDatabase, create_job_db, get_job_db
34+
from openeo.extra.job_management._dataframe_utils import normalize_dataframe
35+
3536

3637
from openeo.rest import OpenEoApiError
3738
from openeo.rest.auth.auth import BearerAuth
@@ -44,6 +45,10 @@
4445
"FullDataFrameJobDatabase",
4546
"ParquetJobDatabase",
4647
"CsvJobDatabase",
48+
"ProcessBasedJobCreator",
49+
"create_job_db",
50+
"get_job_db",
51+
4752
]
4853

4954
class _Backend(NamedTuple):
@@ -73,6 +78,8 @@ class _ColumnProperties:
7378
default: Any = None
7479

7580

81+
82+
7683
class MultiBackendJobManager:
7784
"""
7885
Tracker for multiple jobs on multiple backends.
@@ -143,21 +150,7 @@ def start_job(
143150
# Expected columns in the job DB dataframes.
144151
# TODO: make this part of public API when settled?
145152
# TODO: move non official statuses to seperate column (not_started, queued_for_start)
146-
_COLUMN_REQUIREMENTS: Mapping[str, _ColumnProperties] = {
147-
"id": _ColumnProperties(dtype="str"),
148-
"backend_name": _ColumnProperties(dtype="str"),
149-
"status": _ColumnProperties(dtype="str", default="not_started"),
150-
# TODO: use proper date/time dtype instead of legacy str for start times?
151-
"start_time": _ColumnProperties(dtype="str"),
152-
"running_start_time": _ColumnProperties(dtype="str"),
153-
# TODO: these columns "cpu", "memory", "duration" are not referenced explicitly from MultiBackendJobManager,
154-
# but are indirectly coupled through handling of VITO-specific "usage" metadata in `_track_statuses`.
155-
# Since bfd99e34 they are not really required to be present anymore, can we make that more explicit?
156-
"cpu": _ColumnProperties(dtype="str"),
157-
"memory": _ColumnProperties(dtype="str"),
158-
"duration": _ColumnProperties(dtype="str"),
159-
"costs": _ColumnProperties(dtype="float64"),
160-
}
153+
161154

162155
def __init__(
163156
self,
@@ -258,17 +251,8 @@ def _make_resilient(connection):
258251

259252
@classmethod
260253
def _normalize_df(cls, df: pd.DataFrame) -> pd.DataFrame:
261-
"""
262-
Normalize given pandas dataframe (creating a new one):
263-
ensure we have the required columns.
264-
265-
:param df: The dataframe to normalize.
266-
:return: a new dataframe that is normalized.
267-
"""
268-
new_columns = {col: req.default for (col, req) in cls._COLUMN_REQUIREMENTS.items() if col not in df.columns}
269-
df = df.assign(**new_columns)
270254

271-
return df
255+
return normalize_dataframe(df)
272256

273257
def start_job_thread(self, start_job: Callable[[], BatchJob], job_db: JobDatabaseInterface):
274258
"""
@@ -844,44 +828,3 @@ def ignore_connection_errors(context: Optional[str] = None, sleep: int = 5):
844828

845829

846830

847-
def get_job_db(path: Union[str, Path]) -> JobDatabaseInterface:
848-
"""
849-
Factory to get a job database at a given path,
850-
guessing the database type from filename extension.
851-
852-
:param path: path to job database file.
853-
854-
.. versionadded:: 0.33.0
855-
"""
856-
path = Path(path)
857-
if path.suffix.lower() in {".csv"}:
858-
job_db = CsvJobDatabase(path=path)
859-
elif path.suffix.lower() in {".parquet", ".geoparquet"}:
860-
job_db = ParquetJobDatabase(path=path)
861-
else:
862-
raise ValueError(f"Could not guess job database type from {path!r}")
863-
return job_db
864-
865-
866-
def create_job_db(path: Union[str, Path], df: pd.DataFrame, *, on_exists: str = "error"):
867-
"""
868-
Factory to create a job database at given path,
869-
initialized from a given dataframe,
870-
and its database type guessed from filename extension.
871-
872-
:param path: Path to the job database file.
873-
:param df: DataFrame to store in the job database.
874-
:param on_exists: What to do when the job database already exists:
875-
- "error": (default) raise an exception
876-
- "skip": work with existing database, ignore given dataframe and skip any initialization
877-
878-
.. versionadded:: 0.33.0
879-
"""
880-
job_db = get_job_db(path)
881-
if isinstance(job_db, FullDataFrameJobDatabase):
882-
job_db.initialize_from_df(df=df, on_exists=on_exists)
883-
else:
884-
raise NotImplementedError(f"Initialization of {type(job_db)} is not supported.")
885-
return job_db
886-
887-
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import pandas as pd
2+
3+
class _ColumnProperties:
4+
def __init__(self, dtype: str, default=None):
5+
self.dtype = dtype
6+
self.default = default
7+
8+
# Expected columns in the job DB dataframes.
9+
# TODO: make this part of public API when settled?
10+
# TODO: move non official statuses to seperate column (not_started, queued_for_start)
11+
COLUMN_REQUIREMENTS = {
12+
"id": _ColumnProperties(dtype="str"),
13+
"backend_name": _ColumnProperties(dtype="str"),
14+
"status": _ColumnProperties(dtype="str", default="not_started"),
15+
"start_time": _ColumnProperties(dtype="str"),
16+
"running_start_time": _ColumnProperties(dtype="str"),
17+
"cpu": _ColumnProperties(dtype="str"),
18+
"memory": _ColumnProperties(dtype="str"),
19+
"duration": _ColumnProperties(dtype="str"),
20+
"costs": _ColumnProperties(dtype="float64"),
21+
}
22+
23+
def normalize_dataframe(df: pd.DataFrame) -> pd.DataFrame:
24+
"""
25+
Normalize given pandas dataframe (creating a new one):
26+
ensure we have the required columns.
27+
28+
:param df: The dataframe to normalize.
29+
:return: a new dataframe that is normalized.
30+
"""
31+
new_columns = {col: req.default for (col, req) in COLUMN_REQUIREMENTS.items() if col not in df.columns}
32+
df = df.assign(**new_columns)
33+
return df

openeo/extra/job_management/dataframe_job_db.py renamed to openeo/extra/job_management/_job_database.py

Lines changed: 72 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,20 @@
33
from pathlib import Path
44
from typing import (
55
Iterable,
6-
List,
76
Union,
7+
List,
88
)
99

10+
1011
import pandas as pd
1112
import shapely.errors
1213
import shapely.wkt
1314

15+
from openeo.extra.job_management._dataframe_utils import normalize_dataframe, COLUMN_REQUIREMENTS
16+
17+
18+
1419
_log = logging.getLogger(__name__)
15-
from openeo.extra.job_management import MultiBackendJobManager
1620

1721
class JobDatabaseInterface(metaclass=abc.ABCMeta):
1822
"""
@@ -74,6 +78,8 @@ def get_by_indices(self, indices: Iterable[Union[int, str]]) -> pd.DataFrame:
7478
"""
7579
...
7680

81+
82+
7783
class FullDataFrameJobDatabase(JobDatabaseInterface):
7884
def __init__(self):
7985
super().__init__()
@@ -103,7 +109,7 @@ def initialize_from_df(self, df: pd.DataFrame, *, on_exists: str = "error"):
103109
else:
104110
# TODO handle other on_exists modes: e.g. overwrite, merge, ...
105111
raise ValueError(f"Invalid on_exists={on_exists!r}")
106-
df = MultiBackendJobManager._normalize_df(df)
112+
df = normalize_dataframe(df)
107113
self.persist(df)
108114
# Return self to allow chaining with constructor.
109115
return self
@@ -161,6 +167,7 @@ def get_by_indices(self, indices: Iterable[Union[int, str]]) -> pd.DataFrame:
161167
return self._df.loc[list(known)]
162168

163169

170+
164171
class CsvJobDatabase(FullDataFrameJobDatabase):
165172
"""
166173
Persist/load job metadata with a CSV file.
@@ -196,7 +203,7 @@ def read(self) -> pd.DataFrame:
196203
df = pd.read_csv(
197204
self.path,
198205
# TODO: possible to avoid hidden coupling with MultiBackendJobManager here?
199-
dtype={c: r.dtype for (c, r) in MultiBackendJobManager._COLUMN_REQUIREMENTS.items()},
206+
dtype={c: r.dtype for (c, r) in COLUMN_REQUIREMENTS.items()},
200207
)
201208
if (
202209
"geometry" in df.columns
@@ -262,6 +269,66 @@ def read(self) -> pd.DataFrame:
262269
def persist(self, df: pd.DataFrame):
263270
self._merge_into_df(df)
264271
self.path.parent.mkdir(parents=True, exist_ok=True)
265-
self.df.to_parquet(self.path, index=False)
272+
self.df.to_parquet(self.path, index=False)
273+
274+
def create_job_db(path: Union[str, Path], df: pd.DataFrame, *, on_exists: str = "error"):
275+
"""
276+
Factory to create a job database at given path,
277+
initialized from a given dataframe,
278+
and its database type guessed from filename extension.
279+
280+
:param path: Path to the job database file.
281+
:param df: DataFrame to store in the job database.
282+
:param on_exists: What to do when the job database already exists:
283+
- "error": (default) raise an exception
284+
- "skip": work with existing database, ignore given dataframe and skip any initialization
266285
286+
.. versionadded:: 0.33.0
287+
"""
288+
job_db = get_job_db(path)
289+
if isinstance(job_db, FullDataFrameJobDatabase):
290+
job_db.initialize_from_df(df=df, on_exists=on_exists)
291+
else:
292+
raise NotImplementedError(f"Initialization of {type(job_db)} is not supported.")
293+
return job_db
294+
295+
def get_job_db(path: Union[str, Path]) -> JobDatabaseInterface:
296+
"""
297+
Factory to get a job database at a given path,
298+
guessing the database type from filename extension.
299+
300+
:param path: path to job database file.
301+
302+
.. versionadded:: 0.33.0
303+
"""
304+
path = Path(path)
305+
if path.suffix.lower() in {".csv"}:
306+
job_db = CsvJobDatabase(path=path)
307+
elif path.suffix.lower() in {".parquet", ".geoparquet"}:
308+
job_db = ParquetJobDatabase(path=path)
309+
else:
310+
raise ValueError(f"Could not guess job database type from {path!r}")
311+
return job_db
312+
313+
314+
def create_job_db(path: Union[str, Path], df: pd.DataFrame, *, on_exists: str = "error"):
315+
"""
316+
Factory to create a job database at given path,
317+
initialized from a given dataframe,
318+
and its database type guessed from filename extension.
319+
320+
:param path: Path to the job database file.
321+
:param df: DataFrame to store in the job database.
322+
:param on_exists: What to do when the job database already exists:
323+
- "error": (default) raise an exception
324+
- "skip": work with existing database, ignore given dataframe and skip any initialization
325+
326+
.. versionadded:: 0.33.0
327+
"""
328+
job_db = get_job_db(path)
329+
if isinstance(job_db, FullDataFrameJobDatabase):
330+
job_db.initialize_from_df(df=df, on_exists=on_exists)
331+
else:
332+
raise NotImplementedError(f"Initialization of {type(job_db)} is not supported.")
333+
return job_db
267334

0 commit comments

Comments
 (0)