2020from orquesta .specs import native as native_specs
2121from orquesta import statuses
2222from orquesta .tests .unit import base as test_base
23+ import yaql .language .utils as yaql_utils
2324
2425
2526class WorkflowConductorDataFlowTest (test_base .WorkflowConductorTest ):
2627
27- def _prep_conductor (self , context = None , inputs = None , status = None ):
28- wf_def = """
29- version: 1.0
30-
31- description: A basic sequential workflow.
32-
33- input:
34- - a1
35- - b1: <% ctx().a1 %>
36-
37- vars:
38- - a2: <% ctx().b1 %>
39- - b2: <% ctx().a2 %>
40-
41- output:
42- - a5: <% ctx().b4 %>
43- - b5: <% ctx().a5 %>
44-
45- tasks:
46- task1:
47- action: core.noop
48- next:
49- - when: <% succeeded() %>
50- publish:
51- - a3: <% ctx().b2 %>
52- - b3: <% ctx().a3 %>
53- do: task2
54- task2:
55- action: core.noop
56- next:
57- - when: <% succeeded() %>
58- publish: a4=<% ctx().b3 %> b4=<% ctx().a4 %>
59- do: task3
60- task3:
61- action: core.noop
62- """
63-
28+ wf_def_yaql = """
29+ version: 1.0
30+
31+ description: A basic sequential workflow.
32+
33+ input:
34+ - a1
35+ - b1: <% ctx().a1 %>
36+
37+ vars:
38+ - a2: <% ctx().b1 %>
39+ - b2: <% ctx().a2 %>
40+
41+ output:
42+ - a5: <% ctx().b4 %>
43+ - b5: <% ctx().a5 %>
44+
45+ tasks:
46+ task1:
47+ action: core.noop
48+ next:
49+ - when: <% succeeded() %>
50+ publish:
51+ - a3: <% ctx().b2 %>
52+ - b3: <% ctx().a3 %>
53+ do: task2
54+ task2:
55+ action: core.noop
56+ next:
57+ - when: <% succeeded() %>
58+ publish: a4=<% ctx().b3 %> b4=<% ctx().a4 %>
59+ do: task3
60+ task3:
61+ action: core.noop
62+ """
63+
64+ wf_def_jinja = """
65+ version: 1.0
66+
67+ description: A basic sequential workflow.
68+
69+ input:
70+ - a1
71+ - b1: '{{ ctx("a1") }}'
72+
73+ vars:
74+ - a2: '{{ ctx("b1") }}'
75+ - b2: '{{ ctx("a2") }}'
76+
77+ output:
78+ - a5: '{{ ctx("b4") }}'
79+ - b5: '{{ ctx("a5") }}'
80+
81+ tasks:
82+ task1:
83+ action: core.noop
84+ next:
85+ - when: '{{ succeeded() }}'
86+ publish:
87+ - a3: '{{ ctx("b2") }}'
88+ - b3: '{{ ctx("a3") }}'
89+ do: task2
90+ task2:
91+ action: core.noop
92+ next:
93+ - when: '{{ succeeded() }}'
94+ publish: a4='{{ ctx("b3") }}' b4='{{ ctx("a4") }}'
95+ do: task3
96+ task3:
97+ action: core.noop
98+ """
99+
100+ def _prep_conductor (self , wf_def , context = None , inputs = None , status = None ):
64101 spec = native_specs .WorkflowSpec (wf_def )
65102 self .assertDictEqual (spec .inspect (), {})
66103
@@ -76,33 +113,52 @@ def _prep_conductor(self, context=None, inputs=None, status=None):
76113
77114 return conductor
78115
116+ def _get_combined_value (self , callstack_depth = 0 ):
117+ # This returns dict typed value all Python built-in type values
118+ # which orquesta spec could accept.
119+ if callstack_depth < 2 :
120+ return {
121+ 'null' : None ,
122+ 'integer_positive' : 123 ,
123+ 'integer_negative' : - 123 ,
124+ 'number_positive' : 99.99 ,
125+ 'number_negative' : - 99.99 ,
126+ 'string' : 'xyz' ,
127+ 'boolean_true' : True ,
128+ 'boolean_false' : False ,
129+ 'array' : list (self ._get_combined_value (callstack_depth + 1 ).values ()),
130+ 'object' : self ._get_combined_value (callstack_depth + 1 ),
131+ }
132+ else :
133+ return {}
134+
135+ def _assert_data_flow (self , inputs , expected_output ):
136+ # This assert method checks input value would be handled and published
137+ # as an expected type and value with both YAQL and Jinja expressions.
138+ for wf_def in [self .wf_def_jinja , self .wf_def_yaql ]:
139+ conductor = self ._prep_conductor (wf_def , inputs = inputs , status = statuses .RUNNING )
140+
141+ for i in range (1 , len (conductor .spec .tasks ) + 1 ):
142+ task_name = 'task' + str (i )
143+ forward_statuses = [statuses .RUNNING , statuses .SUCCEEDED ]
144+ self .forward_task_statuses (conductor , task_name , forward_statuses )
145+
146+ # Render workflow output and checkout workflow status and output.
147+ conductor .render_workflow_output ()
148+ self .assertEqual (conductor .get_workflow_status (), statuses .SUCCEEDED )
149+ self .assertDictEqual (conductor .get_workflow_output (), expected_output )
150+
79151 def assert_data_flow (self , input_value ):
80152 inputs = {'a1' : input_value }
81153 expected_output = {'a5' : inputs ['a1' ], 'b5' : inputs ['a1' ]}
82- conductor = self ._prep_conductor (inputs = inputs , status = statuses .RUNNING )
83154
84- for i in range (1 , len (conductor .spec .tasks ) + 1 ):
85- task_name = 'task' + str (i )
86- self .forward_task_statuses (conductor , task_name , [statuses .RUNNING , statuses .SUCCEEDED ])
87-
88- # Render workflow output and checkout workflow status and output.
89- conductor .render_workflow_output ()
90- self .assertEqual (conductor .get_workflow_status (), statuses .SUCCEEDED )
91- self .assertDictEqual (conductor .get_workflow_output (), expected_output )
155+ self ._assert_data_flow (inputs , expected_output )
92156
93157 def assert_unicode_data_flow (self , input_value ):
94158 inputs = {u'a1' : unicode (input_value , 'utf8' ) if six .PY2 else input_value }
95159 expected_output = {u'a5' : inputs ['a1' ], u'b5' : inputs ['a1' ]}
96- conductor = self ._prep_conductor (inputs = inputs , status = statuses .RUNNING )
97-
98- for i in range (1 , len (conductor .spec .tasks ) + 1 ):
99- task_name = 'task' + str (i )
100- self .forward_task_statuses (conductor , task_name , [statuses .RUNNING , statuses .SUCCEEDED ])
101160
102- # Render workflow output and checkout workflow status and output.
103- conductor .render_workflow_output ()
104- self .assertEqual (conductor .get_workflow_status (), statuses .SUCCEEDED )
105- self .assertDictEqual (conductor .get_workflow_output (), expected_output )
161+ self ._assert_data_flow (inputs , expected_output )
106162
107163 def test_data_flow_string (self ):
108164 self .assert_data_flow ('xyz' )
@@ -119,11 +175,20 @@ def test_data_flow_boolean(self):
119175 self .assert_data_flow (True )
120176 self .assert_data_flow (False )
121177
178+ def test_data_flow_none (self ):
179+ self .assert_data_flow (None )
180+
122181 def test_data_flow_dict (self ):
123- self .assert_data_flow ({'x' : 123 , 'y' : 'abc' })
182+ mapping_typed_data = self ._get_combined_value ()
183+
184+ self .assertIsInstance (mapping_typed_data , yaql_utils .MappingType )
185+ self .assert_data_flow (mapping_typed_data )
124186
125187 def test_data_flow_list (self ):
126- self .assert_data_flow ([123 , 'abc' , True ])
188+ sequence_typed_data = list (self ._get_combined_value ().values ())
189+
190+ self .assertIsInstance (sequence_typed_data , yaql_utils .SequenceType )
191+ self .assert_data_flow (sequence_typed_data )
127192
128193 def test_data_flow_unicode (self ):
129194 self .assert_unicode_data_flow ('光合作用' )
0 commit comments