|
1 | 1 | #!/usr/bin/env python3 |
2 | 2 |
|
3 | 3 | import os |
| 4 | +import pdb |
4 | 5 | import sys |
| 6 | + |
5 | 7 | import rclpy |
6 | 8 | import rclpy.time |
7 | | -import ament_index_python |
8 | | -import mocha_core.srv |
9 | 9 | import yaml |
10 | | -import pdb |
| 10 | +from rclpy.node import Node |
| 11 | +import mocha_core.srv |
| 12 | +# Get the database utils module |
| 13 | +from rclpy.executors import MultiThreadedExecutor |
| 14 | +import mocha_core.database_utils as du |
| 15 | +from mocha_core.srv import AddUpdateDB, GetDataHeaderDB, SelectDB |
| 16 | +import mocha_core.database_server as ds |
| 17 | + |
| 18 | + |
| 19 | +class Translator(): |
| 20 | + def __init__( |
| 21 | + self, this_robot, this_robot_id, topic_name, topic_id, msg_type, ros_node |
| 22 | + ): |
| 23 | + |
| 24 | + self.logger = ros_node.get_logger() |
| 25 | + self.ros_node = ros_node |
| 26 | + self.topic_name = topic_name |
| 27 | + self.topic_id = topic_id |
| 28 | + self.this_robot = this_robot |
| 29 | + self.this_robot_id = this_robot_id |
11 | 30 |
|
12 | | -class Translator: |
13 | | - def __init__(self, this_robot, this_robot_id, |
14 | | - topic_name, topic_id, msg_type, node=None): |
15 | | - # Get the database utils module |
16 | | - import mocha_core.database_utils as du |
17 | | - self.__du = du |
18 | | - |
19 | | - # Store or create the ROS2 node |
20 | | - if node is None: |
21 | | - # Initialize ROS2 if not already done |
22 | | - if not rclpy.ok(): |
23 | | - rclpy.init() |
24 | | - self.__node = rclpy.create_node(f'translator_{this_robot}_{topic_id}') |
25 | | - self.__node_owned = True |
26 | | - else: |
27 | | - self.__node = node |
28 | | - self.__node_owned = False |
29 | | - |
30 | | - self.__logger = self.__node.get_logger() |
31 | | - |
32 | | - self.__counter = 0 |
33 | | - self.__topic_name = topic_name |
34 | | - self.__topic_id = topic_id |
35 | | - self.__this_robot = this_robot |
36 | | - self.__this_robot_id = this_robot_id |
37 | | - self.__service_name = "/mocha_core/add_update_db" |
38 | | - |
39 | 31 | # Create service client |
40 | | - self.__add_update_db = self.__node.create_client( |
41 | | - mocha_core.srv.AddUpdateDB, self.__service_name |
42 | | - ) |
| 32 | + self.add_update_db_cli = self.ros_node.create_client(AddUpdateDB, |
| 33 | + "/integrate_database/add_update_db", |
| 34 | + qos_profile=ds.DatabaseServer.QOS_PROFILE) |
| 35 | + wait_counter = 0 |
| 36 | + while not self.add_update_db_cli.wait_for_service(timeout_sec=1.0): |
| 37 | + self.logger.debug("Translator - Waiting for add_update_db") |
| 38 | + wait_counter += 1 |
| 39 | + if wait_counter % 5 == 0: |
| 40 | + self.logger.warn("Translator - Waiting for add_update_db for more than 5 seconds") |
43 | 41 |
|
44 | 42 | # Create subscriber |
45 | | - self.__subscription = self.__node.create_subscription( |
46 | | - msg_type, self.__topic_name, self.translator_cb, 10 |
| 43 | + self.subscription = self.ros_node.create_subscription( |
| 44 | + msg_type, self.topic_name, self.translator_cb, 10 |
47 | 45 | ) |
| 46 | + self.logger.info(f"Translator created for {self.topic_name}") |
48 | 47 |
|
49 | 48 | def translator_cb(self, data): |
50 | 49 | msg = data |
51 | 50 |
|
52 | | - # Wait for service to be available |
53 | | - if not self.__add_update_db.wait_for_service(timeout_sec=5.0): |
54 | | - self.__logger.error(f"Service {self.__service_name} not available") |
55 | | - return |
56 | | - |
57 | | - serialized_msg = self.__du.serialize_ros_msg(msg) |
58 | | - try: |
59 | | - # Get current time |
60 | | - ts = self.__node.get_clock().now().to_msg() |
61 | | - |
62 | | - # Create service request |
63 | | - req = mocha_core.srv.AddUpdateDB.Request() |
64 | | - req.topic_id = self.__topic_id |
65 | | - req.timestamp = ts |
66 | | - req.msg_content = serialized_msg |
67 | | - |
68 | | - # Call service asynchronously |
69 | | - future = self.__add_update_db.call_async(req) |
70 | | - rclpy.spin_until_future_complete(self.__node, future, timeout_sec=2.0) |
71 | | - |
| 51 | + serialized_msg = du.serialize_ros_msg(msg) |
| 52 | + # Get current time |
| 53 | + ts = self.ros_node.get_clock().now().to_msg() |
| 54 | + |
| 55 | + # Create service request |
| 56 | + req = mocha_core.srv.AddUpdateDB.Request() |
| 57 | + req.topic_id = self.topic_id |
| 58 | + req.timestamp = ts |
| 59 | + req.msg_content = serialized_msg |
| 60 | + |
| 61 | + future = self.add_update_db_cli.call_async(req) |
| 62 | + def response_callback(future): |
72 | 63 | if future.done(): |
73 | 64 | answ = future.result() |
74 | 65 | answ_header = answ.new_header |
75 | | - self.__logger.debug(f"{self.__this_robot} - Header insert " + |
76 | | - f"- {self.__topic_name} - {answ_header}") |
77 | | - self.__counter += 1 |
78 | 66 | else: |
79 | | - self.__logger.warning(f"Service call timed out for {self.__topic_name}") |
80 | | - |
81 | | - except Exception as exc: |
82 | | - self.__logger.error(f"Service did not process request: {exc}") |
83 | | - |
84 | | - def shutdown(self): |
85 | | - """Cleanup method for shutting down the translator""" |
86 | | - if self.__node_owned and self.__node is not None: |
87 | | - self.__node.destroy_node() |
88 | | - if rclpy.ok(): |
89 | | - rclpy.shutdown() |
90 | | - |
91 | | - |
92 | | -def create_translator_node(robot_name, robot_configs_path=None, topic_configs_path=None, node=None): |
93 | | - """ |
94 | | - Create and run translator node for a given robot. |
95 | | - Returns the node for cleanup. |
96 | | - """ |
97 | | - # Use provided node or create new one |
98 | | - if node is None: |
99 | | - # Initialize ROS2 if not already done |
100 | | - if not rclpy.ok(): |
101 | | - rclpy.init() |
102 | | - node = rclpy.create_node(f"translator_{robot_name}") |
103 | | - node_owned = True |
104 | | - else: |
105 | | - node_owned = False |
106 | | - |
107 | | - logger = node.get_logger() |
108 | | - |
109 | | - # Get the mocha_core path using ament |
110 | | - try: |
111 | | - from ament_index_python.packages import get_package_share_directory |
112 | | - ddb_path = get_package_share_directory('mocha_core') |
113 | | - except: |
114 | | - # Fallback for development |
115 | | - current_dir = os.path.dirname(os.path.abspath(__file__)) |
116 | | - ddb_path = os.path.join(current_dir, "..") |
117 | | - |
118 | | - # Import database_utils from installed package |
119 | | - import mocha_core.database_utils as du |
120 | | - |
121 | | - # Get the robot_config path and generate the robot_configs object |
122 | | - if robot_configs_path is None: |
123 | | - robot_configs_path = os.path.join(ddb_path, "config", "testConfigs", "robot_configs.yaml") |
124 | | - |
125 | | - with open(robot_configs_path, 'r') as f: |
126 | | - robot_configs = yaml.load(f, Loader=yaml.FullLoader) |
127 | | - if robot_name not in robot_configs.keys(): |
128 | | - logger.error(f"Robot {robot_name} not in robot_configs") |
129 | | - if node_owned: |
130 | | - rclpy.shutdown() |
131 | | - raise ValueError(f"Robot {robot_name} not found in config") |
132 | | - |
133 | | - # Get the topic_configs path and generate the topic_configs object |
134 | | - if topic_configs_path is None: |
135 | | - topic_configs_path = os.path.join(ddb_path, "config", "testConfigs", "topic_configs.yaml") |
136 | | - |
137 | | - with open(topic_configs_path, 'r') as f: |
138 | | - topic_configs = yaml.load(f, Loader=yaml.FullLoader) |
139 | | - this_robot_topics = topic_configs[robot_configs[robot_name]["node-type"]] |
140 | | - |
141 | | - # Get msg_types dict used to publish the topics |
142 | | - msg_types = du.msg_types(robot_configs, topic_configs) |
143 | | - |
144 | | - # Create translators for each topic |
145 | | - translators = [] |
146 | | - for topic_id, topic in enumerate(this_robot_topics): |
147 | | - # Get robot id |
148 | | - robot_id = du.get_robot_id_from_name(robot_configs, robot_name) |
149 | | - obj = msg_types[robot_id][topic_id]["obj"] |
150 | | - translator = Translator(robot_name, |
151 | | - robot_id, |
152 | | - topic["msg_topic"], |
153 | | - topic_id, |
154 | | - obj, |
155 | | - node=node) |
156 | | - translators.append(translator) |
157 | | - |
158 | | - return node |
159 | | - |
160 | | - |
161 | | -if __name__ == "__main__": |
162 | | - # Initialize ROS2 |
163 | | - rclpy.init() |
164 | | - |
165 | | - # Create main node |
166 | | - node = rclpy.create_node("topic_translator") |
167 | | - logger = node.get_logger() |
168 | | - |
169 | | - # Declare parameters with defaults |
170 | | - node.declare_parameter("robot_name", "") |
171 | | - node.declare_parameter("robot_configs", "") |
172 | | - node.declare_parameter("topic_configs", "") |
173 | | - |
174 | | - # Get robot from the parameters |
175 | | - this_robot = node.get_parameter("robot_name").get_parameter_value().string_value |
176 | | - if not this_robot: |
177 | | - logger.error("robot_name parameter is required") |
178 | | - rclpy.shutdown() |
179 | | - sys.exit(1) |
180 | | - |
181 | | - # Get config file paths from parameters |
182 | | - robot_configs_path = node.get_parameter("robot_configs").get_parameter_value().string_value |
183 | | - topic_configs_path = node.get_parameter("topic_configs").get_parameter_value().string_value |
184 | | - |
185 | | - # Use empty strings as None for the function |
186 | | - robot_configs_path = robot_configs_path if robot_configs_path else None |
187 | | - topic_configs_path = topic_configs_path if topic_configs_path else None |
188 | | - |
| 67 | + self.logger.warning(f"{self.this_robot} - translator " + |
| 68 | + "Service call failed for {self.topic_name}" |
| 69 | + ) |
| 70 | + future.add_done_callback(response_callback) |
| 71 | + |
| 72 | +class TranslatorNode(Node): |
| 73 | + def __init__(self, this_robot=None, robot_configs=None, topic_configs=None): |
| 74 | + super().__init__("translator") |
| 75 | + # Declare parameters |
| 76 | + self.declare_parameter("robot_name", "") |
| 77 | + self.declare_parameter("robot_configs", "") |
| 78 | + self.declare_parameter("topic_configs", "") |
| 79 | + self.logger = self.get_logger() |
| 80 | + |
| 81 | + self.this_robot = self.get_parameter("robot_name").get_parameter_value().string_value if this_robot is None else this_robot |
| 82 | + |
| 83 | + if len(self.this_robot) == 0: |
| 84 | + self.logger.error(f"{self.this_robot} - Translator - Empty robot name") |
| 85 | + raise ValueError("Empty robot name") |
| 86 | + |
| 87 | + # Load and check robot configs |
| 88 | + self.robot_configs_file = self.get_parameter("robot_configs").get_parameter_value().string_value if robot_configs is None else robot_configs |
| 89 | + try: |
| 90 | + with open(self.robot_configs_file, "r") as f: |
| 91 | + self.robot_configs = yaml.load(f, Loader=yaml.FullLoader) |
| 92 | + except Exception as e: |
| 93 | + self.logger.error(f"{self.this_robot} - Translator - robot_configs file") |
| 94 | + raise e |
| 95 | + if self.this_robot not in self.robot_configs.keys(): |
| 96 | + self.logger.error(f"{self.this_robot} - Translator - robot_configs file") |
| 97 | + raise ValueError("Robot not in config file") |
| 98 | + |
| 99 | + # Load and check topic configs |
| 100 | + self.topic_configs_file = self.get_parameter("topic_configs").get_parameter_value().string_value if topic_configs is None else topic_configs |
| 101 | + try: |
| 102 | + with open(self.topic_configs_file, "r") as f: |
| 103 | + self.topic_configs = yaml.load(f, Loader=yaml.FullLoader) |
| 104 | + except Exception as e: |
| 105 | + self.logger.error(f"{self.this_robot} - Translator - topics_configs file") |
| 106 | + raise e |
| 107 | + self_type = self.robot_configs[self.this_robot]["node-type"] |
| 108 | + if self_type not in self.topic_configs.keys(): |
| 109 | + self.logger.error(f"{self.this_robot} - Translator - topics_configs file") |
| 110 | + raise ValueError("Node type not in config file") |
| 111 | + this_robot_topics = self.topic_configs[self.robot_configs[self.this_robot]["node-type"]] |
| 112 | + |
| 113 | + # Get msg_types dict used to publish the topics |
| 114 | + self.msg_types = du.msg_types(self.robot_configs, self.topic_configs, |
| 115 | + self) |
| 116 | + |
| 117 | + # Create translators for each topic |
| 118 | + translators = [] |
| 119 | + for topic_id, topic in enumerate(this_robot_topics): |
| 120 | + # Get robot id |
| 121 | + robot_id = du.get_robot_id_from_name(self.robot_configs, self.this_robot) |
| 122 | + obj = self.msg_types[robot_id][topic_id]["obj"] |
| 123 | + translator = Translator( |
| 124 | + self.this_robot, robot_id, topic["msg_topic"], topic_id, obj, self |
| 125 | + ) |
| 126 | + translators.append(translator) |
| 127 | + |
| 128 | +def main(args=None): |
| 129 | + rclpy.init(args=args) |
189 | 130 | try: |
190 | | - # Create translator node and translators |
191 | | - create_translator_node(this_robot, robot_configs_path, topic_configs_path, node) |
192 | | - |
193 | | - # Spin the node |
194 | | - rclpy.spin(node) |
195 | | - except KeyboardInterrupt: |
196 | | - pass |
197 | | - finally: |
198 | | - node.destroy_node() |
| 131 | + translator_node = TranslatorNode() |
| 132 | + except Exception as e: |
| 133 | + print(f"Node initialization failed: {e}") |
199 | 134 | rclpy.shutdown() |
| 135 | + return |
200 | 136 |
|
201 | | -def main(): |
202 | | - rclpy.init(args=None) |
203 | | - node = rclpy.create_node('translator') |
204 | | - |
205 | | - # Declare parameters |
206 | | - node.declare_parameter('robot_name', '') |
207 | | - node.declare_parameter('robot_configs', '') |
208 | | - node.declare_parameter('topic_configs', '') |
209 | | - |
210 | | - logger = node.get_logger() |
211 | | - |
212 | | - # Get robot name from parameters |
213 | | - this_robot = node.get_parameter('robot_name').get_parameter_value().string_value |
214 | | - if not this_robot: |
215 | | - logger.error('robot_name parameter not set') |
216 | | - sys.exit(1) |
217 | | - |
218 | | - # Get config file paths from parameters |
219 | | - robot_configs_path = node.get_parameter("robot_configs").get_parameter_value().string_value |
220 | | - topic_configs_path = node.get_parameter("topic_configs").get_parameter_value().string_value |
221 | | - |
222 | | - # Use empty strings as None for the function |
223 | | - robot_configs_path = robot_configs_path if robot_configs_path else None |
224 | | - topic_configs_path = topic_configs_path if topic_configs_path else None |
| 137 | + # Load mtexecutor |
| 138 | + mtexecutor = MultiThreadedExecutor(num_threads=4) |
| 139 | + mtexecutor.add_node(translator_node) |
225 | 140 |
|
226 | 141 | try: |
227 | | - # Create translator node and translators |
228 | | - create_translator_node(this_robot, robot_configs_path, topic_configs_path, node) |
229 | | - |
230 | | - # Spin the node |
231 | | - rclpy.spin(node) |
| 142 | + mtexecutor.spin() |
232 | 143 | except KeyboardInterrupt: |
233 | | - pass |
| 144 | + print("Keyboard Interrupt") |
| 145 | + except Exception as e: |
| 146 | + print(f"Exception: {e}") |
234 | 147 | finally: |
235 | | - node.destroy_node() |
| 148 | + # Clean up node and ROS2 from main thread (safe) |
| 149 | + translator_node.destroy_node() |
236 | 150 | rclpy.shutdown() |
237 | 151 |
|
238 | 152 | if __name__ == "__main__": |
|
0 commit comments