Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions docs/dot/recovery.dot
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
digraph recovery {
graph [fontname="times-roman"];
node [fontname="times-roman"];
edge [fontname="times-roman"];

recovery [label="Recovery", shape=box, style=rounded, fontsize=11];

main [label="Main Behaviour", shape=ellipse, fontsize=10];
rec1 [label="Recovery #1", shape=ellipse, fontsize=10];
rec2 [label="Recovery #2", shape=ellipse, fontsize=10];
recN [label="Recovery #N", shape=ellipse, fontsize=10];

recovery -> main;
recovery -> rec1;
recovery -> rec2;
recovery -> recN;

{rank=same; rec1 rec2 recN}
}
114 changes: 114 additions & 0 deletions py_trees/composites.py
Original file line number Diff line number Diff line change
Expand Up @@ -808,3 +808,117 @@ def validate_policy_configuration(self) -> None:
)
self.logger.error(error_message)
raise RuntimeError(error_message)


class Recovery(Composite):
"""
A Recovery composite that wraps a main behaviour with a sequence of recovery behaviours.

.. graphviz:: dot/recovery.dot

Execution model:

- Tick the main behaviour first.
- If main returns SUCCESS or RUNNING, propagate that.
- If main returns FAILURE:
* Attempt the next recovery behaviour in sequence.
* If recovery RUNNING, propagate RUNNING.
* If recovery completes (SUCCESS or FAILURE), consume it and retry main (if any recoveries remain).
- If all recoveries are exhausted and main still fails, return FAILURE.

Args:
name (:obj:`str`): the composite behaviour name
children ([:class:`~py_trees.behaviour.Behaviour`]): list of children,
where the first is the main behaviour and the rest are recovery behaviours
"""

def __init__(
self,
name: str,
children: typing.Optional[typing.Sequence[behaviour.Behaviour]] = None,
):
super().__init__(name, children)
if not children or len(children) < 1:
raise ValueError("Recovery requires at least a main behaviour")

# Explicit references
self.main: behaviour.Behaviour = children[0]
self.recoveries: typing.List[behaviour.Behaviour] = (
list(children[1:]) if len(children) > 1 else []
)
self.current_recovery_index: int = 0
self.running_main = True

def initialise(self) -> None:
"""Reset to the initial state: run main behaviour and restart recovery behaviours sequence."""
self.current_recovery_index = 0
self.running_main = True

def tick(self) -> typing.Iterator[behaviour.Behaviour]:
"""
Tick over the children.

Yields:
:class:`~py_trees.behaviour.Behaviour`: a reference to itself or one of its children
"""
self.logger.debug("%s.tick()" % self.__class__.__name__)

if not self.children:
self.stop(common.Status.FAILURE)
yield self
return

# First try the main behaviour if we are not in the middle of a recovery
if self.running_main:
for node in self.main.tick():
yield node
if node is self.main:
if node.status in (common.Status.SUCCESS, common.Status.RUNNING):
self.status = node.status
yield self
return
elif node.status == common.Status.FAILURE:
# proceed to next recovery
self.running_main = False

# Try recoveries
while self.current_recovery_index < len(self.recoveries):
recovery = self.recoveries[self.current_recovery_index]
for node in recovery.tick():
yield node
if node is recovery:
if node.status == common.Status.RUNNING:
self.status = common.Status.RUNNING
yield self
return
elif node.status == common.Status.SUCCESS:
self.status = common.Status.RUNNING
# consume this recovery and retry main
recovery.stop(common.Status.INVALID)
self.current_recovery_index += 1
self.main.stop(common.Status.INVALID)
self.running_main = True
yield self
return
elif node.status == common.Status.FAILURE:
# consume this recovery and move to next
recovery.stop(common.Status.INVALID)
self.current_recovery_index += 1
yield self

# No recoveries left → fail
self.status = common.Status.FAILURE
yield self

def stop(self, new_status: common.Status = common.Status.INVALID) -> None:
"""
Ensure that children are appropriately stopped and update status.

Args:
new_status : the composite is transitioning to this new status
"""
for child in self.children:
if child.status != common.Status.INVALID:
child.stop(common.Status.INVALID)
self.current_recovery_index = 0
super().stop(new_status)
66 changes: 66 additions & 0 deletions tests/test_recovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# tests/test_recovery.py

from py_trees.behaviours import StatusQueue
from py_trees.common import Status
from py_trees.composites import Recovery


def test_main_success() -> None:
main = StatusQueue("Main", [Status.SUCCESS], eventually=None)
root = Recovery("Recovery", children=[main])

root.tick_once()
assert main.status == Status.SUCCESS


def test_main_running() -> None:
main = StatusQueue("Main", [Status.RUNNING, Status.SUCCESS], eventually=None)
root = Recovery("Recovery", children=[main])

root.tick_once()
assert root.status == Status.RUNNING

root.tick_once()
assert root.status == Status.SUCCESS


def test_recovery_success_then_retry_main() -> None:
# main fails, recovery1 succeeds, main succeeds when retried
main = StatusQueue("Main", [Status.FAILURE, Status.SUCCESS], eventually=None)
rec1 = StatusQueue("Rec1", [Status.SUCCESS], eventually=None)
root = Recovery("Recovery", children=[main, rec1])

# tick 1: main fails, recovery1 succeeds, composite RUNNING
root.tick_once()
assert root.status == Status.RUNNING

# tick 2: main retried, succeeds
root.tick_once()
assert root.status == Status.SUCCESS


def test_recovery_fails_then_next_succeeds() -> None:
# main fails, rec1 fails, rec2 succeeds, then main succeeds
main = StatusQueue("Main", [Status.FAILURE, Status.SUCCESS], eventually=None)
rec1 = StatusQueue("Rec1", [Status.FAILURE], eventually=None)
rec2 = StatusQueue("Rec2", [Status.SUCCESS], eventually=None)
root = Recovery("Recovery", children=[main, rec1, rec2])

# tick 1: main fails, rec1 fails, composite RUNNING
root.tick_once()
assert root.status == Status.RUNNING

# tick 2: main retried, succeeds
root.tick_once()
assert root.status == Status.SUCCESS


def test_all_recoveries_fail() -> None:
# main fails, all recoveries fail, composite fails
main = StatusQueue("Main", [Status.FAILURE], eventually=None)
rec1 = StatusQueue("Rec1", [Status.FAILURE], eventually=None)
rec2 = StatusQueue("Rec2", [Status.FAILURE], eventually=None)
root = Recovery("Recovery", children=[main, rec1, rec2])

root.tick_once()
assert root.status == Status.FAILURE