Skip to content

Commit 1b0793c

Browse files
committed
First version of the test_translator storm
1 parent 9d9c129 commit 1b0793c

File tree

2 files changed

+118
-9
lines changed

2 files changed

+118
-9
lines changed

mocha_core/mocha_core/translator.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,15 @@
1111
import mocha_core.srv
1212
# Get the database utils module
1313
from rclpy.executors import MultiThreadedExecutor
14+
from rclpy.callback_groups import ReentrantCallbackGroup
1415
import mocha_core.database_utils as du
1516
from mocha_core.srv import AddUpdateDB, GetDataHeaderDB, SelectDB
1617
import mocha_core.database_server as ds
1718

1819

1920
class Translator():
2021
def __init__(
21-
self, this_robot, this_robot_id, topic_name, topic_id, msg_type, ros_node
22+
self, this_robot, this_robot_id, topic_name, topic_id, msg_type, ros_node, callback_group=None
2223
):
2324

2425
self.logger = ros_node.get_logger()
@@ -39,9 +40,10 @@ def __init__(
3940
if wait_counter % 5 == 0:
4041
self.logger.warn("Translator - Waiting for add_update_db for more than 5 seconds")
4142

42-
# Create subscriber
43+
# Create subscriber with callback group
4344
self.subscription = self.ros_node.create_subscription(
44-
msg_type, self.topic_name, self.translator_cb, 10
45+
msg_type, self.topic_name, self.translator_cb, 10,
46+
callback_group=callback_group
4547
)
4648
self.logger.info(f"Translator created for {self.topic_name}")
4749

@@ -78,6 +80,9 @@ def __init__(self, this_robot=None, robot_configs=None, topic_configs=None):
7880
self.declare_parameter("topic_configs", "")
7981
self.logger = self.get_logger()
8082

83+
# Create reentrant callback group like DatabaseServer
84+
self.callback_group = ReentrantCallbackGroup()
85+
8186
self.this_robot = self.get_parameter("robot_name").get_parameter_value().string_value if this_robot is None else this_robot
8287

8388
if len(self.this_robot) == 0:
@@ -121,7 +126,7 @@ def __init__(self, this_robot=None, robot_configs=None, topic_configs=None):
121126
robot_id = du.get_robot_id_from_name(self.robot_configs, self.this_robot)
122127
obj = self.msg_types[robot_id][topic_id]["obj"]
123128
translator = Translator(
124-
self.this_robot, robot_id, topic["msg_topic"], topic_id, obj, self
129+
self.this_robot, robot_id, topic["msg_topic"], topic_id, obj, self, self.callback_group
125130
)
126131
translators.append(translator)
127132

mocha_core/test/test_translator.py

Lines changed: 109 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
class Database_server_test(Node):
3131
def __init__(self):
32+
# Important to match the topic that the translator expects
3233
super().__init__("integrate_database")
3334

3435

@@ -56,7 +57,7 @@ def setUp(self):
5657

5758
rclpy.init()
5859
self.test_server_node = Database_server_test()
59-
self.executor = MultiThreadedExecutor(num_threads=4)
60+
self.executor = MultiThreadedExecutor(num_threads=1)
6061
self.executor.add_node(self.test_server_node)
6162
executor_thread = threading.Thread(target=self.executor.spin, daemon=True)
6263
executor_thread.start()
@@ -77,7 +78,7 @@ def setUpTranslator(self):
7778
robot_configs=self.robot_configs_path,
7879
topic_configs=self.topic_configs_path
7980
)
80-
executor = SingleThreadedExecutor()
81+
executor = MultiThreadedExecutor(num_threads=4)
8182
executor.add_node(test_translator_node)
8283
logger = test_translator_node.get_logger()
8384
executor_thread = threading.Thread(target=executor.spin, daemon=True)
@@ -134,7 +135,7 @@ def test_translator_processes_messages(self):
134135
sample_msg.point.x = random.random()
135136
sample_msg.point.y = random.random()
136137
sample_msg.point.z = random.random()
137-
sample_msg.header.stamp = rclpy.clock.Clock().now().to_msg()
138+
sample_msg.header.stamp = translator_node.get_clock().now().to_msg()
138139

139140
# Publish the message (translator should receive and store it)
140141
test_publisher.publish(sample_msg)
@@ -148,8 +149,8 @@ def test_translator_processes_messages(self):
148149
# Create request to check database
149150
req = SelectDB.Request()
150151
req.robot_id = robot_id
151-
req.topic_id = 0 # All topics
152-
req.ts_limit = rclpy.clock.Clock().now().to_msg()
152+
req.topic_id = 0 # Not being used
153+
req.ts_limit = translator_node.get_clock().now().to_msg() # Not being used
153154

154155
# Use the translator node's service client to query the database
155156
select_client = translator_node.create_client(
@@ -208,5 +209,108 @@ def test_create_translators_for_robot(self):
208209
self.assertEqual(len(subscriptions_info), 2,
209210
f"Expected exactly 1 subscription for {topic_name} topic, got {len(subscriptions_info)}")
210211

212+
def test_translator_storm(self):
213+
"""Storm test: publish many messages to translator topics and verify they're all stored"""
214+
MSGS_PER_TOPIC = 20
215+
216+
# Create translator node
217+
translator_node, logger, _ = self.setUpTranslator()
218+
219+
# Wait for translator to be fully set up
220+
time.sleep(1.0)
221+
222+
# Get available topics for this robot
223+
this_robot_topics = self.topic_configs[self.robot_configs[self.robot_name]["node-type"]]
224+
msg_types = du.msg_types(self.robot_configs, self.topic_configs, translator_node)
225+
robot_id = du.get_robot_id_from_name(self.robot_configs, self.robot_name)
226+
227+
def topic_publisher(topic_id, topic_info, msgs_count):
228+
"""Publisher function for a single topic - runs in its own thread"""
229+
topic_name = topic_info["msg_topic"]
230+
231+
# Create a separate node for publishing
232+
pub_node = Node(f"topic_publisher_{topic_name.replace('/', '_')}")
233+
234+
# Get message type for this topic
235+
obj = msg_types[robot_id][topic_id]["obj"]
236+
pub = pub_node.create_publisher(obj, topic_name, 10)
237+
238+
# Wait for publisher to connect
239+
time.sleep(0.5)
240+
241+
# Publish messages with 2ms delay between each
242+
for i in range(msgs_count):
243+
# Create random message based on topic type
244+
if topic_name == "/pose":
245+
msg = PointStamped()
246+
msg.header.frame_id = "world"
247+
msg.point.x = random.random()
248+
msg.point.y = random.random()
249+
msg.point.z = random.random()
250+
msg.header.stamp = pub_node.get_clock().now().to_msg()
251+
elif topic_name == "/odometry":
252+
msg = Odometry()
253+
msg.header.frame_id = "world"
254+
msg.pose.pose.position.x = random.random()
255+
msg.pose.pose.position.y = random.random()
256+
msg.pose.pose.position.z = random.random()
257+
msg.header.stamp = pub_node.get_clock().now().to_msg()
258+
else:
259+
continue # Skip unknown topic types
260+
261+
pub.publish(msg)
262+
# Wait 300 ms between messages to avoid double publication
263+
time.sleep(0.3)
264+
265+
pub_node.destroy_node()
266+
267+
# Launch one thread per topic
268+
threads = []
269+
for topic_id, topic in enumerate(this_robot_topics):
270+
thread = threading.Thread(target=topic_publisher, args=(topic_id, topic, MSGS_PER_TOPIC))
271+
thread.start()
272+
threads.append(thread)
273+
274+
# Wait for all publishing threads to complete
275+
for thread in threads:
276+
thread.join()
277+
278+
# Wait for translator to process all messages
279+
time.sleep(1.0)
280+
281+
# Query the database to count stored messages
282+
try:
283+
robot_id = du.get_robot_id_from_name(self.robot_configs, self.robot_name)
284+
req = SelectDB.Request()
285+
req.robot_id = robot_id
286+
req.topic_id = 0 # This is not used as filtering is not implemented
287+
req.ts_limit = translator_node.get_clock().now().to_msg() # Not used
288+
289+
select_client = translator_node.create_client(
290+
SelectDB,
291+
"/integrate_database/select_db",
292+
qos_profile=ds.DatabaseServer.QOS_PROFILE
293+
)
294+
while not select_client.wait_for_service(timeout_sec=1.0):
295+
pass
296+
297+
future = select_client.call_async(req)
298+
rclpy.spin_until_future_complete(translator_node, future)
299+
answ = future.result()
300+
returned_headers = du.deserialize_headers(answ.headers)
301+
302+
# Calculate expected total messages
303+
# Each topic gets MSGS_PER_TOPIC messages
304+
# charon robot has 2 topics: /pose and /odometry
305+
expected_total = MSGS_PER_TOPIC * len(this_robot_topics)
306+
307+
# Verify all messages were stored
308+
self.assertEqual(len(returned_headers), expected_total,
309+
f"Expected {expected_total} messages in database, got {len(returned_headers)}")
310+
311+
except Exception as exc:
312+
print(f"Database query failed: {exc}")
313+
self.assertTrue(False)
314+
211315
if __name__ == "__main__":
212316
unittest.main()

0 commit comments

Comments
 (0)