11"""
22Safe interval path planner
3+ This script implements a safe-interval path planner for a 2d grid with dynamic obstacles. It is faster than
4+ SpaceTime A* because it reduces the number of redundant node expansions by pre-computing regions of adjacent
5+ time steps that are safe ("safe intervals") at each position. This allows the algorithm to skip expanding nodes
6+ that are in intervals that have already been visited earlier.
37
4- TODO: populate docstring
8+ Reference: https://www.cs.cmu.edu/~maxim/files/sipp_icra11.pdf
59"""
610
711import numpy as np
1115 Interval ,
1216 ObstacleArrangement ,
1317 Position ,
18+ empty_2d_array_of_lists ,
1419)
1520import heapq
1621import random
2530
2631@dataclass ()
2732# Note: Total_ordering is used instead of adding `order=True` to the @dataclass decorator because
28- # this class needs to override the __lt__ and __eq__ methods to ignore parent_index. Parent
29- # index is just used to track the path found by the algorithm, and has no effect on the quality
30- # of a node.
33+ # this class needs to override the __lt__ and __eq__ methods to ignore parent_index. The Parent
34+ # index and interval member variables are just used to track the path found by the algorithm,
35+ # and has no effect on the quality of a node.
3136@total_ordering
3237class Node :
3338 position : Position
@@ -43,16 +48,16 @@ class Node:
4348 def __lt__ (self , other : object ):
4449 if not isinstance (other , Node ):
4550 return NotImplementedError (f"Cannot compare Node with object of type: { type (other )} " )
46- # TODO: assumption that these two carry all the info needed for intervals. I think that makes sense but should think about it
4751 return (self .time + self .heuristic ) < (other .time + other .heuristic )
4852
4953 """
50- TODO - note about interval being included here
54+ Equality only cares about position and time. Heuristic and interval will always be the same for a given
55+ (position, time) pairing, so they are not considered in equality.
5156 """
5257 def __eq__ (self , other : object ):
5358 if not isinstance (other , Node ):
5459 return NotImplementedError (f"Cannot compare Node with object of type: { type (other )} " )
55- return self .position == other .position and self .time == other .time and self . interval == other . interval
60+ return self .position == other .position and self .time == other .time
5661
5762@dataclass
5863class EntryTimeAndInterval :
@@ -103,6 +108,12 @@ def __init__(self, grid: Grid, start: Position, goal: Position):
103108 self .start = start
104109 self .goal = goal
105110
111+ """
112+ Generate a plan given the loaded problem statement. Raises an exception if it fails to find a path.
113+
114+ Arguments:
115+ verbose (bool): set to True to print debug information
116+ """
106117 def plan (self , verbose : bool = False ) -> NodePath :
107118
108119 safe_intervals = self .grid .get_safe_intervals ()
@@ -114,11 +125,7 @@ def plan(self, verbose: bool = False) -> NodePath:
114125 )
115126
116127 expanded_list : list [Node ] = []
117- # TODO: copy pasta from Grid file
118- # 2d np array of lists of (entry time, interval tuples)
119- # TODO: use a dataclass for the tuple
120- visited_intervals = np .empty ((self .grid .grid_size [0 ], self .grid .grid_size [1 ]), dtype = object )
121- visited_intervals [:] = [[[] for _ in range (visited_intervals .shape [1 ])] for _ in range (visited_intervals .shape [0 ])]
128+ visited_intervals = empty_2d_array_of_lists (self .grid .grid_size [0 ], self .grid .grid_size [1 ])
122129 while open_set :
123130 expanded_node : Node = heapq .heappop (open_set )
124131 if verbose :
@@ -154,9 +161,8 @@ def plan(self, verbose: bool = False) -> NodePath:
154161 raise Exception ("No path found" )
155162
156163 """
157- Generate possible successors of the provided `parent_node`
164+ Generate list of possible successors of the provided `parent_node` that are worth expanding
158165 """
159- # TODO: is intervals being passed by ref? (i think so?)
160166 def generate_successors (
161167 self , parent_node : Node , parent_node_idx : int , intervals : np .ndarray , visited_intervals : np .ndarray
162168 ) -> list [Node ]:
@@ -177,16 +183,16 @@ def generate_successors(
177183
178184 new_cell_intervals : list [Interval ] = intervals [new_pos .x , new_pos .y ]
179185 for interval in new_cell_intervals :
180- # if interval ends before current starts, skip
181- if interval .end_time < current_interval .start_time :
182- continue
183-
184186 # if interval starts after current ends, break
185- # TODO: assumption here that intervals are sorted (they should be)
187+ # assumption: intervals are sorted by start time, so all future intervals will hit this condition as well
186188 if interval .start_time > current_interval .end_time :
187189 break
188190
189- # if we have already expanded a node in this interval with a <= starting time, continue
191+ # if interval ends before current starts, skip
192+ if interval .end_time < current_interval .start_time :
193+ continue
194+
195+ # if we have already expanded a node in this interval with a <= starting time, skip
190196 better_node_expanded = False
191197 for visited in visited_intervals [new_pos .x , new_pos .y ]:
192198 if interval == visited .interval and visited .entry_time <= parent_node .time + 1 :
@@ -195,14 +201,14 @@ def generate_successors(
195201 if better_node_expanded :
196202 continue
197203
198- # We know there is some overlap . Generate successor at the earliest possible time the
204+ # We know there is a node worth expanding . Generate successor at the earliest possible time the
199205 # new interval can be entered
200206 for possible_t in range (max (parent_node .time + 1 , interval .start_time ), min (current_interval .end_time , interval .end_time )):
201207 if self .grid .valid_position (new_pos , possible_t ):
202208 new_nodes .append (Node (
203209 new_pos ,
204- # entry is max of interval start and parent node start time (get there as soon as possible)
205- max (parent_node .time + 1 , interval . start_time ),
210+ # entry is max of interval start and parent node time + 1 (get there as soon as possible)
211+ max (interval . start_time , parent_node .time + 1 ),
206212 self .calculate_heuristic (new_pos ),
207213 parent_node_idx ,
208214 interval ,
@@ -212,11 +218,18 @@ def generate_successors(
212218
213219 return new_nodes
214220
221+ """
222+ Calculate the heuristic for a given position - Manhattan distance to the goal
223+ """
215224 def calculate_heuristic (self , position ) -> int :
216225 diff = self .goal - position
217226 return abs (diff .x ) + abs (diff .y )
218227
219228
229+ """
230+ Adds a new entry to the visited intervals array. If the entry is already present, the entry time is updated if the new
231+ entry time is better. Otherwise, the entry is added to `visited_intervals` at the position of `expanded_node`.
232+ """
220233def add_entry_to_visited_intervals_array (entry_time_and_interval : EntryTimeAndInterval , visited_intervals : np .ndarray , expanded_node : Node ):
221234 # if entry is present, update entry time if better
222235 for existing_entry_and_interval in visited_intervals [expanded_node .position .x , expanded_node .position .y ]:
0 commit comments