-
Notifications
You must be signed in to change notification settings - Fork 3
Raghumanimehta/807-partial:Timeout implementation + cleanup of update_if_needed, LocalPath and NodeNavigate #839
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 20 commits
bb7ac62
3c1799c
b2ea7bf
df18ee5
9980e93
fa946ae
84835d9
666ddc7
a6e536f
48094f0
a50c57a
24f08bc
d292bc2
bf0ad2d
67d902b
df3f06f
ea320f8
29e5a9a
a64bda7
84d4bc5
e23c750
7eaec33
5db7d00
c7b0dbf
96e6aba
90bd316
43c81d6
845c49d
dd1886a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,22 +1,24 @@ | ||
| """The path to the next global waypoint, represented by the LocalPath class.""" | ||
|
|
||
| import math | ||
| from datetime import datetime, timedelta | ||
| from typing import List, Optional | ||
|
|
||
| import custom_interfaces.msg as ci | ||
| from rclpy.impl.rcutils_logger import RcutilsLogger | ||
| from shapely.geometry import LineString, MultiPolygon | ||
|
|
||
| import custom_interfaces.msg as ci | ||
| import local_pathfinding.coord_systems as cs | ||
| import local_pathfinding.wind_coord_systems as wcs | ||
| import local_pathfinding.obstacles as ob | ||
| import local_pathfinding.wind_coord_systems as wcs | ||
| from local_pathfinding.ompl_path import OMPLPath | ||
|
|
||
| WIND_SPEED_CHANGE_THRESH_PROP = 0.3 | ||
| WIND_DIRECTION_CHANGE_THRESH_DEG = 10 | ||
| LOCAL_WAYPOINT_REACHED_THRESH_KM = 0.5 | ||
| HEADING_WEIGHT = 0.6 | ||
| COST_WEIGHT = 0.4 | ||
| PATH_TTL_SEC = timedelta(seconds=600) | ||
|
|
||
|
|
||
| class LocalPathState: | ||
|
|
@@ -38,6 +40,7 @@ class LocalPathState: | |
| planner (str): Planner to use for the OMPL query. | ||
| reference (ci.HelperLatLon): Lat and lon position of the next global waypoint. | ||
| obstacles (List[Obstacle]): All obstacles in the state space | ||
| path_generated_time (datetime): Time when the path was generated | ||
| """ | ||
|
|
||
| def __init__( | ||
|
|
@@ -62,6 +65,7 @@ def __init__( | |
|
|
||
| # obstacles are initialized by OMPLPath right before solving | ||
| self.obstacles: List[ob.Obstacle] = [] | ||
| self.path_generated_time = datetime.now() | ||
|
|
||
| def update_state( | ||
| self, gps: ci.GPS, ais_ships: ci.AISShips, filtered_wind_sensor: ci.WindSensor | ||
|
|
@@ -99,8 +103,8 @@ class LocalPath: | |
| Attributes: | ||
| _logger (RcutilsLogger): ROS logger. | ||
| _ompl_path (Optional[OMPLPath]): Raw representation of the path from OMPL. | ||
| _prev_lp_wp_index (int): index of the local waypoint that Polaris has already traversed in | ||
| the path array | ||
| _prev_lp_wp_index (int): Index of the local waypoint that Polaris has already traversed. | ||
| Polaris heads toward the waypoint at index `_prev_lp_wp_index + 1`. | ||
| path (Path): Collection of coordinates that form the local path to the next | ||
| global waypoint. | ||
| state (LocalPathState): the current local path state. | ||
|
|
@@ -112,12 +116,26 @@ def __init__(self, parent_logger: RcutilsLogger): | |
| self.path: Optional[ci.Path] = None | ||
| self.state: Optional[LocalPathState] = None | ||
|
|
||
| def is_path_expired(self) -> bool: | ||
| """Check if the current path has exceeded the PATH_TTL timeout. | ||
|
|
||
| Returns: | ||
| bool: True if the path has expired, False otherwise. | ||
| """ | ||
| if self.state is None: | ||
| self._logger.debug("Path is expired, since the state is None") | ||
| return True | ||
| is_expired = datetime.now() >= (self.state.path_generated_time + PATH_TTL_SEC) | ||
| if is_expired: | ||
| self._logger.debug("Path is expired") | ||
| return is_expired | ||
|
|
||
| @staticmethod | ||
| def calculate_desired_heading_and_wp_index( | ||
| path: ci.Path, waypoint_index: int, boat_lat_lon: ci.HelperLatLon | ||
| path: ci.Path, prev_lp_wp_index: int, boat_lat_lon: ci.HelperLatLon | ||
| ): | ||
| """Calculates the desired heading using GEODESIC. Updates the waypoint index (i.e. change | ||
| the waypoint) if the boat is close enough to the current waypoint. | ||
| the waypoint) if the boat is close enough to the next waypoint. | ||
|
|
||
| Args: | ||
| path (ci.Path): Array of waypoints | ||
|
|
@@ -127,53 +145,54 @@ def calculate_desired_heading_and_wp_index( | |
| boat_lat_lon (ci.HelperLatLon): boat coordinates | ||
|
|
||
| Returns: | ||
| _type_: _description_ | ||
| tuple[float, int]: Desired heading in degrees and the updated local | ||
| waypoint index (prev_lp_wp_index). | ||
| Raises: | ||
| ValueError: if the path is None | ||
| IndexError: If index out of bounds or path is None | ||
| """ | ||
| waypoint = path.waypoints[waypoint_index] | ||
| if path is None: | ||
| raise ValueError("Path is None") | ||
|
|
||
| waypoint = path.waypoints[prev_lp_wp_index] | ||
| desired_heading, _, distance_to_waypoint_m = cs.GEODESIC.inv( | ||
| boat_lat_lon.longitude, boat_lat_lon.latitude, waypoint.longitude, waypoint.latitude | ||
| ) | ||
|
|
||
| if cs.meters_to_km(distance_to_waypoint_m) < LOCAL_WAYPOINT_REACHED_THRESH_KM: | ||
| # If we reached the current local waypoint, aim for the next one | ||
| waypoint_index += 1 | ||
| waypoint = path.waypoints[waypoint_index] | ||
| # If we reached the next local waypoint, update prev_lp_wp_index to the next waypoint | ||
| prev_lp_wp_index += 1 | ||
|
|
||
| if prev_lp_wp_index > len(path.waypoints): | ||
| raise IndexError("waypoint idx > len(path.waypoints). Must generate new path") | ||
|
|
||
| waypoint = path.waypoints[prev_lp_wp_index] | ||
| desired_heading, _, distance_to_waypoint_m = cs.GEODESIC.inv( | ||
| boat_lat_lon.longitude, | ||
| boat_lat_lon.latitude, | ||
| waypoint.longitude, | ||
| waypoint.latitude, | ||
| ) | ||
|
|
||
| return cs.bound_to_180(desired_heading), waypoint_index | ||
| return cs.bound_to_180(desired_heading), prev_lp_wp_index | ||
|
|
||
| @staticmethod | ||
| def in_collision_zone( | ||
| local_wp_index: int, | ||
| reference_latlon: ci.HelperLatLon, | ||
| path: ci.Path, | ||
| obstacles: List[ob.Obstacle], | ||
| ): | ||
| def in_collision_zone(self): | ||
| """ | ||
| Checks if the path is in a collision zone or not. | ||
| Checks if the path intersects a collision zone. | ||
|
|
||
| Args: | ||
| local_wp_index (int): Index of the current local waypoint in the path. | ||
| reference_latlon (ci.HelperLatLon): lat lon of the next global waypoint | ||
| path (ci.Path): Collection of waypoints forming the local path. | ||
| obstacles (List[Obstacle]): List of obstacles in the state space. | ||
| Uses the current `self.path`, `self.state.obstacles`, and | ||
| `self._prev_lp_wp_index` to test path segments that have not yet been traversed. | ||
|
|
||
| Returns: | ||
| boolean: True if the path intersects a collision zone, False otherwise. | ||
| bool: True if the path intersects a collision zone, False otherwise. | ||
| """ | ||
| xy_path = list( | ||
| map(lambda lat_lon: (cs.latlon_to_xy(reference_latlon, lat_lon)), path.waypoints) | ||
| ) | ||
| for i in range(local_wp_index, len(xy_path) - 1): | ||
| xy_path = [cs.latlon_to_xy(self.reference_latlon, wp) for wp in self.path.waypoints] | ||
raghumanimehta marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| for i in range(self._prev_lp_wp_index, len(xy_path) - 1): | ||
| p1, p2 = xy_path[i], xy_path[i + 1] | ||
| segment = LineString([(p1.x, p1.y), (p2.x, p2.y)]) | ||
| for o in obstacles: | ||
| for o in self.state.obstacles: | ||
| if segment.crosses(o.collision_zone) or segment.touches(o.collision_zone): | ||
| self._logger.debug("Path intersects with collision zone") | ||
| return True | ||
| return False | ||
|
|
||
|
|
@@ -212,8 +231,20 @@ def is_significant_wind_change( | |
| speed_change_ratio = abs(prev_tw_speed_kmph - current_tw_speed_kmph) / prev_tw_speed_kmph | ||
| dir_change = abs(cs.bound_to_180(current_tw_dir_deg - prev_tw_dir_deg)) | ||
|
|
||
| return (speed_change_ratio >= WIND_SPEED_CHANGE_THRESH_PROP or | ||
| dir_change >= WIND_DIRECTION_CHANGE_THRESH_DEG) | ||
| return ( | ||
| speed_change_ratio >= WIND_SPEED_CHANGE_THRESH_PROP | ||
| or dir_change >= WIND_DIRECTION_CHANGE_THRESH_DEG | ||
| ) | ||
|
|
||
| def must_change_path(self): | ||
| return ( | ||
| (self._ompl_path is None) | ||
| or (self.state is None) | ||
| or (self.path is None) | ||
| or (self.in_collision_zone()) | ||
| or (self.is_path_expired()) | ||
| or (self._prev_lp_wp_index > len(self.path.waypoints)) | ||
|
||
| ) | ||
|
|
||
| def update_if_needed( | ||
| self, | ||
|
|
@@ -242,9 +273,8 @@ def update_if_needed( | |
| gps (ci.GPS): Current GPS position and heading data. | ||
| ais_ships (ci.AISShips): AIS data for nearby ships (obstacles). | ||
| global_path (ci.Path): The global path plan to the destination. | ||
| prev_lp_wp_index (int): Current index in the local waypoint list. | ||
| This is the index that the boat last traversed. The boat is heading towards | ||
| the index following prev_lp_wp_index | ||
| prev_lp_wp_index (int): Index of the last traversed local waypoint. | ||
| The boat is heading towards the waypoint at index `prev_lp_wp_index + 1`. | ||
| received_new_global_waypoint (bool): Flag indicating if a new global | ||
| waypoint was received. | ||
| target_global_waypoint (ci.HelperLatLon): Target waypoint from global path. | ||
|
|
@@ -262,36 +292,6 @@ def update_if_needed( | |
| self._prev_lp_wp_index = prev_lp_wp_index | ||
| old_ompl_path = self._ompl_path | ||
|
|
||
| if ( | ||
| (received_new_global_waypoint or old_ompl_path is None) | ||
| or (self.path is None) | ||
| or (self.state is None) | ||
| ): | ||
| new_state = LocalPathState( | ||
| gps, ais_ships, global_path, target_global_waypoint, filtered_wind_sensor, planner | ||
| ) | ||
| new_ompl_path = OMPLPath( | ||
| parent_logger=self._logger, | ||
| local_path_state=new_state, | ||
| land_multi_polygon=land_multi_polygon, | ||
| ) | ||
| heading_new_path, wp_index = self.calculate_desired_heading_and_wp_index( | ||
| new_ompl_path.get_path(), 0, gps.lat_lon | ||
| ) | ||
| if received_new_global_waypoint: | ||
| self._logger.debug("Updating local path because we have a new global waypoint") | ||
| else: | ||
| self._logger.debug("old path is None") | ||
| self.state = new_state | ||
| self._update(new_ompl_path) | ||
| return heading_new_path, wp_index | ||
| else: | ||
| self.state.update_state(gps, ais_ships, filtered_wind_sensor) | ||
|
|
||
| # create a new state with most up-to-date environment information. This is done to compare | ||
| # a new OMPL path based on this state with the OMPL path that we have been using. Since the | ||
| # paths are based on environment snapshots of when the path was created, this is important | ||
| # to ensure that we are not on a bad path. | ||
| new_state = LocalPathState( | ||
| gps, ais_ships, global_path, target_global_waypoint, filtered_wind_sensor, planner | ||
| ) | ||
|
|
@@ -300,27 +300,43 @@ def update_if_needed( | |
| local_path_state=new_state, | ||
| land_multi_polygon=land_multi_polygon, | ||
| ) | ||
|
|
||
| heading_new_path, wp_index = self.calculate_desired_heading_and_wp_index( | ||
| # TODO: handle the error thrown here in reworked update_if_needed | ||
| heading_new_path, new_prev_lp_wp_index = self.calculate_desired_heading_and_wp_index( | ||
| new_ompl_path.get_path(), 0, gps.lat_lon | ||
| ) | ||
| heading_old_path, updated_wp_index = self.calculate_desired_heading_and_wp_index( | ||
| old_ompl_path.get_path(), prev_lp_wp_index, gps.lat_lon | ||
| ) | ||
|
|
||
| if self.in_collision_zone( | ||
| prev_lp_wp_index, self.state.reference_latlon, self.path, self.state.obstacles | ||
| ): | ||
| self._logger.debug("old path is in collision zone") | ||
| if self.must_change_path() or received_new_global_waypoint: | ||
| if received_new_global_waypoint: | ||
| self._logger.debug("Updating local path because we have a new global waypoint") | ||
| else: | ||
| self._logger.debug("old path is None") | ||
| self.state = new_state | ||
| self._update(new_ompl_path) | ||
| return heading_new_path, wp_index | ||
| return heading_new_path, new_prev_lp_wp_index | ||
| else: | ||
| self.state.update_state(gps, ais_ships, filtered_wind_sensor) # type: ignore | ||
|
|
||
| heading_diff_old_path = cs.calculate_heading_diff(self.state.heading, heading_old_path) | ||
| heading_diff_new_path = cs.calculate_heading_diff(self.state.heading, heading_new_path) | ||
| try: | ||
| heading_old_path, old_prev_lp_wp_index = self.calculate_desired_heading_and_wp_index( | ||
| self._ompl_path.get_path(), prev_lp_wp_index, gps.lat_lon # type: ignore | ||
| ) | ||
| except ValueError: | ||
| return heading_new_path, new_prev_lp_wp_index | ||
|
|
||
| old_prev_lp_wp_index = max(updated_wp_index - 1, 0) | ||
| new_prev_lp_wp_index = max(wp_index - 1, 0) | ||
| if self.is_path_expired(): | ||
|
||
| self._logger.debug("Updating local path because PATH_TTL has expired") | ||
| self._update(new_ompl_path) | ||
| return heading_new_path, new_prev_lp_wp_index | ||
|
|
||
| assert old_ompl_path is not None | ||
|
|
||
| heading_diff_old_path = cs.calculate_heading_diff( | ||
| self.state.heading, # type: ignore | ||
| heading_old_path, | ||
| ) | ||
| heading_diff_new_path = cs.calculate_heading_diff( | ||
| self.state.heading, # type: ignore | ||
| heading_new_path, | ||
| ) | ||
|
|
||
| old_cost = old_ompl_path.get_remaining_cost(old_prev_lp_wp_index, gps.lat_lon) | ||
| new_cost = new_ompl_path.get_remaining_cost(new_prev_lp_wp_index, gps.lat_lon) | ||
|
|
@@ -353,10 +369,10 @@ def update_if_needed( | |
| self._logger.debug("New path is cheaper, updating local path ") | ||
| self.state = new_state | ||
| self._update(new_ompl_path) | ||
| return heading_new_path, wp_index | ||
| return heading_new_path, new_prev_lp_wp_index | ||
| else: | ||
| self._logger.debug("old path is cheaper, continuing on the same path") | ||
| return heading_old_path, updated_wp_index | ||
| return heading_old_path, old_prev_lp_wp_index | ||
|
|
||
| def _update(self, ompl_path: OMPLPath): | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.