Skip to content

Commit edfdf49

Browse files
committed
after switching the circuit to symbol IDs it's actually faster than TRB 2.1! 🍻
1 parent 284ad6e commit edfdf49

File tree

1 file changed

+66
-44
lines changed

1 file changed

+66
-44
lines changed

test/RUNNER_INVOKER_test.rb

Lines changed: 66 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
# 9. does invoker.call need kwargs?
1313
# 10. BUG: when all tasks are the same proc and the last is the terminus, only the first is run. ===> use ids, we got them, anyway.
1414
# 11. should circuit_options be a positional arg?
15-
# 12. don't repeat Io.new as context, use automatic passing
15+
# 12. don't repeat Io.new as context, use automatic passing [done]
16+
# 13. termini IDs in map when using nesting
1617

1718
class RunnerInvokerTest < Minitest::Spec
1819
# Helper for those who don't like or have a DSL :D
@@ -23,25 +24,30 @@ def pipeline_circuit(*task_cfgs)
2324
]
2425
end
2526

26-
circuit = task_cfgs.collect.with_index do |task_cfg, i|
27-
signal = task_cfg[-1]
27+
map = task_cfgs.collect.with_index do |(id, _, _, _, signal), i|
28+
next_task = task_cfgs[i + 1]
2829

2930
[
30-
task_cfg,
31-
{signal => task_cfgs[i + 1]} # FIXME: don't link last task!
31+
id,
32+
{signal => next_task ? next_task[0] : nil} # FIXME: don't link last task at all!
3233
]
3334

3435
end.to_h
3536

37+
config = task_cfgs.collect do |(id, *args)|
38+
[id, [id, *args]]
39+
end.to_h
40+
3641
Circuit.new(
37-
map: circuit,
38-
start_task: task_cfgs[0],
39-
termini: [task_cfgs[-1][1]]
42+
map: map,
43+
start_task: config.keys[0],
44+
termini: [config.keys[-1]],
45+
config: config,
4046
)
4147
end
4248

4349
# We start with NO #call methods!
44-
class Circuit < Struct.new(:map, :start_task, :termini, keyword_init: true)
50+
class Circuit < Struct.new(:map, :start_task, :termini, :config, keyword_init: true)
4551
class Processor
4652
module Trailblazer
4753
class Context < Struct.new(:shadowed, :mutable)
@@ -88,14 +94,20 @@ def self.Context(shadowed)
8894
def self.call(circuit, ctx, scope: false, emit_to_outer_ctx: nil, emit_signal: false, **tmp_ctx) # DISCUSS: should we extract or pass-on {:use_outer_tmp}?
8995
map = circuit.map
9096
termini = circuit.termini
91-
task_cfg = circuit.start_task
97+
start_task_id = circuit.start_task
98+
config = circuit.config
99+
100+
task_cfg = config[start_task_id]
92101

93102
# DISCUSS: tmp == circuit_ctx
94103
ctx = Trailblazer.Context(ctx) if scope # discarded after this circuit is finished. (see oUTER_TMP___) # FIXME: share on demand?
95104
# FIXME: should this be done on the outside?
96105
loop do
97106
id, task, invoker, circuit_options_to_merge = task_cfg
98107

108+
#
109+
#
110+
#
99111
# puts "@@@@@ circuit [invoke] #{id.inspect} #{circuit_options_to_merge}"
100112
# ctx = ctx.merge(circuit_options_to_merge)
101113

@@ -109,8 +121,8 @@ def self.call(circuit, ctx, scope: false, emit_to_outer_ctx: nil, emit_signal: f
109121
)
110122

111123
# Stop execution of the circuit when we hit a terminus.
112-
# puts "@@@@@ #{termini.collect { |o| o} } ??? #{signal.object_id} #{signal}"
113-
if termini.include?(task)
124+
# puts "@@@@@ #{termini.collect { |o| o} } ??? #{id.inspect}"
125+
if termini.include?(id)
114126
# puts "done with circuit #{task}"
115127
if emit_to_outer_ctx
116128
outer_ctx, mutable = ctx.decompose
@@ -135,8 +147,8 @@ def self.call(circuit, ctx, scope: false, emit_to_outer_ctx: nil, emit_signal: f
135147
return ctx, signal # FIXME: IS THAT WHAT WE WANT? what if we want to pass in a tmp context into a nested circuit, but don't want it back?
136148
end
137149

138-
if next_task_cfg = next_for(map, task_cfg, signal)
139-
task_cfg = next_task_cfg
150+
if next_task_id = next_for(map, id, signal)
151+
task_cfg = config[next_task_id]
140152
# puts "@@@@@ =========> #{next_task_cfg.inspect}"
141153
else
142154
raise signal.inspect
@@ -145,8 +157,8 @@ def self.call(circuit, ctx, scope: false, emit_to_outer_ctx: nil, emit_signal: f
145157
end
146158
end
147159

148-
def self.next_for(map, last_task_cfg, signal)
149-
outputs = map[last_task_cfg]
160+
def self.next_for(map, last_task_id, signal)
161+
outputs = map[last_task_id]
150162
outputs[signal]
151163
end
152164
end
@@ -356,12 +368,12 @@ def create_application_ctx(ctx, aggregate:, **)
356368
# In() => :my_model_input
357369
my_model_input_pipe = pipeline_circuit(
358370
[:invoke_instance_method, :my_model_input, INVOKER___STEP_INTERFACE_ON_EXEC_CONTEXT, {exec_context: Create.new}],
359-
[:add_value_to_aggregate, :add_value_to_aggregate, INVOKER___CIRCUIT_INTERFACE_ON_EXEC_CONTEXT, {exec_context: Io, use_application_ctx___: false}],
371+
[:add_value_to_aggregate, :add_value_to_aggregate, INVOKER___CIRCUIT_INTERFACE_ON_EXEC_CONTEXT, {use_application_ctx___: false}],
360372
)
361373

362374
more_model_input_pipe = pipeline_circuit(
363375
[:invoke_callable, Create::MoreModelInput, INVOKER___STEP_INTERFACE],
364-
[:add_value_to_aggregate, :add_value_to_aggregate, INVOKER___CIRCUIT_INTERFACE_ON_EXEC_CONTEXT, {exec_context: Io, use_application_ctx___: false}],
376+
[:add_value_to_aggregate, :add_value_to_aggregate, INVOKER___CIRCUIT_INTERFACE_ON_EXEC_CONTEXT, {use_application_ctx___: false}],
365377
)
366378

367379
my_model_output_pipe = pipeline_circuit(
@@ -370,18 +382,20 @@ def create_application_ctx(ctx, aggregate:, **)
370382
)
371383
# raise "the original_application_ctx must be available to output, but not to the next real step"
372384

385+
# !!! requires: {exec_context: Io}
373386
model_input_pipe = pipeline_circuit(
374-
[:save_original_application_ctx, :save_original_application_ctx, INVOKER___CIRCUIT_INTERFACE_ON_EXEC_CONTEXT, {exec_context: Io}],
375-
[:init_aggregate, :init_aggregate, INVOKER___CIRCUIT_INTERFACE_ON_EXEC_CONTEXT, {exec_context: Io}],
387+
[:save_original_application_ctx, :save_original_application_ctx, INVOKER___CIRCUIT_INTERFACE_ON_EXEC_CONTEXT, {}],
388+
[:init_aggregate, :init_aggregate, INVOKER___CIRCUIT_INTERFACE_ON_EXEC_CONTEXT, {}],
376389
[:my_model_input, my_model_input_pipe, Circuit::Processor, {scope: true, emit_to_outer_ctx: [:aggregate]}], # user filter.
377390
[:more_model_input, more_model_input_pipe, Circuit::Processor, {scope: true, emit_to_outer_ctx: [:aggregate]}], # user filter.
378-
[:create_application_ctx, :create_application_ctx, INVOKER___CIRCUIT_INTERFACE_ON_EXEC_CONTEXT, {exec_context: Io}],
391+
[:create_application_ctx, :create_application_ctx, INVOKER___CIRCUIT_INTERFACE_ON_EXEC_CONTEXT, {}],
379392
)
380393

394+
# !!! requires: {exec_context: Io}
381395
model_output_pipe = pipeline_circuit(
382-
[:init_aggregate, :init_aggregate, INVOKER___CIRCUIT_INTERFACE_ON_EXEC_CONTEXT, {exec_context: Io}],
396+
[:init_aggregate, :init_aggregate, INVOKER___CIRCUIT_INTERFACE_ON_EXEC_CONTEXT, {}],
383397
[:my_model_output, my_model_output_pipe, Circuit::Processor], # user filter.
384-
[:swap___, :swap___, INVOKER___CIRCUIT_INTERFACE_ON_EXEC_CONTEXT, {exec_context: Io}],
398+
[:swap___, :swap___, INVOKER___CIRCUIT_INTERFACE_ON_EXEC_CONTEXT, {}],
385399
)
386400

387401

@@ -390,11 +404,11 @@ def create_application_ctx(ctx, aggregate:, **)
390404
#{ } how to handle signal?"
391405

392406
model_pipe = pipeline_circuit(
393-
[:input, model_input_pipe, Circuit::Processor, {scope: true, emit_to_outer_ctx: [:application_ctx, :original_application_ctx].freeze}], # change {:application_ctx}.
407+
[:input, model_input_pipe, Circuit::Processor, {exec_context: Io, scope: true, emit_to_outer_ctx: [:application_ctx, :original_application_ctx].freeze}], # change {:application_ctx}.
394408

395409
[:invoke_instance_method, :model, INVOKER___STEP_INTERFACE_ON_EXEC_CONTEXT, {exec_context: Create.new}],
396410
[:compute_binary_signal, ComputeBinarySignal],
397-
[:output, model_output_pipe, Circuit::Processor, {scope: true, emit_to_outer_ctx: [:application_ctx].freeze}],
411+
[:output, model_output_pipe, Circuit::Processor, {exec_context: Io, scope: true, emit_to_outer_ctx: [:application_ctx].freeze}],
398412
)
399413

400414
run_checks_pipe = pipeline_circuit(
@@ -407,40 +421,44 @@ def create_application_ctx(ctx, aggregate:, **)
407421
[:compute_binary_signal, ComputeBinarySignal],
408422
)
409423

410-
run_checks = [:run_checks, run_checks_pipe, Circuit::Processor, {scope: true, emit_to_outer_ctx: [:application_ctx], emit_signal: true}]
411-
title_length_ok = [:title_length_ok?, title_length_ok_pipe, Circuit::Processor, {scope: true, emit_to_outer_ctx: [:application_ctx], emit_signal: true}]
412-
validate_success_terminus = [:validate_success_terminus, FIXME_SUCCESS = Circuit::Terminus::Success.new(semantic: :success), INVOKER___CIRCUIT_INTERFACE, {}]
413-
validate_failure_terminus = [:validate_failure_terminus, FIXME_FAILURE = Circuit::Terminus::Failure.new(semantic: :failure), INVOKER___CIRCUIT_INTERFACE, {}]
424+
validate_config = {
425+
run_checks: [:run_checks, run_checks_pipe, Circuit::Processor, {scope: true, emit_to_outer_ctx: [:application_ctx], emit_signal: true}],
426+
title_length_ok?: [:title_length_ok?, title_length_ok_pipe, Circuit::Processor, {scope: true, emit_to_outer_ctx: [:application_ctx], emit_signal: true}],
427+
validate_success_terminus: [:validate_success_terminus, Circuit::Terminus::Success.new(semantic: :success), INVOKER___CIRCUIT_INTERFACE, {}],
428+
validate_failure_terminus: [:validate_failure_terminus, Circuit::Terminus::Failure.new(semantic: :failure), INVOKER___CIRCUIT_INTERFACE, {}],
429+
}
414430

415431
validate_circuit = {
416-
run_checks => {Trailblazer::Activity::Right => title_length_ok, Trailblazer::Activity::Left => validate_failure_terminus},
417-
title_length_ok => {Trailblazer::Activity::Right => validate_success_terminus, Trailblazer::Activity::Left => validate_failure_terminus},
432+
:run_checks => {Trailblazer::Activity::Right => :title_length_ok?, Trailblazer::Activity::Left => :validate_failure_terminus},
433+
:title_length_ok? => {Trailblazer::Activity::Right => :validate_success_terminus, Trailblazer::Activity::Left => :validate_failure_terminus},
418434
# FIXME_SUCCESS => {},
419435
# FIXME_FAILURE => {},
420436
}
421-
validate_circuit = Circuit.new(map: validate_circuit, termini: [FIXME_SUCCESS, FIXME_FAILURE], start_task: run_checks)
437+
validate_circuit = Circuit.new(map: validate_circuit, termini: [:validate_success_terminus, :validate_failure_terminus], start_task: :run_checks, config: validate_config)
422438

423439
save_pipe = pipeline_circuit(
424440
[:invoke_callable, Save, INVOKER___STEP_INTERFACE],
425441
[:compute_binary_signal, ComputeBinarySignal],
426442
)
427443

428-
model = [:Model, model_pipe, Circuit::Processor, {exec_context: Create.new.freeze, scope: true, emit_to_outer_ctx: [:application_ctx], emit_signal: true},] # TODO: circuit_options should be set outside of Create, in the canonical invoke.
429-
validate = [:Validate, validate_circuit, Circuit::Processor, {exec_context: Validate.new.freeze, scope: true, emit_to_outer_ctx: [:application_ctx]},] # TODO: always emit :application_ctx?
430-
save = [:Save, save_pipe, Circuit::Processor, {scope: true, emit_to_outer_ctx: [:application_ctx], emit_signal: true}] # check that we don't have circuit_options anymore here?
444+
create_config = {
445+
Model: [:Model, model_pipe, Circuit::Processor, {exec_context: Create.new.freeze, scope: true, emit_to_outer_ctx: [:application_ctx], emit_signal: true},], # TODO: circuit_options should be set outside of Create, in the canonical invoke.
446+
Validate: [:Validate, validate_circuit, Circuit::Processor, {exec_context: Validate.new.freeze, scope: true, emit_to_outer_ctx: [:application_ctx]},], # TODO: always emit :application_ctx?
447+
Save: [:Save, save_pipe, Circuit::Processor, {scope: true, emit_to_outer_ctx: [:application_ctx], emit_signal: true}], # check that we don't have circuit_options anymore here?
448+
create_success_terminus: [:create_success_terminus, Circuit::Terminus::Success.new(semantic: :success), INVOKER___CIRCUIT_INTERFACE, {}],
449+
create_failure_terminus: [:create_failure_terminus, Circuit::Terminus::Failure.new(semantic: :failure), INVOKER___CIRCUIT_INTERFACE, {}]
450+
}
431451

432-
create_success_terminus = [:create_success_terminus, CREATE_FIXME_SUCCESS = Circuit::Terminus::Success.new(semantic: :success), INVOKER___CIRCUIT_INTERFACE, {}]
433-
create_failure_terminus = [:create_failure_terminus, CREATE_FIXME_FAILURE = Circuit::Terminus::Failure.new(semantic: :failure), INVOKER___CIRCUIT_INTERFACE, {}]
434452

435453

436454

437455
create_circuit = {
438-
model => {Trailblazer::Activity::Right => validate, Trailblazer::Activity::Left => create_failure_terminus}, # DISCUSS: reuse termini?
439-
validate => {FIXME_SUCCESS => save, FIXME_FAILURE => create_failure_terminus},
440-
save => {Trailblazer::Activity::Right => create_success_terminus, Trailblazer::Activity::Left => create_failure_terminus},
456+
:Model => {Trailblazer::Activity::Right => :Validate, Trailblazer::Activity::Left => :create_failure_terminus}, # DISCUSS: reuse termini?
457+
:Validate => {validate_config[:validate_success_terminus][1] => :Save, validate_config[:validate_failure_terminus][1] => :create_failure_terminus},
458+
:Save => {Trailblazer::Activity::Right => :create_success_terminus, Trailblazer::Activity::Left => :create_failure_terminus},
441459
}
442460

443-
create_circuit = Circuit.new(map: create_circuit, termini: [CREATE_FIXME_SUCCESS, CREATE_FIXME_FAILURE], start_task: model)
461+
create_circuit = Circuit.new(map: create_circuit, termini: [:create_success_terminus, :create_failure_terminus], start_task: :Model, config: create_config)
444462

445463

446464

@@ -453,7 +471,8 @@ def create_application_ctx(ctx, aggregate:, **)
453471
application_ctx: {params: {song: {}}, noise: true, slug: "0x666"},
454472
},
455473
# {}, # tmp
456-
exec_context: create_instance = Create.new,
474+
# exec_context: create_instance = Create.new,
475+
exec_context: Io,
457476
scope: true,
458477
emit_to_outer_ctx: [:application_ctx, :original_application_ctx].freeze
459478
)
@@ -478,6 +497,7 @@ def create_application_ctx(ctx, aggregate:, **)
478497
ctx, signal = Circuit::Processor.(model_output_pipe, ctx,
479498
scope: true,
480499
emit_to_outer_ctx: [:application_ctx],
500+
exec_context: Io,
481501
)
482502

483503
# FIXME!!!!!!!!!!!!!!!!!!!!!! original_application_ctx shooouldn't contain {model}?
@@ -509,13 +529,13 @@ def create_application_ctx(ctx, aggregate:, **)
509529

510530
assert_equal ctx[:application_ctx], {:params=>{:song=>nil}, slug: "0x666", :model=>"Object / {:more=>\"0x666\"}", :errors=>["Object / {:more=>\"0x666\"}", :song]}
511531
assert_equal ctx.keys, [:application_ctx]
512-
assert_equal signal, CREATE_FIXME_FAILURE
532+
assert_equal signal, create_config[:create_failure_terminus][1]
513533

514534
# success:
515535
ctx, signal = Circuit::Processor.(create_circuit, {application_ctx: _ctx = {params: {song: {title: "Uwe"}, id: 1}, slug: "0x666"}})
516536

517537
assert_equal ctx[:application_ctx], {:params=>{:song=>{title: "Uwe"}, id: 1}, slug: "0x666", :model=>"Object 1 / {:more=>\"0x666\"}", :save=>"Object 1 / {:more=>\"0x666\"}"}
518-
assert_equal signal, CREATE_FIXME_SUCCESS
538+
assert_equal signal, create_config[:create_success_terminus][1]
519539
assert_equal ctx.keys, [:application_ctx]
520540

521541
def call_me(create_circuit)
@@ -532,6 +552,8 @@ def call_me(create_circuit)
532552

533553
# 1.
534554
# 5.648k vs 19.834k how is that so slow?
555+
# 2. circuit map now is based on ID symbols and not [id, task, invoker, ...] which was obviously very slow to compute the key every time
556+
# 21.847k that is a lot faster!
535557

536558
# save_pipe = [
537559
# a = [:input, Model___Input, INVOKER___CIRCUIT_INTERFACE, {}],

0 commit comments

Comments
 (0)