Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 57 additions & 20 deletions zag/engines/action_engine/completer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# under the License.

import abc
import itertools
import weakref

from oslo_utils import reflection
Expand All @@ -23,6 +24,7 @@

from zag.engines.action_engine import compiler as co
from zag.engines.action_engine import executor as ex
from zag.engines.action_engine import traversal as tr
from zag import logging
from zag import retry as retry_atom
from zag import states as st
Expand All @@ -36,8 +38,9 @@ class Strategy(object):

strategy = None

def __init__(self, runtime):
def __init__(self, runtime, atom=None):
self._runtime = runtime
self._atom = atom

@abc.abstractmethod
def apply(self):
Expand All @@ -57,14 +60,10 @@ class RevertAndRetry(Strategy):

strategy = retry_atom.RETRY

def __init__(self, runtime, retry):
super(RevertAndRetry, self).__init__(runtime)
self._retry = retry

def apply(self):
tweaked = self._runtime.reset_atoms([self._retry], state=None,
tweaked = self._runtime.reset_atoms([self._atom], state=None,
intention=st.RETRY)
tweaked.extend(self._runtime.reset_subgraph(self._retry, state=None,
tweaked.extend(self._runtime.reset_subgraph(self._atom, state=None,
intention=st.REVERT))
return tweaked

Expand All @@ -74,9 +73,6 @@ class RevertAll(Strategy):

strategy = retry_atom.REVERT_ALL

def __init__(self, runtime):
super(RevertAll, self).__init__(runtime)

def apply(self):
return self._runtime.reset_atoms(
self._runtime.iterate_nodes(co.ATOMS),
Expand All @@ -88,10 +84,6 @@ class Revert(Strategy):

strategy = retry_atom.REVERT

def __init__(self, runtime, atom):
super(Revert, self).__init__(runtime)
self._atom = atom

def apply(self):
tweaked = self._runtime.reset_atoms([self._atom], state=None,
intention=st.REVERT)
Expand All @@ -100,6 +92,46 @@ def apply(self):
return tweaked


class Abort(Strategy):
"""Sets atom and any unfinished nodes to the ``IGNORE`` intention."""

strategy = retry_atom.ABORT

def apply(self):
execution_graph = self._runtime.compilation.execution_graph
successors_iter = tr.depth_first_iterate(
execution_graph,
self._atom,
tr.Direction.FORWARD
)
return self._runtime.reset_atoms(
itertools.chain([self._atom], successors_iter),
state=st.IGNORE,
intention=st.IGNORE,
)


class Ignore(Strategy):
"""Sets atom and *associated* nodes to the ``IGNORE`` intention."""

strategy = retry_atom.IGNORE

def apply(self):
execution_graph = self._runtime.compilation.execution_graph
successors_iter = tr.depth_first_iterate(
execution_graph,
self._atom,
tr.Direction.FORWARD,
through_flows=False,
through_retries=False,
)
return self._runtime.reset_atoms(
itertools.chain([self._atom], successors_iter),
state=st.IGNORE,
intention=st.IGNORE,
)


class Completer(object):
"""Completes atoms using actions to complete them."""

Expand Down Expand Up @@ -176,9 +208,7 @@ def _determine_resolution(self, atom, failure):
# Ask retry controller what to do in case of failure.
handler = self._runtime.fetch_action(retry)
strategy = handler.on_failure(retry, atom, failure)
if strategy == retry_atom.RETRY:
return RevertAndRetry(self._runtime, retry)
elif strategy == retry_atom.REVERT:
if strategy == retry_atom.REVERT:
# Ask parent retry and figure out what to do...
parent_resolver = self._determine_resolution(retry, failure)
# In the future, this will be the only behavior. REVERT
Expand All @@ -192,9 +222,16 @@ def _determine_resolution(self, atom, failure):
if parent_resolver is not self._undefined_resolver:
if parent_resolver.strategy != retry_atom.REVERT:
return parent_resolver
return Revert(self._runtime, retry)
elif strategy == retry_atom.REVERT_ALL:
return RevertAll(self._runtime)

# find the strategy subclass that matches the strategy and use it
strategy_cls = None
for subclass in Strategy.__subclasses__():
if subclass.strategy == strategy:
strategy_cls = subclass
break

if strategy_cls:
return strategy_cls(self._runtime, retry)
else:
raise ValueError("Unknown atom failure resolution"
" action/strategy '%s'" % strategy)
Expand Down
2 changes: 1 addition & 1 deletion zag/engines/action_engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ def run_iter(self, timeout=None):
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception("Engine execution has failed, something"
" bad must of happened (last"
" bad must have happened (last"
" %s machine transitions were %s)",
last_transitions.maxlen,
list(last_transitions))
Expand Down
2 changes: 2 additions & 0 deletions zag/engines/action_engine/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ def iter_next_atoms(self, atom=None):
return self._browse_atoms_for_execute(atom=atom)
else:
return iter([])
elif state == st.IGNORE and intention == st.IGNORE:
return self._browse_atoms_for_execute(atom=atom)
elif state == st.REVERTED:
return self._browse_atoms_for_revert(atom=atom)
elif state == st.FAILURE:
Expand Down
44 changes: 43 additions & 1 deletion zag/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,35 @@ class Decision(misc.StrEnum):
retry strategy associated with it.
"""

#: Retries the surrounding/associated subflow again.
RETRY = "RETRY"
"""Retries the surrounding/associated subflow.

This strategy will revert tasks within the associated subflow, then
re-run all the tasks again.
"""

ABORT = "ABORT"
"""Ends the entire flow immediately without reverting anything.

This strategy will just end the flow without marking anything as failed.
If you want a task to be able to short-circuit a flow without triggering
any error handling, this is what you want.
"""

IGNORE = "IGNORE"
"""Ends the subflow immediately and continues on to the remaining flow.

This strategy will abandon the associated subflow but continue processing
the outer flow as if the subflow had succeeded.
"""


# Retain these aliases for a number of releases...
REVERT = Decision.REVERT
REVERT_ALL = Decision.REVERT_ALL
RETRY = Decision.RETRY
ABORT = Decision.ABORT
IGNORE = Decision.IGNORE

# Constants passed into revert/execute kwargs.
#
Expand Down Expand Up @@ -379,3 +401,23 @@ def on_failure(self, values, history, *args, **kwargs):

def execute(self, values, history, *args, **kwargs):
return self._get_next_value(values, history)


class AlwaysAbort(Retry):
"""Retry that always aborts entire flow."""

def on_failure(self, *args, **kwargs):
return ABORT

def execute(self, *args, **kwargs):
pass


class AlwaysIgnore(Retry):
"""Retry that always ignores subflow."""

def on_failure(self, *args, **kwargs):
return IGNORE

def execute(self, *args, **kwargs):
pass
57 changes: 57 additions & 0 deletions zag/tests/unit/test_retries.py
Original file line number Diff line number Diff line change
Expand Up @@ -1188,6 +1188,63 @@ def test_nested_provides_graph_retried_correctly(self):
self.assertItemsEqual(expected, capturer.values[4:])
self.assertEqual(st.SUCCESS, engine.storage.get_flow_state())

def test_abort(self):
retry1 = retry.AlwaysAbort('r1')
flow = lf.Flow('flow-1').add(
utils.ProgressingTask('task-1'),
lf.Flow('flow-2', retry1).add(
utils.FailingTask('task-2'),
utils.ProgressingTask('task-3'),
),
utils.ProgressingTask('task-4'),
)
engine = self._make_engine(flow)
with utils.CaptureListener(engine) as capturer:
engine.run()
expected = ['flow-1.f RUNNING',
'task-1.t RUNNING',
'task-1.t SUCCESS(5)',
'r1.r RUNNING',
'r1.r SUCCESS(None)',
'task-2.t RUNNING',
'task-2.t FAILURE(Failure: RuntimeError: Woot!)',
'r1.r IGNORE',
'task-2.t IGNORE',
'task-3.t IGNORE',
'task-4.t IGNORE',
'flow-1.f SUCCESS']

self.assertEqual(expected, capturer.values)

def test_ignore(self):
retry1 = retry.AlwaysIgnore('r1')
flow = lf.Flow('flow-1').add(
utils.ProgressingTask('task-1'),
lf.Flow('flow-2', retry1).add(
utils.FailingTask('task-2'),
utils.ProgressingTask('task-3'),
),
utils.ProgressingTask('task-4'),
)
engine = self._make_engine(flow)
with utils.CaptureListener(engine) as capturer:
engine.run()
expected = ['flow-1.f RUNNING',
'task-1.t RUNNING',
'task-1.t SUCCESS(5)',
'r1.r RUNNING',
'r1.r SUCCESS(None)',
'task-2.t RUNNING',
'task-2.t FAILURE(Failure: RuntimeError: Woot!)',
'r1.r IGNORE',
'task-2.t IGNORE',
'task-3.t IGNORE',
'task-4.t RUNNING',
'task-4.t SUCCESS(5)',
'flow-1.f SUCCESS']

self.assertEqual(expected, capturer.values)


class RetryParallelExecutionTest(utils.EngineTestBase):
# FIXME(harlowja): fix this class so that it doesn't use events or uses
Expand Down