|
| 1 | +# This file is part of the CoverageControl library |
| 2 | +# |
| 3 | +# Author: Saurav Agarwal |
| 4 | + |
| 5 | +# Repository: https://github.com/KumarRobotics/CoverageControl |
| 6 | +# |
| 7 | +# Copyright (c) 2024, Saurav Agarwal |
| 8 | +# |
| 9 | +# The CoverageControl library is free software: you can redistribute it and/or |
| 10 | +# modify it under the terms of the GNU General Public License as published by |
| 11 | +# the Free Software Foundation, either version 3 of the License, or (at your |
| 12 | +# option) any later version. |
| 13 | +# |
| 14 | +# The CoverageControl library is distributed in the hope that it will be |
| 15 | +# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 16 | +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General |
| 17 | +# Public License for more details. |
| 18 | +# |
| 19 | +# You should have received a copy of the GNU General Public License along with |
| 20 | +# CoverageControl library. If not, see <https://www.gnu.org/licenses/>. |
| 21 | + |
| 22 | +## @file coverage_env_utils.py |
| 23 | +# @brief Utility functions for coverage environment |
| 24 | + |
| 25 | +import math |
| 26 | +import numpy |
| 27 | +import copy |
| 28 | +# import cv2 |
| 29 | +import torch |
| 30 | +import torchvision |
| 31 | +import torch_geometric |
| 32 | +from torch_geometric.data import Dataset |
| 33 | +from scipy.ndimage import gaussian_filter |
| 34 | +from scipy.spatial import distance_matrix |
| 35 | + |
| 36 | +from ...core import PointVector, DblVector, DblVectorVector |
| 37 | +from ...core import Parameters |
| 38 | +from ...core import CoverageSystem |
| 39 | + |
| 40 | +## @ingroup python_api |
| 41 | +class CoverageEnvUtils: |
| 42 | + """ |
| 43 | + Class for utility functions for coverage environment |
| 44 | + """ |
| 45 | + @staticmethod |
| 46 | + def to_tensor(data: object) -> torch.Tensor: |
| 47 | + """ |
| 48 | + Converts various types of data to torch.Tensor |
| 49 | +
|
| 50 | + Can accept the following types: |
| 51 | + - numpy.ndarray |
| 52 | + - PointVector |
| 53 | + - DblVectorVector |
| 54 | + - DblVector |
| 55 | +
|
| 56 | + Args: |
| 57 | + data: input data |
| 58 | +
|
| 59 | + Returns: |
| 60 | + torch.Tensor: converted data |
| 61 | +
|
| 62 | + Raises: |
| 63 | + ValueError: if data type is not supported |
| 64 | +
|
| 65 | + """ |
| 66 | + if isinstance(data, numpy.ndarray): |
| 67 | + return torch.from_numpy(numpy.copy(data.astype(numpy.float32))) |
| 68 | + elif isinstance(data, PointVector): |
| 69 | + data_tensor = torch.Tensor(len(data), 2) |
| 70 | + for i in range(len(data)): |
| 71 | + data_tensor[i] = CoverageEnvUtils.to_tensor(data[i]) |
| 72 | + return data_tensor |
| 73 | + elif isinstance(data, DblVectorVector): |
| 74 | + data_tensor = torch.Tensor(len(data)) |
| 75 | + for i in range(len(data)): |
| 76 | + data_tensor[i] = CoverageEnvUtils.to_tensor(data[i]) |
| 77 | + return data_tensor |
| 78 | + elif isinstance(data, DblVector): |
| 79 | + data_tensor = torch.Tensor(len(data)) |
| 80 | + for i in range(len(data)): |
| 81 | + data_tensor[i] = float(data[i]) |
| 82 | + return data_tensor |
| 83 | + else: |
| 84 | + raise ValueError('Unknown data type: {}'.format(type(data))) |
| 85 | + |
| 86 | + @staticmethod |
| 87 | + def get_raw_local_maps(env: CoverageSystem, params: Parameters) -> torch.Tensor: |
| 88 | + """ |
| 89 | + Get raw local maps |
| 90 | +
|
| 91 | + Args: |
| 92 | + env: coverage environment |
| 93 | + params: parameters |
| 94 | +
|
| 95 | + Returns: |
| 96 | + torch.Tensor: raw local maps |
| 97 | +
|
| 98 | + """ |
| 99 | + local_maps = torch.zeros((env.GetNumRobots(), params.pLocalMapSize, params.pLocalMapSize)) |
| 100 | + for r_idx in range(env.GetNumRobots()): |
| 101 | + local_maps[r_idx] = CoverageEnvUtils.to_tensor(env.GetRobotLocalMap(r_idx)) |
| 102 | + return local_maps |
| 103 | + |
| 104 | + @staticmethod |
| 105 | + def get_raw_obstacle_maps(env: CoverageSystem, params: Parameters) -> torch.Tensor: |
| 106 | + """ |
| 107 | + Get raw obstacle maps |
| 108 | +
|
| 109 | + Args: |
| 110 | + env: coverage environment |
| 111 | + params: parameters |
| 112 | +
|
| 113 | + Returns: |
| 114 | + torch.Tensor: raw obstacle maps |
| 115 | +
|
| 116 | + """ |
| 117 | + obstacle_maps = torch.zeros((env.GetNumRobots(), params.pLocalMapSize, params.pLocalMapSize)) |
| 118 | + for r_idx in range(env.GetNumRobots()): |
| 119 | + obstacle_maps[r_idx] = CoverageEnvUtils.to_tensor(env.GetRobotObstacleMap(r_idx)) |
| 120 | + return obstacle_maps |
| 121 | + |
| 122 | + @staticmethod |
| 123 | + def get_communication_maps(env: CoverageSystem, params: Parameters, map_size: int) -> torch.Tensor: |
| 124 | + """ |
| 125 | + Generate communication maps from positions |
| 126 | +
|
| 127 | + Communication maps are composed of two channels. |
| 128 | + Each channnel has non-zero values for cells that correspond to the relative positions of the neighbors. |
| 129 | + For the first channel, the value is the x-coordinate of the relative position divided by the communication range. |
| 130 | + Similarly, the y-coordinte is used for the second channel. |
| 131 | +
|
| 132 | + Args: |
| 133 | + env: coverage environment |
| 134 | + params: parameters |
| 135 | + map_size: size of the map |
| 136 | +
|
| 137 | + Returns: |
| 138 | + torch.Tensor: communication maps |
| 139 | + """ |
| 140 | + num_robots = env.GetNumRobots() |
| 141 | + |
| 142 | + comm_maps = torch.zeros((num_robots, 2, map_size, map_size)) |
| 143 | + for r_idx in range(num_robots): |
| 144 | + neighbors_pos = CoverageEnvUtils.to_tensor(env.GetRelativePositonsNeighbors(r_idx)) |
| 145 | + scaled_indices = torch.round(neighbors_pos * map_size / (params.pCommunicationRange * params.pResolution * 2.) + (map_size / 2. - params.pResolution / 2.)) |
| 146 | + # comm_range_mask = relative_dist[r_idx] < params.pCommunicationRange |
| 147 | + # scaled_indices = scaled_relative_pos[r_idx][comm_range_mask] |
| 148 | + indices = torch.transpose(scaled_indices, 1, 0) |
| 149 | + indices = indices.long() |
| 150 | + values = neighbors_pos / params.pCommunicationRange |
| 151 | + # values = values / params.pCommunicationRange |
| 152 | + # values = (values + params.pCommunicationRange) / (2. * params.pCommunicationRange) |
| 153 | + comm_maps[r_idx][0] = torch.sparse_coo_tensor(indices, values[:, 0], torch.Size([map_size, map_size])).to_dense() |
| 154 | + comm_maps[r_idx][1] = torch.sparse_coo_tensor(indices, values[:, 1], torch.Size([map_size, map_size])).to_dense() |
| 155 | + return comm_maps |
| 156 | + # positions = env.GetRobotPositions() |
| 157 | + # robot_positions = CoverageEnvUtils.to_tensor(env.GetRobotPositions()) |
| 158 | + # relative_pos = robot_positions.unsqueeze(0) - robot_positions.unsqueeze(1) |
| 159 | + # scaled_relative_pos = torch.round(relative_pos * map_size / (params.pCommunicationRange * params.pResolution * 2.) + (map_size / 2. - params.pResolution / 2.)) |
| 160 | + # relative_dist = relative_pos.norm(2, 2) |
| 161 | + # diagonal_mask = torch.eye(num_robots).to(torch.bool) |
| 162 | + # relative_dist.masked_fill_(diagonal_mask, params.pCommunicationRange + 1) |
| 163 | + |
| 164 | + @staticmethod |
| 165 | + def resize_maps(maps: torch.Tensor, resized_map_size: int) -> torch.Tensor: |
| 166 | + """ |
| 167 | + Resize maps to a given size |
| 168 | + Uses bilinear interpolation from torchvision.transforms.functional.resize |
| 169 | + Options: antialias=True |
| 170 | +
|
| 171 | + Args: |
| 172 | + maps: input maps |
| 173 | + resized_map_size: size of the resized maps |
| 174 | +
|
| 175 | + Returns: |
| 176 | + torch.Tensor: resized maps |
| 177 | +
|
| 178 | + """ |
| 179 | + shape = maps.shape |
| 180 | + maps = maps.view(-1, maps.shape[-2], maps.shape[-1]) |
| 181 | + maps = torchvision.transforms.functional.resize(maps, (resized_map_size, resized_map_size), interpolation=torchvision.transforms.InterpolationMode.BILINEAR, antialias=True) |
| 182 | + maps = maps.view(shape[:-2] + maps.shape[-2:]) |
| 183 | + return maps |
| 184 | + |
| 185 | + @staticmethod |
| 186 | + def get_maps(env: CoverageSystem, params: Parameters, resized_map_size: int, use_comm_map: bool) -> torch.Tensor: |
| 187 | + """ |
| 188 | + Get maps for the coverage environment |
| 189 | +
|
| 190 | + Args: |
| 191 | + env: coverage environment |
| 192 | + params: parameters |
| 193 | + resized_map_size: size of the resized maps |
| 194 | + use_comm_map: whether to use communication maps |
| 195 | +
|
| 196 | + Returns: |
| 197 | + torch.Tensor: maps |
| 198 | +
|
| 199 | + """ |
| 200 | + |
| 201 | + num_robots = env.GetNumRobots() |
| 202 | + raw_local_maps = CoverageEnvUtils.get_raw_local_maps(env, params) |
| 203 | + resized_local_maps = CoverageEnvUtils.resize_maps(raw_local_maps, resized_map_size) |
| 204 | + raw_obstacle_maps = CoverageEnvUtils.get_raw_obstacle_maps(env, params) |
| 205 | + resized_obstacle_maps = CoverageEnvUtils.resize_maps(raw_obstacle_maps, resized_map_size) |
| 206 | + if use_comm_map: |
| 207 | + comm_maps = CoverageEnvUtils.get_communication_maps(env, params, resized_map_size) |
| 208 | + maps = torch.cat([resized_local_maps.unsqueeze(1), comm_maps, resized_obstacle_maps.unsqueeze(1)], 1) |
| 209 | + else: |
| 210 | + maps = torch.cat([resized_local_maps.unsqueeze(1), resized_obstacle_maps.unsqueeze(1)], 1) |
| 211 | + return maps |
| 212 | + |
| 213 | + @staticmethod |
| 214 | + def get_voronoi_features(env: CoverageSystem) -> torch.Tensor: |
| 215 | + """ |
| 216 | + Get voronoi features |
| 217 | +
|
| 218 | + Args: |
| 219 | + env: coverage environment |
| 220 | +
|
| 221 | + Returns: |
| 222 | + torch.Tensor: voronoi features |
| 223 | + """ |
| 224 | + features = env.GetRobotVoronoiFeatures() |
| 225 | + tensor_features = torch.zeros((len(features), len(features[0]))) |
| 226 | + for r_idx in range(len(features)): |
| 227 | + tensor_features[r_idx] = CoverageEnvUtils.to_tensor(features[r_idx]) |
| 228 | + return tensor_features |
| 229 | + |
| 230 | + @staticmethod |
| 231 | + def get_robot_positions(env: CoverageSystem) -> torch.Tensor: |
| 232 | + """ |
| 233 | + Get robot positions |
| 234 | +
|
| 235 | + Args: |
| 236 | + env: coverage environment |
| 237 | + |
| 238 | + Returns: |
| 239 | + torch.Tensor: robot positions |
| 240 | + """ |
| 241 | + robot_positions = CoverageEnvUtils.to_tensor(env.GetRobotPositions()) |
| 242 | + return robot_positions |
| 243 | + |
| 244 | + @staticmethod |
| 245 | + def get_weights(env: CoverageSystem, params: Parameters) -> torch.Tensor: |
| 246 | + onebyexp = 1. / math.exp(1.) |
| 247 | + robot_positions = CoverageEnvUtils.to_tensor(env.GetRobotPositions()) |
| 248 | + pairwise_distances = torch.cdist(robot_positions, robot_positions, 2) |
| 249 | + edge_weights = torch.exp(-(pairwise_distances.square())/(params.pCommunicationRange * params.pCommunicationRange)) |
| 250 | + edge_weights.masked_fill_(edge_weights < onebyexp, 0) |
| 251 | + edge_weights.fill_diagonal_(0) |
| 252 | + return edge_weights |
| 253 | + |
| 254 | + # Legacy edge weights used in previous research |
| 255 | + # The weights are proportional to the distance |
| 256 | + # Trying to move away from this |
| 257 | + @staticmethod |
| 258 | + def robot_positions_to_edge_weights(robot_positions: PointVector, world_map_size: int, comm_range: float) -> torch.Tensor: |
| 259 | + x = numpy.array(robot_positions) |
| 260 | + S = distance_matrix(x, x) |
| 261 | + S[S > comm_range] = 0 |
| 262 | + C = (world_map_size**2) / (S.shape[0]**2) |
| 263 | + C = 3 / C |
| 264 | + graph_obs = C * S |
| 265 | + return graph_obs |
| 266 | + |
| 267 | + @staticmethod |
| 268 | + def get_torch_geometric_data(env: CoverageSystem, params: Parameters, use_cnn: bool, use_comm_map: bool, map_size: int) -> torch_geometric.data.Data: |
| 269 | + """ |
| 270 | + Get torch geometric data |
| 271 | + In this function, the edge weights are binary |
| 272 | +
|
| 273 | + Args: |
| 274 | + env: coverage environment |
| 275 | + params: parameters |
| 276 | + use_cnn: whether to use CNN |
| 277 | + use_comm_map: whether to use communication maps |
| 278 | + map_size: size of the maps |
| 279 | +
|
| 280 | + Returns: |
| 281 | + torch_geometric.data.Data: torch geometric data |
| 282 | +
|
| 283 | + """ |
| 284 | + if use_cnn: |
| 285 | + features = CoverageEnvUtils.get_maps(env, params, map_size, use_comm_map) |
| 286 | + else: |
| 287 | + features = CoverageEnvUtils.get_voronoi_features(env) |
| 288 | + edge_weights = CoverageEnvUtils.get_weights(env, params).to_sparse().coalesce() |
| 289 | + edge_index = edge_weights.indices().long() |
| 290 | + weights = edge_weights.values().float() |
| 291 | + pos = CoverageEnvUtils.get_robot_positions(env) |
| 292 | + pos = (pos + params.pWorldMapSize/2.0)/params.pWorldMapSize |
| 293 | + data = torch_geometric.data.Data( |
| 294 | + x=features, |
| 295 | + edge_index=edge_index.clone().detach(), |
| 296 | + edge_weight=weights.clone().detach(), |
| 297 | + pos=pos.clone().detach() |
| 298 | + ) |
| 299 | + |
| 300 | + return data |
| 301 | + |
| 302 | + # Legacy maps which gives decent results |
| 303 | + # Trying to move away from this |
| 304 | + # @staticmethod |
| 305 | + # def get_stable_maps(env, params, resized_map_size): |
| 306 | + # robot_positions = CoverageEnvUtils.to_tensor(env.GetRobotPositions()) |
| 307 | + # num_robots = env.GetNumRobots() |
| 308 | + # maps = torch.empty((num_robots, 4, resized_map_size, resized_map_size)) |
| 309 | + # h_vals = torch.linspace(1.0, -1.0, maps.shape[-2]+1) |
| 310 | + # h_vals = (h_vals[1:] + h_vals[:-1])/2 |
| 311 | + # w_vals = torch.linspace(-1.0, 1.0, maps.shape[-1]+1) |
| 312 | + # w_vals = (w_vals[1:] + w_vals[:-1])/2 |
| 313 | + # heatmap_x = torch.stack([h_vals] * maps.shape[-1], dim=1)/100 |
| 314 | + # heatmap_y = torch.stack([w_vals] * maps.shape[-2], dim=0)/100 |
| 315 | + # for r_idx in range(num_robots): |
| 316 | + # local_map = env.GetRobotLocalMap(r_idx) |
| 317 | + # resized_local_map = cv2.resize(local_map, dsize=(resized_map_size, resized_map_size), interpolation=cv2.INTER_AREA) |
| 318 | + # maps[r_idx][0] = torch.tensor(resized_local_map).float() |
| 319 | + |
| 320 | + # comm_map = env.GetCommunicationMap(r_idx) |
| 321 | + # filtered_comm_map = gaussian_filter(comm_map, sigma=(3,3), order=0) |
| 322 | + # resized_comm_map = torch.tensor(cv2.resize(numpy.array(filtered_comm_map), dsize=(resized_map_size, resized_map_size), interpolation=cv2.INTER_AREA)).float() |
| 323 | + # maps[r_idx][1] = resized_comm_map |
| 324 | + |
| 325 | + # maps[r_idx][2] = heatmap_x |
| 326 | + # maps[r_idx][3] = heatmap_y |
| 327 | + |
| 328 | + # return maps |
0 commit comments