Skip to content

Commit 5f88530

Browse files
committed
cleaning up Grid + adding new obstacle arrangement
1 parent f6ce576 commit 5f88530

File tree

2 files changed

+123
-83
lines changed

2 files changed

+123
-83
lines changed

PathPlanning/TimeBasedPathPlanning/TimeBaseAStar.py renamed to PathPlanning/TimeBasedPathPlanning/SpaceTimeAStar.py

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@
55
import heapq
66
from typing import Generator
77
import random
8+
from __future__ import annotations
89

910
# Seed randomness for reproducibility
10-
RANDOM_SEED = 42
11+
RANDOM_SEED = 50
1112
random.seed(RANDOM_SEED)
1213
np.random.seed(RANDOM_SEED)
1314

@@ -23,7 +24,7 @@ def __init__(self, position: Position, time: int, heuristic: int, parent_index:
2324
self.heuristic = heuristic
2425
self.parent_index = parent_index
2526

26-
def __lt__(self, other):
27+
def __lt__(self, other: Node):
2728
return (self.time + self.heuristic) < (other.time + other.heuristic)
2829

2930
def __repr__(self):
@@ -118,12 +119,13 @@ def calculate_heuristic(self, position) -> int:
118119
diff = self.goal - position
119120
return abs(diff.x) + abs(diff.y)
120121

122+
import imageio.v2 as imageio
121123
show_animation = True
122124
def main():
123125
start = Position(1, 1)
124126
goal = Position(19, 19)
125127
grid_side_length = 21
126-
grid = Grid(np.array([grid_side_length, grid_side_length]), num_obstacles=115, obstacle_avoid_points=[start, goal])
128+
grid = Grid(np.array([grid_side_length, grid_side_length]), num_obstacles=40, obstacle_avoid_points=[start, goal])
127129

128130
planner = TimeBasedAStar(grid, start, goal)
129131
verbose = False
@@ -148,21 +150,21 @@ def main():
148150
path_points, = ax.plot([], [], 'bo', ms=10, label="Path Found")
149151
ax.legend(bbox_to_anchor=(1.05, 1))
150152

151-
def get_frame(i):
152-
obs_x_points = []
153-
obs_y_points = []
154-
for obs_path in grid.obstacle_paths:
155-
obs_pos = obs_path[i]
156-
obs_x_points.append(obs_pos.x)
157-
obs_y_points.append(obs_pos.y)
158-
obs_points.set_data(obs_x_points, obs_y_points)
153+
# for stopping simulation with the esc key.
154+
plt.gcf().canvas.mpl_connect('key_release_event',
155+
lambda event: [exit(
156+
0) if event.key == 'escape' else None])
159157

158+
frames = []
159+
for i in range(0, path.goal_reached_time()):
160+
obs_positions = grid.get_obstacle_positions_at_time(i)
161+
obs_points.set_data(obs_positions[0], obs_positions[1])
160162
path_position = path.get_position(i)
161163
path_points.set_data([path_position.x], [path_position.y])
162-
return start_and_goal, obs_points, path_points
163-
164-
_ani = animation.FuncAnimation(
165-
fig, get_frame, path.goal_reached_time(), interval=500, blit=True, repeat=False)
164+
plt.pause(0.2)
165+
plt.savefig(f"frame_{i:03d}.png") # Save each frame as an image
166+
frames.append(imageio.imread(f"frame_{i:03d}.png"))
167+
imageio.mimsave("path_animation.gif", frames, fps=5) # Convert images to GIF
166168
plt.show()
167169

168170
if __name__ == '__main__':

PathPlanning/TimeBasedPathPlanning/moving_obstacles.py

Lines changed: 106 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
import numpy as np
2-
import random
32
import matplotlib.pyplot as plt
4-
import matplotlib.animation as animation
5-
3+
from enum import Enum
64
class Position:
75
x: int
86
y: int
@@ -32,85 +30,116 @@ def __eq__(self, other):
3230
def __repr__(self):
3331
return f"Position({self.x}, {self.y})"
3432

33+
class ObstacleArrangement(Enum):
34+
# Random obstacle positions and movements
35+
RANDOM = 0
36+
# Obstacles start in a line in y at center of grid and move side-to-side in x
37+
ARRANGEMENT1 = 1
38+
3539
class Grid():
3640

3741
# Set in constructor
3842
grid_size = None
3943
grid = None
40-
obstacle_paths = []
44+
obstacle_paths: list[list[Position]] = []
4145
# Obstacles will never occupy these points. Useful to avoid impossible scenarios
4246
obstacle_avoid_points = []
4347

4448
# Problem definition
45-
time_limit = 100
46-
num_obstacles: int
49+
# Number of time steps in the simulation
50+
time_limit: int
4751

4852
# Logging control
4953
verbose = False
5054

51-
def __init__(self, grid_size: np.ndarray[int, int], num_obstacles: int = 2, obstacle_avoid_points: list[Position] = []):
52-
self.num_obstacles = num_obstacles
55+
def __init__(self, grid_size: np.ndarray[int, int], num_obstacles: int = 2, obstacle_avoid_points: list[Position] = [], obstacle_arrangement: ObstacleArrangement = ObstacleArrangement.RANDOM, time_limit: int = 100):
56+
num_obstacles
5357
self.obstacle_avoid_points = obstacle_avoid_points
58+
self.time_limit = time_limit
5459
self.grid_size = grid_size
5560
self.grid = np.zeros((grid_size[0], grid_size[1], self.time_limit))
5661

57-
if self.num_obstacles > self.grid_size[0] * self.grid_size[1]:
62+
if num_obstacles > self.grid_size[0] * self.grid_size[1]:
5863
raise Exception("Number of obstacles is greater than grid size!")
5964

60-
for i in range(self.num_obstacles):
61-
self.obstacle_paths.append(self.generate_dynamic_obstacle(i+1))
62-
63-
"""
64-
Generate a dynamic obstacle following a random trajectory, and reserve its path in `self.grid`
65-
66-
input:
67-
obs_idx (int): index of the obstacle. Used to reserve its path in `self.grid`
68-
69-
output:
70-
list[np.ndarray[int, int]]: list of positions of the obstacle at each time step
71-
"""
72-
def generate_dynamic_obstacle(self, obs_idx: int) -> list[Position]:
73-
74-
# Sample until a free starting space is found
75-
initial_position = self.sample_random_position()
76-
while not self.valid_obstacle_position(initial_position, 0):
65+
if obstacle_arrangement == ObstacleArrangement.RANDOM:
66+
self.obstacle_paths = self.generate_dynamic_obstacles(num_obstacles)
67+
elif obstacle_arrangement == ObstacleArrangement.ARRANGEMENT1:
68+
self.obstacle_paths = self.obstacle_arrangement_1(num_obstacles)
69+
70+
for (i, path) in enumerate(self.obstacle_paths):
71+
obs_idx = i + 1 # avoid using 0 - that indicates free space
72+
for (t, position) in enumerate(path):
73+
# Reserve old & new position at this time step
74+
if t > 0:
75+
self.grid[path[t-1].x, path[t-1].y, t] = obs_idx
76+
self.grid[position.x, position.y, t] = obs_idx
77+
78+
def generate_dynamic_obstacles(self, obs_count: int) -> list[list[Position]]:
79+
obstacle_paths = []
80+
for _obs_idx in (0, obs_count):
81+
# Sample until a free starting space is found
7782
initial_position = self.sample_random_position()
83+
while not self.valid_obstacle_position(initial_position, 0):
84+
initial_position = self.sample_random_position()
85+
86+
positions = [initial_position]
87+
if self.verbose:
88+
print("Obstacle initial position: ", initial_position)
7889

79-
positions = [initial_position]
80-
if self.verbose:
81-
print("Obstacle initial position: ", initial_position)
90+
# Encourage obstacles to mostly stay in place - too much movement leads to chaotic planning scenarios
91+
# that are not fun to watch
92+
weights = [0.05, 0.05, 0.05, 0.05, 0.8]
93+
diffs = [Position(0, 1), Position(0, -1), Position(1, 0), Position(-1, 0), Position(0, 0)]
8294

83-
# Encourage obstacles to mostly stay in place - too much movement leads to chaotic planning scenarios
84-
# that are not fun to watch
85-
weights = [0.05, 0.05, 0.05, 0.05, 0.8]
86-
diffs = [Position(0, 1), Position(0, -1), Position(1, 0), Position(-1, 0), Position(0, 0)]
95+
for t in range(1, self.time_limit-1):
96+
sampled_indices = np.random.choice(len(diffs), size=5, replace=False, p=weights)
97+
rand_diffs = [diffs[i] for i in sampled_indices]
8798

88-
for t in range(1, self.time_limit-1):
89-
sampled_indices = np.random.choice(len(diffs), size=5, replace=False, p=weights)
90-
rand_diffs = [diffs[i] for i in sampled_indices]
91-
# rand_diffs = random.sample(diffs, k=len(diffs))
99+
valid_position = None
100+
for diff in rand_diffs:
101+
new_position = positions[-1] + diff
92102

93-
valid_position = None
94-
for diff in rand_diffs:
95-
new_position = positions[-1] + diff
103+
if not self.valid_obstacle_position(new_position, t):
104+
continue
96105

97-
if not self.valid_obstacle_position(new_position, t):
98-
continue
106+
valid_position = new_position
107+
break
99108

100-
valid_position = new_position
101-
break
109+
# Impossible situation for obstacle - stay in place
110+
# -> this can happen if the oaths of other obstacles this one
111+
if valid_position is None:
112+
valid_position = positions[-1]
102113

103-
# Impossible situation for obstacle - stay in place
104-
# -> this can happen if another obstacle's path traps this one
105-
if valid_position is None:
106-
valid_position = positions[-1]
114+
positions.append(valid_position)
115+
116+
obstacle_paths.append(positions)
107117

108-
# Reserve old & new position at this time step
109-
self.grid[positions[-1].x, positions[-1].y, t] = obs_idx
110-
self.grid[valid_position.x, valid_position.y, t] = obs_idx
111-
positions.append(valid_position)
118+
return obstacle_paths
112119

113-
return positions
120+
def obstacle_arrangement_1(self, obs_count: int) -> list[list[Position]]:
121+
# bottom half of y values start left -> right
122+
# top half of y values start right -> left
123+
obstacle_paths = []
124+
half_grid_x = self.grid_size[0] // 2
125+
half_grid_y = self.grid_size[1] // 2
126+
127+
for y_idx in range(0, min(obs_count, self.grid_size[1] - 1)):
128+
moving_right = y_idx < half_grid_y
129+
position = Position(half_grid_x, y_idx)
130+
path = [position]
131+
132+
for _t in range(1, self.time_limit-1):
133+
# first check if we should switch direction (at edge of grid)
134+
if (moving_right and position.x == self.grid_size[0] - 1) or (not moving_right and position.x == 0):
135+
moving_right = not moving_right
136+
# step in direction
137+
position = Position(position.x + (1 if moving_right else -1), position.y)
138+
path.append(position)
139+
140+
obstacle_paths.append(path)
141+
142+
return obstacle_paths
114143

115144
"""
116145
Check if the given position is valid at time t
@@ -151,11 +180,23 @@ def inside_grid_bounds(self, position: Position) -> bool:
151180
"""
152181
def sample_random_position(self) -> Position:
153182
return Position(np.random.randint(0, self.grid_size[0]), np.random.randint(0, self.grid_size[1]))
183+
184+
"""
185+
Returns a tuple of (x_positions, y_positions) of the obstacles at time t
186+
"""
187+
def get_obstacle_positions_at_time(self, t: int) -> tuple[list[int], list[int]]:
188+
189+
x_positions = []
190+
y_positions = []
191+
for obs_path in self.obstacle_paths:
192+
x_positions.append(obs_path[t].x)
193+
y_positions.append(obs_path[t].y)
194+
return (x_positions, y_positions)
154195

155196
show_animation = True
156197

157198
def main():
158-
grid = Grid(np.array([11, 11]))
199+
grid = Grid(np.array([11, 11]), num_obstacles=10, obstacle_arrangement=ObstacleArrangement.ARRANGEMENT1)
159200

160201
if not show_animation:
161202
return
@@ -166,20 +207,17 @@ def main():
166207
ax.grid()
167208
ax.set_xticks(np.arange(0, 11, 1))
168209
ax.set_yticks(np.arange(0, 11, 1))
169-
points, = ax.plot([], [], 'ro', ms=15)
170-
171-
def get_frame(i):
172-
obs_x_points = []
173-
obs_y_points = []
174-
for obs_path in grid.obstacle_paths:
175-
obs_pos = obs_path[i]
176-
obs_x_points.append(obs_pos.x)
177-
obs_y_points.append(obs_pos.y)
178-
points.set_data(obs_x_points, obs_y_points)
179-
return points,
180-
181-
_ani = animation.FuncAnimation(
182-
fig, get_frame, grid.time_limit-1, interval=500, blit=True, repeat=False)
210+
obs_points, = ax.plot([], [], 'ro', ms=15)
211+
212+
# for stopping simulation with the esc key.
213+
plt.gcf().canvas.mpl_connect('key_release_event',
214+
lambda event: [exit(
215+
0) if event.key == 'escape' else None])
216+
217+
for i in range(0, grid.time_limit - 1):
218+
obs_positions = grid.get_obstacle_positions_at_time(i)
219+
obs_points.set_data(obs_positions[0], obs_positions[1])
220+
plt.pause(0.2)
183221
plt.show()
184222

185223
if __name__ == '__main__':

0 commit comments

Comments
 (0)