7
7
import matplotlib .pyplot as plt
8
8
from enum import Enum
9
9
from dataclasses import dataclass
10
-
11
- @dataclass (order = True )
12
- class Position :
13
- x : int
14
- y : int
15
-
16
- def as_ndarray (self ) -> np .ndarray :
17
- return np .array ([self .x , self .y ])
18
-
19
- def __add__ (self , other ):
20
- if isinstance (other , Position ):
21
- return Position (self .x + other .x , self .y + other .y )
22
- raise NotImplementedError (
23
- f"Addition not supported for Position and { type (other )} "
24
- )
25
-
26
- def __sub__ (self , other ):
27
- if isinstance (other , Position ):
28
- return Position (self .x - other .x , self .y - other .y )
29
- raise NotImplementedError (
30
- f"Subtraction not supported for Position and { type (other )} "
31
- )
32
-
33
- def __hash__ (self ):
34
- return hash ((self .x , self .y ))
10
+ from PathPlanning .TimeBasedPathPlanning .Node import NodePath , Position
35
11
36
12
@dataclass
37
13
class Interval :
@@ -43,6 +19,8 @@ class ObstacleArrangement(Enum):
43
19
RANDOM = 0
44
20
# Obstacles start in a line in y at center of grid and move side-to-side in x
45
21
ARRANGEMENT1 = 1
22
+ # Static obstacle arrangement
23
+ NARROW_CORRIDOR = 2
46
24
47
25
"""
48
26
Generates a 2d numpy array with lists for elements.
@@ -87,6 +65,8 @@ def __init__(
87
65
self .obstacle_paths = self .generate_dynamic_obstacles (num_obstacles )
88
66
elif obstacle_arrangement == ObstacleArrangement .ARRANGEMENT1 :
89
67
self .obstacle_paths = self .obstacle_arrangement_1 (num_obstacles )
68
+ elif obstacle_arrangement == ObstacleArrangement .NARROW_CORRIDOR :
69
+ self .obstacle_paths = self .generate_narrow_corridor_obstacles (num_obstacles )
90
70
91
71
for i , path in enumerate (self .obstacle_paths ):
92
72
obs_idx = i + 1 # avoid using 0 - that indicates free space in the grid
@@ -184,6 +164,26 @@ def obstacle_arrangement_1(self, obs_count: int) -> list[list[Position]]:
184
164
obstacle_paths .append (path )
185
165
186
166
return obstacle_paths
167
+
168
+ def generate_narrow_corridor_obstacles (self , obs_count : int ) -> list [list [Position ]]:
169
+ obstacle_paths = []
170
+
171
+ for y in range (0 , self .grid_size [1 ]):
172
+ if y > obs_count :
173
+ break
174
+
175
+ if y == self .grid_size [1 ] // 2 :
176
+ # Skip the middle row
177
+ continue
178
+
179
+ obstacle_path = []
180
+ x = self .grid_size [0 ] // 2 # middle of the grid
181
+ for t in range (0 , self .time_limit - 1 ):
182
+ obstacle_path .append (Position (x , y ))
183
+
184
+ obstacle_paths .append (obstacle_path )
185
+
186
+ return obstacle_paths
187
187
188
188
"""
189
189
Check if the given position is valid at time t
@@ -196,11 +196,11 @@ def obstacle_arrangement_1(self, obs_count: int) -> list[list[Position]]:
196
196
bool: True if position/time combination is valid, False otherwise
197
197
"""
198
198
def valid_position (self , position : Position , t : int ) -> bool :
199
- # Check if new position is in grid
199
+ # Check if position is in grid
200
200
if not self .inside_grid_bounds (position ):
201
201
return False
202
202
203
- # Check if new position is not occupied at time t
203
+ # Check if position is not occupied at time t
204
204
return self .reservation_matrix [position .x , position .y , t ] == 0
205
205
206
206
"""
@@ -289,9 +289,48 @@ def get_safe_intervals_at_cell(self, cell: Position) -> list[Interval]:
289
289
# both the time step when it is entering the cell, and the time step when it is leaving the cell.
290
290
intervals = [interval for interval in intervals if interval .start_time != interval .end_time ]
291
291
return intervals
292
+
293
+ """
294
+ Reserve an agent's path in the grid. Raises an exception if the agent's index is 0, or if a position is
295
+ already reserved by a different agent.
296
+ """
297
+ def reserve_path (self , node_path : NodePath , agent_index : int ):
298
+ if agent_index == 0 :
299
+ raise Exception ("Agent index cannot be 0" )
300
+
301
+ for i , node in enumerate (node_path .path ):
302
+ reservation_finish_time = node .time + 1
303
+ if i < len (node_path .path ) - 1 :
304
+ reservation_finish_time = node_path .path [i + 1 ].time
292
305
293
- show_animation = True
306
+ self .reserve_position (node .position , agent_index , Interval (node .time , reservation_finish_time ))
307
+
308
+ """
309
+ Reserve a position for the provided agent during the provided time interval.
310
+ Raises an exception if the agent's index is 0, or if the position is already reserved by a different agent during the interval.
311
+ """
312
+ def reserve_position (self , position : Position , agent_index : int , interval : Interval ):
313
+ if agent_index == 0 :
314
+ raise Exception ("Agent index cannot be 0" )
315
+
316
+ for t in range (interval .start_time , interval .end_time + 1 ):
317
+ current_reserver = self .reservation_matrix [position .x , position .y , t ]
318
+ if current_reserver not in [0 , agent_index ]:
319
+ raise Exception (
320
+ f"Agent { agent_index } tried to reserve a position already reserved by another agent: { position } at time { t } , reserved by { current_reserver } "
321
+ )
322
+ self .reservation_matrix [position .x , position .y , t ] = agent_index
323
+
324
+ """
325
+ Clears the initial reservation for an agent by clearing reservations at its start position with its index for
326
+ from time 0 to the time limit.
327
+ """
328
+ def clear_initial_reservation (self , position : Position , agent_index : int ):
329
+ for t in range (self .time_limit ):
330
+ if self .reservation_matrix [position .x , position .y , t ] == agent_index :
331
+ self .reservation_matrix [position .x , position .y , t ] = 0
294
332
333
+ show_animation = True
295
334
296
335
def main ():
297
336
grid = Grid (
0 commit comments