Skip to content

Commit c6fe140

Browse files
HansVRPsoxofaan
andauthored
MultiBackendJobManager: keep "queued" under 10 for better CDSE compat (#839)
refs: #839, eu-cdse/openeo-cdse-infra#859 --------- Co-authored-by: Stefaan Lippens <[email protected]>
1 parent 945259e commit c6fe140

File tree

2 files changed

+16
-8
lines changed

2 files changed

+16
-8
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1111

1212
- `MultiBackendJobManager`: add `download_results` option to enable/disable the automated download of job results once completed by the job manager ([#744](https://github.com/Open-EO/openeo-python-client/issues/744))
1313
- Support UDF based spatial and temporal extents in `load_collection`, `load_stac` and `filter_temporal` ([#831](https://github.com/Open-EO/openeo-python-client/pull/831))
14+
- `MultiBackendJobManager`: keep number of "queued" jobs below 10 for better CDSE compatibility ([#839](https://github.com/Open-EO/openeo-python-client/pull/839), eu-cdse/openeo-cdse-infra#859)
1415

1516
### Changed
1617

openeo/extra/job_management/_manager.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ class _Backend(NamedTuple):
6060
# Maximum number of jobs to allow in parallel on a backend
6161
parallel_jobs: int
6262

63+
# Maximum number of jobs to allow in queue on a backend
64+
queueing_limit: int = 10
65+
6366

6467
@dataclasses.dataclass(frozen=True)
6568
class _ColumnProperties:
@@ -252,7 +255,8 @@ def add_backend(
252255
c = connection
253256
connection = lambda: c
254257
assert callable(connection)
255-
self.backends[name] = _Backend(get_connection=connection, parallel_jobs=parallel_jobs)
258+
# TODO: expose queueing_limit?
259+
self.backends[name] = _Backend(get_connection=connection, parallel_jobs=parallel_jobs, queueing_limit=10)
256260

257261
def _get_connection(self, backend_name: str, resilient: bool = True) -> Connection:
258262
"""Get a connection for the backend and optionally make it resilient (adds retry behavior)
@@ -540,18 +544,21 @@ def _job_update_loop(
540544

541545
not_started = job_db.get_by_status(statuses=["not_started"], max=200).copy()
542546
if len(not_started) > 0:
543-
# Check number of jobs running at each backend
547+
# Check number of jobs queued/running at each backend
544548
# TODO: should "created" be included in here? Calling this "running" is quite misleading then.
545549
# apparently (see #839/#840) this seemingly simple change makes a lot of MultiBackendJobManager tests flaky
546550
running = job_db.get_by_status(statuses=["created", "queued", "queued_for_start", "running"])
547-
stats["job_db get_by_status"] += 1
548-
per_backend = running.groupby("backend_name").size().to_dict()
549-
_log.info(f"Running per backend: {per_backend}")
551+
queued = running[running["status"] == "queued"]
552+
running_per_backend = running.groupby("backend_name").size().to_dict()
553+
queued_per_backend = queued.groupby("backend_name").size().to_dict()
554+
_log.info(f"{running_per_backend=} {queued_per_backend=}")
555+
550556
total_added = 0
551557
for backend_name in self.backends:
552-
backend_load = per_backend.get(backend_name, 0)
553-
if backend_load < self.backends[backend_name].parallel_jobs:
554-
to_add = self.backends[backend_name].parallel_jobs - backend_load
558+
queue_capacity = self.backends[backend_name].queueing_limit - queued_per_backend.get(backend_name, 0)
559+
run_capacity = self.backends[backend_name].parallel_jobs - running_per_backend.get(backend_name, 0)
560+
to_add = min(queue_capacity, run_capacity)
561+
if to_add > 0:
555562
for i in not_started.index[total_added : total_added + to_add]:
556563
self._launch_job(start_job, df=not_started, i=i, backend_name=backend_name, stats=stats)
557564
stats["job launch"] += 1

0 commit comments

Comments
 (0)