@@ -42,22 +42,24 @@ def pipeline_circuit(*task_cfgs)
4242 # We start with NO #call methods!
4343 class Circuit < Struct . new ( :map , :start_task , :termini , keyword_init : true )
4444 class Processor
45- def self . call ( circuit , ctx , **tmp_ctx )
45+ def self . call ( circuit , ctx , oUTER_TMP___ , use_outer_tmp : false , **tmp_ctx ) # DISCUSS: should we extract or pass-on {:use_outer_tmp}?
4646 map = circuit . map
4747 termini = circuit . termini
4848 task_cfg = circuit . start_task
4949
50- # circuit_ctx = {}
51-
50+ # DISCUSS: tmp == circuit_ctx
51+ tmp = use_outer_tmp ? oUTER_TMP___ : { } # discarded after this circuit is finished. (see oUTER_TMP___) # FIXME: share on demand?
52+ puts "@@@@@ +++++++++ #{ tmp . inspect } "
5253 loop do
5354 id , task , invoker , circuit_options_to_merge = task_cfg
5455
5556 puts "@@@@@ circuit [invoke] #{ id . inspect } #{ circuit_options_to_merge } "
5657 # ctx = ctx.merge(circuit_options_to_merge)
5758
58- ctx , signal = invoker . ( # TODO: redundant with {Pipeline::Processor.call}.
59+ ctx , signal , tmp = invoker . (
5960 task ,
6061 ctx ,
62+ tmp ,
6163
6264 **tmp_ctx , # FIXME: prototyping here.
6365 **circuit_options_to_merge ,
@@ -67,7 +69,7 @@ def self.call(circuit, ctx, **tmp_ctx)
6769 # puts "@@@@@ #{termini.collect { |o| o} } ??? #{signal.object_id} #{signal}"
6870 if termini . include? ( task )
6971 # puts "done with circuit #{task}"
70- return ctx , signal
72+ return ctx , signal , ( use_outer_tmp ? tmp : oUTER_TMP___ ) # FIXME: IS THAT WHAT WE WANT? what if we want to pass in a tmp context into a nested circuit, but don't want it back?
7173 end
7274
7375 if next_task_cfg = next_for ( map , task_cfg , signal )
@@ -99,47 +101,48 @@ class Failure < Success
99101 end
100102
101103 class INVOKER___CIRCUIT_INTERFACE_ON_EXEC_CONTEXT
102- def self . call ( task , ctx , exec_context :, kwargs : { } , **)
103- exec_context . send ( task , ctx , **ctx . to_h ) # TODO: how to add kwargs for Rescue.
104+ def self . call ( task , ctx , tmp , exec_context :, kwargs : { } , **)
105+ puts "@@@@@ !!!!!!!!!#{ task . inspect } "
106+ exec_context . send ( task , ctx , tmp , **ctx . to_h ) # TODO: how to add kwargs for Rescue.
104107 end
105108 end
106109
107110 class INVOKER___CIRCUIT_INTERFACE
108- def self . call ( task , ctx , **temp_ctx )
109- task . ( ctx , **ctx , **temp_ctx ) # DISCUSS/FIXME: we can also merge the kwargs once for all childs in Processor#call?
111+ def self . call ( task , ctx , tmp , **temp_ctx )
112+ task . ( ctx , tmp , **ctx , **temp_ctx ) # DISCUSS/FIXME: we can also merge the kwargs once for all childs in Processor#call?
110113 end
111114 end
112115
113116 class INVOKER___STEP_INTERFACE
114117 # def self.call(task, ctx, application_ctx:, **)
115- def self . call ( task , ctx , **)
118+ def self . call ( task , ctx , tmp , **)
116119 application_ctx = ctx [ :application_ctx ]
117120
118121 result = task . ( application_ctx , **application_ctx . to_h )
119122 # pp application_ctx
120123
121- return ctx . merge ( value : result ,
124+ return ctx , nil , tmp . merge ( value : result ,
122125 # application_ctx: application_ctx
123- ) , nil # DISCUSS: value. FIXME: redundant to INVOKER___STEP_INTERFACE_ON_EXEC_CONTEXT
126+ ) # DISCUSS: value. FIXME: redundant to INVOKER___STEP_INTERFACE_ON_EXEC_CONTEXT
124127 end
125128 end
126129
127130 class INVOKER___STEP_INTERFACE_ON_EXEC_CONTEXT
128131 # def self.call(task, ctx, exec_context:, application_ctx:, use_application_ctx___: true, **)
129- def self . call ( task , ctx , exec_context :, use_application_ctx___ : true , **)
132+ def self . call ( task , ctx , tmp , exec_context :, use_application_ctx___ : true , **)
130133 application_ctx = ctx [ :application_ctx ]
131134
132- target_ctx = use_application_ctx___ ? application_ctx : ctx # FIXME: not entirely happy with this.
133- puts "@@@@@ #{ task . inspect } "
135+ target_ctx = use_application_ctx___ ? application_ctx : ctx # FIXME: not happy with this AT ALL .
136+ puts " invok @@@@@ #{ task . inspect } "
134137 result = exec_context . send ( task , target_ctx , **target_ctx . to_h )
135138
136- return ctx . merge ( value : result ) , nil # DISCUSS: value
139+ return ctx , nil , tmp . merge ( value : result ) # DISCUSS: value
137140 end
138141 end
139142
140143 it "circuit_options, depth-only" do
141144 def capture_task ( id :)
142- -> ( ctx , exec_context :, lib_exec_context : nil , **) { ctx [ :captured ] << [ id , exec_context , lib_exec_context ] . compact ; return ctx , nil }
145+ -> ( ctx , tmp , exec_context :, lib_exec_context : nil , **) { ctx [ :captured ] << [ id , exec_context , lib_exec_context ] . compact ; return ctx , nil , tmp }
143146 end
144147
145148 model_pipe = pipeline_circuit (
@@ -165,7 +168,7 @@ def capture_task(id:)
165168 )
166169
167170 # As we pass in exec_context: as a kwarg, it's passed to all siblings etc.
168- ctx , signal = Circuit ::Processor . ( create_pipe , { captured : [ ] } , exec_context : "Object" )
171+ ctx , signal = Circuit ::Processor . ( create_pipe , { captured : [ ] } , { } , exec_context : "Object" )
169172 assert_equal ctx [ :captured ] , [ [ 1 , "Object" ] , [ 2 , "Object" ] , [ 3 , "Object" ] , [ 4 , "Object" , "Validate::Input" ] , [ 5 , "Object" , "Validate::Input" ] , [ 6 , "Object" ] ]
170173
171174# FIXME: new test case.
@@ -175,7 +178,7 @@ def capture_task(id:)
175178 [ :Validate , validate_pipe , Circuit ::Processor ] ,
176179 )
177180
178- ctx , signal = Circuit ::Processor . ( create_pipe , { captured : [ ] } , exec_context : "Object" )
181+ ctx , signal = Circuit ::Processor . ( create_pipe , { captured : [ ] } , { } , exec_context : "Object" )
179182 assert_equal ctx [ :captured ] , [ [ 1 , "Model" ] , [ 2 , "Model" ] , [ 3 , "Model" ] , [ 4 , "Object" , "Validate::Input" ] , [ 5 , "Object" , "Validate::Input" ] , [ 6 , "Object" ] ]
180183
181184 end
@@ -192,10 +195,10 @@ def self.call(ctx, value:, **)
192195 end
193196
194197 class Model___Input
195- def self . call ( ctx , **)
198+ def self . call ( ctx , tmp , **)
196199 ctx = Trailblazer . Context ( ctx )
197200
198- return ctx , nil
201+ return ctx , nil , tmp
199202 end
200203 end
201204
@@ -227,32 +230,36 @@ def my_model_output(ctx, model:, **)
227230 end
228231
229232 class IO___
230- def init_aggregate ( ctx , **)
231- ctx [ :aggregate ] = { }
233+ def init_aggregate ( ctx , tmp , **)
234+ tmp [ :aggregate ] = { }
232235
233- return ctx , nil
236+ return ctx , nil , tmp
234237 end
235238
236- def add_value_to_aggregate ( ctx , aggregate :, value :, **)
237- ctx [ :aggregate ] = aggregate . merge ( value )
239+ # def add_value_to_aggregate(ctx, aggregate:, value:, **)
240+ def add_value_to_aggregate ( ctx , tmp , **)
241+ value = tmp . fetch ( :value )
242+ aggregate = tmp . fetch ( :aggregate )
238243
239- return ctx , nil
244+ tmp [ :aggregate ] = aggregate . merge ( value )
245+
246+ return ctx , nil , tmp
240247 end
241248
242- def save_original_application_ctx ( ctx , application_ctx :, **)
243- ctx [ :original_application_ctx ] = application_ctx # the "outer ctx".
249+ def save_original_application_ctx ( ctx , tmp , application_ctx :, **)
250+ tmp [ :original_application_ctx ] = application_ctx # the "outer ctx".
244251
245- return ctx , nil
252+ return ctx , nil , tmp
246253 end
247254
248255 # Reinstate the original working ctx with a new application_ctx
249- def unscope___ ( ctx , application_ctx :, aggregate :, **)
256+ def unscope___ ( ctx , tmp , application_ctx :, aggregate :, **)
250257 original , _ = ctx . decompose
251258 puts " unscope___ discarding keys #{ _ . keys . inspect } "
252259
253260 ctx = original . merge ( application_ctx : Trailblazer ::Context ( aggregate ) ) # FIXME: separate step.
254261
255- return ctx , nil
262+ return ctx , nil , tmp
256263 end
257264
258265 # def create_input_ctx(ctx, aggregate:, **)
@@ -270,6 +277,15 @@ def swap___(ctx, application_ctx:, original_application_ctx:, aggregate:, **)
270277
271278 return ctx , nil
272279 end
280+
281+
282+ def create_application_ctx ( ctx , tmp , **)
283+ aggregate = tmp . fetch ( :aggregate )
284+
285+ ctx [ :application_ctx ] = Trailblazer ::Context ( aggregate ) # DISCUSS: write to {ctx} or use merge?
286+
287+ return ctx , nil , tmp
288+ end
273289 end
274290 Io = IO___ . new
275291
@@ -294,7 +310,7 @@ def self.FIXME_emit_signal_from_ctx___AND_UNSCOPE(ctx, signal:, **) # DISCUSS: d
294310 # [:input, Model___Input], # DISCUSS: can we somehow save these steps?
295311 [ :invoke_instance_method , :my_model_input , INVOKER___STEP_INTERFACE_ON_EXEC_CONTEXT , { exec_context : Create . new } ] ,
296312 # [:compute_binary_signal, ComputeBinarySignal],
297- [ :add_value_to_aggregate , :add_value_to_aggregate , INVOKER___STEP_INTERFACE_ON_EXEC_CONTEXT , { exec_context : Io , use_application_ctx___ : false } ] ,
313+ [ :add_value_to_aggregate , :add_value_to_aggregate , INVOKER___CIRCUIT_INTERFACE_ON_EXEC_CONTEXT , { exec_context : Io , use_application_ctx___ : false } ] ,
298314 # [:output, Model___Output],
299315 )
300316 # In() => MoreModelInput
@@ -307,21 +323,23 @@ def self.call(ctx, params:, **)
307323 end
308324 more_model_input_pipe = pipeline_circuit (
309325 [ :invoke_callable , MoreModelInput , INVOKER___STEP_INTERFACE ] ,
310- [ :add_value_to_aggregate , :add_value_to_aggregate , INVOKER___STEP_INTERFACE_ON_EXEC_CONTEXT , { exec_context : Io , use_application_ctx___ : false } ] ,
326+ [ :add_value_to_aggregate , :add_value_to_aggregate , INVOKER___CIRCUIT_INTERFACE_ON_EXEC_CONTEXT , { exec_context : Io , use_application_ctx___ : false } ] ,
311327 )
312328
313329 my_model_output_pipe = pipeline_circuit (
314330 [ :invoke_instance_method , :my_model_output , INVOKER___STEP_INTERFACE_ON_EXEC_CONTEXT , { exec_context : Create . new } ] ,
315- [ :add_value_to_aggregate , :add_value_to_aggregate , INVOKER___STEP_INTERFACE_ON_EXEC_CONTEXT , { exec_context : Io , use_application_ctx___ : false } ] ,
331+ [ :add_value_to_aggregate , :add_value_to_aggregate , INVOKER___CIRCUIT_INTERFACE_ON_EXEC_CONTEXT , { exec_context : Io , use_application_ctx___ : false } ] ,
316332 )
333+ raise "the original_application_ctx must be available to output, but not to the next real step"
317334
318335 model_input_pipe = pipeline_circuit (
319336 [ :save_original_application_ctx , :save_original_application_ctx , INVOKER___CIRCUIT_INTERFACE_ON_EXEC_CONTEXT , { exec_context : Io } ] ,
320- [ :scope , Model___Input ] , # scope, so we don't pollute anything.
337+ # [:scope, Model___Input], # scope, so we don't pollute anything.
321338 [ :init_aggregate , :init_aggregate , INVOKER___CIRCUIT_INTERFACE_ON_EXEC_CONTEXT , { exec_context : Io } ] ,
322- [ :my_model_input , my_model_input_pipe , Circuit ::Processor ] , # user filter.
323- [ :more_model_input , more_model_input_pipe , Circuit ::Processor ] , # user filter.
324- [ :unscope , :unscope___ , INVOKER___CIRCUIT_INTERFACE_ON_EXEC_CONTEXT , { exec_context : Io } ]
339+ [ :my_model_input , my_model_input_pipe , Circuit ::Processor , { use_outer_tmp : true } ] , # user filter.
340+ [ :more_model_input , more_model_input_pipe , Circuit ::Processor , { use_outer_tmp : true } ] , # user filter.
341+ # [:unscope, :unscope___, INVOKER___CIRCUIT_INTERFACE_ON_EXEC_CONTEXT, {exec_context: Io}]
342+ [ :create_application_ctx , :create_application_ctx , INVOKER___CIRCUIT_INTERFACE_ON_EXEC_CONTEXT , { exec_context : Io } ] ,
325343 )
326344
327345 model_output_pipe = pipeline_circuit (
@@ -333,23 +351,27 @@ def self.call(ctx, params:, **)
333351 )
334352
335353# TEST I/O
336- ctx , signal = Circuit ::Processor . ( model_input_pipe ,
354+ ctx , signal , tmp = Circuit ::Processor . ( model_input_pipe ,
337355 {
338356 application_ctx : { params : { slug : 999 } , noise : true } ,
339357 } ,
358+ { } , # tmp
340359 exec_context : create_instance = Create . new ,
360+ use_outer_tmp : true
341361 )
342362
343363 # raise ctx.inspect
344364 assert_equal ctx [ :application_ctx ] . class , Trailblazer ::Context ::Container # our In pipe's creation!
345365 assert_equal ctx [ :application_ctx ] [ :more ] , "{:slug=>999}" # the more_model_input was called.
346- assert_equal ctx [ :original_application_ctx ] . class , Hash # the OG ctx is a Hash.
347- assert_equal CU . inspect ( ctx ) , %({:application_ctx=>#<Trailblazer::Context::Container wrapped_options={:params=>{:id=>999}, :more=>\" {:slug=>999}\" } mutable_options={}>, :original_application_ctx=>{:params=>{:slug=>999}, :noise=>true}})
366+ assert_equal tmp [ :original_application_ctx ] . class , Hash # the OG ctx is a Hash.
367+ pp tmp
368+ assert_equal tmp . keys , [ :original_application_ctx ]
369+ assert_equal CU . inspect ( ctx ) , %({:application_ctx=>#<Trailblazer::Context::Container wrapped_options={:params=>{:id=>999}, :more=>\" {:slug=>999}\" } mutable_options={}>})
348370
349371 # this is what happens in the actual {:model} step.
350372 ctx [ :application_ctx ] [ :model ] = Object
351373
352- ctx , signal = Circuit ::Processor . ( model_output_pipe , ctx )
374+ ctx , signal = Circuit ::Processor . ( model_output_pipe , ctx , tmp , use_outer_tmp : true )
353375
354376 assert_equal ctx . inspect , %()
355377
0 commit comments