Skip to content

Commit 5fca798

Browse files
committed
Integrate database mostly finished, only small deadlock...
1 parent e399c9a commit 5fca798

File tree

1 file changed

+116
-70
lines changed

1 file changed

+116
-70
lines changed

mocha_core/mocha_core/integrate_database.py

Lines changed: 116 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,25 @@
44
import signal
55
import sys
66
import time
7+
import pdb
78
import traceback
89
from functools import partial
910

1011
import rclpy
12+
from rclpy.logging import LoggingSeverity
1113
import rclpy.logging
1214
import rclpy.time
15+
from rclpy.node import Node
16+
from rclpy.executors import MultiThreadedExecutor
17+
import threading
1318
import yaml
1419
import std_msgs.msg
1520
import subprocess
1621

1722
import mocha_core.database_server as ds
1823
import mocha_core.database_utils as du
1924
import mocha_core.synchronize_channel as sync
25+
from mocha_core.srv import AddUpdateDB, GetDataHeaderDB, SelectDB
2026

2127

2228
def ping(host):
@@ -30,58 +36,74 @@ def ping(host):
3036
return False
3137

3238

33-
class IntegrateDatabase:
34-
def __init__(self, node=None):
35-
# Initialize ROS2 if not already done
36-
if not rclpy.ok():
37-
rclpy.init()
38-
39-
# Create or use provided node
40-
if node is None:
41-
self.node = rclpy.create_node("integrate_database")
42-
else:
43-
self.node = node
44-
45-
self.logger = self.node.get_logger()
46-
47-
# In ROS2, parameters need to be declared before use
48-
self.node.declare_parameter("robot_name", "")
49-
self.node.declare_parameter("rssi_threshold", 20)
50-
self.node.declare_parameter("client_timeout", 6.0)
51-
self.node.declare_parameter("robot_configs", "")
52-
self.node.declare_parameter("radio_configs", "")
53-
self.node.declare_parameter("topic_configs", "")
54-
55-
self.this_robot = self.node.get_parameter("robot_name").get_parameter_value().string_value
56-
self.rssi_threshold = self.node.get_parameter("rssi_threshold").get_parameter_value().integer_value
57-
self.all_channels = []
39+
class IntegrateDatabase(Node):
40+
def __init__(self):
41+
super().__init__("integrate_database")
42+
43+
# Handle shutdown signal
44+
self.interrupted = False
45+
self.shutdownTriggered = threading.Event()
46+
self.shutdownTriggered.clear()
47+
def signal_handler(sig, frame):
48+
self.logger.warning(f"{self.this_robot} - Integrate - " +
49+
f"Got SIGINT. Triggering shutdown.")
50+
self.interrupted = True
51+
self.shutdown("Killed by user")
52+
signal.signal(signal.SIGINT, signal_handler)
53+
54+
self.logger = self.get_logger()
55+
# self.logger.set_level(LoggingSeverity.DEBUG)
56+
57+
# Declare parameters
58+
self.declare_parameter("robot_name", "")
59+
self.declare_parameter("rssi_threshold", 20)
60+
self.declare_parameter("client_timeout", 6.0)
61+
self.declare_parameter("robot_configs", "")
62+
self.declare_parameter("radio_configs", "")
63+
self.declare_parameter("topic_configs", "")
64+
65+
self.this_robot = self.get_parameter("robot_name").get_parameter_value().string_value
66+
self.rssi_threshold = self.get_parameter("rssi_threshold").get_parameter_value().integer_value
67+
68+
if len(self.this_robot) == 0:
69+
self.shutdown("Empty robot name")
70+
5871
self.logger.info(f"{self.this_robot} - Integrate - " +
5972
f"RSSI threshold: {self.rssi_threshold}")
60-
self.client_timeout = self.node.get_parameter("client_timeout").get_parameter_value().double_value
73+
self.client_timeout = self.get_parameter("client_timeout").get_parameter_value().double_value
6174
self.logger.info(f"{self.this_robot} - Integrate - " +
6275
f"Client timeout: {self.client_timeout}")
6376

6477
# Load and check robot configs
65-
self.robot_configs_file = self.node.get_parameter("robot_configs").get_parameter_value().string_value
66-
with open(self.robot_configs_file, "r") as f:
67-
self.robot_configs = yaml.load(f, Loader=yaml.FullLoader)
78+
self.robot_configs_file = self.get_parameter("robot_configs").get_parameter_value().string_value
79+
try:
80+
with open(self.robot_configs_file, "r") as f:
81+
self.robot_configs = yaml.load(f, Loader=yaml.FullLoader)
82+
except Exception as e:
83+
self.shutdown(f"Error opening robot_configs file: {e}")
6884
if self.this_robot not in self.robot_configs.keys():
6985
self.shutdown("Robot not in config file")
7086

7187
# Load and check radio configs
72-
self.radio_configs_file = self.node.get_parameter("radio_configs").get_parameter_value().string_value
73-
with open(self.radio_configs_file, "r") as f:
74-
self.radio_configs = yaml.load(f, Loader=yaml.FullLoader)
88+
self.radio_configs_file = self.get_parameter("radio_configs").get_parameter_value().string_value
89+
try:
90+
with open(self.radio_configs_file, "r") as f:
91+
self.radio_configs = yaml.load(f, Loader=yaml.FullLoader)
92+
except Exception as e:
93+
self.shutdown(f"Error opening radio_configs file: {e}")
7594
self.radio = self.robot_configs[self.this_robot]["using-radio"]
7695
if self.radio not in self.radio_configs.keys():
77-
self.shutdown("Radio not in config file")
96+
self.shutdown("Radio {self.radio} not in config file")
7897

7998
# Load and check topic configs
80-
self.topic_configs_file = self.node.get_parameter("topic_configs").get_parameter_value().string_value
81-
with open(self.topic_configs_file, "r") as f:
82-
self.topic_configs = yaml.load(f, Loader=yaml.FullLoader)
83-
self.node_type = self.robot_configs[self.this_robot]["node-type"]
84-
if self.node_type not in self.topic_configs.keys():
99+
self.topic_configs_file = self.get_parameter("topic_configs").get_parameter_value().string_value
100+
try:
101+
with open(self.topic_configs_file, "r") as f:
102+
self.topic_configs = yaml.load(f, Loader=yaml.FullLoader)
103+
except Exception as e:
104+
self.shutdown(f"Error opening topic_configs file: {e}")
105+
self_type = self.robot_configs[self.this_robot]["node-type"]
106+
if self_type not in self.topic_configs.keys():
85107
self.shutdown("Node type not in config file")
86108

87109
# Check that we can ping the radios
@@ -92,26 +114,18 @@ def __init__(self, node=None):
92114
self.shutdown("Cannot ping self")
93115
return
94116

95-
# Create a database server object with ROS2 node
117+
# Create database server
96118
self.DBServer = ds.DatabaseServer(self.robot_configs,
97-
self.topic_configs, self.this_robot, node=self.node)
119+
self.topic_configs, self.this_robot, self)
98120

99-
self.num_robot_in_comm = 0
100121

101-
# Handle possible interruptions
102-
self.interrupted = False
103-
104-
def signal_handler(sig, frame):
105-
self.logger.warning(f"{self.this_robot} - Integrate - " +
106-
f"Got signal. Killing comm nodes.")
107-
self.interrupted = True
108-
self.shutdown("Killed by user")
109-
signal.signal(signal.SIGINT, signal_handler)
122+
self.num_robot_in_comm = 0
110123

111124
self.logger.info(f"{self.this_robot} - Integrate - " +
112125
"Created all communication channels!")
113126

114127
# Start comm channels with other robots
128+
self.all_channels = []
115129
self.other_robots = [i for i in list(self.robot_configs.keys()) if i !=
116130
self.this_robot]
117131
for other_robot in self.other_robots:
@@ -126,7 +140,7 @@ def signal_handler(sig, frame):
126140
channel = sync.Channel(self.DBServer.dbl,
127141
self.this_robot,
128142
other_robot, self.robot_configs,
129-
self.client_timeout)
143+
self.client_timeout, self)
130144
self.all_channels.append(channel)
131145
channel.run()
132146

@@ -135,8 +149,8 @@ def signal_handler(sig, frame):
135149
# such as using a timer to periodically trigger the sync
136150
def make_callback(ch):
137151
return lambda msg: self.rssi_cb(msg, ch)
138-
139-
self.node.create_subscription(
152+
153+
self.create_subscription(
140154
std_msgs.msg.Int32,
141155
'ddb/rajant/rssi/' + other_robot,
142156
make_callback(channel),
@@ -149,26 +163,41 @@ def make_callback(ch):
149163
self.shutdown("Killed while waiting for other robots")
150164
return
151165
time.sleep(0.1)
166+
# Node is ready
167+
self.mtexecutor = MultiThreadedExecutor(num_threads=4)
168+
self.mtexecutor.add_node(self)
152169

153-
# Spin!
154-
try:
155-
rclpy.spin(self.node)
156-
except KeyboardInterrupt:
157-
pass
170+
def run(self):
171+
self.mtexecutor.spin()
158172

159173
def shutdown(self, reason):
174+
# Only trigger shutdown once
175+
if self.shutdownTriggered.is_set():
176+
sys.exit(1)
177+
return
178+
self.shutdownTriggered.set()
179+
160180
assert isinstance(reason, str)
161181
self.logger.error(f"{self.this_robot} - Integrate - " + reason)
162-
for channel in self.all_channels:
163-
channel.comm_node.terminate()
164-
self.all_channels.remove(channel)
165-
self.logger.warning(f"{self.this_robot} - Integrate - " + "Killed Channels")
166-
if hasattr(self, 'DBServer') and self.DBServer is not None:
167-
self.DBServer.shutdown()
168-
self.logger.warning(f"{self.this_robot} - Integrate - " + "Killed DB")
169-
if rclpy.ok():
170-
rclpy.shutdown()
171-
self.logger.warning(f"{self.this_robot} - Integrate - " + "Shutting down")
182+
# Shutting down communication channels
183+
if hasattr(self, 'all_channels') and len(self.all_channels) != 0:
184+
for channel in self.all_channels:
185+
channel.stop()
186+
self.all_channels.remove(channel)
187+
self.logger.warning(f"{self.this_robot} - Integrate - " + "Killed Channels")
188+
# Wait for all the channels to be gone. This needs to be slightly
189+
# larger than RECV_TIMEOUT
190+
time.sleep(3.5)
191+
192+
pdb.set_trace()
193+
194+
# Stop the executor to exit the spin loop
195+
if hasattr(self, 'mtexecutor') and self.mtexecutor is not None:
196+
self.mtexecutor.shutdown()
197+
self.logger.warning(f"{self.this_robot} - Integrate - " + "Executor shut down")
198+
self.destroy_node()
199+
self.logger.warning(f"{self.this_robot} - Integrate - " + "Node destroyed")
200+
sys.exit(1)
172201

173202
def rssi_cb(self, data, comm_node):
174203
rssi = data.data
@@ -181,9 +210,26 @@ def rssi_cb(self, data, comm_node):
181210
traceback.print_exception(*sys.exc_info())
182211

183212

184-
def main():
213+
def main(args=None):
214+
# Initialize ROS2 with command line arguments
215+
rclpy.init(args=args)
216+
185217
# Start the node
186-
IntegrateDatabase()
218+
try:
219+
integrate_db = IntegrateDatabase()
220+
except RuntimeError as e:
221+
print(f"Node initialization failed: {e}")
222+
return
223+
224+
try:
225+
integrate_db.run()
226+
except KeyboardInterrupt:
227+
print("Keyboard Interrupt")
228+
pass
229+
except Exception as e:
230+
print(e)
231+
finally:
232+
integrate_db.shutdown("Shutting down")
187233

188234
if __name__ == "__main__":
189235
main()

0 commit comments

Comments
 (0)