Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 171 additions & 0 deletions PathPlanning/TimeBasedPathPlanning/SpaceTimeAStar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from moving_obstacles import Grid, Position
import heapq
from typing import Generator
import random
from __future__ import annotations

# Seed randomness for reproducibility
RANDOM_SEED = 50
random.seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)

class Node:

Check warning

Code scanning / CodeQL

Incomplete ordering Warning

Class Node implements
__lt__
, but does not implement __le__ or __gt__ or __ge__.
position: Position
time: int
heuristic: int
parent_index: int

def __init__(self, position: Position, time: int, heuristic: int, parent_index: int):
self.position = position
self.time = time
self.heuristic = heuristic
self.parent_index = parent_index

def __lt__(self, other: Node):
return (self.time + self.heuristic) < (other.time + other.heuristic)

def __repr__(self):
return f"Node(position={self.position}, time={self.time}, heuristic={self.heuristic}, parent_index={self.parent_index})"

class NodePath:
path: list[Node]

def __init__(self, path: list[Node]):
self.path = path

def get_position(self, time: int) -> Position:
# TODO: this is inefficient
for i in range(0, len(self.path) - 2):
if self.path[i + 1].time > time:
print(f"position @ {i} is {self.path[i].position}")
return self.path[i].position

if len(self.path) > 0:
return self.path[-1].position

return None

def goal_reached_time(self) -> int:
return self.path[-1].time

def __repr__(self):
repr_string = ""
for (i, node) in enumerate(self.path):
repr_string += f"{i}: {node}\n"
return repr_string

class TimeBasedAStar:
grid: Grid
start: Position
goal: Position

def __init__(self, grid: Grid, start: Position, goal: Position):
self.grid = grid
self.start = start
self.goal = goal

def plan(self, verbose: bool = False) -> NodePath:
open_set = []
heapq.heappush(open_set, Node(self.start, 0, self.calculate_heuristic(self.start), -1))

# TODO: is vec good here?
expanded_set = []
while open_set:
expanded_node: Node = heapq.heappop(open_set)
if verbose:
print("Expanded node:", expanded_node)

if expanded_node.time + 1 >= self.grid.time_limit:
if verbose:
print(f"\tSkipping node that is past time limit: {expanded_node}")
continue

if expanded_node.position == self.goal:
print(f"Found path to goal after {len(expanded_set)} expansions")
path = []
path_walker: Node = expanded_node
while path_walker.parent_index != -1:
path.append(path_walker)
path_walker = expanded_set[path_walker.parent_index]
# TODO: fix hack around bad while condiiotn
path.append(path_walker)

# reverse path so it goes start -> goal
path.reverse()
return NodePath(path)

expanded_idx = len(expanded_set)
expanded_set.append(expanded_node)

for child in self.generate_successors(expanded_node, expanded_idx, verbose):
heapq.heappush(open_set, child)

raise Exception("No path found")

def generate_successors(self, parent_node: Node, parent_node_idx: int, verbose: bool) -> Generator[Node, None, None]:
diffs = [Position(0, 1), Position(0, -1), Position(1, 0), Position(-1, 0), Position(0, 0)]
for diff in diffs:
new_pos = parent_node.position + diff
if self.grid.valid_position(new_pos, parent_node.time+1):
new_node = Node(new_pos, parent_node.time+1, self.calculate_heuristic(new_pos), parent_node_idx)
if verbose:
print("\tNew successor node: ", new_node)
yield new_node

def calculate_heuristic(self, position) -> int:
diff = self.goal - position
return abs(diff.x) + abs(diff.y)

import imageio.v2 as imageio
show_animation = True
def main():
start = Position(1, 1)
goal = Position(19, 19)
grid_side_length = 21
grid = Grid(np.array([grid_side_length, grid_side_length]), num_obstacles=40, obstacle_avoid_points=[start, goal])

planner = TimeBasedAStar(grid, start, goal)
verbose = False
path = planner.plan(verbose)

if verbose:
print(f"Path: {path}")

Check warning

Code scanning / CodeQL

Unreachable code Warning

This statement is unreachable.

if not show_animation:
return

fig = plt.figure(figsize=(10, 7))
ax = fig.add_subplot(autoscale_on=False, xlim=(0, grid.grid_size[0]-1), ylim=(0, grid.grid_size[1]-1))
ax.set_aspect('equal')
ax.grid()
ax.set_xticks(np.arange(0, grid_side_length, 1))
ax.set_yticks(np.arange(0, grid_side_length, 1))

start_and_goal, = ax.plot([], [], 'mD', ms=15, label="Start and Goal")
start_and_goal.set_data([start.x, goal.x], [start.y, goal.y])
obs_points, = ax.plot([], [], 'ro', ms=15, label="Obstacles")
path_points, = ax.plot([], [], 'bo', ms=10, label="Path Found")
ax.legend(bbox_to_anchor=(1.05, 1))

# for stopping simulation with the esc key.
plt.gcf().canvas.mpl_connect('key_release_event',
lambda event: [exit(
0) if event.key == 'escape' else None])

frames = []
for i in range(0, path.goal_reached_time()):
obs_positions = grid.get_obstacle_positions_at_time(i)
obs_points.set_data(obs_positions[0], obs_positions[1])
path_position = path.get_position(i)
path_points.set_data([path_position.x], [path_position.y])
plt.pause(0.2)
plt.savefig(f"frame_{i:03d}.png") # Save each frame as an image
frames.append(imageio.imread(f"frame_{i:03d}.png"))
imageio.mimsave("path_animation.gif", frames, fps=5) # Convert images to GIF
plt.show()

if __name__ == '__main__':
main()
224 changes: 224 additions & 0 deletions PathPlanning/TimeBasedPathPlanning/moving_obstacles.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
import numpy as np
import matplotlib.pyplot as plt
from enum import Enum
class Position:
x: int
y: int

def __init__(self, x: int, y: int):
self.x = x
self.y = y

def as_ndarray(self) -> np.ndarray[int, int]:
return np.array([self.x, self.y])

def __add__(self, other):
if isinstance(other, Position):
return Position(self.x + other.x, self.y + other.y)
raise NotImplementedError(f"Addition not supported for Position and {type(other)}")

def __sub__(self, other):
if isinstance(other, Position):
return Position(self.x - other.x, self.y - other.y)
raise NotImplementedError(f"Subtraction not supported for Position and {type(other)}")

def __eq__(self, other):
if isinstance(other, Position):
return self.x == other.x and self.y == other.y
return False

def __repr__(self):
return f"Position({self.x}, {self.y})"

class ObstacleArrangement(Enum):
# Random obstacle positions and movements
RANDOM = 0
# Obstacles start in a line in y at center of grid and move side-to-side in x
ARRANGEMENT1 = 1

class Grid():

# Set in constructor
grid_size = None
grid = None
obstacle_paths: list[list[Position]] = []
# Obstacles will never occupy these points. Useful to avoid impossible scenarios
obstacle_avoid_points = []

# Problem definition
# Number of time steps in the simulation
time_limit: int

# Logging control
verbose = False

def __init__(self, grid_size: np.ndarray[int, int], num_obstacles: int = 2, obstacle_avoid_points: list[Position] = [], obstacle_arrangement: ObstacleArrangement = ObstacleArrangement.RANDOM, time_limit: int = 100):
num_obstacles
self.obstacle_avoid_points = obstacle_avoid_points
self.time_limit = time_limit
self.grid_size = grid_size
self.grid = np.zeros((grid_size[0], grid_size[1], self.time_limit))

if num_obstacles > self.grid_size[0] * self.grid_size[1]:
raise Exception("Number of obstacles is greater than grid size!")

if obstacle_arrangement == ObstacleArrangement.RANDOM:
self.obstacle_paths = self.generate_dynamic_obstacles(num_obstacles)
elif obstacle_arrangement == ObstacleArrangement.ARRANGEMENT1:
self.obstacle_paths = self.obstacle_arrangement_1(num_obstacles)

for (i, path) in enumerate(self.obstacle_paths):
obs_idx = i + 1 # avoid using 0 - that indicates free space
for (t, position) in enumerate(path):
# Reserve old & new position at this time step
if t > 0:
self.grid[path[t-1].x, path[t-1].y, t] = obs_idx
self.grid[position.x, position.y, t] = obs_idx

def generate_dynamic_obstacles(self, obs_count: int) -> list[list[Position]]:
obstacle_paths = []
for _obs_idx in (0, obs_count):
# Sample until a free starting space is found
initial_position = self.sample_random_position()
while not self.valid_obstacle_position(initial_position, 0):
initial_position = self.sample_random_position()

positions = [initial_position]
if self.verbose:
print("Obstacle initial position: ", initial_position)

# Encourage obstacles to mostly stay in place - too much movement leads to chaotic planning scenarios
# that are not fun to watch
weights = [0.05, 0.05, 0.05, 0.05, 0.8]
diffs = [Position(0, 1), Position(0, -1), Position(1, 0), Position(-1, 0), Position(0, 0)]

for t in range(1, self.time_limit-1):
sampled_indices = np.random.choice(len(diffs), size=5, replace=False, p=weights)
rand_diffs = [diffs[i] for i in sampled_indices]

valid_position = None
for diff in rand_diffs:
new_position = positions[-1] + diff

if not self.valid_obstacle_position(new_position, t):
continue

valid_position = new_position
break

# Impossible situation for obstacle - stay in place
# -> this can happen if the oaths of other obstacles this one
if valid_position is None:
valid_position = positions[-1]

positions.append(valid_position)

obstacle_paths.append(positions)

return obstacle_paths

def obstacle_arrangement_1(self, obs_count: int) -> list[list[Position]]:
# bottom half of y values start left -> right
# top half of y values start right -> left
obstacle_paths = []
half_grid_x = self.grid_size[0] // 2
half_grid_y = self.grid_size[1] // 2

for y_idx in range(0, min(obs_count, self.grid_size[1] - 1)):
moving_right = y_idx < half_grid_y
position = Position(half_grid_x, y_idx)
path = [position]

for _t in range(1, self.time_limit-1):
# first check if we should switch direction (at edge of grid)
if (moving_right and position.x == self.grid_size[0] - 1) or (not moving_right and position.x == 0):
moving_right = not moving_right
# step in direction
position = Position(position.x + (1 if moving_right else -1), position.y)
path.append(position)

obstacle_paths.append(path)

return obstacle_paths

"""
Check if the given position is valid at time t

input:
position (np.ndarray[int, int]): (x, y) position
t (int): time step

output:
bool: True if position/time combination is valid, False otherwise
"""
def valid_position(self, position: Position, t: int) -> bool:

# Check if new position is in grid
if not self.inside_grid_bounds(position):
return False

# Check if new position is not occupied at time t
return self.grid[position.x, position.y, t] == 0

"""
Returns True if the given position is valid at time t and is not in the set of obstacle_avoid_points
"""
def valid_obstacle_position(self, position: Position, t: int) -> bool:
return self.valid_position(position, t) and position not in self.obstacle_avoid_points

"""
Returns True if the given position is within the grid's boundaries
"""
def inside_grid_bounds(self, position: Position) -> bool:
return position.x >= 0 and position.x < self.grid_size[0] and position.y >= 0 and position.y < self.grid_size[1]

"""
Sample a random position that is within the grid's boundaries

output:
np.ndarray[int, int]: (x, y) position
"""
def sample_random_position(self) -> Position:
return Position(np.random.randint(0, self.grid_size[0]), np.random.randint(0, self.grid_size[1]))

"""
Returns a tuple of (x_positions, y_positions) of the obstacles at time t
"""
def get_obstacle_positions_at_time(self, t: int) -> tuple[list[int], list[int]]:

x_positions = []
y_positions = []
for obs_path in self.obstacle_paths:
x_positions.append(obs_path[t].x)
y_positions.append(obs_path[t].y)
return (x_positions, y_positions)

show_animation = True

def main():
grid = Grid(np.array([11, 11]), num_obstacles=10, obstacle_arrangement=ObstacleArrangement.ARRANGEMENT1)

if not show_animation:
return

fig = plt.figure(figsize=(8, 7))
ax = fig.add_subplot(autoscale_on=False, xlim=(0, grid.grid_size[0]-1), ylim=(0, grid.grid_size[1]-1))
ax.set_aspect('equal')
ax.grid()
ax.set_xticks(np.arange(0, 11, 1))
ax.set_yticks(np.arange(0, 11, 1))
obs_points, = ax.plot([], [], 'ro', ms=15)

# for stopping simulation with the esc key.
plt.gcf().canvas.mpl_connect('key_release_event',
lambda event: [exit(
0) if event.key == 'escape' else None])

for i in range(0, grid.time_limit - 1):
obs_positions = grid.get_obstacle_positions_at_time(i)
obs_points.set_data(obs_positions[0], obs_positions[1])
plt.pause(0.2)
plt.show()

if __name__ == '__main__':
main()
Loading