Skip to content

Commit 7dca476

Browse files
authored
Merge pull request #213 from StackStorm/fix-item-failure-flow
Fix workflow state when one or more items failed in a with items task
2 parents 142d7c4 + a315f99 commit 7dca476

File tree

4 files changed

+708
-465
lines changed

4 files changed

+708
-465
lines changed

CHANGELOG.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ Fixed
99

1010
* Warn users when there is a loop and no start task identified. (bug fix)
1111
* Lock global variables during initialization to make them thread safe. (bug fix)
12+
* Workflow stuck in running if one or more items failed in a with items task. (bug fix)
1213

1314
1.1.1
1415
-----

orquesta/conducting.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,16 @@ def deserialize(cls, data):
6767
instance.contexts = json_util.deepcopy(data.get("contexts", list()))
6868
instance.routes = json_util.deepcopy(data.get("routes", list()))
6969
instance.sequence = json_util.deepcopy(data.get("sequence", list()))
70-
instance.staged = json_util.deepcopy(data.get("staged", dict()))
70+
instance.staged = json_util.deepcopy(data.get("staged", list()))
7171
instance.status = data.get("status", statuses.UNSET)
7272
instance.tasks = json_util.deepcopy(data.get("tasks", dict()))
7373
instance.reruns = json_util.deepcopy(data.get("reruns", list()))
7474

7575
return instance
7676

77+
def has_task(self, task_id, route):
78+
return constants.TASK_STATE_ROUTE_FORMAT % (task_id, str(route)) in self.tasks
79+
7780
def get_task(self, task_id, route):
7881
return self.sequence[self.tasks[constants.TASK_STATE_ROUTE_FORMAT % (task_id, str(route))]]
7982

@@ -174,7 +177,10 @@ def get_unreachable_barriers(self):
174177
return unreachable_barriers
175178

176179
def get_staged_tasks(self, filtered=True):
177-
return list(filter(lambda x: x["ready"] is True, self.staged)) if filtered else self.staged
180+
if not filtered:
181+
return self.staged
182+
183+
return [x for x in self.staged if x["ready"] and not x.get("completed", False)]
178184

179185
@property
180186
def has_staged_tasks(self):
@@ -200,10 +206,7 @@ def add_staged_task(self, task_id, route, ctxs=None, prev=None, ready=True, retr
200206
return entry
201207

202208
def get_staged_task(self, task_id, route):
203-
def query(x):
204-
return x["id"] == task_id and x["route"] == route
205-
206-
staged_tasks = list(filter(query, self.staged))
209+
staged_tasks = [x for x in self.staged if x["id"] == task_id and x["route"] == route]
207210

208211
return staged_tasks[0] if staged_tasks else None
209212

@@ -909,10 +912,13 @@ def update_task_state(self, task_id, route, event):
909912

910913
# Get task result and set current context if task is completed.
911914
if new_task_status in statuses.COMPLETED_STATUSES:
912-
# Remove task from staging if exists but keep entry
915+
# Remove task from staging if exists but keep and flag entry
913916
# if task has items and failed for manual rerun.
914917
if not (task_spec.has_items() and new_task_status in statuses.ABENDED_STATUSES):
915918
self.workflow_state.remove_staged_task(task_id, route)
919+
else:
920+
staged_task = self.workflow_state.get_staged_task(task_id, route)
921+
staged_task["completed"] = True
916922

917923
# Format task result depending on the type of task.
918924
task_result = self.make_task_result(task_spec, event)
@@ -1015,6 +1021,7 @@ def update_task_state(self, task_id, route, event):
10151021

10161022
# Clear list of items for with items task.
10171023
staged_next_task.pop("items", None)
1024+
staged_next_task.pop("completed", None)
10181025
else:
10191026
# Otherwise create a new entry in staging for the next task.
10201027
staged_next_task = self.workflow_state.add_staged_task(
@@ -1171,6 +1178,12 @@ def _request_task_rerun(self, task_id, route, reset_items=False):
11711178
task.pop("term", None)
11721179
task.pop("ignore", None)
11731180

1181+
# Reset staged task for the rerunnable candidate.
1182+
staged_task = self.workflow_state.get_staged_task(task_id, route)
1183+
1184+
if staged_task:
1185+
staged_task.pop("completed", None)
1186+
11741187
# Reset the list of errors for the task.
11751188
for e in [e for e in self.errors if e.get("task_id", None) == task_id]:
11761189
self.errors.remove(e)

0 commit comments

Comments
 (0)