Skip to content

Commit 619dbd0

Browse files
committed
Port MultiBackendJobManager to DummyBackend fixtures
1 parent 4989d4b commit 619dbd0

File tree

3 files changed

+185
-175
lines changed

3 files changed

+185
-175
lines changed

openeo/extra/job_management.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,9 @@ def _job_update_loop(
497497
go through the necessary jobs to check for status updates,
498498
trigger status events, start new jobs when there is room for them, etc.
499499
"""
500+
if not self.backends:
501+
raise RuntimeError("No backends registered")
502+
500503
stats = stats if stats is not None else collections.defaultdict(int)
501504

502505
with ignore_connection_errors(context="get statuses"):

openeo/rest/_testing.py

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import collections
44
import json
55
import re
6-
from typing import Callable, Iterable, Iterator, Optional, Sequence, Tuple, Union
6+
from typing import Callable, Dict, Iterable, Iterator, Optional, Sequence, Tuple, Union
77

88
from openeo import Connection, DataCube
99
from openeo.rest.vectorcube import VectorCube
@@ -32,7 +32,9 @@ class DummyBackend:
3232
"validation_requests",
3333
"next_result",
3434
"next_validation_errors",
35+
"_forced_job_status",
3536
"job_status_updater",
37+
"job_id_generator",
3638
"extra_job_metadata_fields",
3739
)
3840

@@ -53,13 +55,20 @@ def __init__(
5355
self.next_result = self.DEFAULT_RESULT
5456
self.next_validation_errors = []
5557
self.extra_job_metadata_fields = []
58+
self._forced_job_status: Dict[str, str] = {}
5659

5760
# Job status update hook:
5861
# callable that is called on starting a job, and getting job metadata
5962
# allows to dynamically change how the status of a job evolves
6063
# By default: immediately set to "finished" once job is started
6164
self.job_status_updater = lambda job_id, current_status: "finished"
6265

66+
# Optional job id generator hook:
67+
# callable that generates a job id, e.g. based on the process graph.
68+
# When set to None, or the callable returns None, or it returns an existing job id:
69+
# things fall back to auto-increment job ids ("job-000", "job-001", "job-002", ...)
70+
self.job_id_generator: Optional[Callable[[dict], str]] = None
71+
6372
requests_mock.post(
6473
connection.build_url("/result"),
6574
content=self._handle_post_result,
@@ -75,6 +84,9 @@ def __init__(
7584
requests_mock.get(
7685
re.compile(connection.build_url(r"/jobs/(job-\d+)/results$")), json=self._handle_get_job_results
7786
)
87+
requests_mock.delete(
88+
re.compile(connection.build_url(r"/jobs/(job-\d+)/results$")), json=self._handle_delete_job_results
89+
)
7890
requests_mock.get(
7991
re.compile(connection.build_url("/jobs/(.*?)/results/result.data$")),
8092
content=self._handle_get_job_result_asset,
@@ -88,7 +100,7 @@ def at_url(cls, root_url: str, *, requests_mock, capabilities: Optional[dict] =
88100
including creation of connection and mocking of capabilities doc
89101
"""
90102
root_url = root_url.rstrip("/") + "/"
91-
requests_mock.get(root_url, json=build_capabilities(**(capabilities or None)))
103+
requests_mock.get(root_url, json=build_capabilities(**(capabilities or {})))
92104
connection = Connection(root_url)
93105
return cls(requests_mock=requests_mock, connection=connection)
94106

@@ -150,7 +162,14 @@ def _handle_post_jobs(self, request, context):
150162
"""handler of `POST /jobs` (create batch job)"""
151163
post_data = request.json()
152164
pg = post_data["process"]["process_graph"]
153-
job_id = f"job-{len(self.batch_jobs):03d}"
165+
166+
# Generate (new) job id
167+
job_id = self.job_id_generator and self.job_id_generator(process_graph=pg)
168+
if not job_id or job_id in self.batch_jobs:
169+
# As fallback: use auto-increment job ids ("job-000", "job-001", "job-002", ...)
170+
job_id = f"job-{len(self.batch_jobs):03d}"
171+
assert job_id not in self.batch_jobs
172+
154173
job_data = {"job_id": job_id, "pg": pg, "status": "created"}
155174
for field in ["title", "description"]:
156175
if field in post_data:
@@ -169,11 +188,16 @@ def _get_job_id(self, request) -> str:
169188
assert job_id in self.batch_jobs
170189
return job_id
171190

191+
def _get_job_status(self, job_id: str, current_status: str) -> str:
192+
if job_id in self._forced_job_status:
193+
return self._forced_job_status[job_id]
194+
return self.job_status_updater(job_id=job_id, current_status=current_status)
195+
172196
def _handle_post_job_results(self, request, context):
173197
"""Handler of `POST /job/{job_id}/results` (start batch job)."""
174198
job_id = self._get_job_id(request)
175199
assert self.batch_jobs[job_id]["status"] == "created"
176-
self.batch_jobs[job_id]["status"] = self.job_status_updater(
200+
self.batch_jobs[job_id]["status"] = self._get_job_status(
177201
job_id=job_id, current_status=self.batch_jobs[job_id]["status"]
178202
)
179203
context.status_code = 202
@@ -183,7 +207,7 @@ def _handle_get_job(self, request, context):
183207
job_id = self._get_job_id(request)
184208
# Allow updating status with `job_status_setter` once job got past status "created"
185209
if self.batch_jobs[job_id]["status"] != "created":
186-
self.batch_jobs[job_id]["status"] = self.job_status_updater(
210+
self.batch_jobs[job_id]["status"] = self._get_job_status(
187211
job_id=job_id, current_status=self.batch_jobs[job_id]["status"]
188212
)
189213
return {"id": job_id, "status": self.batch_jobs[job_id]["status"]}
@@ -197,6 +221,13 @@ def _handle_get_job_results(self, request, context):
197221
"assets": {"result.data": {"href": self.connection.build_url(f"/jobs/{job_id}/results/result.data")}},
198222
}
199223

224+
def _handle_delete_job_results(self, request, context):
225+
"""Handler of `DELETE /job/{job_id}/results` (cancel job)."""
226+
job_id = self._get_job_id(request)
227+
self.batch_jobs[job_id]["status"] = "canceled"
228+
self._forced_job_status[job_id] = "canceled"
229+
context.status_code = 204
230+
200231
def _handle_get_job_result_asset(self, request, context):
201232
"""Handler of `GET /job/{job_id}/results/result.data` (get batch job result asset)."""
202233
job_id = self._get_job_id(request)

0 commit comments

Comments
 (0)