Skip to content

Commit 410de4e

Browse files
committed
Add execution plan chaining
This commit enables execution plans to be chained. Assuming there is an execution plan EP1, another execution plan EP2 can be chained onto EP1. When chained, EP2 will stay in scheduled state until EP1 goes to stopped state. An execution plan can be chained onto multiple prerequisite execution plans, in which case it will be run once all the prerequisite execution plans are stopped and have not failed. If the prerequisite execution plan ends with stopped-error, the chained execution plan(s) will fail. If the prerequisite execution plan is halted, the chained execution plan(s) will be run. It builds on mechanisms which were already present. When an execution plan is chained, it behaves in the same way as if it was scheduled for future execution. A record is created in dynflow_delayed_table and once the conditions for it to execute are right, it is dispatched by the delayed executor. Because of this, there might be small delay between when the prerequisites finishs and the chained plan is started.
1 parent 3dea623 commit 410de4e

File tree

14 files changed

+404
-16
lines changed

14 files changed

+404
-16
lines changed
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
#!/usr/bin/env ruby
2+
# frozen_string_literal: true
3+
4+
require_relative 'example_helper'
5+
6+
class DelayedAction < Dynflow::Action
7+
def plan(should_fail = false)
8+
plan_self :should_fail => should_fail
9+
end
10+
11+
def run
12+
sleep 5
13+
raise "Controlled failure" if input[:should_fail]
14+
end
15+
16+
def rescue_strategy
17+
Dynflow::Action::Rescue::Fail
18+
end
19+
end
20+
21+
if $PROGRAM_NAME == __FILE__
22+
world = ExampleHelper.create_world do |config|
23+
config.auto_rescue = true
24+
end
25+
world.action_logger.level = 1
26+
world.logger.level = 0
27+
28+
plan1 = world.trigger(DelayedAction)
29+
plan2 = world.chain(plan1.execution_plan_id, DelayedAction)
30+
plan3 = world.chain(plan2.execution_plan_id, DelayedAction)
31+
plan4 = world.chain(plan2.execution_plan_id, DelayedAction)
32+
33+
plan5 = world.trigger(DelayedAction, true)
34+
plan6 = world.chain(plan5.execution_plan_id, DelayedAction)
35+
36+
puts <<-MSG.gsub(/^.*\|/, '')
37+
|
38+
| Execution Plan Chaining example
39+
| ========================
40+
|
41+
| This example shows the execution plan chaining functionality of Dynflow, which allows execution plans to wait until another execution plan finishes.
42+
|
43+
| Execution plans:
44+
| #{plan1.id} runs immediately and should run successfully.
45+
| #{plan2.id} is delayed and should run once #{plan1.id} finishes.
46+
| #{plan3.id} and #{plan4.id} are delayed and should run once #{plan2.id} finishes.
47+
|
48+
| #{plan5.id} runs immediately and is expected to fail.
49+
| #{plan6.id} should not run at all as its prerequisite failed.
50+
|
51+
| Visit #{ExampleHelper::DYNFLOW_URL} to see their status.
52+
|
53+
MSG
54+
55+
ExampleHelper.run_web_console(world)
56+
end

lib/dynflow/debug/telemetry/persistence.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ module Persistence
1919
:load_execution_plan,
2020
:save_execution_plan,
2121
:find_old_execution_plans,
22-
:find_past_delayed_plans,
22+
:find_ready_delayed_plans,
2323
:delete_delayed_plans,
2424
:save_delayed_plan,
2525
:set_delayed_plan_frozen,

lib/dynflow/delayed_executors/abstract_core.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def time
3232

3333
def delayed_execution_plans(time)
3434
with_error_handling([]) do
35-
world.persistence.find_past_delayed_plans(time)
35+
world.persistence.find_ready_delayed_plans(time)
3636
end
3737
end
3838

lib/dynflow/delayed_plan.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@ def timeout
3131
error("Execution plan could not be started before set time (#{@start_before})", 'timeout')
3232
end
3333

34+
def failed_dependencies(uuids)
35+
bullets = uuids.map { |u| "- #{u}" }.join("\n")
36+
msg = "Execution plan could not be started because some of its prerequisite execution plans failed:\n#{bullets}"
37+
error(msg, 'failed-dependency')
38+
end
39+
3440
def error(message, history_entry = nil)
3541
execution_plan.root_plan_step.state = :error
3642
execution_plan.root_plan_step.error = ::Dynflow::ExecutionPlan::Steps::Error.new(message)

lib/dynflow/director.rb

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,15 @@ def execute
114114
plan = world.persistence.load_delayed_plan(execution_plan_id)
115115
return if plan.nil? || plan.execution_plan.state != :scheduled
116116

117-
if !plan.start_before.nil? && plan.start_before < Time.now.utc()
117+
if plan.start_before.nil?
118+
blocker_ids = world.persistence.find_execution_plan_dependencies(execution_plan_id)
119+
statuses = world.persistence.find_execution_plan_statuses({ filters: { uuid: blocker_ids } })
120+
failed = statuses.select { |_uuid, status| status[:state] == 'stopped' && status[:result] == 'error' }
121+
if failed.any?
122+
plan.failed_dependencies(failed.keys)
123+
return
124+
end
125+
elsif plan.start_before < Time.now.utc()
118126
plan.timeout
119127
return
120128
end

lib/dynflow/persistence.rb

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,16 @@ def find_old_execution_plans(age)
101101
end
102102
end
103103

104-
def find_past_delayed_plans(time)
105-
adapter.find_past_delayed_plans(time).map do |plan|
104+
def find_execution_plan_dependencies(execution_plan_id)
105+
adapter.find_execution_plan_dependencies(execution_plan_id)
106+
end
107+
108+
def find_blocked_execution_plans(execution_plan_id)
109+
adapter.find_blocked_execution_plans(execution_plan_id)
110+
end
111+
112+
def find_ready_delayed_plans(time)
113+
adapter.find_ready_delayed_plans(time).map do |plan|
106114
DelayedPlan.new_from_hash(@world, plan)
107115
end
108116
end
@@ -163,5 +171,9 @@ def prune_envelopes(receiver_ids)
163171
def prune_undeliverable_envelopes
164172
adapter.prune_undeliverable_envelopes
165173
end
174+
175+
def chain_execution_plan(first, second)
176+
adapter.chain_execution_plan(first, second)
177+
end
166178
end
167179
end

lib/dynflow/persistence_adapters/abstract.rb

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,15 @@ def save_execution_plan(execution_plan_id, value)
7272
raise NotImplementedError
7373
end
7474

75-
def find_past_delayed_plans(options = {})
75+
def find_execution_plan_dependencies(execution_plan_id)
76+
raise NotImplementedError
77+
end
78+
79+
def find_blocked_execution_plans(execution_plan_id)
80+
raise NotImplementedError
81+
end
82+
83+
def find_ready_delayed_plans(options = {})
7684
raise NotImplementedError
7785
end
7886

lib/dynflow/persistence_adapters/sequel.rb

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ class action_class execution_plan_uuid queue),
3939
envelope: %w(receiver_id),
4040
coordinator_record: %w(id owner_id class),
4141
delayed: %w(execution_plan_uuid start_at start_before args_serializer frozen),
42-
output_chunk: %w(execution_plan_uuid action_id kind timestamp) }
42+
output_chunk: %w(execution_plan_uuid action_id kind timestamp),
43+
execution_plan_dependency: %w(execution_plan_uuid blocked_by_uuid) }
4344

4445
SERIALIZABLE_COLUMNS = { action: %w(input output),
4546
delayed: %w(serialized_args),
@@ -153,12 +154,31 @@ def find_old_execution_plans(age)
153154
records.map { |plan| execution_plan_column_map(load_data plan, table_name) }
154155
end
155156

156-
def find_past_delayed_plans(time)
157+
def find_execution_plan_dependencies(execution_plan_id)
158+
table(:execution_plan_dependency)
159+
.where(execution_plan_uuid: execution_plan_id)
160+
.select_map(:blocked_by_uuid)
161+
end
162+
163+
def find_blocked_execution_plans(execution_plan_id)
164+
table(:execution_plan_dependency)
165+
.where(blocked_by_uuid: execution_plan_id)
166+
.select_map(:execution_plan_uuid)
167+
end
168+
169+
def find_ready_delayed_plans(time)
157170
table_name = :delayed
171+
# Subquery to find delayed plans that have at least one non-stopped dependency
172+
plans_with_unfinished_deps = table(:execution_plan_dependency)
173+
.join(TABLES[:execution_plan], uuid: :blocked_by_uuid)
174+
.where(::Sequel.~(state: 'stopped'))
175+
.select(:execution_plan_uuid)
176+
158177
records = with_retry do
159178
table(table_name)
160-
.where(::Sequel.lit('start_at <= ? OR (start_before IS NOT NULL AND start_before <= ?)', time, time))
179+
.where(::Sequel.lit('start_at IS NULL OR (start_at <= ? OR (start_before IS NOT NULL AND start_before <= ?))', time, time))
161180
.where(:frozen => false)
181+
.exclude(execution_plan_uuid: plans_with_unfinished_deps)
162182
.order_by(:start_at)
163183
.all
164184
end
@@ -175,6 +195,10 @@ def save_delayed_plan(execution_plan_id, value)
175195
save :delayed, { execution_plan_uuid: execution_plan_id }, value, with_data: false
176196
end
177197

198+
def chain_execution_plan(first, second)
199+
save :execution_plan_dependency, {}, { execution_plan_uuid: second, blocked_by_uuid: first }, with_data: false
200+
end
201+
178202
def load_step(execution_plan_id, step_id)
179203
load :step, execution_plan_uuid: execution_plan_id, id: step_id
180204
end
@@ -319,7 +343,8 @@ def abort_if_pending_migrations!
319343
envelope: :dynflow_envelopes,
320344
coordinator_record: :dynflow_coordinator_records,
321345
delayed: :dynflow_delayed_plans,
322-
output_chunk: :dynflow_output_chunks }
346+
output_chunk: :dynflow_output_chunks,
347+
execution_plan_dependency: :dynflow_execution_plan_dependencies }
323348

324349
def table(which)
325350
db[TABLES.fetch(which)]
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# frozen_string_literal: true
2+
3+
Sequel.migration do
4+
up do
5+
type = database_type
6+
create_table(:dynflow_execution_plan_dependencies) do
7+
column_properties = if type.to_s.include?('postgres')
8+
{ type: :uuid }
9+
else
10+
{ type: String, size: 36, fixed: true, null: false }
11+
end
12+
foreign_key :execution_plan_uuid, :dynflow_execution_plans, on_delete: :cascade, **column_properties
13+
foreign_key :blocked_by_uuid, :dynflow_execution_plans, on_delete: :cascade, **column_properties
14+
index :blocked_by_uuid
15+
index :execution_plan_uuid
16+
end
17+
end
18+
19+
down do
20+
drop_table(:dynflow_execution_plan_dependencies)
21+
end
22+
end

lib/dynflow/world.rb

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,16 @@ def delay_with_options(action_class:, args:, delay_options:, id: nil, caller_act
202202
Scheduled[execution_plan.id]
203203
end
204204

205+
def chain(plan_uuids, action_class, *args)
206+
plan_uuids = [plan_uuids] unless plan_uuids.is_a? Array
207+
result = delay_with_options(action_class: action_class, args: args, delay_options: { frozen: true })
208+
plan_uuids.each do |plan_uuid|
209+
persistence.chain_execution_plan(plan_uuid, result.execution_plan_id)
210+
end
211+
persistence.set_delayed_plan_frozen(result.execution_plan_id, false)
212+
result
213+
end
214+
205215
def plan_elsewhere(action_class, *args)
206216
execution_plan = ExecutionPlan.new(self, nil)
207217
execution_plan.delay(nil, action_class, {}, *args)

0 commit comments

Comments
 (0)