diff --git a/local_pathfinding/ompl_path.py b/local_pathfinding/ompl_path.py index 984e755..a652169 100644 --- a/local_pathfinding/ompl_path.py +++ b/local_pathfinding/ompl_path.py @@ -14,6 +14,8 @@ from ompl import util as ou from rclpy.impl.rcutils_logger import RcutilsLogger +from .planners import A_star + if TYPE_CHECKING: from local_pathfinding.local_path import LocalPathState @@ -143,7 +145,7 @@ def _init_simple_setup(self) -> og.SimpleSetup: # set the planner of the simple setup object # TODO: implement and add planner here - # simple_setup.setPlanner(planner) + simple_setup.setPlanner(A_star.Astar(simple_setup.getSpaceInformation())) return simple_setup diff --git a/local_pathfinding/planners/A_star.py b/local_pathfinding/planners/A_star.py new file mode 100644 index 0000000..9c8f76b --- /dev/null +++ b/local_pathfinding/planners/A_star.py @@ -0,0 +1,191 @@ +#!/usr/bin/env python + +import heapq +import math + +from ompl import base as ob +from ompl import geometric as og + + +class Node: + """A node class for A* pathfinding""" + + def __init__(self, parent=None, position=None): + """Initialize the Node class. + + Args: + parent (Node, optional): Parent Node. Defaults to None. + position (ob.State, optional): Position of the Node (x, y). Defaults to None. + """ + self.parent = parent + self.position = position + self.g = 0 + self.h = 0 + self.f = 0 + + def __eq__(self, other) -> bool: + """Check if two nodes are equal. + + Args: + other (Node): Node to compare to. + + Returns: + bool: True if equal, False otherwise. + """ + return ( + self.position.getX() == other.position.getX() + and self.position.getY() == other.position.getY() + ) + # return self.position[0] == other.position[0] and self.position[1] == other.position[1] + + def __str__(self) -> str: + """Return a string representation of the Node. + + Returns: + str: String representation of the Node. + """ + return ( + str(self.position.getX()) + + ", " + + str(self.position.getY()) + + ", " + + str(self.position.getYaw() * 180 / math.pi) + ) + + def __lt__(self, other) -> bool: + """Check if the current node is less than another node. + + Args: + other (Node): Node to compare to. + + Returns: + bool: True if less than, False otherwise. + """ + return self.f < other.f + + def __gt__(self, other) -> bool: + """Check if the current node is greater than another node. + + Args: + other (Node): Node to compare to. + + Returns: + bool: True if greater than, False otherwise. + """ + return self.f > other.f + + +class Astar(ob.Planner): + def __init__(self, si: ob.SpaceInformation): + """Initialize the Astar class. + + Args: + si (ob.SpaceInformation): Space information. + """ + super(Astar, self).__init__(si, "Astar") + self.states_ = [] + self.sampler_ = si.allocStateSampler() + + def solve(self, ptc: ob.PlannerTerminationCondition) -> ob.PlannerStatus: + """Solve the Astar problem. + + Args: + ptc (ob.PlannerTerminationCondition): Planner termination condition. + + Returns: + ob.PlannerStatus: Planner status. + """ + pdef = self.getProblemDefinition() # type: ob.ProblemDefinition + goal = pdef.getGoal() # type: ob.GoalState + si = self.getSpaceInformation() # type: ob.SpaceInformation + pi = self.getPlannerInputStates() # type: ob.PlannerInputStates + st = pi.nextStart() # type: ob.State + while st: + self.states_.append(st) + st = pi.nextStart() + solution = None + approxsol = 0 + # approxdif = 1e6 + start_state = pdef.getStartState(0) + goal_state = goal.getState() + start_node = Node(None, start_state) + start_node.g = start_state.h = start_node.f = 0 + end_node = Node(None, goal_state) + end_node.g = end_node.h = end_node.f = 0 + + open_list = [] + closed_list = [] + heapq.heapify(open_list) + adjacent_squares = ( + (1, 0, 0), + (1, 1, 45), + (0, 1, 90), + (-1, 1, 135), + (-1, 0, 0), + (-1, -1, -135), + (0, -1, -90), + (1, -1, -45), + ) + + heapq.heappush(open_list, start_node) + while len(open_list) > 0 and not ptc(): + current_node = heapq.heappop(open_list) + if current_node == end_node: # if we hit the goal + current = current_node + path = [] + while current is not None: + path.append(current.position) + current = current.parent + for i in range(1, len(path)): + self.states_.append(path[len(path) - i - 1]) + solution = len(self.states_) + break + closed_list.append(current_node) + + children = [] + for new_position in adjacent_squares: + node_position = si.allocState() + current_node_x = current_node.position.getX() + current_node_y = current_node.position.getY() + node_position.setXY( + current_node_x + new_position[0], current_node_y + new_position[1] + ) + node_position.setYaw(new_position[2] * math.pi / 180) + + if not si.checkMotion(current_node.position, node_position): + continue + if not si.satisfiesBounds(node_position): + continue + new_node = Node(current_node, node_position) + children.append(new_node) + + for child in children: + if child in closed_list: + continue + if child.position.getYaw() % (math.pi / 2) == 0: + child.g = current_node.g + 1 + else: + child.g = current_node.g + math.sqrt(2) + child.h = goal.distanceGoal(child.position) + child.f = child.g + child.h + if len([i for i in open_list if child == i and child.g >= i.g]) > 0: + continue + heapq.heappush(open_list, child) + + solved = False + approximate = False + if not solution: + solution = approxsol + approximate = False + if solution: + path = og.PathGeometric(si) + for s in self.states_[:solution]: + path.append(s) + pdef.addSolutionPath(path) + solved = True + return ob.PlannerStatus(solved, approximate) + + def clear(self): + """Clear the Astar problem.""" + super(Astar, self).clear() + self.states_ = [] diff --git a/local_pathfinding/planners/__init__.py b/local_pathfinding/planners/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/local_pathfinding/planners/potential_field.py b/local_pathfinding/planners/potential_field.py new file mode 100644 index 0000000..527ba1a --- /dev/null +++ b/local_pathfinding/planners/potential_field.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python + +from ompl import base as ob +from ompl import geometric as og + + +class PotentialFieldPlanner(ob.Planner): + """Potential Field Planner class.""" + + def __init__(self, si): + """Initialize the Potential Field Planner class. + + Args: + si (ob.SpaceInformation): OMPL Space Information object. + """ + super(PotentialFieldPlanner, self).__init(si, "PotentialFieldPlanner") + self.states_ = [] + + def solve(self, ptc) -> ob.PlannerStatus: + """Solve the potential field problem. + + Args: + ptc (ob.PlannerTerminationCondition): Planner termination condition. + + Returns: + ob.PlannerStatus: Planner status. + """ + # Get the problem definition and space information + pdef = self.getProblemDefinition() + si = self.getSpaceInformation() + goal = pdef.getGoal() + + # Get the initial and goal states + start = pdef.getStartState(0) + goal_state = goal.getState() + + current_state = start + + # Iterate and navigate the sailboat using the potential field + while not ptc(): + # Calculate the gradient of the potential field at the current state + gradient = calculate_gradient(current_state, goal_state) + + # Update the sailboat's position and heading based on the gradient + current_state = navigate_sailboat(current_state, gradient) + + # Add the current state to the list of states + self.states_.append(current_state) + + # Check if the sailboat has reached the goal + if si.distance(current_state, goal_state) < si.getGoal().getThreshold(): + break + + # Create a path from the list of states + path = og.PathGeometric(si) + for state in self.states_: + path.append(state) + + # Add the path to the problem definition + pdef.addSolutionPath(path) + + # Return the planner status + return ob.PlannerStatus(pdef.hasSolution(), False) + + def clear(self): + """Clear the potential field planner.""" + super(PotentialFieldPlanner, self).clear() + self.states_ = [] + + +def calculate_gradient(current_state, goal_state) -> tuple: + """Calculate the gradient of the potential field at the current state. + + Args: + current_state (ob.State): Current state. + goal_state (ob.State): Goal state. + + Returns: + tuple: Gradient vector as a tuple of floats. + """ + # Calculate the gradient of the potential field at the current state + # This function should return the gradient vector as a tuple of floats + + # Replace this with your implementation + return (0.0, 0.0) + + +def navigate_sailboat(current_state, gradient) -> ob.State: + """Update the sailboat's position and heading based on the gradient. + + Args: + current_state (ob.State): Current state. + gradient (tuple): Gradient vector as a tuple of floats. + """ + # Update the sailboat's position and heading based on the gradient + # This function should adjust the boat's sail configuration and heading + # to align with the gradient direction + + # Replace this with your implementation + return current_state