Skip to content

Commit 68a0f13

Browse files
committed
limit per backend
1 parent 81d4178 commit 68a0f13

File tree

1 file changed

+23
-8
lines changed

1 file changed

+23
-8
lines changed

openeo/extra/job_management/_manager.py

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242

4343
# TODO: eliminate this module constant (should be part of some constructor interface)
4444
MAX_RETRIES = 50
45-
45+
QUEUE_LIMIT_PER_BACKEND = 10
4646

4747
# Sentinel value to indicate that a parameter was not set
4848
_UNSET = object()
@@ -539,19 +539,34 @@ def _job_update_loop(
539539
stats["track_statuses"] += 1
540540

541541
not_started = job_db.get_by_status(statuses=["not_started"], max=200).copy()
542-
pending = job_db.get_by_status(statuses=["created", "queued", "queued_for_start"], max=200).copy()
542+
queued = job_db.get_by_status(statuses=["queued"], max=200)
543543

544-
if len(not_started) > 0 and len(pending) < 10:
544+
if len(not_started) > 0:
545545
# Check number of jobs running at each backend
546546
running = job_db.get_by_status(statuses=["running"])
547547
stats["job_db get_by_status"] += 1
548-
per_backend = running.groupby("backend_name").size().to_dict()
549-
_log.info(f"Running per backend: {per_backend}")
548+
running_per_backend = running.groupby("backend_name").size().to_dict()
549+
queued_per_backend = queued.groupby("backend_name").size().to_dict()
550+
_log.info(f"Running per backend: {running_per_backend}")
551+
_log.info(f"Queued per backend: {queued_per_backend}")
552+
550553
total_added = 0
551554
for backend_name in self.backends:
552-
backend_load = per_backend.get(backend_name, 0)
553-
if backend_load < self.backends[backend_name].parallel_jobs:
554-
to_add = self.backends[backend_name].parallel_jobs - backend_load
555+
backend_running = running_per_backend.get(backend_name, 0)
556+
backend_queued = queued_per_backend.get(backend_name, 0)
557+
558+
# capacity, check per backend (max 10 queued jobs per user/backend)
559+
backend_capacity = self.backends[backend_name].parallel_jobs
560+
has_capacity = backend_running < backend_capacity
561+
under_queued_limit = backend_queued < QUEUE_LIMIT_PER_BACKEND
562+
563+
if has_capacity and under_queued_limit:
564+
565+
#calcualte the number of jobs we can add, also based on the queue size
566+
available_slots = max(0, backend_capacity - backend_running)
567+
remaining_queue_space = max(0, QUEUE_LIMIT_PER_BACKEND - backend_queued)
568+
to_add = min(available_slots, remaining_queue_space)
569+
555570
for i in not_started.index[total_added : total_added + to_add]:
556571
self._launch_job(start_job, df=not_started, i=i, backend_name=backend_name, stats=stats)
557572
stats["job launch"] += 1

0 commit comments

Comments
 (0)