Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 202 additions & 4 deletions src/local_pathfinding/local_pathfinding/local_path.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
"""The path to the next global waypoint, represented by the `LocalPath` class."""

import logging
from typing import List, Optional, Tuple

from custom_interfaces.msg import GPS, AISShips, HelperLatLon, Path, WindSensor
from rclpy.impl.rcutils_logger import RcutilsLogger
from shapely.geometry import LineString, Point

import local_pathfinding.coord_systems as cs
from local_pathfinding.obstacles import Boat
from local_pathfinding.ompl_path import OMPLPath


Expand Down Expand Up @@ -69,7 +73,12 @@ class LocalPath:

def __init__(self, parent_logger: RcutilsLogger):
"""Initializes the LocalPath class."""
self._logger = parent_logger.get_child(name="local_path")
# TODO: Remove these to make logger right
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use the ros logging? Pls remove the TODO comment either way

if parent_logger and hasattr(parent_logger, "get_child"):
self._logger = parent_logger.get_child(name="local_path")
else:
# Use the logger as-is or create a default logger if None is provided
self._logger = parent_logger or logging.getLogger("local_path")
self._ompl_path: Optional[OMPLPath] = None
self.waypoints: Optional[List[Tuple[float, float]]] = None

Expand All @@ -89,16 +98,205 @@ def update_if_needed(
`global_path` (Path): Path to the destination.
`filtered_wind_sensor` (WindSensor): Wind data.
"""
if self.check_wind_valid(filtered_wind_sensor=filtered_wind_sensor):
if not self.old_path_valid(gps, ais_ships, global_path):
ompl_path = self._solve(gps, ais_ships, global_path, filtered_wind_sensor, planner)
if ompl_path.solved:
self._logger.info("Updating local path")
self._update(ompl_path)
else:
self._logger.info("Continuing on old local path")

def _solve(self, gps, ais_ships, global_path, filtered_wind_sensor, planner):
state = LocalPathState(gps, ais_ships, global_path, filtered_wind_sensor, planner)
ompl_path = OMPLPath(
parent_logger=self._logger,
max_runtime=1.0,
local_path_state=state,
)
if ompl_path.solved:
self._logger.info("Updating local path")
self._update(ompl_path)
return ompl_path

def _update(self, ompl_path: OMPLPath):
self._ompl_path = ompl_path
self.waypoints = self._ompl_path.get_waypoints()

def check_wind_valid(self, filtered_wind_sensor: WindSensor) -> bool:
"""Checks if the wind speed is too low for sailing.

Args:
`filtered_wind_sensor` (WindSensor): Filtered wind sensor data.

Returns:
bool: False if wind speed is too low, True otherwise.
"""
if filtered_wind_sensor is None:
self._logger.warning("Filtered wind sensor is None. Not computing OMPL Path")
return False

if filtered_wind_sensor.speed.speed < 5:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets change this to a constant instead of just 5. Something like LOW_WIND_THRESH_KMPH

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add a LOW_WIND_SPEED constant at the top of the file.

# TODO Example threshold of 5.0 for low speed. Change accordingly
self._logger.warning(
f"Wind speed too low: {filtered_wind_sensor.speed.speed}. Not computing OMPL Path"
)
return False
return True

def old_path_valid(self, gps: GPS, ais_ships: AISShips, global_path: Path) -> bool:
"""Checks if the old path is still valid based on multiple conditions.

Args:
`gps` (GPS): GPS data.
`ais_ships` (AISShips): AIS ships data.
`global_path` (Path): Path to the destination.
`filtered_wind_sensor` (WindSensor): Wind data.

Returns:
bool: False if old path is invalid, True if old path is still valid.
"""
if self.waypoints is None:
return False
else:
# TODO Buffer is set to 2.0 km. Change accordingly
if (
self.old_path_in_collision_zone(gps, ais_ships)
or self.sailbot_drifted_from_old_path(gps, self.waypoints, buffer=2.0)
or self.global_path_changed(global_path, self.waypoints, buffer=2.0)
):
return False
return True

def old_path_in_collision_zone(self, gps: GPS, ais_ships: AISShips) -> bool:
"""
Checks if the old path is in a collision zone based on the provided AISShips data.

This method creates a collision zone for each AIS ship using the Boat class and checks if
the SailBot's old path, represented as a series of waypoints, intersects or is within any
of the collision zones

Args:
gps (GPS): GPS message containing the SailBot's current position, speed, and heading.
ais_ships (AISShips): AISShips message containing a list of detected ships.

Returns:
bool: True if the old path is in a collision zone False otherwise.
"""
self.sailbot_position_latlon = gps.lat_lon
self.sailbot_speed = gps.speed.speed
self.reference = gps.lat_lon
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use the LocalPathState class attributes rather than redefining them again here.


reference_latlon = HelperLatLon(
latitude=gps.lat_lon.latitude, longitude=gps.lat_lon.longitude
)

if len(self.waypoints) < 2: # type: ignore
single_waypoint = self.waypoints[0] # type: ignore
projected_point = cs.latlon_to_xy(
reference_latlon,
HelperLatLon(latitude=single_waypoint[0], longitude=single_waypoint[1]),
)
old_path_geom = Point(projected_point)
else:
projected_waypoints = [
cs.latlon_to_xy(reference_latlon, HelperLatLon(latitude=lat, longitude=lon))
for lat, lon in self.waypoints # type: ignore
]
old_path_geom = LineString(projected_waypoints)

for ais_ship in ais_ships.ships:
boat = Boat(
reference=reference_latlon,
sailbot_position=gps.lat_lon,
sailbot_speed=gps.speed.speed,
ais_ship=ais_ship,
)
boat.update_boat_collision_zone()

if old_path_geom.intersects(boat.collision_zone) or old_path_geom.within(
boat.collision_zone
):
self._logger.debug(
f"Old path is in collision zone with AIS ship ID: {ais_ship.id}"
)
return True

self._logger.debug("Old path is not in collision zone with any AIS ships.")
return False

def global_path_changed(
self,
global_path: Path,
waypoints: List[Tuple[float, float]],
buffer: float = 2.0,
) -> bool:
"""Checks if the global path has changed since the last update.

Args:
`global_path` (Path): Global path to the destination.
'waypoints' : Old path waypoints
`buffer` (float): Buffer in km (default is 2.0 km).

Returns:
bool: True if the global path has changed, False otherwise.
"""
# Get the last waypoint from self.waypoints
last_local_waypoint = HelperLatLon(latitude=waypoints[-1][0], longitude=waypoints[-1][1])

# Check if the last local waypoint is within the buffer of any global path waypoint
for global_waypoint in global_path.waypoints:
global_latlon = HelperLatLon(
latitude=global_waypoint.latitude, longitude=global_waypoint.longitude
)

_, _, distance_m = cs.GEODESIC.inv(
last_local_waypoint.longitude,
last_local_waypoint.latitude,
global_latlon.longitude,
global_latlon.latitude,
)
if distance_m <= buffer * 1000: # km coverted to m
return False

self._logger.warning(
f"None of the waypoints in global path within {buffer} km with local goal state"
)
return True

def sailbot_drifted_from_old_path(
self, gps: GPS, waypoints: List[Tuple[float, float]], buffer: float = 2.0
) -> bool:
"""Checks if the Sailbot has drifted buffer distance away from the old path.

Args:
'gps' (GPS): GPS data.
'waypoints' : Old path waypoints
'buffer' (float): Distance from the path in 'km'

Returns:
bool: True if Sailbot has drifted significantly, False otherwise.
"""

reference_latlon = HelperLatLon(latitude=waypoints[0][0], longitude=waypoints[0][1])

projected_waypoints = [
cs.latlon_to_xy(reference_latlon, HelperLatLon(latitude=lat, longitude=lon))
for lat, lon in waypoints
]

path_line = LineString([(pt.x, pt.y) for pt in projected_waypoints])

path_polygon = path_line.buffer(buffer)

projected_gps = cs.latlon_to_xy(
reference_latlon,
HelperLatLon(latitude=gps.lat_lon.latitude, longitude=gps.lat_lon.longitude),
)

gps_point = Point(projected_gps.x, projected_gps.y)

# Check if the Sailbot is outside the buffered path
if not path_polygon.contains(gps_point):
self._logger.warning(f"Sailbot has drifted from the old path: GPS {gps.lat_lon}")
return True

self._logger.debug("Sailbot is within the path polygon.")
return False
8 changes: 5 additions & 3 deletions src/local_pathfinding/local_pathfinding/node_navigate.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
"""The main node of the local_pathfinding package, represented by the `Sailbot` class."""

import custom_interfaces.msg as ci
import rclpy
from rclpy.node import Node

import custom_interfaces.msg as ci
from local_pathfinding.local_path import LocalPath


Expand Down Expand Up @@ -150,8 +150,10 @@ def desired_heading_callback(self):

def lpath_data_callback(self):
"""Get and publish the local path."""

current_local_path = ci.Path(waypoints=self.local_path.waypoints)
if self.local_path.waypoints is None:
current_local_path = ci.Path(waypoints=list())
else:
current_local_path = ci.Path(waypoints=self.local_path.waypoints)

msg = ci.LPathData(local_path=current_local_path)

Expand Down
Loading
Loading