diff --git a/PathPlanning/TimeBasedPathPlanning/GridWithDynamicObstacles.py b/PathPlanning/TimeBasedPathPlanning/GridWithDynamicObstacles.py new file mode 100644 index 0000000000..7b0190d023 --- /dev/null +++ b/PathPlanning/TimeBasedPathPlanning/GridWithDynamicObstacles.py @@ -0,0 +1,273 @@ +""" +This file implements a grid with a 3d reservation matrix with dimensions for x, y, and time. There +is also infrastructure to generate dynamic obstacles that move around the grid. The obstacles' paths +are stored in the reservation matrix on creation. +""" +import numpy as np +import matplotlib.pyplot as plt +from enum import Enum +from dataclasses import dataclass + +@dataclass(order=True) +class Position: + x: int + y: int + + def as_ndarray(self) -> np.ndarray: + return np.array([self.x, self.y]) + + def __add__(self, other): + if isinstance(other, Position): + return Position(self.x + other.x, self.y + other.y) + raise NotImplementedError( + f"Addition not supported for Position and {type(other)}" + ) + + def __sub__(self, other): + if isinstance(other, Position): + return Position(self.x - other.x, self.y - other.y) + raise NotImplementedError( + f"Subtraction not supported for Position and {type(other)}" + ) + + +class ObstacleArrangement(Enum): + # Random obstacle positions and movements + RANDOM = 0 + # Obstacles start in a line in y at center of grid and move side-to-side in x + ARRANGEMENT1 = 1 + + +class Grid: + # Set in constructor + grid_size: np.ndarray + reservation_matrix: np.ndarray + obstacle_paths: list[list[Position]] = [] + # Obstacles will never occupy these points. Useful to avoid impossible scenarios + obstacle_avoid_points: list[Position] = [] + + # Number of time steps in the simulation + time_limit: int + + # Logging control + verbose = False + + def __init__( + self, + grid_size: np.ndarray, + num_obstacles: int = 40, + obstacle_avoid_points: list[Position] = [], + obstacle_arrangement: ObstacleArrangement = ObstacleArrangement.RANDOM, + time_limit: int = 100, + ): + self.obstacle_avoid_points = obstacle_avoid_points + self.time_limit = time_limit + self.grid_size = grid_size + self.reservation_matrix = np.zeros((grid_size[0], grid_size[1], self.time_limit)) + + if num_obstacles > self.grid_size[0] * self.grid_size[1]: + raise Exception("Number of obstacles is greater than grid size!") + + if obstacle_arrangement == ObstacleArrangement.RANDOM: + self.obstacle_paths = self.generate_dynamic_obstacles(num_obstacles) + elif obstacle_arrangement == ObstacleArrangement.ARRANGEMENT1: + self.obstacle_paths = self.obstacle_arrangement_1(num_obstacles) + + for i, path in enumerate(self.obstacle_paths): + obs_idx = i + 1 # avoid using 0 - that indicates free space in the grid + for t, position in enumerate(path): + # Reserve old & new position at this time step + if t > 0: + self.reservation_matrix[path[t - 1].x, path[t - 1].y, t] = obs_idx + self.reservation_matrix[position.x, position.y, t] = obs_idx + + """ + Generate dynamic obstacles that move around the grid. Initial positions and movements are random + """ + def generate_dynamic_obstacles(self, obs_count: int) -> list[list[Position]]: + obstacle_paths = [] + for _ in (0, obs_count): + # Sample until a free starting space is found + initial_position = self.sample_random_position() + while not self.valid_obstacle_position(initial_position, 0): + initial_position = self.sample_random_position() + + positions = [initial_position] + if self.verbose: + print("Obstacle initial position: ", initial_position) + + # Encourage obstacles to mostly stay in place - too much movement leads to chaotic planning scenarios + # that are not fun to watch + weights = [0.05, 0.05, 0.05, 0.05, 0.8] + diffs = [ + Position(0, 1), + Position(0, -1), + Position(1, 0), + Position(-1, 0), + Position(0, 0), + ] + + for t in range(1, self.time_limit - 1): + sampled_indices = np.random.choice( + len(diffs), size=5, replace=False, p=weights + ) + rand_diffs = [diffs[i] for i in sampled_indices] + + valid_position = None + for diff in rand_diffs: + new_position = positions[-1] + diff + + if not self.valid_obstacle_position(new_position, t): + continue + + valid_position = new_position + break + + # Impossible situation for obstacle - stay in place + # -> this can happen if the oaths of other obstacles this one + if valid_position is None: + valid_position = positions[-1] + + positions.append(valid_position) + + obstacle_paths.append(positions) + + return obstacle_paths + + """ + Generate a line of obstacles in y at the center of the grid that move side-to-side in x + Bottom half start moving right, top half start moving left. If `obs_count` is less than the length of + the grid, only the first `obs_count` obstacles will be generated. + """ + def obstacle_arrangement_1(self, obs_count: int) -> list[list[Position]]: + obstacle_paths = [] + half_grid_x = self.grid_size[0] // 2 + half_grid_y = self.grid_size[1] // 2 + + for y_idx in range(0, min(obs_count, self.grid_size[1])): + moving_right = y_idx < half_grid_y + position = Position(half_grid_x, y_idx) + path = [position] + + for t in range(1, self.time_limit - 1): + # sit in place every other time step + if t % 2 == 0: + path.append(position) + continue + + # first check if we should switch direction (at edge of grid) + if (moving_right and position.x == self.grid_size[0] - 1) or ( + not moving_right and position.x == 0 + ): + moving_right = not moving_right + # step in direction + position = Position( + position.x + (1 if moving_right else -1), position.y + ) + path.append(position) + + obstacle_paths.append(path) + + return obstacle_paths + + """ + Check if the given position is valid at time t + + input: + position (Position): (x, y) position + t (int): time step + + output: + bool: True if position/time combination is valid, False otherwise + """ + def valid_position(self, position: Position, t: int) -> bool: + # Check if new position is in grid + if not self.inside_grid_bounds(position): + return False + + # Check if new position is not occupied at time t + return self.reservation_matrix[position.x, position.y, t] == 0 + + """ + Returns True if the given position is valid at time t and is not in the set of obstacle_avoid_points + """ + def valid_obstacle_position(self, position: Position, t: int) -> bool: + return ( + self.valid_position(position, t) + and position not in self.obstacle_avoid_points + ) + + """ + Returns True if the given position is within the grid's boundaries + """ + def inside_grid_bounds(self, position: Position) -> bool: + return ( + position.x >= 0 + and position.x < self.grid_size[0] + and position.y >= 0 + and position.y < self.grid_size[1] + ) + + """ + Sample a random position that is within the grid's boundaries + + output: + Position: (x, y) position + """ + def sample_random_position(self) -> Position: + return Position( + np.random.randint(0, self.grid_size[0]), + np.random.randint(0, self.grid_size[1]), + ) + + """ + Returns a tuple of (x_positions, y_positions) of the obstacles at time t + """ + def get_obstacle_positions_at_time(self, t: int) -> tuple[list[int], list[int]]: + x_positions = [] + y_positions = [] + for obs_path in self.obstacle_paths: + x_positions.append(obs_path[t].x) + y_positions.append(obs_path[t].y) + return (x_positions, y_positions) + + +show_animation = True + + +def main(): + grid = Grid( + np.array([11, 11]), + num_obstacles=10, + obstacle_arrangement=ObstacleArrangement.ARRANGEMENT1, + ) + + if not show_animation: + return + + fig = plt.figure(figsize=(8, 7)) + ax = fig.add_subplot( + autoscale_on=False, + xlim=(0, grid.grid_size[0] - 1), + ylim=(0, grid.grid_size[1] - 1), + ) + ax.set_aspect("equal") + ax.grid() + ax.set_xticks(np.arange(0, 11, 1)) + ax.set_yticks(np.arange(0, 11, 1)) + (obs_points,) = ax.plot([], [], "ro", ms=15) + + # for stopping simulation with the esc key. + plt.gcf().canvas.mpl_connect( + "key_release_event", lambda event: [exit(0) if event.key == "escape" else None] + ) + + for i in range(0, grid.time_limit - 1): + obs_positions = grid.get_obstacle_positions_at_time(i) + obs_points.set_data(obs_positions[0], obs_positions[1]) + plt.pause(0.2) + plt.show() + + +if __name__ == "__main__": + main() diff --git a/PathPlanning/TimeBasedPathPlanning/SpaceTimeAStar.py b/PathPlanning/TimeBasedPathPlanning/SpaceTimeAStar.py new file mode 100644 index 0000000000..3b3613d695 --- /dev/null +++ b/PathPlanning/TimeBasedPathPlanning/SpaceTimeAStar.py @@ -0,0 +1,220 @@ +""" +Space-time A* Algorithm + This script demonstrates the Space-time A* algorithm for path planning in a grid world with moving obstacles. + This algorithm is different from normal 2D A* in one key way - the cost (often notated as g(n)) is + the number of time steps it took to get to a given node, instead of the number of cells it has + traversed. This ensures the path is time-optimal, while respescting any dynamic obstacles in the environment. + + Reference: https://www.davidsilver.uk/wp-content/uploads/2020/03/coop-path-AIWisdom.pdf +""" + +import numpy as np +import matplotlib.pyplot as plt +from PathPlanning.TimeBasedPathPlanning.GridWithDynamicObstacles import ( + Grid, + ObstacleArrangement, + Position, +) +import heapq +from collections.abc import Generator +import random +from dataclasses import dataclass +from functools import total_ordering + + +# Seed randomness for reproducibility +RANDOM_SEED = 50 +random.seed(RANDOM_SEED) +np.random.seed(RANDOM_SEED) + +@dataclass() +# Note: Total_ordering is used instead of adding `order=True` to the @dataclass decorator because +# this class needs to override the __lt__ and __eq__ methods to ignore parent_index. Parent +# index is just used to track the path found by the algorithm, and has no effect on the quality +# of a node. +@total_ordering +class Node: + position: Position + time: int + heuristic: int + parent_index: int + + """ + This is what is used to drive node expansion. The node with the lowest value is expanded next. + This comparison prioritizes the node with the lowest cost-to-come (self.time) + cost-to-go (self.heuristic) + """ + def __lt__(self, other: object): + if not isinstance(other, Node): + return NotImplementedError(f"Cannot compare Node with object of type: {type(other)}") + return (self.time + self.heuristic) < (other.time + other.heuristic) + + def __eq__(self, other: object): + if not isinstance(other, Node): + return NotImplementedError(f"Cannot compare Node with object of type: {type(other)}") + return self.position == other.position and self.time == other.time + + +class NodePath: + path: list[Node] + positions_at_time: dict[int, Position] = {} + + def __init__(self, path: list[Node]): + self.path = path + for node in path: + self.positions_at_time[node.time] = node.position + + """ + Get the position of the path at a given time + """ + def get_position(self, time: int) -> Position | None: + return self.positions_at_time.get(time) + + """ + Time stamp of the last node in the path + """ + def goal_reached_time(self) -> int: + return self.path[-1].time + + def __repr__(self): + repr_string = "" + for i, node in enumerate(self.path): + repr_string += f"{i}: {node}\n" + return repr_string + + +class SpaceTimeAStar: + grid: Grid + start: Position + goal: Position + + def __init__(self, grid: Grid, start: Position, goal: Position): + self.grid = grid + self.start = start + self.goal = goal + + def plan(self, verbose: bool = False) -> NodePath: + open_set: list[Node] = [] + heapq.heappush( + open_set, Node(self.start, 0, self.calculate_heuristic(self.start), -1) + ) + + expanded_set: list[Node] = [] + while open_set: + expanded_node: Node = heapq.heappop(open_set) + if verbose: + print("Expanded node:", expanded_node) + + if expanded_node.time + 1 >= self.grid.time_limit: + if verbose: + print(f"\tSkipping node that is past time limit: {expanded_node}") + continue + + if expanded_node.position == self.goal: + print(f"Found path to goal after {len(expanded_set)} expansions") + path = [] + path_walker: Node = expanded_node + while True: + path.append(path_walker) + if path_walker.parent_index == -1: + break + path_walker = expanded_set[path_walker.parent_index] + + # reverse path so it goes start -> goal + path.reverse() + return NodePath(path) + + expanded_idx = len(expanded_set) + expanded_set.append(expanded_node) + + for child in self.generate_successors(expanded_node, expanded_idx, verbose): + heapq.heappush(open_set, child) + + raise Exception("No path found") + + """ + Generate possible successors of the provided `parent_node` + """ + def generate_successors( + self, parent_node: Node, parent_node_idx: int, verbose: bool + ) -> Generator[Node, None, None]: + diffs = [ + Position(0, 0), + Position(1, 0), + Position(-1, 0), + Position(0, 1), + Position(0, -1), + ] + for diff in diffs: + new_pos = parent_node.position + diff + if self.grid.valid_position(new_pos, parent_node.time + 1): + new_node = Node( + new_pos, + parent_node.time + 1, + self.calculate_heuristic(new_pos), + parent_node_idx, + ) + if verbose: + print("\tNew successor node: ", new_node) + yield new_node + + def calculate_heuristic(self, position) -> int: + diff = self.goal - position + return abs(diff.x) + abs(diff.y) + + +show_animation = True +verbose = False + +def main(): + start = Position(1, 11) + goal = Position(19, 19) + grid_side_length = 21 + grid = Grid( + np.array([grid_side_length, grid_side_length]), + num_obstacles=40, + obstacle_avoid_points=[start, goal], + obstacle_arrangement=ObstacleArrangement.ARRANGEMENT1, + ) + + planner = SpaceTimeAStar(grid, start, goal) + path = planner.plan(verbose) + + if verbose: + print(f"Path: {path}") + + if not show_animation: + return + + fig = plt.figure(figsize=(10, 7)) + ax = fig.add_subplot( + autoscale_on=False, + xlim=(0, grid.grid_size[0] - 1), + ylim=(0, grid.grid_size[1] - 1), + ) + ax.set_aspect("equal") + ax.grid() + ax.set_xticks(np.arange(0, grid_side_length, 1)) + ax.set_yticks(np.arange(0, grid_side_length, 1)) + + (start_and_goal,) = ax.plot([], [], "mD", ms=15, label="Start and Goal") + start_and_goal.set_data([start.x, goal.x], [start.y, goal.y]) + (obs_points,) = ax.plot([], [], "ro", ms=15, label="Obstacles") + (path_points,) = ax.plot([], [], "bo", ms=10, label="Path Found") + ax.legend(bbox_to_anchor=(1.05, 1)) + + # for stopping simulation with the esc key. + plt.gcf().canvas.mpl_connect( + "key_release_event", lambda event: [exit(0) if event.key == "escape" else None] + ) + + for i in range(0, path.goal_reached_time()): + obs_positions = grid.get_obstacle_positions_at_time(i) + obs_points.set_data(obs_positions[0], obs_positions[1]) + path_position = path.get_position(i) + path_points.set_data([path_position.x], [path_position.y]) + plt.pause(0.2) + plt.show() + + +if __name__ == "__main__": + main() diff --git a/PathPlanning/TimeBasedPathPlanning/__init__.py b/PathPlanning/TimeBasedPathPlanning/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/docs/modules/5_path_planning/path_planning_main.rst b/docs/modules/5_path_planning/path_planning_main.rst index a0f9c30a3d..0c84a19c22 100644 --- a/docs/modules/5_path_planning/path_planning_main.rst +++ b/docs/modules/5_path_planning/path_planning_main.rst @@ -12,6 +12,7 @@ Path planning is the ability of a robot to search feasible and efficient path to dynamic_window_approach/dynamic_window_approach bugplanner/bugplanner grid_base_search/grid_base_search + time_based_grid_search/time_based_grid_search model_predictive_trajectory_generator/model_predictive_trajectory_generator state_lattice_planner/state_lattice_planner prm_planner/prm_planner diff --git a/docs/modules/5_path_planning/time_based_grid_search/time_based_grid_search_main.rst b/docs/modules/5_path_planning/time_based_grid_search/time_based_grid_search_main.rst new file mode 100644 index 0000000000..0c26badec7 --- /dev/null +++ b/docs/modules/5_path_planning/time_based_grid_search/time_based_grid_search_main.rst @@ -0,0 +1,22 @@ +Time based grid search +---------------------- + +Space-time astar +~~~~~~~~~~~~~~~~~~~~~~ + +This is an extension of A* algorithm that supports planning around dynamic obstacles. + +.. image:: https://github.com/AtsushiSakai/PythonRoboticsGifs/raw/master/PathPlanning/TimeBasedPathPlanning/SpaceTimeAStar/path_animation.gif + +.. image:: https://github.com/AtsushiSakai/PythonRoboticsGifs/raw/master/PathPlanning/TimeBasedPathPlanning/SpaceTimeAStar/path_animation2.gif + +The key difference of this algorithm compared to vanilla A* is that the cost and heuristic are now time-based instead of distance-based. +Using a time-based cost and heuristic ensures the path found is optimal in terms of time to reach the goal. + +The cost is the amount of time it takes to reach a given node, and the heuristic is the minimum amount of time it could take to reach the goal from that node, disregarding all obstacles. +For a simple scenario where the robot can move 1 cell per time step and stop and go as it pleases, the heuristic for time is equivalent to the heuristic for distance. + +References: +~~~~~~~~~~~ + +- `Cooperative Pathfinding `__ diff --git a/tests/test_space_time_astar.py b/tests/test_space_time_astar.py new file mode 100644 index 0000000000..5290738eb4 --- /dev/null +++ b/tests/test_space_time_astar.py @@ -0,0 +1,33 @@ +from PathPlanning.TimeBasedPathPlanning.GridWithDynamicObstacles import ( + Grid, + ObstacleArrangement, + Position, +) +from PathPlanning.TimeBasedPathPlanning import SpaceTimeAStar as m +import numpy as np +import conftest + + +def test_1(): + start = Position(1, 11) + goal = Position(19, 19) + grid_side_length = 21 + grid = Grid( + np.array([grid_side_length, grid_side_length]), + obstacle_arrangement=ObstacleArrangement.ARRANGEMENT1, + ) + + m.show_animation = False + planner = m.SpaceTimeAStar(grid, start, goal) + + path = planner.plan(False) + + # path should have 28 entries + assert len(path.path) == 31 + + # path should end at the goal + assert path.path[-1].position == goal + + +if __name__ == "__main__": + conftest.run_this_test(__file__)