Skip to content
27 changes: 23 additions & 4 deletions src/local_pathfinding/local_pathfinding/local_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,14 @@

from typing import List, Optional, Tuple

from custom_interfaces.msg import GPS, AISShips, HelperLatLon, Path, WindSensor
from custom_interfaces.msg import (
GPS,
AISShips,
HelperLatLon,
HelperSpeed,
Path,
WindSensor,
)
from rclpy.impl.rcutils_logger import RcutilsLogger

from local_pathfinding.ompl_path import OMPLPath
Expand All @@ -11,10 +18,10 @@
class LocalPathState:
"""Gathers and stores the state of Sailbot.
The attributes' units and conventions can be found in the ROS msgs they are derived from in the
custom_interfaces repository.
custom_interfaces package.

Attributes:
`position` (Tuple[float, float]): Latitude and longitude of Sailbot.
`position` (HelperLatLon): Latitude and longitude of Sailbot.
`speed` (float): Speed of Sailbot.
`heading` (float): Direction that Sailbot is pointing.
`ais_ships` (List[HelperAISShip]): Information about nearby ships.
Expand All @@ -23,6 +30,7 @@ class LocalPathState:
`wind_speed` (float): Wind speed.
`wind_direction` (int): Wind direction.
`planner` (str): Planner to use for the OMPL query.
`reference (HelperLatLon): Lat and lon position of the next global waypoint.
"""

def __init__(
Expand All @@ -35,9 +43,13 @@ def __init__(
):
"""Initializes the state from ROS msgs."""
if gps: # TODO: remove when mock can be run
self.position = (gps.lat_lon.latitude, gps.lat_lon.longitude)
self.position = gps.lat_lon
self.speed = gps.speed.speed
self.heading = gps.heading.heading
else:
self.position = HelperLatLon(latitude=0.0, longitude=0.0)
self.speed = 0.0
self.heading = 0.0

if ais_ships: # TODO: remove when mock can be run
self.ais_ships = [ship for ship in ais_ships.ships]
Expand All @@ -53,6 +65,13 @@ def __init__(
if filtered_wind_sensor: # TODO: remove when mock can be run
self.wind_speed = filtered_wind_sensor.speed.speed
self.wind_direction = filtered_wind_sensor.direction
else:
self.wind_speed = HelperSpeed(speed=0.0)
self.wind_direction = 0

self.reference_latlon = (
self.global_path[-1] if self.global_path else HelperLatLon(latitude=0.0, longitude=0.0)
)

self.planner = planner

Expand Down
71 changes: 35 additions & 36 deletions src/local_pathfinding/local_pathfinding/ompl_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
import pyompl
from custom_interfaces.msg import HelperLatLon
from rclpy.impl.rcutils_logger import RcutilsLogger
from shapely.geometry import MultiPolygon, Point, Polygon, box

import local_pathfinding.coord_systems as cs
from local_pathfinding.coord_systems import XY
from local_pathfinding.objectives import get_sailing_objective

if TYPE_CHECKING:
Expand All @@ -24,35 +26,13 @@
# ou.setLogLevel(ou.LOG_WARN)


class OMPLPathState:
def __init__(self, local_path_state: LocalPathState, logger: RcutilsLogger):
# TODO: derive OMPLPathState attributes from local_path_state
self.heading_direction = 45.0
self.wind_direction = 10.0
self.wind_speed = 1.0

self.state_domain = (-1, 1)
self.state_range = (-1, 1)
self.start_state = (0.5, 0.4)
self.goal_state = (0.5, -0.4)

self.reference_latlon = (
local_path_state.global_path[-1]
if local_path_state.global_path and len(local_path_state.global_path) > 0
else HelperLatLon(latitude=0.0, longitude=0.0)
)

if local_path_state:
self.planner = None
# self.planner = pyompl.RRTstar()


class OMPLPath:
"""Represents the general OMPL Path.

Attributes
_logger (RcutilsLogger): ROS logger of this class.
_simple_setup (og.SimpleSetup): OMPL SimpleSetup object.
_box_buffer (float): buffer around the sailbot position and the goal position in km
solved (bool): True if the path is a solution to the OMPL query, else false.
"""

Expand All @@ -69,6 +49,7 @@ def __init__(
max_runtime (float): Maximum amount of time in seconds to look for a solution path.
local_path_state (LocalPathState): State of Sailbot.
"""
self._box_buffer = 1
self._logger = parent_logger.get_child(name="ompl_path")

self._simple_setup = self._init_simple_setup(local_path_state) # this needs state
Expand Down Expand Up @@ -114,6 +95,13 @@ def get_waypoints(self) -> List[HelperLatLon]:

return waypoints

def create_buffer_around_position(self: OMPLPath, position: XY) -> Polygon:
"""Create a space around the given position. Position is the center of the space and
is a tuple of x and y.
"""
space = Point(position.x, position.y).buffer(self._box_buffer, cap_style=3, join_style=2)
return space

def update_objectives(self):
"""Update the objectives on the basis of which the path is optimized.
Raises:
Expand All @@ -122,21 +110,33 @@ def update_objectives(self):
raise NotImplementedError

def _init_simple_setup(self, local_path_state) -> pyompl.SimpleSetup:
"""Initialize and configure the OMPL SimpleSetup object.

Returns:
og.SimpleSetup: Encapsulates the various objects necessary to solve a geometric or
control query in OMPL.
"""
self.state = OMPLPathState(local_path_state, self._logger)
self.state = local_path_state

# Create buffered spaces and extract their centers
start_position_in_xy = cs.latlon_to_xy(self.state.reference_latlon, self.state.position)
start_box = self.create_buffer_around_position(start_position_in_xy)
start_x = start_position_in_xy.x
start_y = start_position_in_xy.y

if not self.state.global_path:
goal_polygon = self.create_buffer_around_position(cs.XY(0, 0))
goal_x, goal_y = (0.0, 0.0)
else:
goal_position = self.state.global_path[-1]
goal_position_in_xy = cs.latlon_to_xy(self.state.reference_latlon, goal_position)
goal_polygon = self.create_buffer_around_position(goal_position_in_xy)
goal_x, goal_y = goal_position_in_xy

# create an SE2 state space: rotation and translation in a plane
space = pyompl.SE2StateSpace()

# set the bounds of the state space
bounds = pyompl.RealVectorBounds(dim=2)
x_min, x_max = self.state.state_domain
y_min, y_max = self.state.state_range
state_space = box(*MultiPolygon([start_box, goal_polygon]).bounds)
x_min, y_min, x_max, y_max = state_space.bounds

if x_max <= x_min or y_max <= y_min:
raise ValueError(f"Invalid bounds: x=[{x_min}, {x_max}], y=[{y_min}, {y_max}]")
bounds.setLow(0, x_min)
bounds.setLow(1, y_min)
bounds.setHigh(0, x_max)
Expand All @@ -153,11 +153,8 @@ def _init_simple_setup(self, local_path_state) -> pyompl.SimpleSetup:
simple_setup = pyompl.SimpleSetup(space)
simple_setup.setStateValidityChecker(is_state_valid)

# set the goal and start states of the simple setup object
start = pyompl.ScopedState(space)
goal = pyompl.ScopedState(space)
start_x, start_y = self.state.start_state
goal_x, goal_y = self.state.goal_state
start.setXY(start_x, start_y)
goal.setXY(goal_x, goal_y)
"""self._logger.debug(
Expand All @@ -170,6 +167,7 @@ def _init_simple_setup(self, local_path_state) -> pyompl.SimpleSetup:
# Constructs a space information instance for this simple setup
space_information = simple_setup.getSpaceInformation()

# figure this out
self.state.planner = pyompl.RRTstar(space_information)

# set the optimization objective of the simple setup object
Expand All @@ -178,7 +176,8 @@ def _init_simple_setup(self, local_path_state) -> pyompl.SimpleSetup:
objective = get_sailing_objective(
space_information,
simple_setup,
self.state.heading_direction,
# This too
self.state.heading,
self.state.wind_direction,
self.state.wind_speed,
)
Expand Down
41 changes: 20 additions & 21 deletions src/local_pathfinding/test/test_ompl_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import pytest
from custom_interfaces.msg import GPS, AISShips, Path, WindSensor
from rclpy.impl.rcutils_logger import RcutilsLogger
from shapely.geometry import Point

import local_pathfinding.coord_systems as cs
import local_pathfinding.ompl_path as ompl_path
Expand All @@ -20,25 +21,6 @@
)


def test_OMPLPathState():
local_path_state = LocalPathState(
gps=GPS(),
ais_ships=AISShips(),
global_path=Path(),
filtered_wind_sensor=WindSensor(),
planner="rrtstar",
)
state = ompl_path.OMPLPathState(local_path_state, logger=RcutilsLogger())
assert state.state_domain == (-1, 1), "incorrect value for attribute state_domain"
assert state.state_range == (-1, 1), "incorrect value for attribute start_state"
assert state.start_state == pytest.approx(
(0.5, 0.4)
), "incorrect value for attribute start_state"
assert state.goal_state == pytest.approx(
(0.5, -0.4)
), "incorrect value for attribute goal_state"


def test_OMPLPath___init__():
assert PATH.solved

Expand All @@ -50,8 +32,7 @@ def test_OMPLPath_get_cost():

def test_OMPLPath_get_waypoint():
waypoints = PATH.get_waypoints()

waypoint_XY = cs.XY(*PATH.state.start_state)
waypoint_XY = cs.XY(PATH.state.position.latitude, PATH.state.position.longitude)
start_state_latlon = cs.xy_to_latlon(PATH.state.reference_latlon, waypoint_XY)

test_start = waypoints[0]
Expand Down Expand Up @@ -85,3 +66,21 @@ def test_is_state_valid(x: float, y: float, is_valid: bool):
assert ompl_path.is_state_valid(state), "state should be valid"
else:
assert not ompl_path.is_state_valid(state), "state should not be valid"


@pytest.mark.parametrize(
"position,expected_area,expected_bounds",
[
(cs.XY(0.0, 0.0), pytest.approx(4, rel=1e-2), (-1, -1, 1, 1)),
(cs.XY(100.0, 100.0), pytest.approx(4, rel=1e-2), (99, 99, 101, 101)),
(cs.XY(-100.0, -100.0), pytest.approx(4, rel=1e-2), (-101, -101, -99, -99)),
],
)
def test_create_space(position: cs.XY, expected_area, expected_bounds):
"""Test creation of buffered space around positions"""
# Given an OMPLPath instance
space = PATH.create_buffer_around_position(position)

assert space.area == expected_area, "Space area should match buffer size"
assert space.bounds == pytest.approx(expected_bounds, abs=1.0), "Bounds should match expected"
assert space.contains(Point(position.x, position.y)), "Space should contain center point"
Loading