Skip to content

Commit b645a14

Browse files
committed
Port MultiBackendJobManager tests to DummyBackend fixtures
1 parent 4989d4b commit b645a14

File tree

4 files changed

+320
-308
lines changed

4 files changed

+320
-308
lines changed

openeo/extra/job_management.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,9 @@ def _job_update_loop(
497497
go through the necessary jobs to check for status updates,
498498
trigger status events, start new jobs when there is room for them, etc.
499499
"""
500+
if not self.backends:
501+
raise RuntimeError("No backends registered")
502+
500503
stats = stats if stats is not None else collections.defaultdict(int)
501504

502505
with ignore_connection_errors(context="get statuses"):

openeo/rest/_testing.py

Lines changed: 73 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,17 @@
33
import collections
44
import json
55
import re
6-
from typing import Callable, Iterable, Iterator, Optional, Sequence, Tuple, Union
6+
from typing import (
7+
Callable,
8+
Dict,
9+
Iterable,
10+
Iterator,
11+
Mapping,
12+
Optional,
13+
Sequence,
14+
Tuple,
15+
Union,
16+
)
717

818
from openeo import Connection, DataCube
919
from openeo.rest.vectorcube import VectorCube
@@ -32,7 +42,9 @@ class DummyBackend:
3242
"validation_requests",
3343
"next_result",
3444
"next_validation_errors",
45+
"_forced_job_status",
3546
"job_status_updater",
47+
"job_id_generator",
3648
"extra_job_metadata_fields",
3749
)
3850

@@ -53,13 +65,20 @@ def __init__(
5365
self.next_result = self.DEFAULT_RESULT
5466
self.next_validation_errors = []
5567
self.extra_job_metadata_fields = []
68+
self._forced_job_status: Dict[str, str] = {}
5669

5770
# Job status update hook:
5871
# callable that is called on starting a job, and getting job metadata
5972
# allows to dynamically change how the status of a job evolves
6073
# By default: immediately set to "finished" once job is started
6174
self.job_status_updater = lambda job_id, current_status: "finished"
6275

76+
# Optional job id generator hook:
77+
# callable that generates a job id, e.g. based on the process graph.
78+
# When set to None, or the callable returns None, or it returns an existing job id:
79+
# things fall back to auto-increment job ids ("job-000", "job-001", "job-002", ...)
80+
self.job_id_generator: Optional[Callable[[dict], str]] = None
81+
6382
requests_mock.post(
6483
connection.build_url("/result"),
6584
content=self._handle_post_result,
@@ -75,10 +94,18 @@ def __init__(
7594
requests_mock.get(
7695
re.compile(connection.build_url(r"/jobs/(job-\d+)/results$")), json=self._handle_get_job_results
7796
)
97+
requests_mock.delete(
98+
re.compile(connection.build_url(r"/jobs/(job-\d+)/results$")), json=self._handle_delete_job_results
99+
)
78100
requests_mock.get(
79101
re.compile(connection.build_url("/jobs/(.*?)/results/result.data$")),
80102
content=self._handle_get_job_result_asset,
81103
)
104+
requests_mock.get(
105+
re.compile(connection.build_url(r"/jobs/(.*?)/logs($|\?.*)")),
106+
# TODO: need to fine-tune dummy logs?
107+
json={"logs": [], "links": []},
108+
)
82109
requests_mock.post(connection.build_url("/validation"), json=self._handle_post_validation)
83110

84111
@classmethod
@@ -88,7 +115,7 @@ def at_url(cls, root_url: str, *, requests_mock, capabilities: Optional[dict] =
88115
including creation of connection and mocking of capabilities doc
89116
"""
90117
root_url = root_url.rstrip("/") + "/"
91-
requests_mock.get(root_url, json=build_capabilities(**(capabilities or None)))
118+
requests_mock.get(root_url, json=build_capabilities(**(capabilities or {})))
92119
connection = Connection(root_url)
93120
return cls(requests_mock=requests_mock, connection=connection)
94121

@@ -150,7 +177,14 @@ def _handle_post_jobs(self, request, context):
150177
"""handler of `POST /jobs` (create batch job)"""
151178
post_data = request.json()
152179
pg = post_data["process"]["process_graph"]
153-
job_id = f"job-{len(self.batch_jobs):03d}"
180+
181+
# Generate (new) job id
182+
job_id = self.job_id_generator and self.job_id_generator(process_graph=pg)
183+
if not job_id or job_id in self.batch_jobs:
184+
# As fallback: use auto-increment job ids ("job-000", "job-001", "job-002", ...)
185+
job_id = f"job-{len(self.batch_jobs):03d}"
186+
assert job_id not in self.batch_jobs
187+
154188
job_data = {"job_id": job_id, "pg": pg, "status": "created"}
155189
for field in ["title", "description"]:
156190
if field in post_data:
@@ -169,11 +203,16 @@ def _get_job_id(self, request) -> str:
169203
assert job_id in self.batch_jobs
170204
return job_id
171205

206+
def _get_job_status(self, job_id: str, current_status: str) -> str:
207+
if job_id in self._forced_job_status:
208+
return self._forced_job_status[job_id]
209+
return self.job_status_updater(job_id=job_id, current_status=current_status)
210+
172211
def _handle_post_job_results(self, request, context):
173212
"""Handler of `POST /job/{job_id}/results` (start batch job)."""
174213
job_id = self._get_job_id(request)
175214
assert self.batch_jobs[job_id]["status"] == "created"
176-
self.batch_jobs[job_id]["status"] = self.job_status_updater(
215+
self.batch_jobs[job_id]["status"] = self._get_job_status(
177216
job_id=job_id, current_status=self.batch_jobs[job_id]["status"]
178217
)
179218
context.status_code = 202
@@ -183,10 +222,14 @@ def _handle_get_job(self, request, context):
183222
job_id = self._get_job_id(request)
184223
# Allow updating status with `job_status_setter` once job got past status "created"
185224
if self.batch_jobs[job_id]["status"] != "created":
186-
self.batch_jobs[job_id]["status"] = self.job_status_updater(
225+
self.batch_jobs[job_id]["status"] = self._get_job_status(
187226
job_id=job_id, current_status=self.batch_jobs[job_id]["status"]
188227
)
189-
return {"id": job_id, "status": self.batch_jobs[job_id]["status"]}
228+
return {
229+
# TODO: add some more required fields like "process" and "created"?
230+
"id": job_id,
231+
"status": self.batch_jobs[job_id]["status"],
232+
}
190233

191234
def _handle_get_job_results(self, request, context):
192235
"""Handler of `GET /job/{job_id}/results` (list batch job results)."""
@@ -197,6 +240,13 @@ def _handle_get_job_results(self, request, context):
197240
"assets": {"result.data": {"href": self.connection.build_url(f"/jobs/{job_id}/results/result.data")}},
198241
}
199242

243+
def _handle_delete_job_results(self, request, context):
244+
"""Handler of `DELETE /job/{job_id}/results` (cancel job)."""
245+
job_id = self._get_job_id(request)
246+
self.batch_jobs[job_id]["status"] = "canceled"
247+
self._forced_job_status[job_id] = "canceled"
248+
context.status_code = 204
249+
200250
def _handle_get_job_result_asset(self, request, context):
201251
"""Handler of `GET /job/{job_id}/results/result.data` (get batch job result asset)."""
202252
job_id = self._get_job_id(request)
@@ -261,18 +311,30 @@ def execute(self, cube: Union[DataCube, VectorCube], process_id: Optional[str] =
261311
cube.execute()
262312
return self.get_pg(process_id=process_id)
263313

264-
def setup_simple_job_status_flow(self, *, queued: int = 1, running: int = 4, final: str = "finished"):
314+
def setup_simple_job_status_flow(
315+
self,
316+
*,
317+
queued: int = 1,
318+
running: int = 4,
319+
final: str = "finished",
320+
final_per_job: Optional[Mapping[str, str]] = None,
321+
):
265322
"""
266323
Set up simple job status flow:
267-
queued (a couple of times) -> running (a couple of times) -> finished/error.
324+
325+
queued (a couple of times) -> running (a couple of times) -> finished/error.
326+
327+
Final state can be specified generically with arg `final`
328+
and, optionally, further fine-tuned per job with `final_per_job`.
268329
"""
269-
template = ["queued"] * queued + ["running"] * running + [final]
330+
template = ["queued"] * queued + ["running"] * running
270331
job_stacks = collections.defaultdict(template.copy)
332+
final_per_job = final_per_job or {}
271333

272334
def get_status(job_id: str, current_status: str) -> str:
273335
stack = job_stacks[job_id]
274-
# Pop first item each time, but repeat the last one at the end
275-
return stack.pop(0) if len(stack) > 1 else stack[0]
336+
# Pop first item each time, unless we're in final state
337+
return stack.pop(0) if len(stack) > 0 else final_per_job.get(job_id, final)
276338

277339
self.job_status_updater = get_status
278340

0 commit comments

Comments
 (0)