Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
bb7ac62
added unused timer to check for timeuots
raghumanimehta Feb 17, 2026
3c1799c
timer logic added
raghumanimehta Feb 21, 2026
b2ea7bf
tested path generation timeout logic
raghumanimehta Mar 2, 2026
df18ee5
Merge branch 'main' into raghumanimehta/807_conditions_to_switch_path…
raghumanimehta Mar 2, 2026
9980e93
big changes
raghumanimehta Mar 2, 2026
fa946ae
tests fixed
raghumanimehta Mar 2, 2026
84835d9
linting fix
raghumanimehta Mar 2, 2026
666ddc7
renamed function
raghumanimehta Mar 2, 2026
a6e536f
added a comment
raghumanimehta Mar 2, 2026
48094f0
added second comment
raghumanimehta Mar 2, 2026
a50c57a
test fixed
raghumanimehta Mar 2, 2026
24f08bc
docs update
raghumanimehta Mar 5, 2026
d292bc2
value error -> index error in calculate_desired_heading_and_wp_index
raghumanimehta Mar 5, 2026
bf0ad2d
Path in collision zone comment update
raghumanimehta Mar 5, 2026
67d902b
readibility addressed
raghumanimehta Mar 5, 2026
df3f06f
remved asserts
raghumanimehta Mar 5, 2026
ea320f8
added comment to remember to handle the error thrown
raghumanimehta Mar 5, 2026
29e5a9a
Merge branch 'main' into raghumanimehta/807_conditions_to_switch_path…
SPDonaghy Mar 5, 2026
a64bda7
removed MUST comment
raghumanimehta Mar 7, 2026
84d4bc5
updated docs
raghumanimehta Mar 7, 2026
e23c750
updated the local waypoint index saga
raghumanimehta Mar 7, 2026
7eaec33
Merge branch 'main' into raghumanimehta/807_conditions_to_switch_path…
raghumanimehta Mar 7, 2026
5db7d00
simplified logging
raghumanimehta Mar 7, 2026
c7b0dbf
docs linter fix
raghumanimehta Mar 8, 2026
96e6aba
comments
raghumanimehta Mar 17, 2026
90bd316
fixed in_collision_zone
raghumanimehta Mar 17, 2026
43c81d6
removed redundant code
raghumanimehta Mar 17, 2026
845c49d
Merge branch 'main' into raghumanimehta/807_conditions_to_switch_path…
raghumanimehta Mar 17, 2026
dd1886a
Merge branch 'main' into raghumanimehta/807_conditions_to_switch_path…
raghumanimehta Mar 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/custom_interfaces/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,29 @@ Update diagram by editing diagrams/src/external_interfaces.puml and the PlantUML
| HelperROT | HelperAISShip |
| HelperSpeed | GPS, HelperAISShip, WindSensor |

## Safety and Error Handling

### DesiredHeading Message - Stop Logic

The `DesiredHeading` message includes a `sail` boolean field that controls whether the boat should proceed or stop:

- **`sail = True`**: Boat should proceed with the specified heading (normal operation)
- **`sail = False`**: Boat should stop/not sail (pathfinding failure or safety condition)

#### When `sail` is set to `False`

The local pathfinding node sets `sail = False` when:

- OMPL path planning fails to find a valid solution
- An exception occurs during path retrieval (e.g., invalid state space)
- No safe path can be computed due to obstacles or constraints

When `sail = False`:

- The `heading.heading` field is set to `0.0`
- The `steering` field is set to `0`
- This signals a stop condition to the boat indicating pathfinding failure.

## Boat Simulator Interfaces

ROS messages and services used in our [boat simulator](https://github.com/UBCSailbot/sailbot_workspace/tree/main/src/boat_simulator).
Expand Down
219 changes: 128 additions & 91 deletions src/local_pathfinding/local_pathfinding/local_path.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
"""The path to the next global waypoint, represented by the LocalPath class."""

import math
from datetime import datetime, timedelta
from typing import List, Optional

import custom_interfaces.msg as ci
from rclpy.impl.rcutils_logger import RcutilsLogger
from shapely.geometry import LineString, MultiPolygon

import custom_interfaces.msg as ci
import local_pathfinding.coord_systems as cs
import local_pathfinding.wind_coord_systems as wcs
import local_pathfinding.obstacles as ob
import local_pathfinding.wind_coord_systems as wcs
from local_pathfinding.ompl_path import OMPLPath

WIND_SPEED_CHANGE_THRESH_PROP = 0.3
WIND_DIRECTION_CHANGE_THRESH_DEG = 10
LOCAL_WAYPOINT_REACHED_THRESH_KM = 0.5
HEADING_WEIGHT = 0.6
COST_WEIGHT = 0.4
PATH_TTL_SEC = timedelta(seconds=600)


class LocalPathState:
Expand All @@ -38,6 +40,7 @@ class LocalPathState:
planner (str): Planner to use for the OMPL query.
reference (ci.HelperLatLon): Lat and lon position of the next global waypoint.
obstacles (List[Obstacle]): All obstacles in the state space
path_generated_time (datetime): Time when the path was generated
"""

def __init__(
Expand All @@ -62,6 +65,7 @@ def __init__(

# obstacles are initialized by OMPLPath right before solving
self.obstacles: List[ob.Obstacle] = []
self.path_generated_time = datetime.now()

def update_state(
self, gps: ci.GPS, ais_ships: ci.AISShips, filtered_wind_sensor: ci.WindSensor
Expand Down Expand Up @@ -99,8 +103,9 @@ class LocalPath:
Attributes:
_logger (RcutilsLogger): ROS logger.
_ompl_path (Optional[OMPLPath]): Raw representation of the path from OMPL.
_prev_lp_wp_index (int): index of the local waypoint that Polaris has already traversed in
the path array
_target_lp_wp_index (int): 0-based array index of the local waypoint Polaris is
currently heading toward. This is initialized to 1 because OMPL path index 0 is the
start state near the boat, and index 1 is the first target waypoint.
path (Path): Collection of coordinates that form the local path to the next
global waypoint.
state (LocalPathState): the current local path state.
Expand All @@ -112,68 +117,86 @@ def __init__(self, parent_logger: RcutilsLogger):
self.path: Optional[ci.Path] = None
self.state: Optional[LocalPathState] = None

def is_path_expired(self) -> bool:
"""Check if the current path has exceeded the PATH_TTL timeout.

Returns:
bool: True if the path has expired, False otherwise.
"""
if self.state is None:
self._logger.debug("Path is expired, since the state is None")
return True
is_expired = datetime.now() >= (self.state.path_generated_time + PATH_TTL_SEC)
if is_expired:
self._logger.debug("Path is expired")
return is_expired

@staticmethod
def calculate_desired_heading_and_wp_index(
path: ci.Path, waypoint_index: int, boat_lat_lon: ci.HelperLatLon
path: ci.Path, target_lp_wp_index: int, boat_lat_lon: ci.HelperLatLon
):
"""Calculates the desired heading using GEODESIC. Updates the waypoint index (i.e. change
the waypoint) if the boat is close enough to the current waypoint.
the waypoint) if the boat is close enough to the current target waypoint.

Args:
path (ci.Path): Array of waypoints
prev_lp_wp_index (int): index of the local waypoint that Polaris has already traversed
in the path array
(i.e. the waypoint sailbot traversed, sailbot is heading towards waypoint_index + 1)
target_lp_wp_index (int): 0-based array index of the local waypoint Polaris is
currently heading toward. This should start at index 1 because index 0 is the
OMPL start waypoint near the boat.
boat_lat_lon (ci.HelperLatLon): boat coordinates

Returns:
_type_: _description_
tuple[float, int]: Desired heading in degrees and the updated local
waypoint index (target_lp_wp_index).
Raises:
ValueError: if the path is None
IndexError: If index out of bounds or path is None
"""
waypoint = path.waypoints[waypoint_index]
if path is None:
raise ValueError("Path is None")
if target_lp_wp_index < 1 or target_lp_wp_index >= len(path.waypoints):
raise IndexError("target_lp_wp_index must be in [1, len(path.waypoints) - 1]")

waypoint = path.waypoints[target_lp_wp_index]
desired_heading, _, distance_to_waypoint_m = cs.GEODESIC.inv(
boat_lat_lon.longitude, boat_lat_lon.latitude, waypoint.longitude, waypoint.latitude
)

if cs.meters_to_km(distance_to_waypoint_m) < LOCAL_WAYPOINT_REACHED_THRESH_KM:
# If we reached the current local waypoint, aim for the next one
waypoint_index += 1
waypoint = path.waypoints[waypoint_index]
# If we reached the target local waypoint, update to the next target waypoint
target_lp_wp_index += 1

if target_lp_wp_index >= len(path.waypoints):
raise IndexError("waypoint idx > len(path.waypoints). Must generate new path")

waypoint = path.waypoints[target_lp_wp_index]
desired_heading, _, distance_to_waypoint_m = cs.GEODESIC.inv(
boat_lat_lon.longitude,
boat_lat_lon.latitude,
waypoint.longitude,
waypoint.latitude,
)

return cs.bound_to_180(desired_heading), waypoint_index
return cs.bound_to_180(desired_heading), target_lp_wp_index

@staticmethod
def in_collision_zone(
local_wp_index: int,
reference_latlon: ci.HelperLatLon,
path: ci.Path,
obstacles: List[ob.Obstacle],
):
def in_collision_zone(self):
"""
Checks if the path is in a collision zone or not.
Checks if the path intersects a collision zone.

Args:
local_wp_index (int): Index of the current local waypoint in the path.
reference_latlon (ci.HelperLatLon): lat lon of the next global waypoint
path (ci.Path): Collection of waypoints forming the local path.
obstacles (List[Obstacle]): List of obstacles in the state space.
Uses the current `self.path`, `self.state.obstacles`, and
`self._target_lp_wp_index` to test path segments that have not yet been traversed.

Returns:
boolean: True if the path intersects a collision zone, False otherwise.
bool: True if the path intersects a collision zone, False otherwise.
"""
xy_path = list(
map(lambda lat_lon: (cs.latlon_to_xy(reference_latlon, lat_lon)), path.waypoints)
)
for i in range(local_wp_index, len(xy_path) - 1):
xy_path = [cs.latlon_to_xy(self.state.reference_latlon, wp) for wp in self.path.waypoints]
segment_start_index = max(self._target_lp_wp_index - 1, 0)
for i in range(segment_start_index, len(xy_path) - 1):
p1, p2 = xy_path[i], xy_path[i + 1]
segment = LineString([(p1.x, p1.y), (p2.x, p2.y)])
for o in obstacles:
for o in self.state.obstacles:
if segment.crosses(o.collision_zone) or segment.touches(o.collision_zone):
self._logger.debug("Path intersects with collision zone")
return True
return False

Expand Down Expand Up @@ -212,15 +235,45 @@ def is_significant_wind_change(
speed_change_ratio = abs(prev_tw_speed_kmph - current_tw_speed_kmph) / prev_tw_speed_kmph
dir_change = abs(cs.bound_to_180(current_tw_dir_deg - prev_tw_dir_deg))

return (speed_change_ratio >= WIND_SPEED_CHANGE_THRESH_PROP or
dir_change >= WIND_DIRECTION_CHANGE_THRESH_DEG)
return (
speed_change_ratio >= WIND_SPEED_CHANGE_THRESH_PROP
or dir_change >= WIND_DIRECTION_CHANGE_THRESH_DEG
)

def must_change_path(self, received_new_global_waypoint: bool) -> tuple[bool, str]:
"""Check if the path must be changed.

Returns:
tuple[bool, str]: (should_change_path, reason_description)
"""
if received_new_global_waypoint:
return True, "Received new global waypoint"
if self._ompl_path is None:
return True, "OMPL path is None"
if self.state is None:
return True, "State is None"
if self.path is None:
return True, "Path is None"
if self.in_collision_zone():
return True, "Path intersects collision zone"
if self.is_path_expired():
return True, "Path has expired (TTL exceeded)"
if self._target_lp_wp_index < 1:
return True, f"Target waypoint index too low: {self._target_lp_wp_index}"
if self._target_lp_wp_index >= len(self.path.waypoints):
return (
True,
f"Target waypoint index out of bounds: {self._target_lp_wp_index} >= {len(self.path.waypoints)}", # noqa
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should is_significant_wind_change() be added here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually, yes. This work was done before Jeremy had made the PR for that function. We will add it here in a separate PR.
Think of this PR as a fix of a bug with the waypoint latlon and implementation of a part of the reworked conditions to switch paths.


return False, "Path is valid, no change needed"

def update_if_needed(
self,
gps: ci.GPS,
ais_ships: ci.AISShips,
global_path: ci.Path,
prev_lp_wp_index: int,
target_lp_wp_index: int,
received_new_global_waypoint: bool,
target_global_waypoint: ci.HelperLatLon,
filtered_wind_sensor: ci.WindSensor,
Expand All @@ -242,9 +295,9 @@ def update_if_needed(
gps (ci.GPS): Current GPS position and heading data.
ais_ships (ci.AISShips): AIS data for nearby ships (obstacles).
global_path (ci.Path): The global path plan to the destination.
prev_lp_wp_index (int): Current index in the local waypoint list.
This is the index that the boat last traversed. The boat is heading towards
the index following prev_lp_wp_index
target_lp_wp_index (int): 0-based array index of the local waypoint Polaris is
currently heading toward. This starts at index 1 because OMPL index 0 is the
start state near the boat.
received_new_global_waypoint (bool): Flag indicating if a new global
waypoint was received.
target_global_waypoint (ci.HelperLatLon): Target waypoint from global path.
Expand All @@ -259,39 +312,9 @@ def update_if_needed(
- Updated waypoint index
The method decides whether to return the heading for new path or old path
"""
self._prev_lp_wp_index = prev_lp_wp_index
self._target_lp_wp_index = target_lp_wp_index
old_ompl_path = self._ompl_path

if (
(received_new_global_waypoint or old_ompl_path is None)
or (self.path is None)
or (self.state is None)
):
new_state = LocalPathState(
gps, ais_ships, global_path, target_global_waypoint, filtered_wind_sensor, planner
)
new_ompl_path = OMPLPath(
parent_logger=self._logger,
local_path_state=new_state,
land_multi_polygon=land_multi_polygon,
)
heading_new_path, wp_index = self.calculate_desired_heading_and_wp_index(
new_ompl_path.get_path(), 0, gps.lat_lon
)
if received_new_global_waypoint:
self._logger.debug("Updating local path because we have a new global waypoint")
else:
self._logger.debug("old path is None")
self.state = new_state
self._update(new_ompl_path)
return heading_new_path, wp_index
else:
self.state.update_state(gps, ais_ships, filtered_wind_sensor)

# create a new state with most up-to-date environment information. This is done to compare
# a new OMPL path based on this state with the OMPL path that we have been using. Since the
# paths are based on environment snapshots of when the path was created, this is important
# to ensure that we are not on a bad path.
new_state = LocalPathState(
gps, ais_ships, global_path, target_global_waypoint, filtered_wind_sensor, planner
)
Expand All @@ -300,30 +323,44 @@ def update_if_needed(
local_path_state=new_state,
land_multi_polygon=land_multi_polygon,
)
# TODO: handle the error thrown here in reworked update_if_needed
try:
heading_new_path, new_target_lp_wp_index = self.calculate_desired_heading_and_wp_index(
new_ompl_path.get_path(), 1, gps.lat_lon
)
except (ValueError, IndexError):
# TODO: handle this after merging Jai's PR that sets sail to False
heading_new_path, new_target_lp_wp_index = None, -1

heading_new_path, wp_index = self.calculate_desired_heading_and_wp_index(
new_ompl_path.get_path(), 0, gps.lat_lon
)
heading_old_path, updated_wp_index = self.calculate_desired_heading_and_wp_index(
old_ompl_path.get_path(), prev_lp_wp_index, gps.lat_lon
)

if self.in_collision_zone(
prev_lp_wp_index, self.state.reference_latlon, self.path, self.state.obstacles
):
self._logger.debug("old path is in collision zone")
must_change, reason = self.must_change_path(received_new_global_waypoint)
if must_change:
self._logger.debug(f"Updating local path: {reason}")
self.state = new_state
self._update(new_ompl_path)
return heading_new_path, wp_index
return heading_new_path, new_target_lp_wp_index
else:
self.state.update_state(gps, ais_ships, filtered_wind_sensor) # type: ignore

heading_diff_old_path = cs.calculate_heading_diff(self.state.heading, heading_old_path)
heading_diff_new_path = cs.calculate_heading_diff(self.state.heading, heading_new_path)
try:
heading_old_path, old_target_lp_wp_index = self.calculate_desired_heading_and_wp_index(
self._ompl_path.get_path(), target_lp_wp_index, gps.lat_lon # type: ignore
)
except (ValueError, IndexError):
return heading_new_path, new_target_lp_wp_index

old_prev_lp_wp_index = max(updated_wp_index - 1, 0)
new_prev_lp_wp_index = max(wp_index - 1, 0)
heading_diff_old_path = cs.calculate_heading_diff(
self.state.heading, # type: ignore
heading_old_path,
)
heading_diff_new_path = cs.calculate_heading_diff(
self.state.heading, # type: ignore
heading_new_path,
)

old_cost = old_ompl_path.get_remaining_cost(old_prev_lp_wp_index, gps.lat_lon)
new_cost = new_ompl_path.get_remaining_cost(new_prev_lp_wp_index, gps.lat_lon)
old_cost = old_ompl_path.get_remaining_cost( # type: ignore
old_target_lp_wp_index, gps.lat_lon
)
new_cost = new_ompl_path.get_remaining_cost(new_target_lp_wp_index, gps.lat_lon)
max_cost = max(old_cost, new_cost, 1)
old_cost_normalized = old_cost / max_cost
new_cost_normalized = new_cost / max_cost
Expand Down Expand Up @@ -353,10 +390,10 @@ def update_if_needed(
self._logger.debug("New path is cheaper, updating local path ")
self.state = new_state
self._update(new_ompl_path)
return heading_new_path, wp_index
return heading_new_path, new_target_lp_wp_index
else:
self._logger.debug("old path is cheaper, continuing on the same path")
return heading_old_path, updated_wp_index
return heading_old_path, old_target_lp_wp_index

def _update(self, ompl_path: OMPLPath):

Expand Down
Loading