diff --git a/.komment/.gitkeep b/.komment/.gitkeep new file mode 100644 index 00000000..a2360e65 --- /dev/null +++ b/.komment/.gitkeep @@ -0,0 +1 @@ +1715878003485 \ No newline at end of file diff --git a/agents/g2o_agent/src/g2o_visualizer.py b/agents/g2o_agent/src/g2o_visualizer.py index 52ffe1d0..4f3884b9 100644 --- a/agents/g2o_agent/src/g2o_visualizer.py +++ b/agents/g2o_agent/src/g2o_visualizer.py @@ -2,9 +2,20 @@ import g2o import time import numpy as np +from matplotlib.patches import Polygon, Ellipse, Circle +import math class G2OVisualizer: def __init__(self, title): + """ + Sets up a graphical interactive window with Matplotlib and configures its + title, x-axis limits, and y-axis limits. + + Args: + title (str): title of the subplot, which is then displayed on the plot + using the `set_title()` method. + + """ self.title = title # Configurar la ventana gráfica interactiva de Matplotlib @@ -22,19 +33,124 @@ def __init__(self, title): # self.ax.set_ylim([-3000, 3000]) # Por ejemplo, límites para el eje y de 0 a 10 def edges_coord(self, edges, dim): + """ + Generates coordinates for the edges in a graph by returning the x-y + coordinates of each vertex in the edge. + + Args: + edges (list): 2D or 3D edges of a shape, from which the coordinate + values of each edge are estimated and returned. + dim (int): dimension of the coordinate system used to estimate the + coordinates of the vertices in each edge. + + """ for e in edges: yield e.vertices()[0].estimate()[dim] yield e.vertices()[1].estimate()[dim] yield None - def update_graph(self, optimizer, ground_truth=None): + def update_graph(self, optimizer, ground_truth=None, covariances=(False, None)): + """ + 1) plots the optimizer's pose, 2) draws an ellipse based on the covariance + matrix, and 3) plots the covariance points as small red dots. + + Args: + optimizer (`Optimizer` class instance.): 3D object being posed and is + used to compute the vertex coordinates for plotting the covariance + ellipse. + + - `optimizer`: A `Optimizer` object, which contains various + attributes related to the optimization problem being solved. Some + of these attributes include: + + `vertex_count`: The number of vertices in the optimizer's graph. + + `vertices`: An array of `Vertex` objects, each representing a + vertex in the graph. + + `hessian_index`: An array of indices for each vertex's Hessian + matrix. + + `edges`: An array of edges in the optimizer's graph. + + `graph`: A directed graph object representing the optimizer's + graph. + + `start_vertices`: An optional list of vertices to start the + optimization from. + + `stop_vertices`: An optional list of vertices to stop the + optimization when reached. + + `distance_matrix`: An optional array of distance matrices for + each vertex pair in the graph. + - The `optimizer` object has several methods that can be used + to interact with its properties and perform various operations, + such as: + + `optimize()`: Performs a single optimization step using the + current state of the optimizer. + + `iterations()`: Returns the number of iterations performed so + far by the optimizer. + + `vertex_count()`: Returns the number of vertices in the + optimizer's graph. + + `vertices()`: Returns an array of `Vertex` objects representing + the vertices in the optimizer's graph. + + `hessian_index()`: Returns an array of indices for each vertex's + Hessian matrix. + + `edges()`: Returns an array of edges in the optimizer's graph. + + `graph()`: Returns a directed graph object representing the + optimizer's graph. + + `start_vertices()`: Sets or gets the list of vertices to start + the optimization from. + + `stop_vertices()`: Sets or gets the list of vertices to stop + the optimization when reached. + + `distance_matrix()`: Sets or gets the array of distance matrices + for each vertex pair in the graph. + ground_truth (list): 3D pose of the object or person being analyzed + in the scene, which is used as a reference to calculate the + covariance ellipse and its orientation based on the given camera + parameters and optimizer output. + covariances (ndarray, specifically an array containing a square matrix + with two or three elements each, representing a covariance matrix + as defined by the Optimization module's `Covariance` class in + scikit-optimize.): 2x2 covariance matrix of the robot's state, + which is used to compute the eigenvectors and eigenvalues that are + used to draw the ellipse representing the robot's uncertainty in + position and orientation. + + 1/ `covariances`: This is the input tensor with shape `(N, 3, + 3)`, where `N` is the number of data points, and each column + represents a covariance matrix between two dimensions of a + multidimensional dataset. Each element in the matrix is a measure + of the relationship between two variables. + 2/ `block`: This attribute allows us to access specific parts of + the covariance matrix. For example, `block(v.hessian_index(), + v.hessian_index())` returns a block of the matrix corresponding + to the last optimizer vertex `v`. + 3/ ` eigenvalues`: This attribute provides the eigenvectors and + eigenvalues of the covariance matrix. However, in this implementation, + it is not used directly. + 4/ `vertices`: This attribute provides access to the vertices of + the optimizer object. The last vertex is used to compute the + rotation matrix. + 5/ `estimate`: This attribute provides an estimate of the current + position of the optimizer. It can be used to compute the offset + of the center of mass from the origin. + 6/ `hessian_index`: This attribute provides access to the Hessian + matrix of the objective function at the current vertex. It is used + to compute the rotation matrix. + 7/ `lambda_`: This attribute provides the eigenvalues of the + covariance matrix. + 8/ `v`: This attribute provides the last optimizer vertex. + 9/ `T`: This attribute provides the trace of the covariance matrix. + 10/ `h`: This attribute provides the square root of the determinant + of the covariance matrix. It is used to solve the characteristic + polynomial using the p-q formula. + + These properties and attributes are used in the function to compute + the rotation matrix, draw the ellipse representing the covariance, + and plot the points from the `points_for_cov` function. + + """ self.ax.clear() # Fijar límites de los ejes - # self.ax.set_xlim([-3000, 3000]) # Por ejemplo, límites para el eje x de 0 a 10 - # self.ax.set_ylim([-3000, 3000]) # Por ejemplo, límites para el eje y de 0 a 10 + self.ax.set_xlim([-4000, 4000]) # Por ejemplo, límites para el eje y de 0 a 10 + self.ax.set_ylim([-4000, 4000]) # Por ejemplo, límites para el eje y de 0 a 10 # Obtener los datos actualizados del optimizador - edges = optimizer.edges() - vertices = optimizer.vertices() + edges = optimizer.optimizer.edges() + vertices = optimizer.optimizer.vertices() # edges se2_edges = [e for e in edges if type(e) == g2o.EdgeSE2] @@ -47,10 +163,12 @@ def update_graph(self, optimizer, ground_truth=None): # poses of the vertices poses = [v.estimate() for v in vertices.values() if type(v) == g2o.VertexSE2] measurements = [v.estimate() for v in vertices.values() if type(v) == g2o.VertexPointXY] + for v in poses: self.ax.plot(v.translation()[0], v.translation()[1], 'o', color='lightskyblue') self.ax.plot([v.translation()[0], v.translation()[0] + 250 * np.sin(-v.rotation().angle())], [v.translation()[1], v.translation()[1] + 250 * np.cos(-v.rotation().angle())], 'r-', color='green') + # self.ax.plot([v[0] for v in poses], [v[1] for v in poses], 'o', color='lightskyblue', markersize=10, # label="Poses") # Draw arrows for the pose angles @@ -60,8 +178,150 @@ def update_graph(self, optimizer, ground_truth=None): label="Ground truth") self.ax.plot([v[0] for v in measurements], [v[1] for v in measurements], '*', color='firebrick', markersize=15, label="Measurements") - self.ax.plot(list(self.edges_coord(se2_edges, 0)), list(self.edges_coord(se2_edges, 1)), - color='midnightblue', linewidth=1, label="Pose edges") + # self.ax.plot(list(self.edges_coord(se2_edges, 0)), list(self.edges_coord(se2_edges, 1)), + # color='midnightblue', linewidth=1, label="Pose edges") + + if covariances[0]: + + def points_for_cov(cov): + """ + Generates a set of rotation matrices based on the eigenvalues and + eigenvectors of a covariance matrix provided as input. + + Args: + cov (2D numpy array.): 2x2 covariance matrix of a Gaussian + distribution, which is used to calculate the eigen-values + and eigen-vectors of the matrix. + + - `cov[0, 0]`: The element at row 0 and column 0 of the + covariance matrix, denoted by `a`. + - `cov[0, 1]`: The element at row 0 and column 1 of the + covariance matrix, denoted by `b`. + - `cov[1, 1]`: The element at row 1 and column 1 of the + covariance matrix, denoted by `d`. + - `D`: The determinant of the matrix `a * d - b * b`, + denoted by `D`. + - `T`: The trace of the matrix `a + d`, denoted by `T`. + - `h`: A scalar value computed as the square root of + 0.25 times the product of `T` and `D` minus `h`. + - `lambda1`: A scalar value computed as 0.5 times `T` + plus `h`. + - `lambda2`: A scalar value computed as 0.5 times `T` + minus `h`. + - `theta`: An angle value computed as the arctangent of + `2.0 * b` divided by `a - d`. + - `rotation_matrix`: A numpy array representing a rotation + matrix, which is constructed based on the values of + `majorAxis`, `minorAxis`, and `alpha`. + + """ + a = cov[0, 0] + b = cov[0, 1] + d = cov[1, 1] + + # get eigen-values + D = a * d - b * b # determinant of the matrix + T = a + d # Trace of the matrix + h = math.sqrt(0.25 * (T * T) - D) + lambda1 = 0.5 * T + h # solving characteristic polynom using p-q-formula + lambda2 = 0.5 * T - h + + theta = 0.5 * math.atan2(2.0 * b, a - d) + rotation_matrix = np.array( + [[np.cos(theta), np.sin(theta)], [-np.sin(theta), np.cos(theta)]] + ) + majorAxis = 3.0 * math.sqrt(lambda1) + minorAxis = 3.0 * math.sqrt(lambda2) + for alpha in np.linspace(0, math.tau, 32): + yield np.matmul( + rotation_matrix, + [ + majorAxis * math.cos(alpha), + minorAxis * math.sin(alpha), + ], + ) + + cov_points_x = [] + cov_points_y = [] + v = optimizer.optimizer.vertices()[optimizer.vertex_count - 1] + matrix = covariances[1].block(v.hessian_index(), v.hessian_index()) + vertex_offset = v.estimate().to_vector() + for p in points_for_cov(matrix): + cov_points_x.append(vertex_offset[0] + p[0]) + cov_points_y.append(vertex_offset[1] + p[1]) + cov_points_x.append(None) + cov_points_y.append(None) + # Draw covariance ellipse + self.ax.plot(cov_points_x, cov_points_y, + color='black', linewidth=1, label="Covariance") + + + # if covariances[0]: + # def draw_covariance_ellipse(x, y, cov): + # """ Draw an ellipse based on the covariance matrix. """ + # lambda_, v = np.linalg.eig(cov) + # lambda_ = np.sqrt(lambda_) + # print("Pose", x, y) + # print("MATRIX", lambda_, v) + # ellipse = Ellipse(xy=(x, y), width=lambda_[0] * 2, height=lambda_[1] * 2, + # angle=np.degrees(np.arccos(v[0, 0])), edgecolor='green', facecolor='none') + # self.ax.add_patch(ellipse) + # v = optimizer.optimizer.vertices()[optimizer.vertex_count - 1] + # + # + # matrix = covariances[1].block(v.hessian_index(), v.hessian_index()) + # + # vertex_offset = v.estimate().to_vector() + # draw_covariance_ellipse(vertex_offset[0], vertex_offset[1], matrix) + + + # def points_for_cov(cov): + # a = cov[0, 0] + # b = cov[0, 1] + # d = cov[1, 1] + # + # # get eigen-values + # D = a * d - b * b # determinant of the matrix + # T = a + d # Trace of the matrix + # h = math.sqrt(0.25 * (T * T) - D) + # lambda1 = 0.5 * T + h # solving characteristic polynom using p-q-formula + # lambda2 = 0.5 * T - h + # + # theta = 0.5 * math.atan2(2.0 * b, a - d) + # rotation_matrix = np.array( + # [[np.cos(theta), np.sin(theta)], [-np.sin(theta), np.cos(theta)]] + # ) + # majorAxis = 3.0 * math.sqrt(lambda1) + # minorAxis = 3.0 * math.sqrt(lambda2) + # for alpha in np.linspace(0, math.tau, 32): + # yield np.matmul( + # rotation_matrix, + # [ + # majorAxis * math.cos(alpha), + # minorAxis * math.sin(alpha), + # ], + # ) + # + # cov_points_x = [] + # cov_points_y = [] + # # Get last optimizer vertex + # v = optimizer.optimizer.vertices()[optimizer.vertex_count - 1] + # + # + # matrix = covariances[1].block(v.hessian_index(), v.hessian_index()) + # print("MATRIX", matrix) + # vertex_offset = v.estimate().to_vector() + # for p in points_for_cov(matrix): + # cov_points_x.append(vertex_offset[0] + p[0]) + # cov_points_y.append(vertex_offset[1] + p[1]) + # print("Covariance points", cov_points_x, cov_points_y) + # cov_points_x.append(None) + # cov_points_y.append(None) + # + # self.ax.plot(cov_points_x, cov_points_y, + # color='black', linewidth=5, label="Covariance") + + self.ax.legend() self.fig.canvas.draw() self.fig.canvas.flush_events() diff --git a/agents/g2o_agent/src/specificworker.py b/agents/g2o_agent/src/specificworker.py index 8430ae35..4d5294da 100644 --- a/agents/g2o_agent/src/specificworker.py +++ b/agents/g2o_agent/src/specificworker.py @@ -30,6 +30,10 @@ import interfaces as ifaces from collections import deque import time +import matplotlib.pyplot as plt +from mpl_toolkits.mplot3d import Axes3D +import threading +import time sys.path.append('/opt/robocomp/lib') console = Console(highlight=False) @@ -45,6 +49,27 @@ class SpecificWorker(GenericWorker): def __init__(self, proxy_map, startup_check=False): + """ + Initializes a SpecificWorker class instance with given parameters, setting + up necessary variables and connections to signals for updating graph nodes + and edges. + + Args: + proxy_map (instance/s of `proxyMap`.): 2D grid of an environment that + contains obstacles, and it is used to initialize the G2O graph and + set up the agent's movement capabilities. + + - `proxy_map`: a map that holds proxy data, where each key is a + unique ID and each value is a Proxy object. + + Note that `proxy_map` is provided as an argument to the `__init__` + function, which means it can be modified or destroyed later in the + program's execution. + startup_check (bool): whether to run the `startup_check` method during + the initialization process, which is used to verify the graph + structure and update the node attributes if necessary. + + """ super(SpecificWorker, self).__init__(proxy_map) self.Period = 50 @@ -58,21 +83,27 @@ def __init__(self, proxy_map, startup_check=False): self.startup_check() else: self.rt_api = rt_api(self.g) + self.inner_api = inner_api(self.g) self.odometry_node_id = None self.odometry_queue = deque(maxlen=15) self.last_odometry = None # Initialize g2o graph with visualizer self.g2o = G2OGraph(verbose=False) - self.visualizer = G2OVisualizer("G2O Graph") - - self.testing_pose= [0, 0, 0] - - self.room_initialized = True if self.initialize_g2o_graph() else False - time.sleep(1) + # self.visualizer = G2OVisualizer("G2O Graph") + + self.odometry_noise_std_dev = 1 # Standard deviation for odometry noise + self.odometry_noise_angle_std_dev = 1 # Standard deviation for odometry noise + self.measurement_noise_std_dev = 1 # Standard deviation for measurement noise + # self.room_initialized = True if self.initialize_g2o_graph() else False + # time.sleep(2) + self.room_initialized = False self.iterations = 0 self.timer.timeout.connect(self.compute) self.timer.start(self.Period) + self.init_graph = False + + try: signals.connect(self.g, signals.UPDATE_NODE_ATTR, self.update_node_att) @@ -95,6 +126,13 @@ def setParams(self, params): @QtCore.Slot() def compute(self): + """ + 1) computes odometry and covariance matrix for robot, 2) adds landmarks + to Graph2O based on odometry, 3) optimizes Graph2O graph using RT-Connect + library, 4) updates robot edge in RoomToRoom connectivity graph with + optimized translation and rotation. + + """ if self.room_initialized: # Get robot odometry if self.odometry_queue: @@ -105,62 +143,118 @@ def compute(self): # print("Robot odometry:", robot_odometry) time_1 = time.time() adv_displacement, side_displacement, ang_displacement = self.get_displacement(robot_odometry) - print("Time elapsed get_displacement:", time.time() - time_1) + # print("Time elapsed get_displacement:", time.time() - time_1) if len(self.g2o.pose_vertex_ids) == self.g2o.queue_max_len: self.g2o.remove_first_vertex() - self.g2o.add_odometry(adv_displacement + np.random.normal(0, 0.0003), - side_displacement + np.random.normal(0, 0.0002), - ang_displacement + np.random.normal(0, 0.002), 1.0 * np.eye(3)) + # Generate information matrix considering the noise + odom_information = np.array([[1, 0.0, 0.0], + [0.0, 1, 0.0], + [0.0, 0.0, 0.1]]) + # print("Odometry information:", odom_information) + + # self.g2o.add_odometry(0.0, + # 0.0, + # 0.0, 0.2 * np.eye(3)) + self.g2o.add_odometry(adv_displacement, + side_displacement, + ang_displacement, odom_information) + + + landmark_information = np.array([[1/self.measurement_noise_std_dev, 0.0], + [0.0, 1/self.measurement_noise_std_dev]]) + # print("Landmark information:", landmark_information) + for i in range(4): - corner_node = self.g.get_node("corner_"+str(i+1)+"_measured") + print("Corner", i, "....................................................................................") + corner_node = self.g.get_node("corner_"+str(i)+"_measured") is_corner_valid = corner_node.attrs["valid"].value if is_corner_valid: corner_edge = self.rt_api.get_edge_RT(robot_node, corner_node.id) corner_edge_mat = self.rt_api.get_edge_RT_as_rtmat(corner_edge, robot_odometry[3])[0:3, 3] # print("Vertex count ", self.g2o.vertex_count) - self.g2o.add_landmark(corner_edge_mat[0], corner_edge_mat[1], 1.0 * np.eye(2), pose_id=self.g2o.vertex_count-1, landmark_id=int(corner_node.name[7])) - # print("Landmark added:", corner_edge_mat[0], corner_edge_mat[1], "Landmark id:", int(corner_node.name[7]), "Pose id:", self.g2o.vertex_count-1) + # self.g2o.add_landmark(corner_edge_mat[0], corner_edge_mat[1], 1 * np.eye(2), pose_id=self.g2o.vertex_count-1, landmark_id=int(corner_node.name[7])) + self.g2o.add_landmark(corner_edge_mat[0], corner_edge_mat[1], 0.05 * np.eye(2), pose_id=self.g2o.vertex_count-1, landmark_id=int(corner_node.name[7])+1) + # self.g2o.add_landmark(self.add_noise(corner_edge_mat[0], self.measurement_noise_std_dev), self.add_noise(corner_edge_mat[1], self.measurement_noise_std_dev), landmark_information, pose_id=self.g2o.vertex_count-1, landmark_id=int(corner_node.name[7])) + print("Landmark added:", corner_edge_mat[0], corner_edge_mat[1], "Landmark id:", int(corner_node.name[7]), "Pose id:", self.g2o.vertex_count-1) time_2 = time.time() - self.g2o.optimizer.save("test_pre.g2o") chi_value = self.g2o.optimize(iterations=100, verbose=False) - print("Time elapsed optimize:", time.time() - time_2) - print("Chi value:", chi_value) - # if chi_value < 1000: + # print("Time elapsed optimize:", time.time() - time_2) + # print("Chi value:", chi_value) time_3 = time.time() - self.visualizer.update_graph(self.g2o.optimizer) - print("Time elapsed update_graph:", time.time() - time_3) + + # print("Time elapsed update_graph:", time.time() - time_3) last_vertex = self.g2o.optimizer.vertices()[self.g2o.vertex_count - 1] opt_translation = last_vertex.estimate().translation() opt_orientation = last_vertex.estimate().rotation().angle() + # Substract pi/2 to opt_orientation and keep the number between -pi and pi + if opt_orientation > np.pi: + opt_orientation -= np.pi + elif opt_orientation < -np.pi: + opt_orientation += np.pi + print("Optimized translation:", opt_translation, "Optimized orientation:", opt_orientation) - # cov_matrix = self.get_covariance_matrix(last_vertex) - # if cov_matrix is not None: + cov_matrix = self.get_covariance_matrix(last_vertex) + # print("Covariance matrix:", cov_matrix) + # self.visualizer.update_graph(self.g2o, None, cov_matrix) # rt_robot_edge = Edge(room_node.id, robot_node.id, "RT", self.agent_id) # rt_robot_edge.attrs['rt_translation'] = [opt_translation[0], opt_translation[1], .0] # rt_robot_edge.attrs['rt_rotation_euler_xyz'] = [.0, .0, opt_orientation] # # rt_robot_edge.attrs['rt_se2_covariance'] = cov_matrix # self.g.insert_or_assign_edge(rt_robot_edge) + self.rt_api.insert_or_assign_edge_RT(room_node, robot_node.id, [opt_translation[0], opt_translation[1], 0], [0, 0, opt_orientation]) - self.g2o.optimizer.save("test_post.g2o") - self.iterations += 1 - if self.iterations == 100: - exit(0) + # self.g2o.optimizer.save("test_post.g2o") + # self.iterations += 1 + # if self.iterations == 100: self.last_odometry = robot_odometry # Save last odometry - print("Time elapsed compute:", time.time() - init_time) + # print("Time elapsed compute:", time.time() - init_time) + elif self.init_graph: + self.room_initialized = True + self.init_graph = False + self.initialize_g2o_graph() + + + + def add_noise(self, value, std_dev): + # print("Value", value, "Noise", np.random.normal(0, std_dev)) + """ + Adds a noise term to a given value, generated using a normal distribution + with mean zero and standard deviation provided as input. + Args: + value (int): numerical value that will be added noise to. + std_dev (float): σ or standard deviation of the noise that is added + to the input value in the `add_noise` function. + + Returns: + int: a noisy version of the input value, generated via a random normal + distribution with standard deviation equal to the input `std_dev`. + + """ + return value + np.random.normal(0, std_dev) def initialize_g2o_graph(self): # get robot pose in room + """ + 1) retrieves robot pose and room node information from ROS topic, 2) + initializes g2o graph with fixed robot pose, and 3) adds nominal corners + to the g2o graph based on the room node information. + + Returns: + bool: a boolean value indicating whether the g2o graph was successfully + initialized with nominal corners. + + """ odom_node = self.g.get_node("Shadow") self.odometry_node_id = odom_node.id room_node = self.g.get_node("room") robot_node = self.g.get_node("Shadow") # Check if room and robot nodes exist if room_node is None or robot_node is None: - print("Room or robot node does not exist. g2o graph cannot be initialized", style='red') + print("Room or robot node does not exist. g2o graph cannot be initialized") return False robot_edge_rt = self.rt_api.get_edge_RT(room_node, robot_node.id) print("Robot edge RT", robot_edge_rt.attrs) @@ -176,23 +270,68 @@ def initialize_g2o_graph(self): # Order corner nodes by id corner_list = [] + corner_list_measured = [] for i in range(4): - print("corner_"+str(i+1)+"_measured") - corner_list.append(self.g.get_node("corner_"+str(i+1)+"_measured")) - # corner_nodes = sorted(corner_nodes, key=lambda x: x.name[7]) - - for corner_node in corner_list: - # if "measured" in corner_node.name: - # print(corner_node.name) - corner_edge_rt = self.rt_api.get_edge_RT(odom_node, corner_node.id) - corner_tx, corner_ty, _ = corner_edge_rt.attrs['rt_translation'].value - print("Init corners", corner_tx, corner_ty) + corner_list.append(self.g.get_node("corner_"+str(i))) + corner_list_measured.append(self.g.get_node("corner_"+str(i)+"_measured")) + + for i in range(4): + corner_edge_measured_rt = self.rt_api.get_edge_RT(odom_node, corner_list_measured[i].id) + corner_measured_tx, corner_measured_ty, _ = corner_edge_measured_rt.attrs['rt_translation'].value + corner_edge_rt = self.inner_api.transform(room_node.name, corner_list[i].name) + corner_tx, corner_ty, _ = corner_edge_rt + print("Nominal corners", corner_tx, corner_ty) + print("Measured corners", corner_measured_tx, corner_measured_ty) + landmark_information = np.array([[0.05, 0.0], + [0.0, 0.05]]) + print("Eye matrix", 0.1 * np.eye(2)) + print("Landmark information:", landmark_information) if corner_tx != 0.0 or corner_ty != 0.0: - self.g2o.add_landmark(corner_tx, corner_ty, 1.0 * np.eye(2), pose_id=0) - self.visualizer.update_graph(self.g2o.optimizer) + # self.g2o.add_landmark(corner_tx, corner_ty, 0.1 * np.eye(2), pose_id=0) + self.g2o.add_nominal_corner(corner_edge_rt, + corner_edge_measured_rt.attrs['rt_translation'].value, + landmark_information, pose_id=0) + + # self.visualizer.update_graph(self.g2o) + # self.g2o.optimizer.save("init_graph.g2o") + return True def get_displacement(self, odometry): + """ + Computes the total displacement, acceleration, and angular displacement + of a vehicle based on its odometry data. + + Args: + odometry (tuple): 3D movement data of the agent, which is used to + calculate the displacement in all directions (lateral, angular, + and advancement) based on the timestamps provided in the odometry + data queue. + + Returns: + triplet of real numbers representing the total displacement in x, y, + and z directions, respectively: the total displacement of the robot + in three dimensions (avance, lateral, and angular) based on its odometry + data. + + - `desplazamiento_lateral`: The lateral displacement of the vehicle + from one time step to the next. This value represents the change in + position in the x-axis (left/right). + - `desplazamiento_avance`: The forward displacement of the vehicle + from one time step to the next. This value represents the change in + position in the y-axis (up/down). + - `desplazamiento_angular`: The rotational displacement of the vehicle + from one time step to the next. This value represents the change in + angle (in radians) around the x-axis (pitch). + + The function first retrieves the odometry data from the `odometry_queue`, + and then iterates through the timesteps in the queue to calculate the + displacement at each time step. The lateral, forward, and angular + displacements are calculated by taking the difference between the + position at the current time step and the position at the previous + time step, divided by the timestamp difference between the two steps. + + """ desplazamiento_avance = 0 desplazamiento_lateral = 0 desplazamiento_angular = 0 @@ -200,31 +339,102 @@ def get_displacement(self, odometry): indice = next(index for index, (_, _, _,timestamp) in enumerate(self.odometry_queue) if timestamp == self.last_odometry[3]) except StopIteration: self.last_odometry = odometry - return + return desplazamiento_lateral, desplazamiento_avance, desplazamiento_angular # print("Index range", indice, len(self.odometry_queue) - 1) # Sumar las velocidades lineales entre el timestamp pasado y el actual for i in range(indice, len(self.odometry_queue)-1): # print("Diferencia tiempo actual y pasado:", self.odometry_queue[i + 1][3] - self.odometry_queue[i][3]) - desplazamiento_avance += self.odometry_queue[i + 1][0] * (self.odometry_queue[i + 1][3] - self.odometry_queue[i][3]) - desplazamiento_lateral += self.odometry_queue[i + 1][1] * (self.odometry_queue[i + 1][3] - self.odometry_queue[i][3]) - desplazamiento_angular += self.odometry_queue[i + 1][2] * (self.odometry_queue[i + 1][3] - self.odometry_queue[i][3]) / 1000 - # print("Desplazamiento total avance:", desplazamiento_avance) - # print("Desplazamiento total lateral:", desplazamiento_lateral) - # print("Desplazamiento total angular:", desplazamiento_angular) - return desplazamiento_lateral, desplazamiento_avance, -desplazamiento_angular + desplazamiento_avance += self.odometry_queue[i + 1][0] * (self.odometry_queue[i + 1][3] - self.odometry_queue[i][3]) * 0.8 + desplazamiento_lateral += self.odometry_queue[i + 1][1] * (self.odometry_queue[i + 1][3] - self.odometry_queue[i][3]) *0.8 + desplazamiento_angular -= self.odometry_queue[i + 1][2] * (self.odometry_queue[i + 1][3] - self.odometry_queue[i][3]) *0.8 / 1000 + print("Desplazamiento total avance:", desplazamiento_avance) + print("Desplazamiento total lateral:", desplazamiento_lateral) + print("Desplazamiento total angular:", desplazamiento_angular) + return desplazamiento_lateral, desplazamiento_avance, desplazamiento_angular def get_covariance_matrix(self, vertex): + """ + Computes the covariance matrix for a given vertex in a graph using the G2O + optimization algorithm and returns the resulting matrix or an indication + that it was not computed. + + Args: + vertex (str): 3D vertex for which the covariance matrix is to be computed. + + Returns: + tuple: a tuple containing two elements: (1) a boolean value indicating + whether the covariance matrix was computed successfully, and (2) the + actual covariance matrix. + + """ cov_vertices = [(vertex.hessian_index(), vertex.hessian_index())] covariances, covariances_result = self.g2o.optimizer.compute_marginals(cov_vertices) if covariances_result: print("Covariance computed") matrix = covariances.block(vertex.hessian_index(), vertex.hessian_index()) upper_triangle = np.triu(matrix) # Use k=1 to exclude diagonal - return np.concatenate([row[row != 0] for row in upper_triangle]) + print("Covariance matrix", upper_triangle) + return (covariances_result, covariances) else: print("Covariance not computed") - return None + return (covariances_result, None) + + def visualize_g2o_realtime(self, optimizer): + """ + Loads a .g2o file, plots the positions and measurements of vertices and + edges in real-time using Matplotlib, and updates the plot upon button press. + + Args: + optimizer (`Optimizer` instance, which is an object from some specific + class or module defining that particular optimizer algorithm + implementation.): 3D reconstruction optimizer object that loads + and updates the 3D mesh in real-time based on the input .g2o file. + + - `load("archivo.g2o"`: The `load()` method is used to deserialize + a previously saved G2O file. The file name `archivo.g2o` is a + placeholder and can be any valid filename. + - `vertices`: A list of `vertex` objects, containing the 3D + positions of the vertices in the scene. Each vertex is represented + by a 3D point (x, y, z) in the function. + - `edges`: A list of `edge` objects, containing the connecting + edges between vertices in the scene. Each edge is represented by + two vertices and their corresponding distances. + - `measurement`: A list of measurement objects, containing the + distances along each edge in the scene. Each measurement object + contains three elements: (x, y, z), representing the distance along + that edge. + + """ + plt.ion() + fig = plt.figure() + ax = fig.add_subplot(111, projection='3d') + + while True: + optimizer.load("archivo.g2o") # Reemplaza "archivo.g2o" con tu archivo .g2o + positions = [] + for vertex_id in range(optimizer.vertices().size()): + vertex = optimizer.vertex(vertex_id) + position = vertex.estimate() + positions.append(position) + positions = np.array(positions) + + ax.clear() + ax.scatter(positions[:, 0], positions[:, 1], positions[:, 2], c='b', marker='o', label='Vertices') + + edges = optimizer.edges() + for edge_id in range(edges.size()): + edge = edges[edge_id] + measurement = edge.measurement() + ax.plot(measurement[:, 0], measurement[:, 1], measurement[:, 2], c='r') + + ax.set_xlabel('X') + ax.set_ylabel('Y') + ax.set_zlabel('Z') + ax.legend() + + plt.draw() + plt.pause(0.1) # Pausa para permitir que la visualización se actualice def startup_check(self): QTimer.singleShot(200, QApplication.instance().quit) @@ -234,31 +444,108 @@ def startup_check(self): def update_node_att(self, id: int, attribute_names: [str]): # pass + # check if room node is created + """ + Updates attributes for a room node and a special odometry node based on + their corresponding attribute names, and adds data to an odometry queue. + + Args: + id (int): id of the node to be updated in the graph. + attribute_names ([str]): names of attributes to be checked in the room + node, which determines whether the graph is initialized or not + when the function is called. + + """ + room_node = self.g.get_node("room") + if room_node is not None and not self.init_graph: + if id == room_node.id and "valid" in attribute_names: + self.init_graph = True + print("INIT GRAPH") + if id == self.odometry_node_id: odom_node = self.g.get_node("Shadow") odom_attrs = odom_node.attrs - self.odometry_queue.append((odom_attrs["robot_ref_adv_speed"].value, odom_attrs["robot_ref_side_speed"].value, -odom_attrs["robot_ref_rot_speed"].value, int(time.time()*1000))) + self.odometry_queue.append((odom_attrs["robot_current_advance_speed"].value, odom_attrs["robot_current_side_speed"].value, -odom_attrs["robot_current_angular_speed"].value, int(time.time()*1000))) # self.odometry_queue.append((odom_attrs["robot_ref_adv_speed"].value, odom_attrs["robot_ref_side_speed"].value, odom_attrs["robot_ref_rot_speed"].value, odom_attrs["timestamp_alivetime"].value)) + def update_node(self, id: int, type: str): # console.print(f"UPDATE NODE: {id} {type}", style='green') + # if type == "corner": + # # check if there are 4 corners and room node + # corner_nodes = self.g.get_nodes_by_type("corner") + # room_node = self.g.get_node("room") + # if len(corner_nodes) == 8 and room_node: + # if not self.room_initialized: + # self.room_initialized = True if self.initialize_g2o_graph() else False + # self.init_graph = True + """ + Updates a node with the given ID based on its type. + + Args: + id (int): unique identifier for the node being updated, which is used + to locate and manipulate the appropriate node within the graph. + type (str): type of node to be updated, which can either be "corner" + or "room", and determines the specific action taken within the function. + + """ + pass + + def delete_node(self, id: int): + """ + Removes a node from the graph with the specified `id`. + + Args: + id (int): integer ID of the node to be deleted. + + """ if type == "room": - if not self.room_initialized: - self.room_initialized = True if self.initialize_g2o_graph() else False + # TODO: reset graph and wait until new room appears + self.room_initialized = False - def delete_node(self, id: int): - pass # console.print(f"DELETE NODE:: {id} ", style='green') def update_edge(self, fr: int, to: int, type: str): + """ + Updates an edge with specified `fr` and `to` indices, and provides a type + indication. + + Args: + fr (int): from value of the edge being updated in the function `update_edge`. + to (int): 1D array index of the next edge to be updated in the graph. + type (str): type of edge being updated. + + """ pass # console.print(f"UPDATE EDGE: {fr} to {type}", type, style='green') def update_edge_att(self, fr: int, to: int, type: str, attribute_names: [str]): + """ + Updates an edge's attributes based on a given range and attribute names. + + Args: + fr (int): from index of the edge being updated. + to (int): type of edge attribute that should be updated. + type (str): type of attribute that needs to be updated in the edge. + attribute_names ([str]): 0-based one-dimensional array of strings that + define the names of the attributes to be updated for the given + edge from `fr` to `type`. + + """ pass # console.print(f"UPDATE EDGE ATT: {fr} to {type} {attribute_names}", style='green') def delete_edge(self, fr: int, to: int, type: str): + """ + Deletes an edge between nodes fr and to based on their type. + + Args: + fr (int): 🌪️from node id + to (int): type of edge that should be deleted, as indicated by the + string value assigned to it. + type (str): type of edge to be deleted. + + """ pass # console.print(f"DELETE EDGE: {fr} to {type} {type}", style='green') diff --git a/components/g2o_cpp/circle_gen.py b/components/g2o_cpp/circle_gen.py index 27c1c4c9..80c67d2d 100644 --- a/components/g2o_cpp/circle_gen.py +++ b/components/g2o_cpp/circle_gen.py @@ -11,6 +11,9 @@ def add_noise(value, std_dev): def main(): + """ + + """ num_steps = 50 # Number of steps in the circle angle_step = 360 / num_steps # Angle step in degrees radius = 3.0 # Radius of the circle diff --git a/components/g2o_cpp/draw_circle.py b/components/g2o_cpp/draw_circle.py index 051cf84b..c8199c01 100644 --- a/components/g2o_cpp/draw_circle.py +++ b/components/g2o_cpp/draw_circle.py @@ -3,6 +3,18 @@ import numpy as np def parse_g2o_file(filename): + """ + Reads a G2O file and creates Python dictionaries of vertex positions, landmarks, + and edge information. + + Args: + filename (str): 3D object file to be parsed. + + Returns: + tuple: a triplet of dictionaries containing pose, landmark, and edge + information for a given G2O file. + + """ poses = {} landmarks = {} edges = {} @@ -31,6 +43,20 @@ def parse_g2o_file(filename): def read_covariances_from_file(filename): + """ + Reads a file containing vertex IDs and matrix rows representing covariance + matrices, parses them into Python objects, and returns a dictionary of covariances + for each vertex. + + Args: + filename (str): path to a file that contains the vertex IDs and their + corresponding covariance matrices. + + Returns: + dict: a dictionary of matrices, where each matrix represents the covariance + between two vertices in the graph. + + """ covariances = {} try: with open(filename, 'r') as file: @@ -64,13 +90,69 @@ def read_covariances_from_file(filename): def plot_graph(poses, landmarks, edges, ax, title, covariances=None, original=True): # Plot landmarks + """ + Plots a graph representing robot poses and landmarks over time, with circles + and orientation lines indicating the robot's position and orientation. Edges + are also plotted using green lines. Optionally, covariance ellipses can be + drawn for landmarks. + + Args: + poses (ndarray of shape (N, 3), where N represents the number of poses.): + 3D robot poses as a list of lists, where each inner list contains the + robot's x, y, and orientation (in radians) coordinates at a specific + time step. + + - `pose`: a 3-tuple containing the robot's position in a particular + frame (e.g., world, body, or base) as (x, y, theta), where theta is + the orientation of the robot in radians. + - `landmarks`: a dictionary containing the positions of the landmarks + (features) in the environment, where each landmark is represented by + a 2-tuple (x, y). + - `edges`: a dictionary containing the edges connected to each + landmark, where each edge is represented by a tuple (vertex_id, x, y), + where vertex_id is the ID of the adjacent landmark or robot. + + The function explains how to plot each element of these inputs using + matplotlib. The code comments describe the logic for drawing the poses + as circles, orientation lines, and edges connected by line. Additionally, + there are optional comments about drawing covariance ellipses for + landmarks if appropriate and not original. + landmarks (dict): 2D coordinates of landmarks that are used for plotting + orientation lines and ellipses in the graph. + edges (dict): 2D edges linked by line in the robot's workspace, which are + plotted as green lines between paired vertex IDs. + ax ("instance of mpl_toolkits.mplot3d.Axes3D".): 2D axes object where the + plot will be drawn. + + - `ax`: An instance of the Axes class in Matplotlib, used for plotting + the graph. + - `title`: A string variable representing the title of the graph. + - `covariances`: An optional dictionary variable representing the + covariances of the landmarks. If provided, it will be used to draw + covariance ellipses for the landmarks. + - `original`: An optional boolean variable indicating whether the + input poses are original or not. Used for drawing the orientation line. + + Note: The function does not mention any properties of `ax`, as they + are assumed to be properly initialized and available throughout the + function. Therefore, no further explanation is provided. + title (str): title that will be displayed at the top of the plot created + by the function. + covariances (dict): 2D covariance matrix of landmark positions, which can + be optionally plotted as ellipses around each landmark for visualizing + the spatial distribution of the landmarks. + original (bool): initial value of the function, indicating whether or not + to draw landmarks with ellipses representing covariances for each landmark. + + """ for landmark in landmarks.values(): ax.plot(landmark[0], landmark[1], 'ro') # 'ro' for red circle # Plot poses with circles and orientation lines for pose in poses.values(): it = 4 - x, y, theta = pose + y, x, theta = pose + y = -y # Create a circle at the robot's position circle = Circle((x, y), radius=20, color='blue', fill=True) ax.add_patch(circle) @@ -91,7 +173,8 @@ def plot_graph(poses, landmarks, edges, ax, title, covariances=None, original=Tr it += 1 for pose in poses.values(): - x, y, theta = pose + y, x, theta = pose + y = -y # Create a circle at the robot's position # circle = Circle((x, y), radius=20, color='blue', fill=True) # ax.add_patch(circle) @@ -170,6 +253,12 @@ def draw_covariance_ellipse(x, y, cov, ax): def main(): # Filenames for the original and optimized graphs + """ + Reads and parses two G2O files, computes their covariances using a file + containing the covariances, and plots both graphs with labels indicating which + is the original and which is the optimized graph, using Matplotlib. + + """ original_filename = 'trajectory.g2o' # Update this to your original .g2o file path optimized_filename = 'optimized_trajectory.g2o' # Update this to your optimized .g2o file path diff --git a/insect/environment_object_perception/src/specificworker.py b/insect/environment_object_perception/src/specificworker.py index 0a3c3b0f..c3cecc37 100644 --- a/insect/environment_object_perception/src/specificworker.py +++ b/insect/environment_object_perception/src/specificworker.py @@ -163,6 +163,119 @@ class SpecificWorker(GenericWorker): def __init__(self, proxy_map, params, startup_check=False): + """ + Initializes an instance of `SpecificWorker`, setting up various components + such as the graph, camera, tracker, models, and event handling mechanisms. + + Args: + proxy_map (`dsrgraph.Graph` object.): 2D environment map that this + specific worker is associated with, which the function uses to + access related objects and functions in the worker's scope. + + - `super(SpecificWorker, self).__init__(proxy_map)`: This line + initializes the base class `SpecificWorker` using the `proxy_map` + argument. + - `self.agent_id = 274`: The `agent_id` property is set to a + fixed value of 274, which is used for identification purposes in + the deployment. + - `g = DSRGraph(0, "environment_object_perception_agent", + self.agent_id)`: The `g` variable is assigned a `DSRGraph` object + with a specific identifier (`0`), name ("environment_object_perception_agent"), + and agent ID (`self.agent_id`). + - `signals.connect(self.g, signals.UPDATE_NODE_ATTR, + self.update_node_att)`: This line connects the `update_node_attr` + signal of the `g` graph to the method `update_node_att` in the + class `SpecificWorker`. + - `signals.connect(self.g, signals.UPDATE_NODE_ATTR, + self.update_node_att)`: This line connects the `update_node_attr` + signal of the `g` graph to the method `update_node_att` in the + class `SpecificWorker`. + - `ifaces.RoboCompVisualElementsPub.TData()`: The + `ifaces.RoboCompVisualElementsPub.TData()` line initializes an + instance of the `TData` class from the `ifaces` module, which is + used for handling data from the Robocomp visual elements publisher. + - `lidar_odometry_data = self.lidarodometry_proxy.getPoseAndChange()`: + This line retrieves the latest lidar odometry data from the + `lidarodometry_proxy` instance using the `getPoseAndChange()` method. + - `self.rgb_read_queue = deque(maxlen=1)`: The `self.rgb_read_queue` + property is set to an empty queue with a maximum length of 1, which + is used for storing the read RGB images from the camera. + - `self.image_read_thread = Thread(target=self.get_rgb_thread, + args=["camera_top", self.event], name="rgb_read_queue", daemon=True)`: + This line creates a new thread for reading RGB images from the + camera using the `get_rgb_thread()` method and storing them in the + `rgb_read_queue` property. + - `self.inference_execution_thread = Thread(target=self.inference_thread, + args=[self.event], name="inference_read_queue", daemon=True)`: + This line creates a new thread for executing the YOLO and JointBDOE + inference models using the `inference_thread()` method and storing + the results in the `event` property. + - `self.timer.timeout.connect(self.compute)`: This line connects + the `timeout` signal of the `timer` instance to the `compute()` + method in the class `SpecificWorker`, which is called every ` + Period` milliseconds to perform the inference task. + params (unknown value because there isn't enough context to determine + its data type based on the provided code snippet.): 3rd argument + passed to the constructor of the SpecificWorker class, which is + used to specify additional configuration parameters for the worker's + operation, such as startup checks and time intervals. + + 1/ `startup_check`: This is a boolean value that indicates whether + to call the `startup_check()` method or not. It is optional and + can be skipped if not provided. + 2/ `proxy_map`: This is a dictionary containing the proxy maps + for different environments. The function initializes the agent's + proxy map using this parameter. + 3/ `agent_id`: This is an integer value that represents the unique + identifier of the agent in the deployment. It is set to + `_CHANGE_THIS_ID_` by default, and can be modified if required. + 4/ `g`: This is a DSRGraph object that represents the environment + graph. The function initializes this object with the specified ID + and connects signals to it for updates, deletes, and node attribute + changes. + 5/ `event`: This is an event object that is used to communicate + between threads. It is created and passed as an argument to the + thread that reads the RGB image. + 6/ `rgb_original`: This is a boolean value that indicates whether + the original RGB image should be read or not. By default, it is + set to `False`, and can be modified if required. + 7/ `refresh_sleep_time`: This is an integer value that represents + the time interval between frame refreshments in milliseconds. It + is set to 5 by default, and can be modified if required. + 8/ `rgb_read_queue`: This is a dequeued list of RGB images that + are read from the camera and passed to the inference thread for processing. + 9/ `inference_read_queue`: This is a dequeued list of inference + outputs that are produced by the inference thread and passed to + the event loop for processing. + + In summary, `params` is a dictionary containing properties for + initializing the agent's proxy map, reading RGB images from the + camera, setting up the environment graph, and configuring the timer + interval for frame refreshments. + startup_check (`Event`.): function that performs a check for any issues + or errors during the startup process, and is executed if `True`, + otherwise it is skipped. + + - `startup_check`: This is an optional parameter that is used + to run a check on the agent's startup. If it is set to `True`, the + `startup_check()` method will be called, otherwise it will not be + called. The method checks if the agent has been started correctly + and performs any necessary initialization. + + The properties of the `startup_check` parameter are: + + - `startup_check`: A boolean value that determines whether to + run the `startup_check()` method or not. + - `False`: The default value for `startup_check`. This means + that the `startup_check()` method will not be called on startup. + - `True`: If set to `True`, the `startup_check()` method will + be called on startup to check if the agent has been started correctly. + + The `startup_check` parameter has no attributes or properties + other than its default value and the ability to accept a boolean + value. + + """ super(SpecificWorker, self).__init__(proxy_map) # YOU MUST SET AN UNIQUE ID FOR THIS AGENT IN YOUR DEPLOYMENT. "_CHANGE_THIS_ID_" for a valid unique integer @@ -234,7 +347,7 @@ def __init__(self, proxy_map, params, startup_check=False): self.tracker = BYTETracker(frame_rate=5, buffer_=1000) # self.tracker_back = BYTETracker(frame_rate=5) self.objects = ifaces.RoboCompVisualElementsPub.TData() - self.ui.pushButton.clicked.connect(self.reset_tracks) + # self.ui.pushButton.clicked.connect(self.reset_tracks) self.reset = False self.refresh_sleep_time = 5 @@ -275,6 +388,20 @@ def __del__(self): """Destructor""" def setParams(self, params): + """ + Reads configuration parameters from a file and sets instance variables + based on the read parameters, then returns `True`. + + Args: + params (dict): configuration parameters for the YOLO model, including + the YOLO model file, display flag, depth flag, and class names, + which are read from a file using the `open()` function. + + Returns: + bool: a Boolean value indicating whether the parameters were successfully + read and the process can proceed. + + """ try: self.classes = [0] # self.yolo_model = params["yolo_model"] @@ -388,7 +515,7 @@ def compute(self): front_objects = self.to_visualelements_interface(tracks, alive_time, front_roi) # Fuse front_objects and back_objects and equal it to self.objects_write - self.visualelementspub_proxy.setVisualObjects(front_objects) + # self.visualelementspub_proxy.setVisualObjects(front_objects) # If display is enabled, show the tracking results on the image if self.display: @@ -410,6 +537,47 @@ def reset_tracks(self): def calcular_matriz_diferencial(self, transformacion1, transformacion2): # Calcular la inversa de la primera matriz de transformación + """ + Calculates the differential matrix by multiplying the inverse of the first + transformation matrix by the second transformation matrix and scaling the + result by 1000. + + Args: + transformacion1 (`np.array`.): 4x4 transformation matrix that describes + the rotation and translation of an object in 3D space before + applying the transformation. + + - `np.linalg.inv(transformacion1)` calculates the inverse of the + matrix `transformacion1`, which is a square matrix of size (n, n). + - The matrix `transformacion1` represents a linear transformation + between two vectors in an n-dimensional space. + - The elements of `transformacion1` are the coefficients of the + linear transformation, arranged in a matrix with dimensions (n x + n). + transformacion2 (ndarray or NumPy array.): 2nd matrix of transformation + that the function will take and multiply by its inverse to obtain + the differential matrix. + + - `np.linalg.inv(transformacion1)`: This is an instance of the + NumPy `InvitationMatrx` class, which represents the inverse of the + first matrix of transformation `transformacion1`. The inverse is + computed using the `inv()` method provided by the ` InvitationMatrx` + class. + - `np.dot(inversa_transformacion1, transformacion2)`: This is + an instance of the NumPy `Vector` class, which represents the dot + product of two vectors. In this case, it computes the dot product + of the inverse of the first matrix of transformation + `inversa_transformacion1` and the second matrix of transformation + `transformacion2`. + - `matriz_diferencial[:3, 3] *= 1000`: This line of code multiplies + a specific sub-matrix of the output matrix (`[:3, 3]`) of the dot + product by a scalar value `1000`. This is done to artificially + inflate the diagonal elements of the resulting matrix. + + Returns: + float: a transformed matrix with values multiplied by 1000. + + """ inversa_transformacion1 = np.linalg.inv(transformacion1) # Calcular la matriz diferencial multiplicando la inversa de la primera @@ -421,6 +589,13 @@ def calcular_matriz_diferencial(self, transformacion1, transformacion2): ###################### MODELS LOADING METHODS ###################### def load_v8_model(self): + """ + Retrieves and loads a YOLOv8 model from disk or downloads it if not found, + then stores it in the instance's attribute `v8_model`. If the model is not + found, the function asks for selecting one of the available models and + downloads it accordingly. + + """ pattern = os.path.join(os.getcwd(), "yolov8?-seg.engine") matching_files = glob.glob(pattern) model_path = matching_files[0] if matching_files else None @@ -440,11 +615,26 @@ def load_v8_model(self): self.download_and_convert_v8_model("yolov8" + model_name + "-seg") def download_and_convert_v8_model(self, model_name): # Download model + """ + Downloads a YOLO model from a given name and exports it to a TRT format + on the default device, creating a new model instance. + + Args: + model_name (str): name of the YOLO model that needs to be downloaded + and converted to the TRT format. + + """ self.v8_model = YOLO(model_name) # Export the model to TRT self.v8_model.export(format='engine', device='0') self.v8_model = YOLO(model_name + '.engine') def load_jointbdoe_model(self): + """ + Loads a trained TensorFlow model for JointBDOE, selects the best device + for inference based on the available hardware, and retrieves the necessary + data for the model's usage. + + """ try: self.device = select_device("0", batch_size=1) self.model = attempt_load( @@ -515,6 +705,43 @@ def get_rgb_thread(self, camera_name: str, event: Event): return def convert_image_to_tensor(self, image): + """ + Converts a given image into a tensor format, transposing its dimensions + and normalizing its values to be between 0 and 1. It does so by using + PyTorch's `torch.from_numpy()` method and applying transformations on the + numpy array representing the image. + + Args: + image (ndarray object or tensor.): 2D image that will be converted to + a tensor. + + - `image`: The input image, which is an instance of `np.ndarray`. + - `stride`: The stride of the image, which is a scalar value + that determines how much each dimension of the image is skipped + during processing. + - `auto`: A boolean value that indicates whether the image should + be resized using the nearest neighbor or bicubic interpolation methods. + + Returns: + tensor, which is a contiguous array of numerical values in Python: a + tensor representation of the input image, ready to be used in a deep + learning model. + + - `img_ori`: The original image tensor after letterboxing, transposing, + and normalization. Its shape is (1, height, width, 3), where height + and width are the image's dimensions, and 3 represents the color + channels (BGR). + - `torch.device`: The device on which the tensor is stored, which + is specified by the function argument `self.device`. This information + is not explicitly mentioned in the output but is implicitly provided + as part of the returned tensor object. + - `np.ascontiguousarray()`: This method is used to convert the image + tensor from a numpy array to a contiguous NumPy array, which allows + for efficient processing with NumPy functions. + - `/ 255.0`: This line normalizes the pixel values to the range [0.0, + 1.0]. + + """ img_ori = letterbox(image, 640, stride=self.stride, auto=True)[0] img_ori = img_ori.transpose((2, 0, 1))[::-1] # HWC to CHW, BGR to RGB img_ori = np.ascontiguousarray(img_ori) @@ -526,6 +753,16 @@ def convert_image_to_tensor(self, image): return img_ori def inference_thread(self, event: Event): + """ + Reads RGB frames from a queue, performs computer vision inferencing over + each frame using a TensorFlow model, and appends the processed output to + a new queue for further processing. + + Args: + event (Event): event object that is used to wait for a specified time + period before continuing with the function's execution. + + """ while not event.is_set(): if self.rgb_read_queue: start = time.time() @@ -553,6 +790,34 @@ def get_mask_with_modified_background(self, mask, image): def inference_over_image(self, img0, img_ori): # Make inference with both models + """ + Performs image-based object detection using two different models (V8 and + original model) on a given input image `img0`. It returns the predicted + bounding boxes and orientations for both models. + + Args: + img0 (image/tensor/np array/numpy-like object.): 3D mesh model being + used for inference. + + - `img0`: The input image to be analyzed. It is an instance of + the `Image` class, which represents a 2D image with possible padding + or cropping. + - `img_ori`: The original input image before any transformation. + It is also an instance of the `Image` class and serves as a reference + for the inference. + img_ori (ndarray ( NumPy array) as suggested by its declaration with + `img_ori` as the variable name.): original image used for training + the models and serving as input to the `get_orientation_data()` function. + + - `img_ori`: The input image to perform inference on. It is + assumed to have a numpy array format and various attributes such + as shape (e.g., `(1024, 768)` for a single image). + + Returns: + dict: a list of bounding boxes and their corresponding orientations + for each detected object. + + """ orientation_bboxes, orientations = self.get_orientation_data(img_ori, img0) out_v8 = self.v8_model.predict(img0, show_conf=True, classes=_DETECTED_OBJECTS) return out_v8, orientation_bboxes, orientations @@ -596,6 +861,35 @@ def to_visualelements_interface(self, tracks, image_timestamp, roi): return ifaces.RoboCompVisualElementsPub.TData(timestampimage=image_timestamp, timestampgenerated=int(time.time() * 1000), period=self.Period, objects=objects) def mask_to_TImage(self, mask, roi): + """ + Takes a binary mask and a rectangular region of interest (ROI) as input + and returns a `TImage` object representing the mask within the ROI. + + Args: + mask (ndarray (or NumPy array).): 2D mask image that will be used to + cropped the input `roi` image and return a new `TImage` instance. + + - `mask`: A numpy array representing an image mask with shape + `(y, x, 3)`. The dimensions `y`, `x`, and `3` represent the height, + width, and color channels (red, green, and blue) of the mask. + roi (ndarray.): 2D rectangle of interest within the image, which is + used to crop and return only the portion of the original image + that falls within the ROI. + + - `y`, `x`: The height and width of the `roi`, respectively, + which represent the spatial coordinates of the cropped region. + + Returns: + ifaces.RoboCompCamera360RGB.TImage: a TImage object containing the + masked image with the specified ROI. + + 1/ `image`: This is a bytes object representing the masked image. + 2/ `height`: This is the height of the masked image in pixels. + 3/ `width`: This is the width of the masked image in pixels. + 4/ `roi`: This represents the region of interest (ROI) in the masked + image, specified as a tuple of (x, y) coordinates. + + """ y, x, _ = mask.shape return ifaces.RoboCompCamera360RGB.TImage(image=mask.tobytes(), height=y, width=x, roi=roi) @@ -646,6 +940,45 @@ def get_center(self, bbox): return center def get_pose_data(self, result): + """ + Takes an input list of detection results and returns lists of bounding + boxes, confidence values, and skeleton data for each person in the image. + + Args: + result (ndarray of shape (N, 5), where N is the number of frames or + images in the input sequence.): 2D or 3D bounding box coordinates + and corresponding confidence scores for each person in the image, + which are used to generate high-quality documentation for the code. + + - `result.keypoints`: A numpy array containing the keypoints (x + and y coordinates) of the people in the image, in format `(N, 2)` + where N is the number of people in the image. + - `result.boxes`: A list of bounding boxes around each person + in the image, in format `(N, 4)` where N is the number of people + in the image. The four elements in each box are `(x1, y1, x2, y2)`, + representing the top-left corner of the box and its size. + - `result.conf`: A numpy array containing the confidence scores + of each person's pose estimation, in format `(N,)` where N is the + number of people in the image. + + Note: In the function signature, `self` refers to the instance + of the class that `get_pose_data` belongs to. The other arguments + provided to `get_pose_data` are: + + - `result`: The deserialized input containing the pose estimation + results. + + The function then iterates over each person in the image, and for + each person, it extracts their keypoints, bounding box, and + confidence score, and stores them in lists called `pose_bboxes`, + `pose_confidences`, and `skeletons`. Finally, it returns these + lists as output. + + Returns: + list: a list of 3D bounding boxes, confidence scores, and skeleton + data for each person in the image. + + """ pose_bboxes = [] pose_confidences = [] skeletons = [] @@ -662,6 +995,52 @@ def get_pose_data(self, result): return pose_bboxes, pose_confidences, skeletons, [0] * len(boxes) def get_segmentator_data(self, results, color_image, depth_image): + """ + Takes a list of detection results, color image, and depth image as input + and returns a tuple containing two lists: "people" and "objects". Each + list contains a list of dictionaries containing information about the + detected objects (or people) such as bounding boxes, class labels, confidence + scores, masks, and orientations. + + Args: + results (str): 3D object detections and segmentation results produced + by another code module or application, which is passed to the + `get_segmentator_data()` function for further processing and analysis. + color_image (ndarray object.): 2D color image that is used to compute + the masks and hashes for each person or object in the scene. + + - `roi_ysize`: The height of the ROI (Region of Interest) in the + color image. + - `roi_xsize`: The width of the ROI in the color image. + - `shape`: The shape of the color image, which is (ROI ysize, + ROI xsize, 3) in this case. + + The `color_image` is a NumPy array containing the ROI of the color + image. It has dimensions (ROI ysize, ROI xsize, 3), where each + pixel has a value in the RGB color space. The image may be captured + from various sources such as a camera or an existing image. + depth_image (ndarray or NumPy array, specifically an array of shape + (Height, Width, Number of Channels) representing a depth image.): + 2D image of depth values for the scene, which is used to compute + the 3D positions of objects in the scene along with their corresponding + masks. + + - `roi_ysize`: The size of the region of interest (ROI) in the + depth image. + - `roi_xsize`: The size of the ROI in the x-axis direction in + the depth image. + - `shape`: The shape of the depth image, which is a 3D NumPy + array containing the pixel values of the image. + + These properties are used in the function to extract specific + information from the depth image, such as the coordinates of the + object masks, the class labels, and the confidence scores. + + Returns: + tuple: a list of dictionaries containing information about the objects + and people detected in an image. + + """ people = {"bboxes": [], "poses": [], "confidences": [], "masks": [], "classes": [], "orientations": [], "hashes": []} objects = {"bboxes": [], "poses": [], "confidences": [], "masks": [], "classes": [], "orientations": [], @@ -713,6 +1092,70 @@ def get_segmentator_data(self, results, color_image, depth_image): def get_mask_distance(self, mask, depth_image): # Get bbox center point # Get depth image shape and calculate bbox center + """ + Computes the distance of a person from a depth image based on the binary + mask of the person's body parts. It retrieves the center point and width + of the bbox, extracts valid points from the depth image, and calculates + the distances to those points using the histogram method. Finally, it + returns the median distance of the valid points. + + Args: + mask (ndarray.): 2D segmentation of the depth image, which is used to + extract the valid points (i.e., points with non-zero values) and + calculate their distances from the bbox center. + + 1/ `mask`: A numpy array of shape `(depth_image.shape[0], + depth_image.shape[1])`, where each element is either 0 or 1, + indicating whether a pixel in the depth image belongs to the object + of interest or not. + 2/ `depth_image`: A numpy array representing the depth image, + which is assumed to be a rectangular array with dimensions + corresponding to the shape of the depth sensor's image. + 3/ `bbox_center`: A numpy array of length 2, containing the center + coordinates of the bounding box around the object of interest in + pixels. + 4/ `segmentation_points`: A numpy array of shape `(mask.shape[1], + mask.shape[0])`, containing the indices of the pixels in the depth + image that belong to the object of interest. + 5/ `p`: A float value representing the bbox width, which is used + to calculate the minimum distance between points in the segmentation + mask and their corresponding depth values. + depth_image (3D array, specifically an array with shape ( height, + width, channels) representing a depth image.): 2D depth image that + the function will analyze to determine the person's pose. + + - `shape`: The shape of the depth image, which can be a 3D array + with shape `( height, width, depth )`. + - `bbox_center`: The center point of the bounding box (bbox) for + the object in the depth image. The coordinates are calculated as + `(depth_image_shape[1] // 2, depth_image_shape[0] // 2)`. + - `segmentation_points`: An array containing the coordinates of + the points on which the bbox is applied to the mask. The shape of + this array is `NxM`, where `N` is the number of points used to + calculate the bbox center, and `M` is the number of channels in + the depth image (typically 3 for a RGB depth image). + - `p`: The distance from the bbox center at which points are + considered inside the bbox. The value is calculated as + `depth_image_shape[1] // 4.5`. + - `valid_points`: An array containing the coordinates of the + points that pass the bbox filtering. The shape of this array is + `Nx2`, where `N` is the number of points remaining after bbox filtering. + - `distances`: The distances between the valid points and the + bbox center, calculated as `np.linalg.norm(valid_points - bbox_center, + axis=1)`. + - `hist`: A 2D histogram array created from the `distances` array + using `np.histogram`. The shape of this array is `(bins, height)`. + - `edges`: An array containing the edges of each bin in the + histogram. The shape of this array is `(bins, width)`. + - `person_dist`: An array containing the distances between the + bbox center and the points inside the mold interval. The shape of + this array is `(1,)` or `()`. + + Returns: + list: a list of three values representing the mean distance of a person + from the camera at different positions. + + """ depth_image_shape = depth_image.shape bbox_center = [depth_image_shape[1] // 2, depth_image_shape[0] // 2] segmentation_points = np.argwhere(np.all(mask == 1, axis=-1))[:, [1, 0]] @@ -748,6 +1191,39 @@ def get_mask_distance(self, mask, depth_image): #GPT VERSION: white pixels deleted def get_color_histogram(self, color): # Convert the color image to HSV + """ + Converts an image from RGB to HSV, creates a mask to exclude unwanted + pixels, and calculates the histogram of the resulting HSV image using the + `cv2.calcHist` function. + + Args: + color (3-dimensional numpy array.): 3D RGB image to be converted to + HSV and histogrammed. + + - `color` is a NumPy array with 3 color channels (RGB) representing + a single color image. + - The shape of `color` is `(height, width, 3)`, where `height` + and `width` are the dimensions of the image. + - Each element in `color` has one of three possible values for + each channel: 0 for black, 255 for white, and any other value for + a colored pixel. + - The data type of `color` is `np.uint8`, indicating that the + pixels are represented as 8-bit integers. + + Returns: + array of normalized histogram values: a normalized histogram of the + HSV color space of the input color image. + + 1/ `hist`: The output is a 3D array with shape `(num_bins, num_pixels, + num_colors)`, where `num_bins` is the number of bins in the histogram, + `num_pixels` is the total number of pixels in the input image, and + `num_colors` is the number of distinct colors present in the image. + 2/ `normalize_histogram`: The output of this function is a normalized + histogram, where each bin represents the distribution of pixels in + that color category across all images. This ensures that the histogram + is comparable and meaningful across different images and datasets. + + """ color_hsv = cv2.cvtColor(color, cv2.COLOR_RGB2HSV) # Create a mask to exclude pixels with Saturation = 0 and Value = 256 @@ -759,11 +1235,76 @@ def get_color_histogram(self, color): return self.normalize_histogram(hist) def normalize_histogram(self, hist): + """ + Normalizes a given histogram by dividing it by the total number of pixels + in the histogram, resulting in a normalized histogram with values between + 0 and 1. + + Args: + hist (ndarray (i.e., an array-like object).): 2D histogram that needs + to be normalized. + + - `np.sum(hist)` represents the total number of pixels in the histogram. + + Returns: + array of values: a normalized histogram of pixel values. + + 1/ Total Pixels: The variable `total_pixels` represents the total + number of pixels in the histogram. It is calculated by summing up all + the values in the histogram. + 2/ Normalized Histogram: The variable `normalized_histogram` represents + the normalized histogram, which is the histogram divided by the total + number of pixels. This creates a probability distribution where the + sum of all the probabilities is equal to 1. + 3/ Properties of Normalized Histogram: The normalized histogram has + several desirable properties, including: + - Probability values range from 0 to 1 + - The sum of all the probability values is equal to 1 + - Each bin represents a continuous range of pixel values + + In summary, the `normalize_histogram` function transforms an input + histogram into a normalized form, which simplifies further processing + or analysis of the histogram. + + """ total_pixels = np.sum(hist) normalized_hist = hist / total_pixels return normalized_hist def get_orientation_data(self, processed_image, original_image): + """ + 1/ takes a processed image and its original image as input, 2. applies a + model to the processed image with augmentation enabled, 3. generates an + output with non-maximum suppression, 4. extracts orientation bboxes from + the output, and 5. returns both the orientation bboxes and orientations + in native space. + + Args: + processed_image (ndarray.): 2D image that has been processed by some + previous step or transformation, and is provided to the + `get_orientation_data` function as an argument for the purpose of + estimating orientations from it. + + - `roi_xsize`: The width of the ROI (Region of Interest) in pixels. + - `original_image`: The original image from which the ROI was extracted. + - `augment`: A boolean value indicating whether to apply + augmentations during feature extraction. + - `scales`: An array of scales to apply to the feature extraction + output, including the scale of the ROI. + - `num_angles`: The number of angles in the orientation heatmap. + original_image (ndarray.): 2D image that was preprocessed and passed + through the model as part of the multi-scale feature extraction process. + + - `shape`: The shape of the original image, which is (height, + width) or (depth, height, width), where depth refers to the number + of color channels (e.g., RGB). + - `size`: The size of the original image in pixels, which can + be used to calculate the scale factor for non-max suppression. + + Returns: + int: a tuple of two numpy arrays: `orientation_bboxes` and `orientations`. + + """ out_ori = self.model(processed_image, augment=True, scales=[self.roi_xsize / 640])[0] out = non_max_suppression(out_ori, 0.3, 0.5, num_angles=self.data['num_angles']) orientation_bboxes = scale_coords(processed_image.shape[2:], out[0][:, :4], original_image.shape[:2]).cpu().numpy().astype(int) # native-space pred @@ -811,6 +1352,21 @@ def set_roi_dimensions(self, objects): ############################################################### def show_fps(self, alive_time, period): + """ + Calculates and displays the frame rate, alive time, period, and update + rate of an image processing thread. It updates the values every 1 second + (or a user-defined period) and limits the thread's update rate to prevent + excessive CPU usage. + + Args: + alive_time (int): duration of time the object has been alive or running, + and it is used to calculate the current period of the image + processing task. + period (int): interval between FPS measurements, and it is used to + calculate the current frame rate based on the elapsed time since + the last measurement. + + """ if time.time() - self.last_time > 1: self.last_time = time.time() cur_period = int(1000./self.cont) @@ -858,6 +1414,13 @@ def display_data_tracks(self, img, elements): #Optimizado ############################################################################################ def startup_check(self): + """ + Runs tests to verify the functionality of various components of a robot + vision system, including TImage, TDepth, TRGBD, and TGroundTruth classes + from ifaces.RoboCompCameraRGBDSimple and ifaces.RoboCompHumanCameraBody + modules, as well as KeyPoint, Person, and PeopleData classes. + + """ print(f"Testing RoboCompCameraRGBDSimple.TImage from ifaces.RoboCompCameraRGBDSimple") test = ifaces.RoboCompCameraRGBDSimple.TImage() print(f"Testing RoboCompCameraRGBDSimple.TDepth from ifaces.RoboCompCameraRGBDSimple") @@ -889,6 +1452,30 @@ def VisualElements_getVisualObjects(self, objects): # SUBSCRIPTION to setTrack method from SegmentatorTrackingPub interface # def SegmentatorTrackingPub_setTrack(self, track): + """ + Sets chosen track ID and stores it in tracker. It also retrieves target + ROI coordinates based on given track ID. + + Args: + track (`Track` object.): 3D tracking object to be associated with the + Segmentator, and when provided, sets the chosen track for the + Segmentator's tracker. + + - `id`: An integer value representing the unique identifier of + the track to be followed. + - `chosen_track`: A reference to a track object that has been + chosen by the user. + - `target_roi_xcenter`: The x-coordinate of the target region + of interest (ROI) center. + - `target_roi_ycenter`: The y-coordinate of the target ROI center. + - `target_roi_xsize`: The x-size of the target ROI. + - `target_roi_ysize`: The y-size of the target ROI. + + The function then processes the `track` object based on its + properties, setting the `tracker.set_chosen_track()` and updating + internal variables accordingly. + + """ self.tracker.set_chosen_track(track.id) if track.id == -1: self.tracked_element = None @@ -907,6 +1494,19 @@ def SegmentatorTrackingPub_setTrack(self, track): return def update_plot(self,frame): + """ + Updates a plot by clearing it and re-drawing it with a subset of data from + a list of IDs. + + Args: + frame (`object`.): 2D plot that is to be updated with new data. + + - `frame`: A Pandas DataFrame object containing time series data + with two columns - `xs` (with values of type `float`) representing + the time series and `ys` (with values of type `float`) representing + the corresponding values. + + """ pass # self.ax.clear() # Limpia el plot actual @@ -967,6 +1567,17 @@ def delete_node(self, id: int): def update_edge(self, fr: int, to: int, type: str): + """ + Prints a message and updates an edge in a graph based on its label. + + Args: + fr (int): 0-based index of the first node in the range of nodes being + updated. + to (int): type of edge being updated in the code. + type (str): edge being updated, and it is used to identify the specific + edge being modified in the code. + + """ console.print(f"UPDATE EDGE: {fr} to {type}", type, style='green') def update_edge_att(self, fr: int, to: int, type: str, attribute_names: [str]):