Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions src/custom_interfaces/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,39 @@ Update diagram by editing diagrams/src/external_interfaces.puml and the PlantUML
| HelperROT | HelperAISShip |
| HelperSpeed | GPS, HelperAISShip, WindSensor |

## Safety and Error Handling

### DesiredHeading Message - Stop Logic

The `DesiredHeading` message includes a `sail` boolean field that controls whether the boat should proceed or stop:

- **`sail = True`**: Boat should proceed with the specified heading (normal operation)
- **`sail = False`**: Boat should stop/not sail (pathfinding failure or safety condition)

#### When `sail` is set to `False`

The local pathfinding node sets `sail = False` when:

- OMPL path planning fails to find a valid solution
- An exception occurs during path retrieval (e.g., invalid state space)
- No safe path can be computed due to obstacles or constraints

When `sail = False`:

- The `heading.heading` field is set to `0.0`
- The `steering` field is set to `0`
- This signals a stop condition to the boat indicating pathfinding failure.

#### Error Handling Implementation

The pathfinding system uses try-except blocks to gracefully handle OMPL exceptions:

1. **`ompl_path.get_path()`**: Catches exceptions when retrieving waypoints from OMPL solution paths
2. **`local_path.update_if_needed()`**: Returns `(None, waypoint_index)` when path retrieval fails
3. **`node_navigate.desired_heading_callback()`**: Detects `None` heading and publishes `DesiredHeading` with `sail = False`

This ensures the boat receives an explicit stop command rather than crashing when pathfinding fails.

## Boat Simulator Interfaces

ROS messages and services used in our [boat simulator](https://github.com/UBCSailbot/sailbot_workspace/tree/main/src/boat_simulator).
Expand Down
1 change: 1 addition & 0 deletions src/custom_interfaces/msg/DesiredHeading.msg
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
HelperHeading heading
uint8 steering
bool sail
33 changes: 26 additions & 7 deletions src/local_pathfinding/local_pathfinding/local_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,17 @@ def update_if_needed(
)
old_ompl_path = self._ompl_path

heading_new_path, wp_index = self.calculate_desired_heading_and_waypoint_index(
ompl_path.get_path(), 0, gps.lat_lon
)
try:
new_path = ompl_path.get_path()
if new_path is None:
self._logger.error("New OMPL path is None")
return None, local_waypoint_index
heading_new_path, wp_index = self.calculate_desired_heading_and_waypoint_index(
new_path, 0, gps.lat_lon
)
except Exception as e:
self._logger.error(f"Failed to get new OMPL path: {e}")
return None, local_waypoint_index
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should perhaps return None for both.


if received_new_global_waypoint:
self._logger.debug("Updating local path because we have a new global waypoint")
Expand All @@ -227,9 +235,17 @@ def update_if_needed(
self._update(ompl_path)
return heading_new_path, wp_index

heading_old_path, updated_wp_index = self.calculate_desired_heading_and_waypoint_index(
old_ompl_path.get_path(), local_waypoint_index, gps.lat_lon
)
try:
old_path = old_ompl_path.get_path()
if old_path is None:
self._logger.error("Old OMPL path is None")
return None, local_waypoint_index
heading_old_path, updated_wp_index = self.calculate_desired_heading_and_waypoint_index(
old_path, local_waypoint_index, gps.lat_lon
)
except Exception as e:
self._logger.error(f"Failed to get old OMPL path: {e}")
return None, local_waypoint_index
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also you could perhaps abstract this to a function. DRY principle

# check if the current path goes through a collision zone.
# No need to check for new path since it's fresh and ompl doesn't generate path that
# go through a collision zone
Expand Down Expand Up @@ -286,4 +302,7 @@ def update_if_needed(
def _update(self, ompl_path: OMPLPath):

self._ompl_path = ompl_path
self.path = self._ompl_path.get_path()
local_path = self._ompl_path.get_path()
if local_path is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should update self.path still. You have logged the error but in the next cycle, the reference to self.path would have a valid path which is incorrect behaviour.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.path = local_path runs after the if block, since there is no return statement. So, won't it set the self.path to None and in this would trigger if old_ompl_path is None or self.path is None: check. where it would call self._update(omple_path)

self._logger.error("OMPL path is None, cannot update local path")
self.path = local_path
12 changes: 11 additions & 1 deletion src/local_pathfinding/local_pathfinding/node_navigate.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,17 @@ def desired_heading_callback(self):

desired_heading = self.get_desired_heading()
msg = ci.DesiredHeading()
msg.heading.heading = desired_heading

# If desired_heading is None, pathfinding failed - signal boat to not sail
if desired_heading is None:
self.get_logger().error("Pathfinding failed, signaling boat to stop")
msg.heading.heading = 0.0
msg.steering = 0
msg.sail = False
else:
msg.heading.heading = desired_heading
msg.sail = True

if self.desired_heading is None or desired_heading != self.desired_heading.heading.heading:
self.get_logger().info(f"Updating desired heading to: {msg.heading.heading:.2f}")

Expand Down
30 changes: 17 additions & 13 deletions src/local_pathfinding/local_pathfinding/ompl_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import os
import pickle
from typing import TYPE_CHECKING, Any, Union
from typing import TYPE_CHECKING, Any, Optional, Union

import custom_interfaces.msg as ci
from ompl import base
Expand Down Expand Up @@ -206,7 +206,7 @@ def get_cost(self, waypoint_index: int) -> float:
cost = solution_path.cost(obj)
return cost.value()

def get_path(self) -> ci.Path:
def get_path(self) -> Optional[ci.Path]:
"""Get the collection of waypoints for the boat to follow.

Returns:
Expand All @@ -219,19 +219,23 @@ def get_path(self) -> ci.Path:
self._logger.warning("Trying to get the waypoints of an unsolved OMPLPath")
return ci.Path()

solution_path = self._simple_setup.getSolutionPath()
waypoints = []

for state in solution_path.getStates():
waypoint_XY = cs.XY(state.getX(), state.getY())
waypoint_latlon = cs.xy_to_latlon(self.state.reference_latlon, waypoint_XY)
waypoints.append(
ci.HelperLatLon(
latitude=waypoint_latlon.latitude, longitude=waypoint_latlon.longitude
try:
solution_path = self._simple_setup.getSolutionPath()
waypoints = []

for state in solution_path.getStates():
waypoint_XY = cs.XY(state.getX(), state.getY())
waypoint_latlon = cs.xy_to_latlon(self.state.reference_latlon, waypoint_XY)
waypoints.append(
ci.HelperLatLon(
latitude=waypoint_latlon.latitude, longitude=waypoint_latlon.longitude
)
)
)

return ci.Path(waypoints=waypoints)
return ci.Path(waypoints=waypoints)
except Exception as e:
self._logger.error(f"Exception occurred while getting path from OMPL: {e}")
return None

def update_objectives(self):
"""Update the objectives on the basis of which the path is optimized.
Expand Down