Track scheduling dependencies for workflow invocations#22128
Track scheduling dependencies for workflow invocations#22128mvdbeek wants to merge 1 commit intogalaxyproject:devfrom
Conversation
Replace fragile proxy signals in ready_to_schedule_more() with precise dependency tracking. When scheduling is delayed, record the concrete dependency (job ID, HDA ID, HDCA ID, or invocation step ID) and check whether it's been satisfied on the next iteration. - Add SchedulingDependency and DependencyType to modules.py; attach dependency info to DelayedWorkflowEvaluation where possible. - Collect dependencies on WorkflowProgress.scheduling_dependencies and expose them on the WorkflowInvocation via _scheduling_dependencies. - Propagate scheduling dependencies from subworkflow invokers to the parent progress so the scheduling manager can check them. - In scheduling_manager.py, maintain dependency_tracking_dict alongside update_time_tracking_dict; use batch queries for each dependency type (Job terminal states, HDA ready states, HDCA populated, step action set). - Keep history.update_time and step.update_time as fallbacks for signals not captured by explicit dependencies (e.g. PartialJobExecution). - Fix Job.set_final_state() to update workflow_invocation_step via implicit_collection_jobs link, not just direct job_id. - Fix maximum_workflow_invocation_duration enforcement for paused workflows: always allow scheduling when the duration is exceeded so invoke() can fail the invocation. | Delay scenario | Dependency tracked | Fallback signal | |---|---|---| | Job not finished | `JOB <id>` | `step.update_time` (via `Job.set_final_state`) | | HDA not ready (pending) | `HDA <id>` | `history.update_time` | | HDCA not populated | `HDCA <id>` | `history.update_time` | | Pause step waiting for review | `WORKFLOW_INVOCATION_STEP <id>` | `step.update_time` (action set via API) | | Dependent step not yet invoked | *(none — transient)* | next iteration re-evaluates | | Dependent step not yet scheduled | *(none — transient)* | next iteration re-evaluates | | Dependent step outputs delayed | *(inherits from root cause)* | inherits from root cause | | Tool inputs not ready | *(none — rare special tools)* | `history.update_time` | | PartialJobExecution | *(none — creates new HDAs)* | `history.update_time` | | Workflow output not yet created | *(none — transient)* | next iteration re-evaluates | | `maximum_workflow_invocation_duration` exceeded | *(duration check in `ready_to_schedule_more`)* | — | | Process restart | *(in-memory dicts empty)* | first iteration always schedules |
|
That is so fucking rad! Like wow... I love this! I would never trust myself to do this bookkeeping - I do trust Claude. I would be tempted to say we should track the time we add something to that dependency dict and just wipe it clean every thirty minutes so we try to recompute the dependencies in case something is tracked wrong or doesn't get updated in a way that if you tried to check the actual state it would work (maybe a race condition of some kind) - but it would be super defensive without cause. This is just stellar. |
dannon
left a comment
There was a problem hiding this comment.
Worth keeping a very long backfill timer (say 3600s) as a last-resort safety net, or are the fallbacks sufficient here?
|
I would like to avoid that and instead find all the corner cases. We did have a 5 minute fallback already for the previous incarnation and it was needed and painful. That's what prompted this actually. |
|
Makes sense, definitely better to fix the corner cases than mask them. My only concern is catching them when they happen -- if an invocation stalls because a dependency wasn't tracked, it'd be silent right now. Worth adding a log warning or even a Sentry event if an invocation hasn't made progress in some threshold? Not to act on it automatically, just so it's visible. |
|
I like that, will add it. |
Replace fragile proxy signals in ready_to_schedule_more() with precise dependency tracking. When scheduling is delayed, record the concrete dependency (job ID, HDA ID, HDCA ID, or invocation step ID) and check whether it's been satisfied on the next iteration.
JOB <id>step.update_time(viaJob.set_final_state)HDA <id>history.update_timeHDCA <id>history.update_timeWORKFLOW_INVOCATION_STEP <id>step.update_time(action set via API)history.update_timehistory.update_timemaximum_workflow_invocation_durationexceededready_to_schedule_more)How to test the changes?
(Select all options that apply)
License