Skip to content

Commit 7595401

Browse files
committed
Fixed rogue thread issue on integrate_database
1 parent 5fca798 commit 7595401

File tree

2 files changed

+40
-48
lines changed

2 files changed

+40
-48
lines changed

mocha_core/mocha_core/integrate_database.py

Lines changed: 35 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import mocha_core.database_server as ds
2323
import mocha_core.database_utils as du
2424
import mocha_core.synchronize_channel as sync
25-
from mocha_core.srv import AddUpdateDB, GetDataHeaderDB, SelectDB
2625

2726

2827
def ping(host):
@@ -41,13 +40,11 @@ def __init__(self):
4140
super().__init__("integrate_database")
4241

4342
# Handle shutdown signal
44-
self.interrupted = False
4543
self.shutdownTriggered = threading.Event()
4644
self.shutdownTriggered.clear()
45+
4746
def signal_handler(sig, frame):
48-
self.logger.warning(f"{self.this_robot} - Integrate - " +
49-
f"Got SIGINT. Triggering shutdown.")
50-
self.interrupted = True
47+
self.logger.warning(f"{self.this_robot} - Integrate - Got SIGINT. Triggering shutdown.")
5148
self.shutdown("Killed by user")
5249
signal.signal(signal.SIGINT, signal_handler)
5350

@@ -66,7 +63,8 @@ def signal_handler(sig, frame):
6663
self.rssi_threshold = self.get_parameter("rssi_threshold").get_parameter_value().integer_value
6764

6865
if len(self.this_robot) == 0:
69-
self.shutdown("Empty robot name")
66+
self.logger.error(f"{self.this_robot} - Integrate - Empty robot name")
67+
raise ValueError("Empty robot name")
7068

7169
self.logger.info(f"{self.this_robot} - Integrate - " +
7270
f"RSSI threshold: {self.rssi_threshold}")
@@ -80,39 +78,44 @@ def signal_handler(sig, frame):
8078
with open(self.robot_configs_file, "r") as f:
8179
self.robot_configs = yaml.load(f, Loader=yaml.FullLoader)
8280
except Exception as e:
83-
self.shutdown(f"Error opening robot_configs file: {e}")
81+
self.logger.error(f"{self.this_robot} - Integrate - robot_configs file")
82+
raise e
8483
if self.this_robot not in self.robot_configs.keys():
85-
self.shutdown("Robot not in config file")
84+
self.logger.error(f"{self.this_robot} - Integrate - robot_configs file")
85+
raise ValueError("Robot not in config file")
8686

8787
# Load and check radio configs
8888
self.radio_configs_file = self.get_parameter("radio_configs").get_parameter_value().string_value
8989
try:
9090
with open(self.radio_configs_file, "r") as f:
9191
self.radio_configs = yaml.load(f, Loader=yaml.FullLoader)
9292
except Exception as e:
93-
self.shutdown(f"Error opening radio_configs file: {e}")
93+
self.logger.error(f"{self.this_robot} - Integrate - radio_configs file")
94+
raise e
9495
self.radio = self.robot_configs[self.this_robot]["using-radio"]
9596
if self.radio not in self.radio_configs.keys():
96-
self.shutdown("Radio {self.radio} not in config file")
97+
self.logger.error(f"{self.this_robot} - Integrate - radio_configs file")
98+
raise ValueError("Radio {self.radio} not in config file")
9799

98100
# Load and check topic configs
99101
self.topic_configs_file = self.get_parameter("topic_configs").get_parameter_value().string_value
100102
try:
101103
with open(self.topic_configs_file, "r") as f:
102104
self.topic_configs = yaml.load(f, Loader=yaml.FullLoader)
103105
except Exception as e:
104-
self.shutdown(f"Error opening topic_configs file: {e}")
106+
self.logger.error(f"{self.this_robot} - Integrate - topics_configs file")
107+
raise e
105108
self_type = self.robot_configs[self.this_robot]["node-type"]
106109
if self_type not in self.topic_configs.keys():
107-
self.shutdown("Node type not in config file")
110+
self.logger.error(f"{self.this_robot} - Integrate - topics_configs file")
111+
raise ValueError("Node type not in config file")
108112

109113
# Check that we can ping the radios
110114
ip = self.robot_configs[self.this_robot]["IP-address"]
111115
if not ping(ip):
112116
self.logger.error(f"{self.this_robot} - Integrate - " +
113117
f"Cannot ping self {ip}. Is the radio on?")
114-
self.shutdown("Cannot ping self")
115-
return
118+
raise ValueError("Cannot ping self")
116119

117120
# Create database server
118121
self.DBServer = ds.DatabaseServer(self.robot_configs,
@@ -141,8 +144,8 @@ def signal_handler(sig, frame):
141144
self.this_robot,
142145
other_robot, self.robot_configs,
143146
self.client_timeout, self)
144-
self.all_channels.append(channel)
145147
channel.run()
148+
self.all_channels.append(channel)
146149

147150
# Attach a radio trigger to each channel. This will be triggered
148151
# when the RSSI is high enough. You can use another approach here
@@ -157,23 +160,9 @@ def make_callback(ch):
157160
10
158161
)
159162

160-
# Wait for all the robots to start
161-
for _ in range(100):
162-
if self.interrupted or not rclpy.ok():
163-
self.shutdown("Killed while waiting for other robots")
164-
return
165-
time.sleep(0.1)
166-
# Node is ready
167-
self.mtexecutor = MultiThreadedExecutor(num_threads=4)
168-
self.mtexecutor.add_node(self)
169-
170-
def run(self):
171-
self.mtexecutor.spin()
172-
173163
def shutdown(self, reason):
174164
# Only trigger shutdown once
175165
if self.shutdownTriggered.is_set():
176-
sys.exit(1)
177166
return
178167
self.shutdownTriggered.set()
179168

@@ -183,21 +172,11 @@ def shutdown(self, reason):
183172
if hasattr(self, 'all_channels') and len(self.all_channels) != 0:
184173
for channel in self.all_channels:
185174
channel.stop()
186-
self.all_channels.remove(channel)
187175
self.logger.warning(f"{self.this_robot} - Integrate - " + "Killed Channels")
188176
# Wait for all the channels to be gone. This needs to be slightly
189177
# larger than RECV_TIMEOUT
190178
time.sleep(3.5)
191-
192-
pdb.set_trace()
193-
194-
# Stop the executor to exit the spin loop
195-
if hasattr(self, 'mtexecutor') and self.mtexecutor is not None:
196-
self.mtexecutor.shutdown()
197-
self.logger.warning(f"{self.this_robot} - Integrate - " + "Executor shut down")
198-
self.destroy_node()
199-
self.logger.warning(f"{self.this_robot} - Integrate - " + "Node destroyed")
200-
sys.exit(1)
179+
self.logger.warning(f"{self.this_robot} - Integrate - " + "Shutdown complete")
201180

202181
def rssi_cb(self, data, comm_node):
203182
rssi = data.data
@@ -217,19 +196,30 @@ def main(args=None):
217196
# Start the node
218197
try:
219198
integrate_db = IntegrateDatabase()
220-
except RuntimeError as e:
199+
except Exception as e:
221200
print(f"Node initialization failed: {e}")
201+
rclpy.shutdown()
222202
return
223203

204+
# Load mtexecutor
205+
mtexecutor = MultiThreadedExecutor(num_threads=4)
206+
mtexecutor.add_node(integrate_db)
207+
208+
# Use context manager for clean shutdown
224209
try:
225-
integrate_db.run()
210+
# Spin with periodic checking for shutdown
211+
while rclpy.ok() and not integrate_db.shutdownTriggered.is_set():
212+
mtexecutor.spin_once(timeout_sec=0.1)
226213
except KeyboardInterrupt:
227214
print("Keyboard Interrupt")
228-
pass
215+
integrate_db.shutdown("KeyboardInterrupt")
229216
except Exception as e:
230-
print(e)
217+
print(f"Exception: {e}")
218+
integrate_db.shutdown(f"Exception: {e}")
231219
finally:
232-
integrate_db.shutdown("Shutting down")
220+
# Clean up node and ROS2 from main thread (safe)
221+
integrate_db.destroy_node()
222+
rclpy.shutdown()
233223

234224
if __name__ == "__main__":
235225
main()

mocha_core/mocha_core/zmq_comm_node.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,8 @@ def __init__(self, this_node, client_node, robot_configs,
8282

8383
# Start server thread
8484
self.th = threading.Thread(target=self.server_thread, args=())
85-
self.server_running = True
85+
self.server_running = threading.Event()
86+
self.server_running.clear()
8687
self.th.start()
8788

8889
def connect_send_message(self, msg):
@@ -185,6 +186,7 @@ def connect_send_message(self, msg):
185186
self.syncStatus_lock.release()
186187

187188
def server_thread(self):
189+
self.server_running.set()
188190
# This timer does not have a big impact as it is only the timer until
189191
# the recv times out Most calls from the client are very lightweight
190192
RECV_TIMEOUT = 1000
@@ -195,7 +197,7 @@ def server_thread(self):
195197

196198
self.server.bind("tcp://*:" + str(port))
197199

198-
while self.server_running:
200+
while self.server_running.is_set():
199201
try:
200202
request = self.server.recv()
201203
except zmq.ZMQError as e:
@@ -230,6 +232,6 @@ def server_thread(self):
230232
self.context.term()
231233

232234
def terminate(self):
233-
self.server_running = False
235+
self.server_running.clear()
234236
self.th.join()
235237
self.logger.warn(f"Node {self.this_node} <- {self.client_node} - Terminating server")

0 commit comments

Comments
 (0)