Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions inorbit_edge_executor/behavior_tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from .datatypes import MissionStepSetData
from .datatypes import MissionStepWait
from .datatypes import MissionStepWaitUntil
from .datatypes import MissionStepIf
from .datatypes import Target
from .exceptions import TaskPausedException
from .inorbit import ACTION_CANCEL_NAV_ID
Expand Down Expand Up @@ -1115,6 +1116,9 @@ def visit_wait_until(self, step: MissionStepWaitUntil):
context=self.context, expression=step.expression, target=step.target, label=step.label
)

def visit_if(self, step: MissionStepIf):
raise NotImplementedError("visit_if not implemented")


# List of accepted node types (classes). With register_accepted_node_types(),
# this defines how to build nodes from their type fields (strings)
Expand Down
65 changes: 56 additions & 9 deletions inorbit_edge_executor/datatypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class MissionStepTypes(Enum):
WAIT_UNTIL = "waitUntil"
NAMED_WAYPOINT = "namedWaypoint"
POSE_WAYPOINT = "poseWaypoint"
IF = "if"


class Robot(BaseModel):
Expand Down Expand Up @@ -182,21 +183,67 @@ def accept(self, visitor):
return visitor.visit_run_action(self)


class MissionStepIf(MissionStep):
"""
Mission step for conditional execution based on an expression.
"""

class IfArgs(BaseModel):
model_config = ConfigDict(extra="forbid")
expression: str
target: Target = Field(default=None)
then: "StepsList" = Field(alias="then")
else_: Optional["StepsList"] = Field(alias="else", default=None)

if_step: IfArgs = Field(alias="if")

def _get_expression(self):
return self.if_step.expression

expression = property(fget=_get_expression)

def _get_target(self):
return self.if_step.target

target = property(fget=_get_target)

def _get_then(self):
return self.if_step.then

then = property(fget=_get_then)

def _get_else(self):
return self.if_step.else_

else_ = property(fget=_get_else)

def accept(self, visitor):
return visitor.visit_if(self)

def get_type(self):
return MissionStepTypes.IF.value


# Type alias for steps list that includes all step types including MissionStepIf
StepsList = List[
Union[
MissionStepSetData,
MissionStepPoseWaypoint,
MissionStepRunAction,
MissionStepWait,
MissionStepWaitUntil,
MissionStepIf,
]
]


class MissionDefinition(BaseModel):
"""
Mission Definition. Corresponds to the 'spec' schema of MissionDefinition kind in Config APIs
"""

label: str = ""
steps: List[
Union[
MissionStepSetData,
MissionStepPoseWaypoint,
MissionStepRunAction,
MissionStepWait,
MissionStepWaitUntil,
]
]
steps: StepsList
selector: Any = Field(
default=None
) # Accepted from API just to complete schema in struct mode (and ignore the field)
Expand Down
184 changes: 184 additions & 0 deletions tests/test_if_step.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
import pytest
from inorbit_edge_executor.datatypes import (
MissionStepIf,
MissionStepSetData,
MissionStepWait,
MissionStepTypes,
MissionDefinition,
)


def test_mission_step_if_basic():
"""Test basic MissionStepIf creation with if branch only."""
step = MissionStepIf(
**{
"if": {
"expression": "getValue('battery') > 50",
"then": [{"data": {"key": "value"}}],
}
}
)
assert step.expression == "getValue('battery') > 50"
assert step.target is None
assert len(step.then) == 1
assert isinstance(step.then[0], MissionStepSetData)
assert step.else_ is None
assert step.get_type() == MissionStepTypes.IF.value


def test_mission_step_if_with_else():
"""Test MissionStepIf creation with both if and else branches."""
step = MissionStepIf(
**{
"if": {
"expression": "getValue('battery') > 50",
"then": [{"data": {"key": "if_value"}}],
"else": [{"data": {"key": "else_value"}}],
}
}
)
assert step.expression == "getValue('battery') > 50"
assert len(step.then) == 1
assert isinstance(step.then[0], MissionStepSetData)
assert step.then[0].data["key"] == "if_value"
assert len(step.else_) == 1
assert isinstance(step.else_[0], MissionStepSetData)
assert step.else_[0].data["key"] == "else_value"


def test_mission_step_if_with_target():
"""Test MissionStepIf creation with target."""
step = MissionStepIf(
**{
"if": {
"expression": "getValue('battery') > 50",
"target": {"robotId": "robot456"},
"then": [{"timeoutSecs": 10}],
}
}
)
assert step.expression == "getValue('battery') > 50"
assert step.target is not None
assert step.target.robot_id == "robot456"
assert len(step.then) == 1
assert isinstance(step.then[0], MissionStepWait)


def test_mission_step_if_nested():
"""Test MissionStepIf with nested if steps."""
step = MissionStepIf(
**{
"if": {
"expression": "getValue('battery') > 50",
"then": [
{
"if": {
"expression": "getValue('status') == 'ready'",
"then": [{"data": {"nested": True}}],
}
}
],
}
}
)
assert step.expression == "getValue('battery') > 50"
assert len(step.then) == 1
assert isinstance(step.then[0], MissionStepIf)
nested_if = step.then[0]
assert nested_if.expression == "getValue('status') == 'ready'"
assert len(nested_if.then) == 1
assert isinstance(nested_if.then[0], MissionStepSetData)


def test_mission_step_if_with_label_and_timeout():
"""Test MissionStepIf with label and timeout."""
step = MissionStepIf(
label="Check battery",
timeoutSecs=30.0,
**{
"if": {
"expression": "getValue('battery') > 50",
"then": [{"data": {"key": "value"}}],
}
},
)
assert step.label == "Check battery"
assert step.timeout_secs == 30.0
assert step.expression == "getValue('battery') > 50"


def test_mission_step_if_accept_visitor():
"""Test MissionStepIf accept method for visitor pattern."""
step = MissionStepIf(
**{
"if": {
"expression": "getValue('battery') > 50",
"then": [{"data": {"key": "value"}}],
}
}
)

class MockVisitor:
def visit_if(self, step):
return "visited_if"

visitor = MockVisitor()
result = step.accept(visitor)
assert result == "visited_if"


def test_mission_definition_with_if_step():
"""Test MissionDefinition can contain MissionStepIf."""
definition = MissionDefinition(
label="Test mission",
steps=[
{
"if": {
"expression": "getValue('battery') > 50",
"then": [{"data": {"key": "value"}}],
"else": [{"timeoutSecs": 5}],
}
}
],
)
assert len(definition.steps) == 1
assert isinstance(definition.steps[0], MissionStepIf)
assert definition.steps[0].expression == "getValue('battery') > 50"


def test_mission_step_if_multiple_steps_in_branches():
"""Test MissionStepIf with multiple steps in if and else branches."""
step = MissionStepIf(
**{
"if": {
"expression": "getValue('battery') > 50",
"then": [
{"data": {"step": "if1"}},
{"data": {"step": "if2"}},
{"timeoutSecs": 10},
],
"else": [
{"data": {"step": "else1"}},
{"timeoutSecs": 5},
],
}
}
)
assert len(step.then) == 3
assert isinstance(step.then[0], MissionStepSetData)
assert isinstance(step.then[1], MissionStepSetData)
assert isinstance(step.then[2], MissionStepWait)
assert len(step.else_) == 2
assert isinstance(step.else_[0], MissionStepSetData)
assert isinstance(step.else_[1], MissionStepWait)


def test_mission_step_if_validation():
"""Test that MissionStepIf validates required fields."""
# Should fail without expression
with pytest.raises(Exception):
MissionStepIf(**{"if": {"then": [{"data": {"key": "value"}}]}})

# Should fail without if branch
with pytest.raises(Exception):
MissionStepIf(**{"if": {"expression": "getValue('battery') > 50"}})