Skip to content

Commit b15ae2c

Browse files
authored
Merge pull request #20522 from mvdbeek/improve_workflow_scheduling_iteration_loop
[25.0] Improve workflow monitor loop times
2 parents 1e2737a + f251d84 commit b15ae2c

File tree

2 files changed

+48
-3
lines changed

2 files changed

+48
-3
lines changed

lib/galaxy/model/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9000,6 +9000,13 @@ class WorkflowInvocation(Base, UsesCreateAndUpdateTime, Dictifiable, Serializabl
90009000
states = InvocationState
90019001
non_terminal_states = [states.NEW, states.READY]
90029002

9003+
def get_last_workflow_invocation_step_update_time(self) -> Optional[datetime]:
9004+
session = required_object_session(self)
9005+
stmt = select(func.max(WorkflowInvocationStep.update_time)).where(
9006+
WorkflowInvocationStep.workflow_invocation_id == self.id
9007+
)
9008+
return session.execute(stmt).scalar_one_or_none()
9009+
90039010
def create_subworkflow_invocation_for_step(self, step):
90049011
assert step.type == "subworkflow"
90059012
subworkflow_invocation = WorkflowInvocation()

lib/galaxy/workflow/scheduling_manager.py

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
11
import os
2+
from datetime import (
3+
datetime,
4+
timedelta,
5+
)
26
from functools import partial
3-
from typing import Optional
7+
from typing import (
8+
Dict,
9+
Optional,
10+
)
411

512
from sqlalchemy.orm import Session
613

@@ -30,6 +37,7 @@
3037

3138
DEFAULT_SCHEDULER_ID = "default" # well actually this should be called DEFAULT_DEFAULT_SCHEDULER_ID...
3239
DEFAULT_SCHEDULER_PLUGIN_TYPE = "core"
40+
DEFAULT_SCHEDULER_BACKFILL_SECONDS = int(os.getenv("GALAXY_SCHEDULER_BACKFILL_SECONDS", 300))
3341

3442
EXCEPTION_MESSAGE_SHUTDOWN = "Exception raised while attempting to shutdown workflow scheduler."
3543
EXCEPTION_MESSAGE_NO_SCHEDULERS = "Failed to defined workflow schedulers - no workflow schedulers defined."
@@ -296,6 +304,8 @@ def __init__(self, app: MinimalManagerApp, workflow_scheduling_manager):
296304
name="WorkflowRequestMonitor.monitor_thread", target=self.__monitor, config=app.config
297305
)
298306
self.invocation_grabber = None
307+
self.update_time_tracking_dict: Dict[int, datetime] = {}
308+
self.timedelta = timedelta(seconds=DEFAULT_SCHEDULER_BACKFILL_SECONDS)
299309
self_handler_tags = set(self.app.job_config.self_handler_tags)
300310
self_handler_tags.add(self.workflow_scheduling_manager.default_handler_id)
301311
handler_assignment_method = InvocationGrabber.get_grabbable_handler_assignment_method(
@@ -310,6 +320,29 @@ def __init__(self, app: MinimalManagerApp, workflow_scheduling_manager):
310320
handler_tags=self_handler_tags,
311321
)
312322

323+
def ready_to_schedule_more(self, invocation: model.WorkflowInvocation):
324+
# Improve reactivity of scheduling using the history update_time as a heuristic.
325+
# If there wasn't a change in the history we're unlikely to be able to make more progress.
326+
if invocation.id not in self.update_time_tracking_dict:
327+
return True
328+
else:
329+
last_schedule_time = self.update_time_tracking_dict[invocation.id]
330+
last_history_update_time = invocation.history.update_time
331+
do_schedule = last_history_update_time > last_schedule_time
332+
if not do_schedule and (
333+
invocation_step_update_time := invocation.get_last_workflow_invocation_step_update_time()
334+
):
335+
do_schedule = invocation_step_update_time > last_schedule_time
336+
if not do_schedule and (datetime.now() - last_schedule_time) > self.timedelta:
337+
# If we haven't scheduled in a while, schedule anyway.
338+
log.debug(
339+
"Scheduling workflow invocation [%s] after %s seconds without scheduling.",
340+
invocation.id,
341+
(datetime.now() - last_schedule_time).total_seconds(),
342+
)
343+
do_schedule = True
344+
return do_schedule
345+
313346
def __monitor(self):
314347
to_monitor = self.workflow_scheduling_manager.active_workflow_schedulers
315348
while self.monitor_running:
@@ -393,9 +426,11 @@ def __attempt_schedule(self, invocation_id, workflow_scheduler):
393426
workflow_invocation.cancel_invocation_steps()
394427
workflow_invocation.mark_cancelled()
395428
session.commit()
429+
self.update_time_tracking_dict.pop(invocation_id, None)
396430
return False
397431

398432
if not workflow_invocation or not workflow_invocation.active:
433+
self.update_time_tracking_dict.pop(invocation_id, None)
399434
return False
400435

401436
# This ensures we're only ever working on the 'first' active
@@ -405,9 +440,12 @@ def __attempt_schedule(self, invocation_id, workflow_scheduler):
405440
for i in workflow_invocation.history.workflow_invocations:
406441
if i.active and i.id < workflow_invocation.id:
407442
return False
408-
workflow_scheduler.schedule(workflow_invocation)
409-
log.debug("Workflow invocation [%s] scheduled", workflow_invocation.id)
443+
if self.ready_to_schedule_more(workflow_invocation):
444+
self.update_time_tracking_dict[invocation_id] = datetime.now()
445+
workflow_scheduler.schedule(workflow_invocation)
446+
log.debug("Workflow invocation [%s] scheduled", invocation_id)
410447
except Exception:
448+
self.update_time_tracking_dict.pop(invocation_id, None)
411449
# TODO: eventually fail this - or fail it right away?
412450
log.exception("Exception raised while attempting to schedule workflow request.")
413451
return False

0 commit comments

Comments
 (0)