Skip to content

Commit 9dae43b

Browse files
committed
Each now stops when iteration emits a failing signal.
1 parent c80703a commit 9dae43b

File tree

2 files changed

+22
-0
lines changed

2 files changed

+22
-0
lines changed

lib/trailblazer/macro/each.rb

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
module Trailblazer
22
module Macro
33
# TODO: explain termini routing, Inject usage for "block activity", Out() => []
4+
# BLOG: Each() is a perfect example of how versatile the TRB mechanics are
45

6+
# @api private The internals here are considered private and might change at some point.
57
def self.Each(block_activity=nil, dataset_from: nil, item_key: :item, id: Macro.id_for(block_activity, macro: :Each, hint: dataset_from), collect: false, **dsl_options_for_iterated, &block)
68
# TODO: 2.5. fix
79
iterated_activity, outputs_from_block_activity = Trailblazer::Macro.block_activity_for(block_activity, &block)
@@ -29,6 +31,11 @@ def self.Each(block_activity=nil, dataset_from: nil, item_key: :item, id: Macro.
2931

3032
each_activity = Trailblazer::Activity::Railway(termini: termini_from_block_activity) do
3133

34+
# TODO: make publicly configurable.
35+
@state.update!(:fields) do |fields|
36+
fields.merge(failing_semantics: [:failure, :fail_fast])
37+
end
38+
3239
step Subprocess(iterated_activity, strict: true),
3340
id: "ITERATED FIXME",
3441
Inject(:index, filter_builder: my_filter_builder) => my_lowlevel_inject_filter,
@@ -42,6 +49,8 @@ def self.call((ctx, flow_options), runner:, **circuit_options)
4249
# We don't really need to override/replace {circuit} as we only want to change the way it's run.
4350
iterated_railway = to_h[:circuit].to_h[:map].keys[1] # DISCUSS: maybe find by id?
4451

52+
failing_semantics = @state.get(:fields).fetch(:failing_semantics)
53+
4554
dataset = ctx.fetch(:dataset)
4655
signal = nil
4756

@@ -54,6 +63,9 @@ def self.call((ctx, flow_options), runner:, **circuit_options)
5463

5564
# we "inject" item_key and index via Runner.(..., item_key => ..) and then the input filter grabs that.
5665
signal, (ctx, flow_options) = runner.(iterated_railway, [ctx, flow_options], runner: runner, **circuit_options, activity: self, **each_options_for_iterated)
66+
67+
# Break the loop if {iterated_activity} emits a failure signal.
68+
break if failing_semantics.include?(signal.to_h[:semantic]) # TODO: use generic check from older macro
5769
end
5870

5971
return signal, [ctx, flow_options]

test/docs/each_test.rb

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ def composers_for_each(ctx, model:, **)
241241
end # F
242242

243243
it "failure in Each" do
244+
# last iteration fails:
244245
assert_invoke F::Song::Activity::Cover, params: {id: 2},
245246
expected_ctx_variables: {
246247
model: B::Song.find_by(id: 2),
@@ -249,6 +250,15 @@ def composers_for_each(ctx, model:, **)
249250
seq: "[]",
250251
terminus: :failure
251252

253+
# first iteration fails:
254+
assert_invoke F::Song::Activity::Cover, params: {id: 999},
255+
expected_ctx_variables: {
256+
model: B::Song.find_by(id: 999),
257+
collected_from_each: [nil],
258+
},
259+
seq: "[]",
260+
terminus: :failure
261+
252262
Trailblazer::Developer.wtf?(F::Song::Activity::Cover, {params: {id: 2}, seq: []})
253263
end
254264

0 commit comments

Comments
 (0)