Skip to content

Commit e32d561

Browse files
authored
Merge pull request #156 from StackStorm/fix-resuming-transition-error
Fix task transition error on completing task in pending status
2 parents 0e33378 + af36356 commit e32d561

File tree

3 files changed

+187
-0
lines changed

3 files changed

+187
-0
lines changed

orquesta/machines.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@
146146
events.WORKFLOW_RESUMING_WORKFLOW_COMPLETED: statuses.SUCCEEDED,
147147
events.WORKFLOW_CANCELING_WORKFLOW_DORMANT: statuses.CANCELED,
148148
events.WORKFLOW_CANCELED_WORKFLOW_DORMANT: statuses.CANCELED,
149+
events.WORKFLOW_FAILED: statuses.FAILED,
149150
events.TASK_RUNNING: statuses.RUNNING,
150151
events.TASK_RESUMING: statuses.RUNNING
151152
},

orquesta/tests/unit/conducting/test_workflow_conductor_pause_and_resume.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,3 +124,30 @@ def test_pause_and_resume_from_branches(self):
124124
# Complete task3.
125125
self.forward_task_statuses(conductor, 'task3', [statuses.RUNNING, statuses.SUCCEEDED])
126126
self.assertEqual(conductor.get_workflow_status(), statuses.SUCCEEDED)
127+
128+
def test_pause_and_failed_with_task_transition_error(self):
129+
wf_def = """
130+
version: 1.0
131+
description: A basic sequential workflow.
132+
tasks:
133+
task1:
134+
action: core.noop
135+
next:
136+
- when: <% result().foobar %>
137+
do: task2
138+
task2:
139+
action: core.noop
140+
"""
141+
142+
spec = native_specs.WorkflowSpec(wf_def)
143+
conductor = conducting.WorkflowConductor(spec)
144+
conductor.request_workflow_status(statuses.RUNNING)
145+
146+
# Run task1.
147+
self.forward_task_statuses(conductor, 'task1', [statuses.RUNNING])
148+
self.assertEqual(conductor.get_workflow_status(), statuses.RUNNING)
149+
150+
# Complete task1 and assert the workflow execution fails
151+
# due to the expression error in the task transition.
152+
self.forward_task_statuses(conductor, 'task1', [statuses.SUCCEEDED])
153+
self.assertEqual(conductor.get_workflow_status(), statuses.FAILED)
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
# Licensed under the Apache License, Version 2.0 (the "License");
2+
# you may not use this file except in compliance with the License.
3+
# You may obtain a copy of the License at
4+
#
5+
# http://www.apache.org/licenses/LICENSE-2.0
6+
#
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
13+
from orquesta import conducting
14+
from orquesta.specs import native as native_specs
15+
from orquesta import statuses
16+
from orquesta.tests.unit import base as test_base
17+
18+
19+
class WorkflowConductorPausePendingResumeTest(test_base.WorkflowConductorTest):
20+
21+
def test_pause_and_resume_from_workflow(self):
22+
wf_def = """
23+
version: 1.0
24+
description: A basic branching workflow.
25+
tasks:
26+
# branch 1
27+
task1:
28+
action: core.ask
29+
next:
30+
- when: <% succeeded() %>
31+
do: task3
32+
# branch 2
33+
task2:
34+
action: core.noop
35+
next:
36+
- when: <% succeeded() %>
37+
do: task3
38+
# adjoining branch
39+
task3:
40+
join: all
41+
action: core.noop
42+
"""
43+
44+
spec = native_specs.WorkflowSpec(wf_def)
45+
conductor = conducting.WorkflowConductor(spec)
46+
conductor.request_workflow_status(statuses.RUNNING)
47+
48+
# Run task1 and task2.
49+
self.forward_task_statuses(conductor, 'task1', [statuses.RUNNING])
50+
self.forward_task_statuses(conductor, 'task2', [statuses.RUNNING])
51+
self.assertEqual(conductor.get_workflow_status(), statuses.RUNNING)
52+
53+
# Pause the workflow.
54+
conductor.request_workflow_status(statuses.PAUSING)
55+
56+
# Put task1 to pending state and assert workflow is pausing.
57+
self.forward_task_statuses(conductor, 'task1', [statuses.PENDING])
58+
self.assertEqual(conductor.get_workflow_status(), statuses.PAUSING)
59+
60+
# Complete task1 only. The workflow should still be pausing
61+
# because task2 is still running.
62+
self.forward_task_statuses(conductor, 'task1', [statuses.SUCCEEDED])
63+
self.assertEqual(conductor.get_workflow_status(), statuses.PAUSING)
64+
65+
# Complete task2. When task2 completes, the workflow should be paused
66+
# because there is no task in active status.
67+
self.forward_task_statuses(conductor, 'task2', [statuses.SUCCEEDED])
68+
self.assertEqual(conductor.get_workflow_status(), statuses.PAUSED)
69+
70+
# Resume the workflow, task3 should be staged, and complete task3.
71+
conductor.request_workflow_status(statuses.RESUMING)
72+
self.assert_next_task(conductor, 'task3', {})
73+
self.forward_task_statuses(conductor, 'task3', [statuses.RUNNING, statuses.SUCCEEDED])
74+
self.assertEqual(conductor.get_workflow_status(), statuses.SUCCEEDED)
75+
76+
def test_pause_and_resume_from_branches(self):
77+
wf_def = """
78+
version: 1.0
79+
description: A basic branching workflow.
80+
tasks:
81+
# branch 1
82+
task1:
83+
action: core.ask
84+
next:
85+
- when: <% succeeded() %>
86+
do: task3
87+
# branch 2
88+
task2:
89+
action: core.noop
90+
next:
91+
- when: <% succeeded() %>
92+
do: task3
93+
# adjoining branch
94+
task3:
95+
join: all
96+
action: core.noop
97+
"""
98+
99+
spec = native_specs.WorkflowSpec(wf_def)
100+
conductor = conducting.WorkflowConductor(spec)
101+
conductor.request_workflow_status(statuses.RUNNING)
102+
103+
# Run task1 and task2.
104+
self.forward_task_statuses(conductor, 'task1', [statuses.RUNNING])
105+
self.forward_task_statuses(conductor, 'task2', [statuses.RUNNING])
106+
self.assertEqual(conductor.get_workflow_status(), statuses.RUNNING)
107+
108+
# Pause task1 and task2.
109+
self.forward_task_statuses(conductor, 'task1', [statuses.PENDING])
110+
self.forward_task_statuses(conductor, 'task2', [statuses.PAUSED])
111+
self.assertEqual(conductor.get_workflow_status(), statuses.PAUSED)
112+
113+
# Resume and complete task1 only. Once task1 completes, the workflow
114+
# should pause again because there is no active task.
115+
self.forward_task_statuses(conductor, 'task1', [statuses.SUCCEEDED])
116+
self.assertEqual(conductor.get_workflow_status(), statuses.PAUSED)
117+
118+
# Resume and complete task2. When task2 completes, the workflow
119+
# should stay running because task3 is now staged and ready.
120+
self.forward_task_statuses(conductor, 'task2', [statuses.RUNNING])
121+
self.assertEqual(conductor.get_workflow_status(), statuses.RUNNING)
122+
self.forward_task_statuses(conductor, 'task2', [statuses.SUCCEEDED])
123+
self.assertEqual(conductor.get_workflow_status(), statuses.RUNNING)
124+
self.assert_next_task(conductor, 'task3', {})
125+
126+
# Complete task3.
127+
self.forward_task_statuses(conductor, 'task3', [statuses.RUNNING, statuses.SUCCEEDED])
128+
self.assertEqual(conductor.get_workflow_status(), statuses.SUCCEEDED)
129+
130+
def test_pending_and_failed_with_task_transition_error(self):
131+
wf_def = """
132+
version: 1.0
133+
description: A basic sequential workflow.
134+
tasks:
135+
task1:
136+
action: core.ask
137+
next:
138+
- when: <% result().response.foobar %>
139+
do: task2
140+
task2:
141+
action: core.noop
142+
"""
143+
144+
spec = native_specs.WorkflowSpec(wf_def)
145+
conductor = conducting.WorkflowConductor(spec)
146+
conductor.request_workflow_status(statuses.RUNNING)
147+
148+
# Run task1.
149+
self.forward_task_statuses(conductor, 'task1', [statuses.RUNNING])
150+
self.assertEqual(conductor.get_workflow_status(), statuses.RUNNING)
151+
152+
# Put task1 to pending state and assert workflow is paused.
153+
self.forward_task_statuses(conductor, 'task1', [statuses.PENDING])
154+
self.assertEqual(conductor.get_workflow_status(), statuses.PAUSED)
155+
156+
# Complete task1 and assert workflow execution fails
157+
# due to the expression error in task transition.
158+
self.forward_task_statuses(conductor, 'task1', [statuses.SUCCEEDED])
159+
self.assertEqual(conductor.get_workflow_status(), statuses.FAILED)

0 commit comments

Comments
 (0)