Skip to content
11 changes: 10 additions & 1 deletion src/local_pathfinding/local_pathfinding/local_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ class LocalPathState:
`wind_speed` (float): Wind speed.
`wind_direction` (int): Wind direction.
`planner` (str): Planner to use for the OMPL query.
`reference_latlon` (Tuple[float, float]): The last tuple from the global_path array (
added for convenience in the OMPLPath class after removing the OMPLPathState class
)
"""

def __init__(
Expand All @@ -31,7 +34,7 @@ def __init__(
ais_ships: AISShips,
global_path: Path,
filtered_wind_sensor: WindSensor,
planner: str,
planner: str
):
"""Initializes the state from ROS msgs."""
if gps: # TODO: remove when mock can be run
Expand All @@ -54,6 +57,12 @@ def __init__(
self.wind_speed = filtered_wind_sensor.speed.speed
self.wind_direction = filtered_wind_sensor.direction

self.reference_latlon = (
self.global_path[-1]
if self.global_path and len(self.global_path) > 0
else HelperLatLon(latitude=0.0, longitude=0.0)
)

self.planner = planner


Expand Down
72 changes: 38 additions & 34 deletions src/local_pathfinding/local_pathfinding/ompl_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
from custom_interfaces.msg import HelperLatLon
from rclpy.impl.rcutils_logger import RcutilsLogger

# Fix the import
from shapely.geometry import MultiPolygon, Point, Polygon, box

import local_pathfinding.coord_systems as cs
from local_pathfinding.objectives import get_sailing_objective

Expand All @@ -24,29 +27,6 @@
# ou.setLogLevel(ou.LOG_WARN)


class OMPLPathState:
def __init__(self, local_path_state: LocalPathState, logger: RcutilsLogger):
# TODO: derive OMPLPathState attributes from local_path_state
self.heading_direction = 45.0
self.wind_direction = 10.0
self.wind_speed = 1.0

self.state_domain = (-1, 1)
self.state_range = (-1, 1)
self.start_state = (0.5, 0.4)
self.goal_state = (0.5, -0.4)

self.reference_latlon = (
local_path_state.global_path[-1]
if local_path_state.global_path and len(local_path_state.global_path) > 0
else HelperLatLon(latitude=0.0, longitude=0.0)
)

if local_path_state:
self.planner = None
# self.planner = pyompl.RRTstar()


class OMPLPath:
"""Represents the general OMPL Path.

Expand All @@ -68,7 +48,11 @@ def __init__(
parent_logger (RcutilsLogger): Logger of the parent class.
max_runtime (float): Maximum amount of time in seconds to look for a solution path.
local_path_state (LocalPathState): State of Sailbot.

Fields not mentioned in Args:
box_buffer (float): buffer around the sailbot position and the goal position in km
"""
self.box_buffer = 1
self._logger = parent_logger.get_child(name="ompl_path")

self._simple_setup = self._init_simple_setup(local_path_state) # this needs state
Expand Down Expand Up @@ -114,6 +98,13 @@ def get_waypoints(self) -> List[HelperLatLon]:

return waypoints

def create_space(self, position) -> Polygon:
""" Create a space around the given position. Position is the center of the space and
is a tuple of x and y.
"""
space = Point(position[0], position[1]).buffer(self.box_buffer, cap_style=3, join_style=2)
return space

def update_objectives(self):
"""Update the objectives on the basis of which the path is optimized.
Raises:
Expand All @@ -122,21 +113,32 @@ def update_objectives(self):
raise NotImplementedError

def _init_simple_setup(self, local_path_state) -> pyompl.SimpleSetup:
"""Initialize and configure the OMPL SimpleSetup object.
self.state = local_path_state

Returns:
og.SimpleSetup: Encapsulates the various objects necessary to solve a geometric or
control query in OMPL.
"""
self.state = OMPLPathState(local_path_state, self._logger)
# Create buffered spaces and extract their centers
state_domain = self.create_space(self.state.position)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here we might be confusing what domain is (OMPL has its own different definition). You might rename this variable to something more clear that communicates its only a box around position.

start_x, start_y = self.state.position # Use original position for coordinates

if not self.state.global_path:
goal_polygon = self.create_space([0, 0])
goal_x, goal_y = (0, 0)
else:
goal_position = self.state.global_path[-1]
goal_polygon = self.create_space(goal_position)
goal_x, goal_y = goal_position

# create an SE2 state space: rotation and translation in a plane
space = pyompl.SE2StateSpace()

# set the bounds of the state space
bounds = pyompl.RealVectorBounds(dim=2)
x_min, x_max = self.state.state_domain
y_min, y_max = self.state.state_range
big_polygon = box(*MultiPolygon([state_domain, goal_polygon]).bounds)
x_min, y_min, x_max, y_max = big_polygon.bounds
# x_min, x_max = state_domain
# y_min, y_max = state_range

if x_max <= x_min or y_max <= y_min:
raise ValueError(f"Invalid bounds: x=[{x_min}, {x_max}], y=[{y_min}, {y_max}]")
bounds.setLow(0, x_min)
bounds.setLow(1, y_min)
bounds.setHigh(0, x_max)
Expand All @@ -156,8 +158,8 @@ def _init_simple_setup(self, local_path_state) -> pyompl.SimpleSetup:
# set the goal and start states of the simple setup object
start = pyompl.ScopedState(space)
goal = pyompl.ScopedState(space)
start_x, start_y = self.state.start_state
goal_x, goal_y = self.state.goal_state
# start_x, start_y = start_state
# goal_x, goal_y = goal_state
start.setXY(start_x, start_y)
goal.setXY(goal_x, goal_y)
"""self._logger.debug(
Expand All @@ -170,6 +172,7 @@ def _init_simple_setup(self, local_path_state) -> pyompl.SimpleSetup:
# Constructs a space information instance for this simple setup
space_information = simple_setup.getSpaceInformation()

# figure this out
self.state.planner = pyompl.RRTstar(space_information)

# set the optimization objective of the simple setup object
Expand All @@ -178,7 +181,8 @@ def _init_simple_setup(self, local_path_state) -> pyompl.SimpleSetup:
objective = get_sailing_objective(
space_information,
simple_setup,
self.state.heading_direction,
# This too
self.state.heading,
self.state.wind_direction,
self.state.wind_speed,
)
Expand Down
41 changes: 20 additions & 21 deletions src/local_pathfinding/test/test_ompl_path.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from shapely.geometry import Point
import pyompl
import pytest
from custom_interfaces.msg import GPS, AISShips, Path, WindSensor
Expand All @@ -20,25 +21,6 @@
)


def test_OMPLPathState():
local_path_state = LocalPathState(
gps=GPS(),
ais_ships=AISShips(),
global_path=Path(),
filtered_wind_sensor=WindSensor(),
planner="rrtstar",
)
state = ompl_path.OMPLPathState(local_path_state, logger=RcutilsLogger())
assert state.state_domain == (-1, 1), "incorrect value for attribute state_domain"
assert state.state_range == (-1, 1), "incorrect value for attribute start_state"
assert state.start_state == pytest.approx(
(0.5, 0.4)
), "incorrect value for attribute start_state"
assert state.goal_state == pytest.approx(
(0.5, -0.4)
), "incorrect value for attribute goal_state"


def test_OMPLPath___init__():
assert PATH.solved

Expand All @@ -50,8 +32,7 @@ def test_OMPLPath_get_cost():

def test_OMPLPath_get_waypoint():
waypoints = PATH.get_waypoints()

waypoint_XY = cs.XY(*PATH.state.start_state)
waypoint_XY = cs.XY(PATH.state.position[0], PATH.state.position[1])
start_state_latlon = cs.xy_to_latlon(PATH.state.reference_latlon, waypoint_XY)

test_start = waypoints[0]
Expand Down Expand Up @@ -85,3 +66,21 @@ def test_is_state_valid(x: float, y: float, is_valid: bool):
assert ompl_path.is_state_valid(state), "state should be valid"
else:
assert not ompl_path.is_state_valid(state), "state should not be valid"


@pytest.mark.parametrize(
"position,expected_area,expected_bounds",
[
((0, 0), pytest.approx(4, rel=1e-2), (-1, -1, 1, 1)),
((100, 100), pytest.approx(4, rel=1e-2), (99, 99, 101, 101)),
((-100, -100), pytest.approx(4, rel=1e-2), (-101, -101, -99, -99)),
],
)
def test_create_space(position, expected_area, expected_bounds):
"""Test creation of buffered space around positions"""
# Given an OMPLPath instance
space = PATH.create_space(position)

assert space.area == expected_area, "Space area should match buffer size"
assert space.bounds == pytest.approx(expected_bounds, abs=1.0), "Bounds should match expected"
assert space.contains(Point(position[0], position[1])), "Space should contain center point"
Loading