Skip to content

Commit 41e5512

Browse files
committed
PR #839 simplify run/queueing capacity logic a bit
1 parent d11a0e1 commit 41e5512

File tree

1 file changed

+15
-23
lines changed

1 file changed

+15
-23
lines changed

openeo/extra/job_management/_manager.py

Lines changed: 15 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242

4343
# TODO: eliminate this module constant (should be part of some constructor interface)
4444
MAX_RETRIES = 50
45-
QUEUE_LIMIT_PER_BACKEND = 10
4645

4746
# Sentinel value to indicate that a parameter was not set
4847
_UNSET = object()
@@ -60,6 +59,9 @@ class _Backend(NamedTuple):
6059
# Maximum number of jobs to allow in parallel on a backend
6160
parallel_jobs: int
6261

62+
# Maximum number of jobs to allow in queue on a backend
63+
queueing_limit: int = 10
64+
6365

6466
@dataclasses.dataclass(frozen=True)
6567
class _ColumnProperties:
@@ -252,7 +254,8 @@ def add_backend(
252254
c = connection
253255
connection = lambda: c
254256
assert callable(connection)
255-
self.backends[name] = _Backend(get_connection=connection, parallel_jobs=parallel_jobs)
257+
# TODO: expose queueing_limit?
258+
self.backends[name] = _Backend(get_connection=connection, parallel_jobs=parallel_jobs, queueing_limit=10)
256259

257260
def _get_connection(self, backend_name: str, resilient: bool = True) -> Connection:
258261
"""Get a connection for the backend and optionally make it resilient (adds retry behavior)
@@ -539,34 +542,23 @@ def _job_update_loop(
539542
stats["track_statuses"] += 1
540543

541544
not_started = job_db.get_by_status(statuses=["not_started"], max=200).copy()
542-
queued = job_db.get_by_status(statuses=["queued"], max=200)
543-
544545
if len(not_started) > 0:
545-
# Check number of jobs running at each backend
546+
# Check number of jobs queued/running at each backend
546547
running = job_db.get_by_status(statuses=["queued", "queued_for_start", "running"])
547-
stats["job_db get_by_status"] += 1
548+
queued = running[running["status"] == "queued"]
548549
running_per_backend = running.groupby("backend_name").size().to_dict()
549550
queued_per_backend = queued.groupby("backend_name").size().to_dict()
550-
_log.info(f"Running per backend: {running_per_backend}")
551-
_log.info(f"Queued per backend: {queued_per_backend}")
551+
_log.info(f"{running_per_backend=} {queued_per_backend=}")
552552

553553
total_added = 0
554554
for backend_name in self.backends:
555-
backend_running = running_per_backend.get(backend_name, 0)
556-
backend_queued = queued_per_backend.get(backend_name, 0)
557-
558-
# capacity, check per backend (max 10 queued jobs per user/backend)
559-
backend_capacity = self.backends[backend_name].parallel_jobs
560-
has_capacity = backend_running < backend_capacity
561-
under_queued_limit = backend_queued < QUEUE_LIMIT_PER_BACKEND
562-
563-
if has_capacity and under_queued_limit:
564-
565-
#calcualte the number of jobs we can add, also based on the queue size
566-
available_slots = max(0, backend_capacity - backend_running)
567-
remaining_queue_space = max(0, QUEUE_LIMIT_PER_BACKEND - backend_queued)
568-
to_add = min(available_slots, remaining_queue_space)
569-
555+
to_add = min(
556+
# How much room is there to start/queue a job?
557+
self.backends[backend_name].queueing_limit - queued_per_backend.get(backend_name, 0),
558+
# How much room is there to run a job?
559+
self.backends[backend_name].parallel_jobs - running_per_backend.get(backend_name, 0),
560+
)
561+
if to_add > 0:
570562
for i in not_started.index[total_added : total_added + to_add]:
571563
self._launch_job(start_job, df=not_started, i=i, backend_name=backend_name, stats=stats)
572564
stats["job launch"] += 1

0 commit comments

Comments
 (0)