Skip to content

Commit 3e64e28

Browse files
committed
Merge branch 'issue656-job-manager-dtypes'
2 parents 112ed75 + 5958bba commit 3e64e28

File tree

5 files changed

+345
-318
lines changed

5 files changed

+345
-318
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2121
- `MultiBackendJobManager`: Fix encoding issue of job metadata in `on_job_done` ([#657](https://github.com/Open-EO/openeo-python-client/issues/657))
2222
- `MultiBackendJobManager`: Avoid `SettingWithCopyWarning` ([#641](https://github.com/Open-EO/openeo-python-client/issues/641))
2323
- Avoid creating empty file if asset download request failed.
24+
- `MultiBackendJobManager`: avoid dtype loading mistakes in `CsvJobDatabase` on empty columns ([#656](https://github.com/Open-EO/openeo-python-client/issues/656))
2425

2526

2627
## [0.34.0] - 2024-10-31

openeo/extra/job_management.py

Lines changed: 38 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import abc
22
import collections
33
import contextlib
4+
import dataclasses
45
import datetime
56
import json
67
import logging
@@ -9,7 +10,7 @@
910
import warnings
1011
from pathlib import Path
1112
from threading import Thread
12-
from typing import Callable, Dict, List, NamedTuple, Optional, Union
13+
from typing import Any, Callable, Dict, List, Mapping, NamedTuple, Optional, Union
1314

1415
import numpy
1516
import pandas as pd
@@ -104,6 +105,14 @@ def _start_job_default(row: pd.Series, connection: Connection, *args, **kwargs):
104105
raise NotImplementedError("No 'start_job' callable provided")
105106

106107

108+
@dataclasses.dataclass(frozen=True)
109+
class _ColumnProperties:
110+
"""Expected/required properties of a column in the job manager related dataframes"""
111+
112+
dtype: str = "object"
113+
default: Any = None
114+
115+
107116
class MultiBackendJobManager:
108117
"""
109118
Tracker for multiple jobs on multiple backends.
@@ -171,6 +180,23 @@ def start_job(
171180
Added ``cancel_running_job_after`` parameter.
172181
"""
173182

183+
# Expected columns in the job DB dataframes.
184+
# TODO: make this part of public API when settled?
185+
_COLUMN_REQUIREMENTS: Mapping[str, _ColumnProperties] = {
186+
"id": _ColumnProperties(dtype="str"),
187+
"backend_name": _ColumnProperties(dtype="str"),
188+
"status": _ColumnProperties(dtype="str", default="not_started"),
189+
# TODO: use proper date/time dtype instead of legacy str for start times?
190+
"start_time": _ColumnProperties(dtype="str"),
191+
"running_start_time": _ColumnProperties(dtype="str"),
192+
# TODO: these columns "cpu", "memory", "duration" are not referenced explicitly from MultiBackendJobManager,
193+
# but are indirectly coupled through handling of VITO-specific "usage" metadata in `_track_statuses`.
194+
# Since bfd99e34 they are not really required to be present anymore, can we make that more explicit?
195+
"cpu": _ColumnProperties(dtype="str"),
196+
"memory": _ColumnProperties(dtype="str"),
197+
"duration": _ColumnProperties(dtype="str"),
198+
}
199+
174200
def __init__(
175201
self,
176202
poll_sleep: int = 60,
@@ -267,31 +293,16 @@ def _make_resilient(connection):
267293
connection.session.mount("https://", HTTPAdapter(max_retries=retries))
268294
connection.session.mount("http://", HTTPAdapter(max_retries=retries))
269295

270-
@staticmethod
271-
def _normalize_df(df: pd.DataFrame) -> pd.DataFrame:
296+
@classmethod
297+
def _normalize_df(cls, df: pd.DataFrame) -> pd.DataFrame:
272298
"""
273299
Normalize given pandas dataframe (creating a new one):
274300
ensure we have the required columns.
275301
276302
:param df: The dataframe to normalize.
277303
:return: a new dataframe that is normalized.
278304
"""
279-
# check for some required columns.
280-
required_with_default = [
281-
("status", "not_started"),
282-
("id", None),
283-
("start_time", None),
284-
("running_start_time", None),
285-
# TODO: columns "cpu", "memory", "duration" are not referenced directly
286-
# within MultiBackendJobManager making it confusing to claim they are required.
287-
# However, they are through assumptions about job "usage" metadata in `_track_statuses`.
288-
# => proposed solution: allow to configure usage columns when adding a backend
289-
("cpu", None),
290-
("memory", None),
291-
("duration", None),
292-
("backend_name", None),
293-
]
294-
new_columns = {col: val for (col, val) in required_with_default if col not in df.columns}
305+
new_columns = {col: req.default for (col, req) in cls._COLUMN_REQUIREMENTS.items() if col not in df.columns}
295306
df = df.assign(**new_columns)
296307

297308
return df
@@ -486,6 +497,9 @@ def _job_update_loop(
486497
go through the necessary jobs to check for status updates,
487498
trigger status events, start new jobs when there is room for them, etc.
488499
"""
500+
if not self.backends:
501+
raise RuntimeError("No backends registered")
502+
489503
stats = stats if stats is not None else collections.defaultdict(int)
490504

491505
with ignore_connection_errors(context="get statuses"):
@@ -832,7 +846,11 @@ def _is_valid_wkt(self, wkt: str) -> bool:
832846
return False
833847

834848
def read(self) -> pd.DataFrame:
835-
df = pd.read_csv(self.path)
849+
df = pd.read_csv(
850+
self.path,
851+
# TODO: possible to avoid hidden coupling with MultiBackendJobManager here?
852+
dtype={c: r.dtype for (c, r) in MultiBackendJobManager._COLUMN_REQUIREMENTS.items()},
853+
)
836854
if (
837855
"geometry" in df.columns
838856
and df["geometry"].dtype.name != "geometry"

openeo/rest/_testing.py

Lines changed: 73 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,17 @@
33
import collections
44
import json
55
import re
6-
from typing import Callable, Iterable, Iterator, Optional, Sequence, Tuple, Union
6+
from typing import (
7+
Callable,
8+
Dict,
9+
Iterable,
10+
Iterator,
11+
Mapping,
12+
Optional,
13+
Sequence,
14+
Tuple,
15+
Union,
16+
)
717

818
from openeo import Connection, DataCube
919
from openeo.rest.vectorcube import VectorCube
@@ -32,7 +42,9 @@ class DummyBackend:
3242
"validation_requests",
3343
"next_result",
3444
"next_validation_errors",
45+
"_forced_job_status",
3546
"job_status_updater",
47+
"job_id_generator",
3648
"extra_job_metadata_fields",
3749
)
3850

@@ -53,13 +65,20 @@ def __init__(
5365
self.next_result = self.DEFAULT_RESULT
5466
self.next_validation_errors = []
5567
self.extra_job_metadata_fields = []
68+
self._forced_job_status: Dict[str, str] = {}
5669

5770
# Job status update hook:
5871
# callable that is called on starting a job, and getting job metadata
5972
# allows to dynamically change how the status of a job evolves
6073
# By default: immediately set to "finished" once job is started
6174
self.job_status_updater = lambda job_id, current_status: "finished"
6275

76+
# Optional job id generator hook:
77+
# callable that generates a job id, e.g. based on the process graph.
78+
# When set to None, or the callable returns None, or it returns an existing job id:
79+
# things fall back to auto-increment job ids ("job-000", "job-001", "job-002", ...)
80+
self.job_id_generator: Optional[Callable[[dict], str]] = None
81+
6382
requests_mock.post(
6483
connection.build_url("/result"),
6584
content=self._handle_post_result,
@@ -75,10 +94,18 @@ def __init__(
7594
requests_mock.get(
7695
re.compile(connection.build_url(r"/jobs/(job-\d+)/results$")), json=self._handle_get_job_results
7796
)
97+
requests_mock.delete(
98+
re.compile(connection.build_url(r"/jobs/(job-\d+)/results$")), json=self._handle_delete_job_results
99+
)
78100
requests_mock.get(
79101
re.compile(connection.build_url("/jobs/(.*?)/results/result.data$")),
80102
content=self._handle_get_job_result_asset,
81103
)
104+
requests_mock.get(
105+
re.compile(connection.build_url(r"/jobs/(.*?)/logs($|\?.*)")),
106+
# TODO: need to fine-tune dummy logs?
107+
json={"logs": [], "links": []},
108+
)
82109
requests_mock.post(connection.build_url("/validation"), json=self._handle_post_validation)
83110

84111
@classmethod
@@ -88,7 +115,7 @@ def at_url(cls, root_url: str, *, requests_mock, capabilities: Optional[dict] =
88115
including creation of connection and mocking of capabilities doc
89116
"""
90117
root_url = root_url.rstrip("/") + "/"
91-
requests_mock.get(root_url, json=build_capabilities(**(capabilities or None)))
118+
requests_mock.get(root_url, json=build_capabilities(**(capabilities or {})))
92119
connection = Connection(root_url)
93120
return cls(requests_mock=requests_mock, connection=connection)
94121

@@ -150,7 +177,14 @@ def _handle_post_jobs(self, request, context):
150177
"""handler of `POST /jobs` (create batch job)"""
151178
post_data = request.json()
152179
pg = post_data["process"]["process_graph"]
153-
job_id = f"job-{len(self.batch_jobs):03d}"
180+
181+
# Generate (new) job id
182+
job_id = self.job_id_generator and self.job_id_generator(process_graph=pg)
183+
if not job_id or job_id in self.batch_jobs:
184+
# As fallback: use auto-increment job ids ("job-000", "job-001", "job-002", ...)
185+
job_id = f"job-{len(self.batch_jobs):03d}"
186+
assert job_id not in self.batch_jobs
187+
154188
job_data = {"job_id": job_id, "pg": pg, "status": "created"}
155189
for field in ["title", "description"]:
156190
if field in post_data:
@@ -169,11 +203,16 @@ def _get_job_id(self, request) -> str:
169203
assert job_id in self.batch_jobs
170204
return job_id
171205

206+
def _get_job_status(self, job_id: str, current_status: str) -> str:
207+
if job_id in self._forced_job_status:
208+
return self._forced_job_status[job_id]
209+
return self.job_status_updater(job_id=job_id, current_status=current_status)
210+
172211
def _handle_post_job_results(self, request, context):
173212
"""Handler of `POST /job/{job_id}/results` (start batch job)."""
174213
job_id = self._get_job_id(request)
175214
assert self.batch_jobs[job_id]["status"] == "created"
176-
self.batch_jobs[job_id]["status"] = self.job_status_updater(
215+
self.batch_jobs[job_id]["status"] = self._get_job_status(
177216
job_id=job_id, current_status=self.batch_jobs[job_id]["status"]
178217
)
179218
context.status_code = 202
@@ -183,10 +222,14 @@ def _handle_get_job(self, request, context):
183222
job_id = self._get_job_id(request)
184223
# Allow updating status with `job_status_setter` once job got past status "created"
185224
if self.batch_jobs[job_id]["status"] != "created":
186-
self.batch_jobs[job_id]["status"] = self.job_status_updater(
225+
self.batch_jobs[job_id]["status"] = self._get_job_status(
187226
job_id=job_id, current_status=self.batch_jobs[job_id]["status"]
188227
)
189-
return {"id": job_id, "status": self.batch_jobs[job_id]["status"]}
228+
return {
229+
# TODO: add some more required fields like "process" and "created"?
230+
"id": job_id,
231+
"status": self.batch_jobs[job_id]["status"],
232+
}
190233

191234
def _handle_get_job_results(self, request, context):
192235
"""Handler of `GET /job/{job_id}/results` (list batch job results)."""
@@ -197,6 +240,13 @@ def _handle_get_job_results(self, request, context):
197240
"assets": {"result.data": {"href": self.connection.build_url(f"/jobs/{job_id}/results/result.data")}},
198241
}
199242

243+
def _handle_delete_job_results(self, request, context):
244+
"""Handler of `DELETE /job/{job_id}/results` (cancel job)."""
245+
job_id = self._get_job_id(request)
246+
self.batch_jobs[job_id]["status"] = "canceled"
247+
self._forced_job_status[job_id] = "canceled"
248+
context.status_code = 204
249+
200250
def _handle_get_job_result_asset(self, request, context):
201251
"""Handler of `GET /job/{job_id}/results/result.data` (get batch job result asset)."""
202252
job_id = self._get_job_id(request)
@@ -261,18 +311,30 @@ def execute(self, cube: Union[DataCube, VectorCube], process_id: Optional[str] =
261311
cube.execute()
262312
return self.get_pg(process_id=process_id)
263313

264-
def setup_simple_job_status_flow(self, *, queued: int = 1, running: int = 4, final: str = "finished"):
314+
def setup_simple_job_status_flow(
315+
self,
316+
*,
317+
queued: int = 1,
318+
running: int = 4,
319+
final: str = "finished",
320+
final_per_job: Optional[Mapping[str, str]] = None,
321+
):
265322
"""
266323
Set up simple job status flow:
267-
queued (a couple of times) -> running (a couple of times) -> finished/error.
324+
325+
queued (a couple of times) -> running (a couple of times) -> finished/error.
326+
327+
Final state can be specified generically with arg `final`
328+
and, optionally, further fine-tuned per job with `final_per_job`.
268329
"""
269-
template = ["queued"] * queued + ["running"] * running + [final]
330+
template = ["queued"] * queued + ["running"] * running
270331
job_stacks = collections.defaultdict(template.copy)
332+
final_per_job = final_per_job or {}
271333

272334
def get_status(job_id: str, current_status: str) -> str:
273335
stack = job_stacks[job_id]
274-
# Pop first item each time, but repeat the last one at the end
275-
return stack.pop(0) if len(stack) > 1 else stack[0]
336+
# Pop first item each time, unless we're in final state
337+
return stack.pop(0) if len(stack) > 0 else final_per_job.get(job_id, final)
276338

277339
self.job_status_updater = get_status
278340

0 commit comments

Comments
 (0)