Skip to content

Commit f635528

Browse files
committed
splitting up text and detecting circular import
1 parent 5f3211b commit f635528

File tree

4 files changed

+396
-414
lines changed

4 files changed

+396
-414
lines changed

openeo/extra/job_management/_job_db.py

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -203,42 +203,3 @@ def persist(self, df: pd.DataFrame):
203203
self.df.to_parquet(self.path, index=False)
204204

205205

206-
def get_job_db(path: Union[str, Path]) -> JobDatabaseInterface:
207-
"""
208-
Factory to get a job database at a given path,
209-
guessing the database type from filename extension.
210-
211-
:param path: path to job database file.
212-
213-
.. versionadded:: 0.33.0
214-
"""
215-
path = Path(path)
216-
if path.suffix.lower() in {".csv"}:
217-
job_db = CsvJobDatabase(path=path)
218-
elif path.suffix.lower() in {".parquet", ".geoparquet"}:
219-
job_db = ParquetJobDatabase(path=path)
220-
else:
221-
raise ValueError(f"Could not guess job database type from {path!r}")
222-
return job_db
223-
224-
225-
def create_job_db(path: Union[str, Path], df: pd.DataFrame, *, on_exists: str = "error"):
226-
"""
227-
Factory to create a job database at given path,
228-
initialized from a given dataframe,
229-
and its database type guessed from filename extension.
230-
231-
:param path: Path to the job database file.
232-
:param df: DataFrame to store in the job database.
233-
:param on_exists: What to do when the job database already exists:
234-
- "error": (default) raise an exception
235-
- "skip": work with existing database, ignore given dataframe and skip any initialization
236-
237-
.. versionadded:: 0.33.0
238-
"""
239-
job_db = get_job_db(path)
240-
if isinstance(job_db, FullDataFrameJobDatabase):
241-
job_db.initialize_from_df(df=df, on_exists=on_exists)
242-
else:
243-
raise NotImplementedError(f"Initialization of {type(job_db)} is not supported.")
244-
return job_db

openeo/extra/job_management/_manager.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
_JobStartTask,
3232
)
3333
from openeo.extra.job_management._interface import JobDatabaseInterface
34+
#from openeo.extra.job_management._job_db import get_job_db #TODO circular import
3435

3536
from openeo.rest import OpenEoApiError
3637
from openeo.rest.auth.auth import BearerAuth
@@ -43,6 +44,8 @@
4344
# Sentinel value to indicate that a parameter was not set
4445
_UNSET = object()
4546

47+
def _start_job_default(row: pd.Series, connection: Connection, *args, **kwargs):
48+
raise NotImplementedError("No 'start_job' callable provided")
4649

4750
class _Backend(NamedTuple):
4851
"""Container for backend info/settings"""
@@ -434,7 +437,7 @@ def run_jobs(
434437
assert not kwargs, f"Unexpected keyword arguments: {kwargs!r}"
435438

436439
if isinstance(job_db, (str, Path)):
437-
job_db = get_job_db(path=job_db)
440+
job_db = get_job_db(path=job_db) #TODO circular import
438441

439442
if not isinstance(job_db, JobDatabaseInterface):
440443
raise ValueError(f"Unsupported job_db {job_db!r}")
@@ -832,8 +835,6 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] =
832835

833836
return jobs_done, jobs_error, jobs_cancel
834837

835-
def _start_job_default(row: pd.Series, connection: Connection, *args, **kwargs):
836-
raise NotImplementedError("No 'start_job' callable provided")
837838

838839
def _format_usage_stat(job_metadata: dict, field: str) -> str:
839840
value = deep_get(job_metadata, "usage", field, "value", default=0)

0 commit comments

Comments
 (0)