Skip to content

Commit 1e2d962

Browse files
authored
Merge pull request #200 from StackStorm/fix-task-retry
Fix remediated task with retry
2 parents afdb704 + 36a3475 commit 1e2d962

File tree

3 files changed

+100
-1
lines changed

3 files changed

+100
-1
lines changed

CHANGELOG.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
11
Changelog
22
=========
33

4+
In Development
5+
--------------
6+
7+
Fixed
8+
~~~~~
9+
10+
* Fix task retry where transition on error is also executed along with retry. (bug fix)
11+
412
1.1.0
513
-----
614

orquesta/conducting.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -926,7 +926,7 @@ def update_task_state(self, task_id, route, event):
926926
# the condition if a retry for the task is required.
927927
if (self.get_workflow_status() in statuses.ACTIVE_STATUSES and
928928
self._evaluate_task_retry(task_state_entry, current_ctx)):
929-
self.update_task_state(task_id, route, events.TaskRetryEvent())
929+
return self.update_task_state(task_id, route, events.TaskRetryEvent())
930930

931931
# Evaluate task transitions if task is completed and status change is not processed.
932932
if new_task_status in statuses.COMPLETED_STATUSES and new_task_status != old_task_status:

orquesta/tests/unit/conducting/test_task_retry.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,97 @@ def test_retries_exhausted(self):
274274
actual_task_sequence = [item['id'] for item in conductor.workflow_state.sequence]
275275
self.assertListEqual(expected_task_sequence, actual_task_sequence)
276276

277+
def test_retries_exhausted_and_task_remediated(self):
278+
wf_def = """
279+
version: 1.0
280+
281+
tasks:
282+
task1:
283+
action: core.echo message="$RANDOM"
284+
retry:
285+
count: 3
286+
next:
287+
- when: <% succeeded() %>
288+
do: task2
289+
- when: <% failed() %>
290+
do: task3
291+
task2:
292+
action: core.noop
293+
task3:
294+
action: core.echo message="BOOM!"
295+
next:
296+
- do: fail
297+
"""
298+
299+
expected_tk1_action_spec = {'action': 'core.echo', 'input': {'message': '$RANDOM'}}
300+
expected_tk3_action_spec = {'action': 'core.echo', 'input': {'message': 'BOOM!'}}
301+
302+
spec = native_specs.WorkflowSpec(wf_def)
303+
self.assertDictEqual(spec.inspect(), {})
304+
305+
conductor = conducting.WorkflowConductor(spec)
306+
conductor.request_workflow_status(statuses.RUNNING)
307+
308+
# Failed execution for task1.
309+
next_tasks = conductor.get_next_tasks()
310+
self.assertEqual(len(next_tasks), 1)
311+
self.assertEqual(next_tasks[0]['id'], 'task1')
312+
self.assertDictEqual(next_tasks[0]['actions'][0], expected_tk1_action_spec)
313+
self.forward_task_statuses(conductor, 'task1', [statuses.RUNNING, statuses.FAILED])
314+
315+
# Failed retry #1 for task1.
316+
tk1_state = conductor.get_task_state_entry('task1', 0)
317+
self.assertEqual(tk1_state['status'], statuses.RETRYING)
318+
self.assertEqual(tk1_state['retry']['count'], 3)
319+
self.assertEqual(tk1_state['retry']['tally'], 1)
320+
next_tasks = conductor.get_next_tasks()
321+
self.assertEqual(len(next_tasks), 1)
322+
self.assertEqual(next_tasks[0]['id'], 'task1')
323+
self.assertDictEqual(next_tasks[0]['actions'][0], expected_tk1_action_spec)
324+
self.forward_task_statuses(conductor, 'task1', [statuses.RUNNING, statuses.FAILED])
325+
326+
# Failed retry #2 for task1.
327+
tk1_state = conductor.get_task_state_entry('task1', 0)
328+
self.assertEqual(tk1_state['status'], statuses.RETRYING)
329+
self.assertEqual(tk1_state['retry']['tally'], 2)
330+
next_tasks = conductor.get_next_tasks()
331+
self.assertEqual(len(next_tasks), 1)
332+
self.assertEqual(next_tasks[0]['id'], 'task1')
333+
self.assertDictEqual(next_tasks[0]['actions'][0], expected_tk1_action_spec)
334+
self.forward_task_statuses(conductor, 'task1', [statuses.RUNNING, statuses.FAILED])
335+
336+
# Failed retry #3 for task1.
337+
tk1_state = conductor.get_task_state_entry('task1', 0)
338+
self.assertEqual(tk1_state['status'], statuses.RETRYING)
339+
self.assertEqual(tk1_state['retry']['tally'], 3)
340+
next_tasks = conductor.get_next_tasks()
341+
self.assertEqual(len(next_tasks), 1)
342+
self.assertEqual(next_tasks[0]['id'], 'task1')
343+
self.assertDictEqual(next_tasks[0]['actions'][0], expected_tk1_action_spec)
344+
self.forward_task_statuses(conductor, 'task1', [statuses.RUNNING, statuses.FAILED])
345+
346+
# Assert task1 failed and the workflow execution progresses to task3.
347+
tk1_state = conductor.get_task_state_entry('task1', 0)
348+
self.assertEqual(tk1_state['status'], statuses.FAILED)
349+
self.assertEqual(tk1_state['retry']['tally'], 3)
350+
next_tasks = conductor.get_next_tasks()
351+
self.assertEqual(len(next_tasks), 1)
352+
self.assertEqual(next_tasks[0]['id'], 'task3')
353+
self.assertDictEqual(next_tasks[0]['actions'][0], expected_tk3_action_spec)
354+
355+
# Successful execution for task3.
356+
self.forward_task_statuses(conductor, 'task3', [statuses.RUNNING, statuses.SUCCEEDED])
357+
tk3_state = conductor.get_task_state_entry('task3', 0)
358+
self.assertEqual(tk3_state['status'], statuses.SUCCEEDED)
359+
360+
# Assert workflow failed (manual under task3).
361+
self.assertEqual(conductor.get_workflow_status(), statuses.FAILED)
362+
363+
# Assert there is only a single task1 and a single task3 in the task sequences.
364+
expected_task_sequence = ['task1', 'task3', 'fail']
365+
actual_task_sequence = [item['id'] for item in conductor.workflow_state.sequence]
366+
self.assertListEqual(expected_task_sequence, actual_task_sequence)
367+
277368
def test_retry_delay_with_task_delay_defined(self):
278369
wf_def = """
279370
version: 1.0

0 commit comments

Comments
 (0)