Skip to content

Commit be579f0

Browse files
HansVRPsoxofaan
andcommitted
MultiBackendJobManager: keep "queued" under 10 for better CDSE compat (#839)
refs: #839, eu-cdse/openeo-cdse-infra#859 --------- Co-authored-by: Stefaan Lippens <[email protected]>
1 parent e18c820 commit be579f0

File tree

2 files changed

+16
-8
lines changed

2 files changed

+16
-8
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1111

1212
- `MultiBackendJobManager`: add `download_results` option to enable/disable the automated download of job results once completed by the job manager ([#744](https://github.com/Open-EO/openeo-python-client/issues/744))
1313
- Support UDF based spatial and temporal extents in `load_collection`, `load_stac` and `filter_temporal` ([#831](https://github.com/Open-EO/openeo-python-client/pull/831))
14+
- `MultiBackendJobManager`: keep number of "queued" jobs below 10 for better CDSE compatibility ([#839](https://github.com/Open-EO/openeo-python-client/pull/839), eu-cdse/openeo-cdse-infra#859)
1415

1516
### Changed
1617

openeo/extra/job_management/_manager.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ class _Backend(NamedTuple):
6161
# Maximum number of jobs to allow in parallel on a backend
6262
parallel_jobs: int
6363

64+
# Maximum number of jobs to allow in queue on a backend
65+
queueing_limit: int = 10
66+
6467

6568
@dataclasses.dataclass(frozen=True)
6669
class _ColumnProperties:
@@ -254,7 +257,8 @@ def add_backend(
254257
c = connection
255258
connection = lambda: c
256259
assert callable(connection)
257-
self.backends[name] = _Backend(get_connection=connection, parallel_jobs=parallel_jobs)
260+
# TODO: expose queueing_limit?
261+
self.backends[name] = _Backend(get_connection=connection, parallel_jobs=parallel_jobs, queueing_limit=10)
258262

259263
def _get_connection(self, backend_name: str, resilient: bool = True) -> Connection:
260264
"""Get a connection for the backend and optionally make it resilient (adds retry behavior)
@@ -552,18 +556,21 @@ def _job_update_loop(
552556

553557
not_started = job_db.get_by_status(statuses=["not_started"], max=200).copy()
554558
if len(not_started) > 0:
555-
# Check number of jobs running at each backend
559+
# Check number of jobs queued/running at each backend
556560
# TODO: should "created" be included in here? Calling this "running" is quite misleading then.
557561
# apparently (see #839/#840) this seemingly simple change makes a lot of MultiBackendJobManager tests flaky
558562
running = job_db.get_by_status(statuses=["created", "queued", "queued_for_start", "running"])
559-
stats["job_db get_by_status"] += 1
560-
per_backend = running.groupby("backend_name").size().to_dict()
561-
_log.info(f"Running per backend: {per_backend}")
563+
queued = running[running["status"] == "queued"]
564+
running_per_backend = running.groupby("backend_name").size().to_dict()
565+
queued_per_backend = queued.groupby("backend_name").size().to_dict()
566+
_log.info(f"{running_per_backend=} {queued_per_backend=}")
567+
562568
total_added = 0
563569
for backend_name in self.backends:
564-
backend_load = per_backend.get(backend_name, 0)
565-
if backend_load < self.backends[backend_name].parallel_jobs:
566-
to_add = self.backends[backend_name].parallel_jobs - backend_load
570+
queue_capacity = self.backends[backend_name].queueing_limit - queued_per_backend.get(backend_name, 0)
571+
run_capacity = self.backends[backend_name].parallel_jobs - running_per_backend.get(backend_name, 0)
572+
to_add = min(queue_capacity, run_capacity)
573+
if to_add > 0:
567574
for i in not_started.index[total_added : total_added + to_add]:
568575
self._launch_job(start_job, df=not_started, i=i, backend_name=backend_name, stats=stats)
569576
stats["job launch"] += 1

0 commit comments

Comments
 (0)