Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ def __sub__(self, other):
f"Subtraction not supported for Position and {type(other)}"
)

def __hash__(self):
return hash((self.x, self.y))


class ObstacleArrangement(Enum):
# Random obstacle positions and movements
Expand Down
51 changes: 36 additions & 15 deletions PathPlanning/TimeBasedPathPlanning/SpaceTimeAStar.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import random
from dataclasses import dataclass
from functools import total_ordering

import time

# Seed randomness for reproducibility
RANDOM_SEED = 50
Expand Down Expand Up @@ -48,11 +48,17 @@ def __lt__(self, other: object):
return NotImplementedError(f"Cannot compare Node with object of type: {type(other)}")
return (self.time + self.heuristic) < (other.time + other.heuristic)

"""
Note: cost and heuristic are not included in eq or hash, since they will always be the same
for a given (position, time) pair. Including either cost or heuristic would be redundant.
"""
def __eq__(self, other: object):
if not isinstance(other, Node):
return NotImplementedError(f"Cannot compare Node with object of type: {type(other)}")
return self.position == other.position and self.time == other.time

def __hash__(self):
return hash((self.position, self.time))

class NodePath:
path: list[Node]
Expand Down Expand Up @@ -86,6 +92,8 @@ class SpaceTimeAStar:
grid: Grid
start: Position
goal: Position
# Used to evaluate solutions
expanded_node_count: int = -1

def __init__(self, grid: Grid, start: Position, goal: Position):
self.grid = grid
Expand All @@ -98,7 +106,8 @@ def plan(self, verbose: bool = False) -> NodePath:
open_set, Node(self.start, 0, self.calculate_heuristic(self.start), -1)
)

expanded_set: list[Node] = []
expanded_list: list[Node] = []
expanded_set: set[Node] = set()
while open_set:
expanded_node: Node = heapq.heappop(open_set)
if verbose:
Expand All @@ -110,23 +119,25 @@ def plan(self, verbose: bool = False) -> NodePath:
continue

if expanded_node.position == self.goal:
print(f"Found path to goal after {len(expanded_set)} expansions")
print(f"Found path to goal after {len(expanded_list)} expansions")
path = []
path_walker: Node = expanded_node
while True:
path.append(path_walker)
if path_walker.parent_index == -1:
break
path_walker = expanded_set[path_walker.parent_index]
path_walker = expanded_list[path_walker.parent_index]

# reverse path so it goes start -> goal
path.reverse()
self.expanded_node_count = len(expanded_set)
return NodePath(path)

expanded_idx = len(expanded_set)
expanded_set.append(expanded_node)
expanded_idx = len(expanded_list)
expanded_list.append(expanded_node)
expanded_set.add(expanded_node)

for child in self.generate_successors(expanded_node, expanded_idx, verbose):
for child in self.generate_successors(expanded_node, expanded_idx, verbose, expanded_set):
heapq.heappush(open_set, child)

raise Exception("No path found")
Expand All @@ -135,7 +146,7 @@ def plan(self, verbose: bool = False) -> NodePath:
Generate possible successors of the provided `parent_node`
"""
def generate_successors(
self, parent_node: Node, parent_node_idx: int, verbose: bool
self, parent_node: Node, parent_node_idx: int, verbose: bool, expanded_set: set[Node]
) -> Generator[Node, None, None]:
diffs = [
Position(0, 0),
Expand All @@ -146,13 +157,17 @@ def generate_successors(
]
for diff in diffs:
new_pos = parent_node.position + diff
new_node = Node(
new_pos,
parent_node.time + 1,
self.calculate_heuristic(new_pos),
parent_node_idx,
)

if new_node in expanded_set:
continue

if self.grid.valid_position(new_pos, parent_node.time + 1):
new_node = Node(
new_pos,
parent_node.time + 1,
self.calculate_heuristic(new_pos),
parent_node_idx,
)
if verbose:
print("\tNew successor node: ", new_node)
yield new_node
Expand All @@ -166,9 +181,12 @@ def calculate_heuristic(self, position) -> int:
verbose = False

def main():
start = Position(1, 11)
start = Position(1, 5)
goal = Position(19, 19)
grid_side_length = 21

start_time = time.time()

grid = Grid(
np.array([grid_side_length, grid_side_length]),
num_obstacles=40,
Expand All @@ -179,6 +197,9 @@ def main():
planner = SpaceTimeAStar(grid, start, goal)
path = planner.plan(verbose)

runtime = time.time() - start_time
print(f"Planning took: {runtime:.5f} seconds")

if verbose:
print(f"Path: {path}")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,23 @@ Using a time-based cost and heuristic ensures the path found is optimal in terms
The cost is the amount of time it takes to reach a given node, and the heuristic is the minimum amount of time it could take to reach the goal from that node, disregarding all obstacles.
For a simple scenario where the robot can move 1 cell per time step and stop and go as it pleases, the heuristic for time is equivalent to the heuristic for distance.

One optimization that was added in `this PR <https://github.com/AtsushiSakai/PythonRobotics/pull/1183>`__ was to add an expanded set to the algorithm. The algorithm will not expand nodes that are already in that set. This greatly reduces the number of node expansions needed to find a path, since no duplicates are expanded. It also helps to reduce the amount of memory the algorithm uses.

Before::

Found path to goal after 204490 expansions
Planning took: 1.72464 seconds
Memory usage (RSS): 68.19 MB


After::

Found path to goal after 2348 expansions
Planning took: 0.01550 seconds
Memory usage (RSS): 64.85 MB

When starting at (1, 11) in the structured obstacle arrangement (second of the two gifs above).

References:
~~~~~~~~~~~

Expand Down
1 change: 1 addition & 0 deletions tests/test_space_time_astar.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def test_1():
# path should end at the goal
assert path.path[-1].position == goal

assert planner.expanded_node_count < 1000

if __name__ == "__main__":
conftest.run_this_test(__file__)