1010# 7. try saving memory by providing often-used Pipes, e.g. for IO?
1111# 8. how would we change the "circuit options" from a step? ===> change :start_task
1212# 9. does invoker.call need kwargs?
13- # 10. BUG: when all tasks are the same proc and the last is the terminus, only the first is run.
13+ # 10. BUG: when all tasks are the same proc and the last is the terminus, only the first is run. ===> use ids, we got them, anyway.
14+ # 11. should circuit_options be a positional arg?
1415
1516class RunnerInvokerTest < Minitest ::Spec
1617 # Helper for those who don't like or have a DSL :D
@@ -46,6 +47,8 @@ def self.call(circuit, ctx, **tmp_ctx)
4647 termini = circuit . termini
4748 task_cfg = circuit . start_task
4849
50+ # circuit_ctx = {}
51+
4952 loop do
5053 id , task , invoker , circuit_options_to_merge = task_cfg
5154
@@ -55,12 +58,9 @@ def self.call(circuit, ctx, **tmp_ctx)
5558 ctx , signal = invoker . ( # TODO: redundant with {Pipeline::Processor.call}.
5659 task ,
5760 ctx ,
58- **ctx , # DISCUSS: do we need kwargs here?
5961
6062 **tmp_ctx , # FIXME: prototyping here.
61-
6263 **circuit_options_to_merge ,
63-
6464 )
6565
6666 # Stop execution of the circuit when we hit a terminus.
@@ -106,12 +106,15 @@ def self.call(task, ctx, exec_context:, kwargs: {}, **)
106106
107107 class INVOKER___CIRCUIT_INTERFACE
108108 def self . call ( task , ctx , **temp_ctx )
109- task . ( ctx , **temp_ctx . to_h )
109+ task . ( ctx , **ctx , ** temp_ctx ) # DISCUSS/FIXME: we can also merge the kwargs once for all childs in Processor#call?
110110 end
111111 end
112112
113113 class INVOKER___STEP_INTERFACE
114- def self . call ( task , ctx , application_ctx :, **)
114+ # def self.call(task, ctx, application_ctx:, **)
115+ def self . call ( task , ctx , **)
116+ application_ctx = ctx [ :application_ctx ]
117+
115118 result = task . ( application_ctx , **application_ctx . to_h )
116119 # pp application_ctx
117120
@@ -122,9 +125,12 @@ def self.call(task, ctx, application_ctx:, **)
122125 end
123126
124127 class INVOKER___STEP_INTERFACE_ON_EXEC_CONTEXT
125- def self . call ( task , ctx , exec_context :, application_ctx :, use_application_ctx___ : true , **)
126- target_ctx = use_application_ctx___ ? application_ctx : ctx # FIXME: not entirely happy with this.
128+ # def self.call(task, ctx, exec_context:, application_ctx:, use_application_ctx___: true, **)
129+ def self . call ( task , ctx , exec_context :, use_application_ctx___ : true , **)
130+ application_ctx = ctx [ :application_ctx ]
127131
132+ target_ctx = use_application_ctx___ ? application_ctx : ctx # FIXME: not entirely happy with this.
133+ puts "@@@@@ #{ task . inspect } "
128134 result = exec_context . send ( task , target_ctx , **target_ctx . to_h )
129135
130136 return ctx . merge ( value : result ) , nil # DISCUSS: value
@@ -196,6 +202,7 @@ def self.call(ctx, **)
196202 class Model___Output
197203 def self . call ( ctx , **)
198204 ctx , _ = ctx . decompose
205+ puts " unscope___ discarding keys #{ _ . keys . inspect } [Model___Output]"
199206 return ctx , nil
200207 end
201208 end
@@ -241,6 +248,7 @@ def save_original_application_ctx(ctx, application_ctx:, **)
241248 # Reinstate the original working ctx with a new application_ctx
242249 def unscope___ ( ctx , application_ctx :, aggregate :, **)
243250 original , _ = ctx . decompose
251+ puts " unscope___ discarding keys #{ _ . keys . inspect } "
244252
245253 ctx = original . merge ( application_ctx : Trailblazer ::Context ( aggregate ) ) # FIXME: separate step.
246254
@@ -254,7 +262,9 @@ def unscope___(ctx, application_ctx:, aggregate:, **)
254262 def swap___ ( ctx , application_ctx :, original_application_ctx :, aggregate :, **)
255263 new_application_ctx = original_application_ctx . merge ( aggregate ) # DISCUSS: how to write on outer ctx?
256264
257- original , _ = ctx . decompose
265+
266+ before_output , _ = ctx . decompose # FIXME: couldn't we get the original_application_ctx from here?
267+ raise before_output . inspect
258268
259269 ctx = original . merge ( application_ctx : new_application_ctx )
260270
@@ -265,6 +275,16 @@ def swap___(ctx, application_ctx:, original_application_ctx:, aggregate:, **)
265275
266276 module TaskWrap
267277 def self . emit_signal_from_ctx ( ctx , signal :, **) # DISCUSS: do we really want this?
278+ puts "@@@@@ ||||#{ signal . inspect } "
279+ return ctx , signal # FIXME: thiiiiiiiiiiiiiiiiiis neeeeds to be the last tw step.
280+ end
281+
282+ def self . FIXME_emit_signal_from_ctx___AND_UNSCOPE ( ctx , signal :, **) # DISCUSS: do we really want this?
283+ puts "<<<<<<<<< signaL: #{ signal . inspect } "
284+ ctx , _ = ctx . decompose
285+ puts " FIXME_emit_signal_from_ctx___AND_UNSCOPE discarding keys #{ _ . keys . inspect } "
286+ puts " new ctx: #{ ctx } "
287+
268288 return ctx , signal # FIXME: thiiiiiiiiiiiiiiiiiis neeeeds to be the last tw step.
269289 end
270290 end
@@ -305,26 +325,41 @@ def self.call(ctx, params:, **)
305325 )
306326
307327 model_output_pipe = pipeline_circuit (
328+ [ :nil , -> ( ctx , **) { raise ctx . inspect } ] ,
308329 [ :scope , Model___Input ] , # scope so we don't pollute
309330 [ :init_aggregate , :init_aggregate , INVOKER___CIRCUIT_INTERFACE_ON_EXEC_CONTEXT , { exec_context : Io } ] ,
310331 [ :my_model_output , my_model_output_pipe , Circuit ::Processor ] , # user filter.
311332 [ :swap___ , :swap___ , INVOKER___CIRCUIT_INTERFACE_ON_EXEC_CONTEXT , { exec_context : Io } ] ,
312333 )
313334
314- # ctx, signal = Circuit::Processor.(model_input_pipe, {
315- # application_ctx: {params: {slug: 999}, noise: true},
316- # exec_context: create_instance = Create.new,
317- # })
335+ # TEST I/O
336+ ctx , signal = Circuit ::Processor . ( model_input_pipe ,
337+ {
338+ application_ctx : { params : { slug : 999 } , noise : true } ,
339+ } ,
340+ exec_context : create_instance = Create . new ,
341+ )
342+
343+ # raise ctx.inspect
344+ assert_equal ctx [ :application_ctx ] . class , Trailblazer ::Context ::Container # our In pipe's creation!
345+ assert_equal ctx [ :application_ctx ] [ :more ] , "{:slug=>999}" # the more_model_input was called.
346+ assert_equal ctx [ :original_application_ctx ] . class , Hash # the OG ctx is a Hash.
347+ assert_equal CU . inspect ( ctx ) , %({:application_ctx=>#<Trailblazer::Context::Container wrapped_options={:params=>{:id=>999}, :more=>\" {:slug=>999}\" } mutable_options={}>, :original_application_ctx=>{:params=>{:slug=>999}, :noise=>true}})
318348
319- # application_ctx = ctx[:application_ctx].merge(model: Object)
349+ # this is what happens in the actual {:model} step.
350+ ctx [ :application_ctx ] [ :model ] = Object
320351
321- # ctx, signal = Circuit::Processor.(model_output_pipe, ctx.merge(application_ctx: application_ctx) )
352+ ctx , signal = Circuit ::Processor . ( model_output_pipe , ctx )
322353
323- # assert_equal ctx.inspect, %({:application_ctx=>{:params=>{:id=>"999"}, :more=>"{:id=>999}"}, :exec_context=>#{create_instance}, noise} )
354+ assert_equal ctx . inspect , %()
324355
325356
326357
327358
359+
360+ # raise "should we merge ctx and temp_ctx in Processor, or do that in the invoker?
361+ #{ } how to handle signal?"
362+
328363 class Validate
329364 def run_checks ( ctx , params :, model :, **)
330365 if params [ :song ]
@@ -351,30 +386,33 @@ def self.call(ctx, model:, **)
351386
352387 model_pipe = pipeline_circuit (
353388 # [:input, Model___Input], # DISCUSS: can we somehow save these steps?
354- [ :input , model_input_pipe , Circuit ::Processor ] ,
389+ [ :input , model_input_pipe , Circuit ::Processor ] , # change {:application_ctx}.
390+
391+ [ :scope , Model___Input ] , # we don't want {:signal} later!
355392 [ :invoke_instance_method , :model , INVOKER___STEP_INTERFACE_ON_EXEC_CONTEXT , { exec_context : Create . new } ] ,
356393 [ :compute_binary_signal , ComputeBinarySignal ] ,
357394 # [:output, Model___Output], # DISCUSS: can we somehow save these steps?
358395 [ :output , model_output_pipe , Circuit ::Processor ] ,
359- [ :emit_signal_from_ctx , :emit_signal_from_ctx , INVOKER___CIRCUIT_INTERFACE_ON_EXEC_CONTEXT , { exec_context : TaskWrap } ] ,
396+ # [:asdf, ->(ctx, signal:, **) { pp ctx; raise "why is there a signal?" }],
397+ [ :emit_signal_from_ctx , :FIXME_emit_signal_from_ctx___AND_UNSCOPE , INVOKER___CIRCUIT_INTERFACE_ON_EXEC_CONTEXT , { exec_context : TaskWrap } ] ,
398+ # [:unscope, Model___Output],
360399 )
361400 # pp model_pipe
362401
363402 run_checks_pipe = pipeline_circuit (
364- [ nil , -> ( ctx , **circuit_options ) { puts "@@@@@ #{ circuit_options . inspect } " ; raise } ] ,
365403 [ :input , Model___Input ] ,
366404 [ :invoke_instance_method , :run_checks , INVOKER___STEP_INTERFACE_ON_EXEC_CONTEXT ] , # FIXME: we're currenly assuming that exec_context is passed down.
367405 [ :compute_binary_signal , ComputeBinarySignal ] ,
368- [ :output , Model___Output ] ,
369- [ :emit_signal_from_ctx , :emit_signal_from_ctx , INVOKER___CIRCUIT_INTERFACE_ON_EXEC_CONTEXT , { exec_context : TaskWrap } ] , # FIXME: {exec_context} currently bleeds into {title_length_ok_pipe}.
406+ # [:output, Model___Output],
407+ [ :emit_signal_from_ctx , :FIXME_emit_signal_from_ctx___AND_UNSCOPE , INVOKER___CIRCUIT_INTERFACE_ON_EXEC_CONTEXT , { exec_context : TaskWrap } ] , # FIXME: {exec_context} currently bleeds into {title_length_ok_pipe}.
370408 )
371409
372410 title_length_ok_pipe = pipeline_circuit (
373411 [ :input , Model___Input ] ,
374412 [ :invoke_instance_method , :title_length_ok? , INVOKER___STEP_INTERFACE_ON_EXEC_CONTEXT ] ,
375413 [ :compute_binary_signal , ComputeBinarySignal ] ,
376- [ :output , Model___Output ] ,
377- [ :emit_signal_from_ctx , :emit_signal_from_ctx , INVOKER___CIRCUIT_INTERFACE_ON_EXEC_CONTEXT , { exec_context : TaskWrap } ] ,
414+ # [:output, Model___Output],
415+ [ :emit_signal_from_ctx , :FIXME_emit_signal_from_ctx___AND_UNSCOPE , INVOKER___CIRCUIT_INTERFACE_ON_EXEC_CONTEXT , { exec_context : TaskWrap } ] ,
378416 )
379417
380418 run_checks = [ :run_checks , run_checks_pipe , Circuit ::Processor , { } ]
0 commit comments