Skip to content

Commit ed73677

Browse files
committed
centralizing
1 parent 4825bbc commit ed73677

File tree

3 files changed

+36
-39
lines changed

3 files changed

+36
-39
lines changed

openeo/extra/job_management/__init__.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,7 @@
3030
_JobStartTask,
3131
)
3232
from openeo.extra.job_management.process_based_job_creator import ProcessBasedJobCreator
33-
from openeo.extra.job_management._job_database import FullDataFrameJobDatabase, JobDatabaseInterface, ParquetJobDatabase, CsvJobDatabase, create_job_db, get_job_db
34-
from openeo.extra.job_management._dataframe_utils import normalize_dataframe
35-
33+
from openeo.extra.job_management._job_database import FullDataFrameJobDatabase, JobDatabaseInterface, ParquetJobDatabase, CsvJobDatabase, create_job_db, get_job_db, normalize_dataframe
3634

3735
from openeo.rest import OpenEoApiError
3836
from openeo.rest.auth.auth import BearerAuth

openeo/extra/job_management/_dataframe_utils.py

Lines changed: 0 additions & 33 deletions
This file was deleted.

openeo/extra/job_management/_job_database.py

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,19 @@
1212
import shapely.errors
1313
import shapely.wkt
1414

15-
from openeo.extra.job_management._dataframe_utils import normalize_dataframe, COLUMN_REQUIREMENTS
16-
1715

1816

1917
_log = logging.getLogger(__name__)
2018

19+
import pandas as pd
20+
21+
class _ColumnProperties:
22+
def __init__(self, dtype: str, default=None):
23+
self.dtype = dtype
24+
self.default = default
25+
26+
27+
2128
class JobDatabaseInterface(metaclass=abc.ABCMeta):
2229
"""
2330
Interface for a database of job metadata to use with the :py:class:`MultiBackendJobManager`,
@@ -78,7 +85,20 @@ def get_by_indices(self, indices: Iterable[Union[int, str]]) -> pd.DataFrame:
7885
"""
7986
...
8087

81-
88+
# Expected columns in the job DB dataframes.
89+
# TODO: make this part of public API when settled?
90+
# TODO: move non official statuses to seperate column (not_started, queued_for_start)
91+
COLUMN_REQUIREMENTS = {
92+
"id": _ColumnProperties(dtype="str"),
93+
"backend_name": _ColumnProperties(dtype="str"),
94+
"status": _ColumnProperties(dtype="str", default="not_started"),
95+
"start_time": _ColumnProperties(dtype="str"),
96+
"running_start_time": _ColumnProperties(dtype="str"),
97+
"cpu": _ColumnProperties(dtype="str"),
98+
"memory": _ColumnProperties(dtype="str"),
99+
"duration": _ColumnProperties(dtype="str"),
100+
"costs": _ColumnProperties(dtype="float64"),
101+
}
82102

83103
class FullDataFrameJobDatabase(JobDatabaseInterface):
84104
def __init__(self):
@@ -271,6 +291,18 @@ def persist(self, df: pd.DataFrame):
271291
self.path.parent.mkdir(parents=True, exist_ok=True)
272292
self.df.to_parquet(self.path, index=False)
273293

294+
def normalize_dataframe(df: pd.DataFrame) -> pd.DataFrame:
295+
"""
296+
Normalize given pandas dataframe (creating a new one):
297+
ensure we have the required columns.
298+
299+
:param df: The dataframe to normalize.
300+
:return: a new dataframe that is normalized.
301+
"""
302+
new_columns = {col: req.default for (col, req) in COLUMN_REQUIREMENTS.items() if col not in df.columns}
303+
df = df.assign(**new_columns)
304+
return df
305+
274306
def create_job_db(path: Union[str, Path], df: pd.DataFrame, *, on_exists: str = "error"):
275307
"""
276308
Factory to create a job database at given path,

0 commit comments

Comments
 (0)