Skip to content

Commit af36356

Browse files
committed
Fix task transition error on completing task in pending status
A workflow goes into paused status when a task goes into pending status. If the task transition has error, the workflow does not go into failed status when the task is completed. This patch add an entry in the state machine for workflow to go from paused to failed status.
1 parent 0e33378 commit af36356

File tree

3 files changed

+187
-0
lines changed

3 files changed

+187
-0
lines changed

orquesta/machines.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@
146146
events.WORKFLOW_RESUMING_WORKFLOW_COMPLETED: statuses.SUCCEEDED,
147147
events.WORKFLOW_CANCELING_WORKFLOW_DORMANT: statuses.CANCELED,
148148
events.WORKFLOW_CANCELED_WORKFLOW_DORMANT: statuses.CANCELED,
149+
events.WORKFLOW_FAILED: statuses.FAILED,
149150
events.TASK_RUNNING: statuses.RUNNING,
150151
events.TASK_RESUMING: statuses.RUNNING
151152
},

orquesta/tests/unit/conducting/test_workflow_conductor_pause_and_resume.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,3 +124,30 @@ def test_pause_and_resume_from_branches(self):
124124
# Complete task3.
125125
self.forward_task_statuses(conductor, 'task3', [statuses.RUNNING, statuses.SUCCEEDED])
126126
self.assertEqual(conductor.get_workflow_status(), statuses.SUCCEEDED)
127+
128+
def test_pause_and_failed_with_task_transition_error(self):
129+
wf_def = """
130+
version: 1.0
131+
description: A basic sequential workflow.
132+
tasks:
133+
task1:
134+
action: core.noop
135+
next:
136+
- when: <% result().foobar %>
137+
do: task2
138+
task2:
139+
action: core.noop
140+
"""
141+
142+
spec = native_specs.WorkflowSpec(wf_def)
143+
conductor = conducting.WorkflowConductor(spec)
144+
conductor.request_workflow_status(statuses.RUNNING)
145+
146+
# Run task1.
147+
self.forward_task_statuses(conductor, 'task1', [statuses.RUNNING])
148+
self.assertEqual(conductor.get_workflow_status(), statuses.RUNNING)
149+
150+
# Complete task1 and assert the workflow execution fails
151+
# due to the expression error in the task transition.
152+
self.forward_task_statuses(conductor, 'task1', [statuses.SUCCEEDED])
153+
self.assertEqual(conductor.get_workflow_status(), statuses.FAILED)
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
# Licensed under the Apache License, Version 2.0 (the "License");
2+
# you may not use this file except in compliance with the License.
3+
# You may obtain a copy of the License at
4+
#
5+
# http://www.apache.org/licenses/LICENSE-2.0
6+
#
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
13+
from orquesta import conducting
14+
from orquesta.specs import native as native_specs
15+
from orquesta import statuses
16+
from orquesta.tests.unit import base as test_base
17+
18+
19+
class WorkflowConductorPausePendingResumeTest(test_base.WorkflowConductorTest):
20+
21+
def test_pause_and_resume_from_workflow(self):
22+
wf_def = """
23+
version: 1.0
24+
description: A basic branching workflow.
25+
tasks:
26+
# branch 1
27+
task1:
28+
action: core.ask
29+
next:
30+
- when: <% succeeded() %>
31+
do: task3
32+
# branch 2
33+
task2:
34+
action: core.noop
35+
next:
36+
- when: <% succeeded() %>
37+
do: task3
38+
# adjoining branch
39+
task3:
40+
join: all
41+
action: core.noop
42+
"""
43+
44+
spec = native_specs.WorkflowSpec(wf_def)
45+
conductor = conducting.WorkflowConductor(spec)
46+
conductor.request_workflow_status(statuses.RUNNING)
47+
48+
# Run task1 and task2.
49+
self.forward_task_statuses(conductor, 'task1', [statuses.RUNNING])
50+
self.forward_task_statuses(conductor, 'task2', [statuses.RUNNING])
51+
self.assertEqual(conductor.get_workflow_status(), statuses.RUNNING)
52+
53+
# Pause the workflow.
54+
conductor.request_workflow_status(statuses.PAUSING)
55+
56+
# Put task1 to pending state and assert workflow is pausing.
57+
self.forward_task_statuses(conductor, 'task1', [statuses.PENDING])
58+
self.assertEqual(conductor.get_workflow_status(), statuses.PAUSING)
59+
60+
# Complete task1 only. The workflow should still be pausing
61+
# because task2 is still running.
62+
self.forward_task_statuses(conductor, 'task1', [statuses.SUCCEEDED])
63+
self.assertEqual(conductor.get_workflow_status(), statuses.PAUSING)
64+
65+
# Complete task2. When task2 completes, the workflow should be paused
66+
# because there is no task in active status.
67+
self.forward_task_statuses(conductor, 'task2', [statuses.SUCCEEDED])
68+
self.assertEqual(conductor.get_workflow_status(), statuses.PAUSED)
69+
70+
# Resume the workflow, task3 should be staged, and complete task3.
71+
conductor.request_workflow_status(statuses.RESUMING)
72+
self.assert_next_task(conductor, 'task3', {})
73+
self.forward_task_statuses(conductor, 'task3', [statuses.RUNNING, statuses.SUCCEEDED])
74+
self.assertEqual(conductor.get_workflow_status(), statuses.SUCCEEDED)
75+
76+
def test_pause_and_resume_from_branches(self):
77+
wf_def = """
78+
version: 1.0
79+
description: A basic branching workflow.
80+
tasks:
81+
# branch 1
82+
task1:
83+
action: core.ask
84+
next:
85+
- when: <% succeeded() %>
86+
do: task3
87+
# branch 2
88+
task2:
89+
action: core.noop
90+
next:
91+
- when: <% succeeded() %>
92+
do: task3
93+
# adjoining branch
94+
task3:
95+
join: all
96+
action: core.noop
97+
"""
98+
99+
spec = native_specs.WorkflowSpec(wf_def)
100+
conductor = conducting.WorkflowConductor(spec)
101+
conductor.request_workflow_status(statuses.RUNNING)
102+
103+
# Run task1 and task2.
104+
self.forward_task_statuses(conductor, 'task1', [statuses.RUNNING])
105+
self.forward_task_statuses(conductor, 'task2', [statuses.RUNNING])
106+
self.assertEqual(conductor.get_workflow_status(), statuses.RUNNING)
107+
108+
# Pause task1 and task2.
109+
self.forward_task_statuses(conductor, 'task1', [statuses.PENDING])
110+
self.forward_task_statuses(conductor, 'task2', [statuses.PAUSED])
111+
self.assertEqual(conductor.get_workflow_status(), statuses.PAUSED)
112+
113+
# Resume and complete task1 only. Once task1 completes, the workflow
114+
# should pause again because there is no active task.
115+
self.forward_task_statuses(conductor, 'task1', [statuses.SUCCEEDED])
116+
self.assertEqual(conductor.get_workflow_status(), statuses.PAUSED)
117+
118+
# Resume and complete task2. When task2 completes, the workflow
119+
# should stay running because task3 is now staged and ready.
120+
self.forward_task_statuses(conductor, 'task2', [statuses.RUNNING])
121+
self.assertEqual(conductor.get_workflow_status(), statuses.RUNNING)
122+
self.forward_task_statuses(conductor, 'task2', [statuses.SUCCEEDED])
123+
self.assertEqual(conductor.get_workflow_status(), statuses.RUNNING)
124+
self.assert_next_task(conductor, 'task3', {})
125+
126+
# Complete task3.
127+
self.forward_task_statuses(conductor, 'task3', [statuses.RUNNING, statuses.SUCCEEDED])
128+
self.assertEqual(conductor.get_workflow_status(), statuses.SUCCEEDED)
129+
130+
def test_pending_and_failed_with_task_transition_error(self):
131+
wf_def = """
132+
version: 1.0
133+
description: A basic sequential workflow.
134+
tasks:
135+
task1:
136+
action: core.ask
137+
next:
138+
- when: <% result().response.foobar %>
139+
do: task2
140+
task2:
141+
action: core.noop
142+
"""
143+
144+
spec = native_specs.WorkflowSpec(wf_def)
145+
conductor = conducting.WorkflowConductor(spec)
146+
conductor.request_workflow_status(statuses.RUNNING)
147+
148+
# Run task1.
149+
self.forward_task_statuses(conductor, 'task1', [statuses.RUNNING])
150+
self.assertEqual(conductor.get_workflow_status(), statuses.RUNNING)
151+
152+
# Put task1 to pending state and assert workflow is paused.
153+
self.forward_task_statuses(conductor, 'task1', [statuses.PENDING])
154+
self.assertEqual(conductor.get_workflow_status(), statuses.PAUSED)
155+
156+
# Complete task1 and assert workflow execution fails
157+
# due to the expression error in task transition.
158+
self.forward_task_statuses(conductor, 'task1', [statuses.SUCCEEDED])
159+
self.assertEqual(conductor.get_workflow_status(), statuses.FAILED)

0 commit comments

Comments
 (0)