|
| 1 | +#!/usr/bin/env python |
| 2 | + |
| 3 | +import heapq |
| 4 | +import math |
| 5 | + |
| 6 | +from ompl import base as ob |
| 7 | +from ompl import geometric as og |
| 8 | + |
| 9 | + |
| 10 | +class Node: |
| 11 | + """A node class for A* pathfinding""" |
| 12 | + |
| 13 | + def __init__(self, parent=None, position=None): |
| 14 | + self.parent = parent |
| 15 | + self.position = position |
| 16 | + self.g = 0 |
| 17 | + self.h = 0 |
| 18 | + self.f = 0 |
| 19 | + |
| 20 | + def __eq__(self, other): |
| 21 | + return ( |
| 22 | + self.position.getX() == other.position.getX() |
| 23 | + and self.position.getY() == other.position.getY() |
| 24 | + ) |
| 25 | + # return self.position[0] == other.position[0] and self.position[1] == other.position[1] |
| 26 | + |
| 27 | + def __str__(self): |
| 28 | + return ( |
| 29 | + str(self.position.getX()) |
| 30 | + + ", " |
| 31 | + + str(self.position.getY()) |
| 32 | + + ", " |
| 33 | + + str(self.position.getYaw() * 180 / math.pi) |
| 34 | + ) |
| 35 | + |
| 36 | + def __lt__(self, other): |
| 37 | + return self.f < other.f |
| 38 | + |
| 39 | + def __gt__(self, other): |
| 40 | + return self.f > other.f |
| 41 | + |
| 42 | + |
| 43 | +class Astar(ob.Planner): |
| 44 | + def __init__(self, si: ob.SpaceInformation): |
| 45 | + super(Astar, self).__init__(si, "Astar") |
| 46 | + self.states_ = [] |
| 47 | + self.sampler_ = si.allocStateSampler() |
| 48 | + |
| 49 | + def solve(self, ptc: ob.PlannerTerminationCondition): |
| 50 | + pdef = self.getProblemDefinition() # type: ob.ProblemDefinition |
| 51 | + goal = pdef.getGoal() # type: ob.GoalState |
| 52 | + si = self.getSpaceInformation() # type: ob.SpaceInformation |
| 53 | + pi = self.getPlannerInputStates() # type: ob.PlannerInputStates |
| 54 | + st = pi.nextStart() # type: ob.State |
| 55 | + while st: |
| 56 | + self.states_.append(st) |
| 57 | + st = pi.nextStart() |
| 58 | + solution = None |
| 59 | + approxsol = 0 |
| 60 | + # approxdif = 1e6 |
| 61 | + start_state = pdef.getStartState(0) |
| 62 | + goal_state = goal.getState() |
| 63 | + start_node = Node(None, start_state) |
| 64 | + start_node.g = start_state.h = start_node.f = 0 |
| 65 | + end_node = Node(None, goal_state) |
| 66 | + end_node.g = end_node.h = end_node.f = 0 |
| 67 | + |
| 68 | + open_list = [] |
| 69 | + closed_list = [] |
| 70 | + heapq.heapify(open_list) |
| 71 | + adjacent_squares = ( |
| 72 | + (1, 0, 0), |
| 73 | + (1, 1, 45), |
| 74 | + (0, 1, 90), |
| 75 | + (-1, 1, 135), |
| 76 | + (-1, 0, 0), |
| 77 | + (-1, -1, -135), |
| 78 | + (0, -1, -90), |
| 79 | + (1, -1, -45), |
| 80 | + ) |
| 81 | + |
| 82 | + heapq.heappush(open_list, start_node) |
| 83 | + while len(open_list) > 0 and not ptc(): |
| 84 | + current_node = heapq.heappop(open_list) |
| 85 | + if current_node == end_node: # if we hit the goal |
| 86 | + current = current_node |
| 87 | + path = [] |
| 88 | + while current is not None: |
| 89 | + path.append(current.position) |
| 90 | + current = current.parent |
| 91 | + for i in range(1, len(path)): |
| 92 | + self.states_.append(path[len(path) - i - 1]) |
| 93 | + solution = len(self.states_) |
| 94 | + break |
| 95 | + closed_list.append(current_node) |
| 96 | + |
| 97 | + children = [] |
| 98 | + for new_position in adjacent_squares: |
| 99 | + node_position = si.allocState() |
| 100 | + current_node_x = current_node.position.getX() |
| 101 | + current_node_y = current_node.position.getY() |
| 102 | + node_position.setXY( |
| 103 | + current_node_x + new_position[0], current_node_y + new_position[1] |
| 104 | + ) |
| 105 | + node_position.setYaw(new_position[2] * math.pi / 180) |
| 106 | + |
| 107 | + if not si.checkMotion(current_node.position, node_position): |
| 108 | + continue |
| 109 | + if not si.satisfiesBounds(node_position): |
| 110 | + continue |
| 111 | + new_node = Node(current_node, node_position) |
| 112 | + children.append(new_node) |
| 113 | + |
| 114 | + for child in children: |
| 115 | + if child in closed_list: |
| 116 | + continue |
| 117 | + if child.position.getYaw() % (math.pi / 2) == 0: |
| 118 | + child.g = current_node.g + 1 |
| 119 | + else: |
| 120 | + child.g = current_node.g + math.sqrt(2) |
| 121 | + child.h = goal.distanceGoal(child.position) |
| 122 | + child.f = child.g + child.h |
| 123 | + if len([i for i in open_list if child == i and child.g >= i.g]) > 0: |
| 124 | + continue |
| 125 | + heapq.heappush(open_list, child) |
| 126 | + |
| 127 | + solved = False |
| 128 | + approximate = False |
| 129 | + if not solution: |
| 130 | + solution = approxsol |
| 131 | + approximate = False |
| 132 | + if solution: |
| 133 | + path = og.PathGeometric(si) |
| 134 | + for s in self.states_[:solution]: |
| 135 | + path.append(s) |
| 136 | + pdef.addSolutionPath(path) |
| 137 | + solved = True |
| 138 | + return ob.PlannerStatus(solved, approximate) |
| 139 | + |
| 140 | + def clear(self): |
| 141 | + super(Astar, self).clear() |
| 142 | + self.states_ = [] |
0 commit comments