Skip to content

Commit e8dac6b

Browse files
committed
Improve workflow monitor loop times
1 parent e95a559 commit e8dac6b

File tree

1 file changed

+23
-3
lines changed

1 file changed

+23
-3
lines changed

lib/galaxy/workflow/scheduling_manager.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
import os
2+
from datetime import datetime
23
from functools import partial
3-
from typing import Optional
4+
from typing import (
5+
Dict,
6+
Optional,
7+
)
48

59
from sqlalchemy.orm import Session
610

@@ -296,6 +300,7 @@ def __init__(self, app: MinimalManagerApp, workflow_scheduling_manager):
296300
name="WorkflowRequestMonitor.monitor_thread", target=self.__monitor, config=app.config
297301
)
298302
self.invocation_grabber = None
303+
self.update_time_tracking_dict: Dict[int, datetime] = {}
299304
self_handler_tags = set(self.app.job_config.self_handler_tags)
300305
self_handler_tags.add(self.workflow_scheduling_manager.default_handler_id)
301306
handler_assignment_method = InvocationGrabber.get_grabbable_handler_assignment_method(
@@ -310,6 +315,16 @@ def __init__(self, app: MinimalManagerApp, workflow_scheduling_manager):
310315
handler_tags=self_handler_tags,
311316
)
312317

318+
def ready_to_schedule_more(self, invocation: model.WorkflowInvocation):
319+
# Improve reactivity of scheduling using the history update_time as a heuristic.
320+
# If there wasn't a change in the history we're unlikely to be able to make more progress.
321+
if invocation.id not in self.update_time_tracking_dict:
322+
return True
323+
else:
324+
last_schedule_time = self.update_time_tracking_dict[invocation.id]
325+
last_history_update_time = invocation.history.update_time
326+
return last_history_update_time > last_schedule_time
327+
313328
def __monitor(self):
314329
to_monitor = self.workflow_scheduling_manager.active_workflow_schedulers
315330
while self.monitor_running:
@@ -393,9 +408,11 @@ def __attempt_schedule(self, invocation_id, workflow_scheduler):
393408
workflow_invocation.cancel_invocation_steps()
394409
workflow_invocation.mark_cancelled()
395410
session.commit()
411+
self.update_time_tracking_dict.pop(invocation_id, None)
396412
return False
397413

398414
if not workflow_invocation or not workflow_invocation.active:
415+
self.update_time_tracking_dict.pop(invocation_id, None)
399416
return False
400417

401418
# This ensures we're only ever working on the 'first' active
@@ -405,9 +422,12 @@ def __attempt_schedule(self, invocation_id, workflow_scheduler):
405422
for i in workflow_invocation.history.workflow_invocations:
406423
if i.active and i.id < workflow_invocation.id:
407424
return False
408-
workflow_scheduler.schedule(workflow_invocation)
409-
log.debug("Workflow invocation [%s] scheduled", workflow_invocation.id)
425+
if self.ready_to_schedule_more(workflow_invocation):
426+
self.update_time_tracking_dict[invocation_id] = datetime.now()
427+
workflow_scheduler.schedule(workflow_invocation)
428+
log.debug("Workflow invocation [%s] scheduled", invocation_id)
410429
except Exception:
430+
self.update_time_tracking_dict.pop(invocation_id, None)
411431
# TODO: eventually fail this - or fail it right away?
412432
log.exception("Exception raised while attempting to schedule workflow request.")
413433
return False

0 commit comments

Comments
 (0)