Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 169 additions & 0 deletions PathPlanning/TimeBasedPathPlanning/TimeBaseAStar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from moving_obstacles import Grid, Position
import heapq
from typing import Generator
import random

# Seed randomness for reproducibility
RANDOM_SEED = 42
random.seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)

class Node:
position: Position
time: int
heuristic: int
parent_index: int

def __init__(self, position: Position, time: int, heuristic: int, parent_index: int):
self.position = position
self.time = time
self.heuristic = heuristic
self.parent_index = parent_index

def __lt__(self, other):
return (self.time + self.heuristic) < (other.time + other.heuristic)

def __repr__(self):
return f"Node(position={self.position}, time={self.time}, heuristic={self.heuristic}, parent_index={self.parent_index})"

class NodePath:
path: list[Node]

def __init__(self, path: list[Node]):
self.path = path

def get_position(self, time: int) -> Position:
# TODO: this is inefficient
for i in range(0, len(self.path) - 2):
if self.path[i + 1].time > time:
print(f"position @ {i} is {self.path[i].position}")
return self.path[i].position

if len(self.path) > 0:
return self.path[-1].position

return None

def goal_reached_time(self) -> int:
return self.path[-1].time

def __repr__(self):
repr_string = ""
for (i, node) in enumerate(self.path):
repr_string += f"{i}: {node}\n"
return repr_string

class TimeBasedAStar:
grid: Grid
start: Position
goal: Position

def __init__(self, grid: Grid, start: Position, goal: Position):
self.grid = grid
self.start = start
self.goal = goal

def plan(self, verbose: bool = False) -> NodePath:
open_set = []
heapq.heappush(open_set, Node(self.start, 0, self.calculate_heuristic(self.start), -1))

# TODO: is vec good here?
expanded_set = []
while open_set:
expanded_node: Node = heapq.heappop(open_set)
if verbose:
print("Expanded node:", expanded_node)

if expanded_node.time + 1 >= self.grid.time_limit:
if verbose:
print(f"\tSkipping node that is past time limit: {expanded_node}")
continue

if expanded_node.position == self.goal:
print(f"Found path to goal after {len(expanded_set)} expansions")
path = []
path_walker: Node = expanded_node
while path_walker.parent_index != -1:
path.append(path_walker)
path_walker = expanded_set[path_walker.parent_index]
# TODO: fix hack around bad while condiiotn
path.append(path_walker)

# reverse path so it goes start -> goal
path.reverse()
return NodePath(path)

expanded_idx = len(expanded_set)
expanded_set.append(expanded_node)

for child in self.generate_successors(expanded_node, expanded_idx, verbose):
heapq.heappush(open_set, child)

raise Exception("No path found")

def generate_successors(self, parent_node: Node, parent_node_idx: int, verbose: bool) -> Generator[Node, None, None]:
diffs = [Position(0, 1), Position(0, -1), Position(1, 0), Position(-1, 0), Position(0, 0)]
for diff in diffs:
new_pos = parent_node.position + diff
if self.grid.valid_position(new_pos, parent_node.time+1):
new_node = Node(new_pos, parent_node.time+1, self.calculate_heuristic(new_pos), parent_node_idx)
if verbose:
print("\tNew successor node: ", new_node)
yield new_node

def calculate_heuristic(self, position) -> int:
diff = self.goal - position
return abs(diff.x) + abs(diff.y)

show_animation = True
def main():
start = Position(1, 1)
goal = Position(19, 19)
grid_side_length = 21
grid = Grid(np.array([grid_side_length, grid_side_length]), num_obstacles=115, obstacle_avoid_points=[start, goal])

planner = TimeBasedAStar(grid, start, goal)
verbose = False
path = planner.plan(verbose)

if verbose:
print(f"Path: {path}")

if not show_animation:
return

fig = plt.figure(figsize=(10, 7))
ax = fig.add_subplot(autoscale_on=False, xlim=(0, grid.grid_size[0]-1), ylim=(0, grid.grid_size[1]-1))
ax.set_aspect('equal')
ax.grid()
ax.set_xticks(np.arange(0, grid_side_length, 1))
ax.set_yticks(np.arange(0, grid_side_length, 1))

start_and_goal, = ax.plot([], [], 'mD', ms=15, label="Start and Goal")
start_and_goal.set_data([start.x, goal.x], [start.y, goal.y])
obs_points, = ax.plot([], [], 'ro', ms=15, label="Obstacles")
path_points, = ax.plot([], [], 'bo', ms=10, label="Path Found")
ax.legend(bbox_to_anchor=(1.05, 1))

def get_frame(i):
obs_x_points = []
obs_y_points = []
for obs_path in grid.obstacle_paths:
obs_pos = obs_path[i]
obs_x_points.append(obs_pos.x)
obs_y_points.append(obs_pos.y)
obs_points.set_data(obs_x_points, obs_y_points)

path_position = path.get_position(i)
path_points.set_data([path_position.x], [path_position.y])
return start_and_goal, obs_points, path_points

_ani = animation.FuncAnimation(
fig, get_frame, path.goal_reached_time(), interval=500, blit=True, repeat=False)
plt.show()

if __name__ == '__main__':
main()
186 changes: 186 additions & 0 deletions PathPlanning/TimeBasedPathPlanning/moving_obstacles.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
import numpy as np
import random
import matplotlib.pyplot as plt
import matplotlib.animation as animation

class Position:
x: int
y: int

def __init__(self, x: int, y: int):
self.x = x
self.y = y

def as_ndarray(self) -> np.ndarray[int, int]:
return np.array([self.x, self.y])

def __add__(self, other):
if isinstance(other, Position):
return Position(self.x + other.x, self.y + other.y)
raise NotImplementedError(f"Addition not supported for Position and {type(other)}")

def __sub__(self, other):
if isinstance(other, Position):
return Position(self.x - other.x, self.y - other.y)
raise NotImplementedError(f"Subtraction not supported for Position and {type(other)}")

def __eq__(self, other):
if isinstance(other, Position):
return self.x == other.x and self.y == other.y
return False

def __repr__(self):
return f"Position({self.x}, {self.y})"

class Grid():

# Set in constructor
grid_size = None
grid = None
obstacle_paths = []
# Obstacles will never occupy these points. Useful to avoid impossible scenarios
obstacle_avoid_points = []

# Problem definition
time_limit = 100
num_obstacles: int

# Logging control
verbose = False

def __init__(self, grid_size: np.ndarray[int, int], num_obstacles: int = 2, obstacle_avoid_points: list[Position] = []):
self.num_obstacles = num_obstacles
self.obstacle_avoid_points = obstacle_avoid_points
self.grid_size = grid_size
self.grid = np.zeros((grid_size[0], grid_size[1], self.time_limit))

if self.num_obstacles > self.grid_size[0] * self.grid_size[1]:
raise Exception("Number of obstacles is greater than grid size!")

for i in range(self.num_obstacles):
self.obstacle_paths.append(self.generate_dynamic_obstacle(i+1))

"""
Generate a dynamic obstacle following a random trajectory, and reserve its path in `self.grid`

input:
obs_idx (int): index of the obstacle. Used to reserve its path in `self.grid`

output:
list[np.ndarray[int, int]]: list of positions of the obstacle at each time step
"""
def generate_dynamic_obstacle(self, obs_idx: int) -> list[Position]:

# Sample until a free starting space is found
initial_position = self.sample_random_position()
while not self.valid_obstacle_position(initial_position, 0):
initial_position = self.sample_random_position()

positions = [initial_position]
if self.verbose:
print("Obstacle initial position: ", initial_position)

# Encourage obstacles to mostly stay in place - too much movement leads to chaotic planning scenarios
# that are not fun to watch
weights = [0.05, 0.05, 0.05, 0.05, 0.8]
diffs = [Position(0, 1), Position(0, -1), Position(1, 0), Position(-1, 0), Position(0, 0)]

for t in range(1, self.time_limit-1):
sampled_indices = np.random.choice(len(diffs), size=5, replace=False, p=weights)
rand_diffs = [diffs[i] for i in sampled_indices]
# rand_diffs = random.sample(diffs, k=len(diffs))

valid_position = None
for diff in rand_diffs:
new_position = positions[-1] + diff

if not self.valid_obstacle_position(new_position, t):
continue

valid_position = new_position
break

# Impossible situation for obstacle - stay in place
# -> this can happen if another obstacle's path traps this one
if valid_position is None:
valid_position = positions[-1]

# Reserve old & new position at this time step
self.grid[positions[-1].x, positions[-1].y, t] = obs_idx
self.grid[valid_position.x, valid_position.y, t] = obs_idx
positions.append(valid_position)

return positions

"""
Check if the given position is valid at time t

input:
position (np.ndarray[int, int]): (x, y) position
t (int): time step

output:
bool: True if position/time combination is valid, False otherwise
"""
def valid_position(self, position: Position, t: int) -> bool:

# Check if new position is in grid
if not self.inside_grid_bounds(position):
return False

# Check if new position is not occupied at time t
return self.grid[position.x, position.y, t] == 0

"""
Returns True if the given position is valid at time t and is not in the set of obstacle_avoid_points
"""
def valid_obstacle_position(self, position: Position, t: int) -> bool:
return self.valid_position(position, t) and position not in self.obstacle_avoid_points

"""
Returns True if the given position is within the grid's boundaries
"""
def inside_grid_bounds(self, position: Position) -> bool:
return position.x >= 0 and position.x < self.grid_size[0] and position.y >= 0 and position.y < self.grid_size[1]

"""
Sample a random position that is within the grid's boundaries

output:
np.ndarray[int, int]: (x, y) position
"""
def sample_random_position(self) -> Position:
return Position(np.random.randint(0, self.grid_size[0]), np.random.randint(0, self.grid_size[1]))

show_animation = True

def main():
grid = Grid(np.array([11, 11]))

if not show_animation:
return

fig = plt.figure(figsize=(8, 7))
ax = fig.add_subplot(autoscale_on=False, xlim=(0, grid.grid_size[0]-1), ylim=(0, grid.grid_size[1]-1))
ax.set_aspect('equal')
ax.grid()
ax.set_xticks(np.arange(0, 11, 1))
ax.set_yticks(np.arange(0, 11, 1))
points, = ax.plot([], [], 'ro', ms=15)

def get_frame(i):
obs_x_points = []
obs_y_points = []
for obs_path in grid.obstacle_paths:
obs_pos = obs_path[i]
obs_x_points.append(obs_pos.x)
obs_y_points.append(obs_pos.y)
points.set_data(obs_x_points, obs_y_points)
return points,

_ani = animation.FuncAnimation(
fig, get_frame, grid.time_limit-1, interval=500, blit=True, repeat=False)
plt.show()

if __name__ == '__main__':
main()
Loading