11#!/usr/bin/env python3
22
33import os
4- import sys
4+ import pdb
55import time
66import unittest
7- import warnings
7+ import threading
88
99import rclpy
10+ from rclpy .node import Node
1011import yaml
1112from colorama import Fore , Style
13+ import mocha_core .database_server as ds
14+ import mocha_core .topic_publisher as tp
15+ from mocha_core .database import DBMessage
16+ from rclpy .executors import MultiThreadedExecutor
17+ import geometry_msgs .msg
18+
19+ class Database_server_test (Node ):
20+ def __init__ (self ):
21+ super ().__init__ ("mocha" )
1222
1323
1424class test_topic_publisher (unittest .TestCase ):
25+ @classmethod
26+ def setUpClass (cls ):
27+ # Load configurations at the class level
28+ current_dir = os .path .dirname (os .path .abspath (__file__ ))
29+ ddb_path = os .path .join (current_dir , ".." )
30+
31+ # Load robot configs
32+ cls .robot_configs_path = os .path .join (ddb_path , "config/testConfigs/robot_configs.yaml" )
33+ with open (cls .robot_configs_path , "r" ) as f :
34+ cls .robot_configs = yaml .load (f , Loader = yaml .FullLoader )
35+
36+ # Load topic configs
37+ cls .topic_configs_path = os .path .join (ddb_path , "config/testConfigs/topic_configs.yaml" )
38+ with open (cls .topic_configs_path , "r" ) as f :
39+ cls .topic_configs = yaml .load (f , Loader = yaml .FullLoader )
40+ cls .robot_name = "charon"
41+
1542 def setUp (self ):
1643 test_name = self ._testMethodName
1744 print ("\n " , Fore .RED , 20 * "=" , test_name , 20 * "=" , Style .RESET_ALL )
45+ rclpy .init ()
46+ self .test_server_node = Database_server_test ()
47+ self .executor_db = MultiThreadedExecutor (num_threads = 4 )
48+ self .executor_db .add_node (self .test_server_node )
49+ executor_thread_db = threading .Thread (target = self .executor_db .spin , daemon = True )
50+ executor_thread_db .start ()
1851
19- # Ignore pesky warnings about sockets
20- warnings .filterwarnings (
21- action = "ignore" , message = "unclosed" , category = ResourceWarning
22- )
23-
24- # Initialize ROS2 if not already done
25- if not rclpy .ok ():
26- rclpy .init ()
52+ # Create a database server object with the node (this will create the services)
53+ self .dbs = ds .DatabaseServer (self .robot_configs , self .topic_configs ,
54+ self .robot_name , self .test_server_node )
2755
56+ # Create the topic_puiblisher node
57+ self .test_topic_publisher_node = tp .TopicPublisherNode (self .robot_name ,
58+ self .robot_configs_path ,
59+ self .topic_configs_path )
60+ self .executor_tp = MultiThreadedExecutor (num_threads = 4 )
61+ self .executor_tp .add_node (self .test_topic_publisher_node )
62+ self .logger = self .test_topic_publisher_node .get_logger ()
63+ executor_thread_tp = threading .Thread (target = self .executor_tp .spin , daemon = True )
64+ executor_thread_tp .start ()
65+ time .sleep (1 ) # Wait for nodes to be set up
2866 super ().setUp ()
2967
3068 def tearDown (self ):
31- # Cleanup topic publisher
32- if hasattr ( self , 'topic_publisher' ):
33- self .topic_publisher . shutdown ()
34- if hasattr ( self , 'publisher_node' ):
35- self .publisher_node .destroy_node ()
36-
37- time .sleep (1 )
69+ self . test_topic_publisher_node . shutdown ()
70+ self . executor_db . shutdown ()
71+ self .test_server_node . destroy_node ()
72+ self . executor_tp . shutdown ()
73+ self .test_topic_publisher_node .destroy_node ()
74+ rclpy . shutdown ()
75+ time .sleep (1 ) # Wait final teardown
3876 super ().tearDown ()
3977
4078 def test_topic_publisher_creates_topics (self ):
41- """Test that topic publisher creates expected topics based on testConfigs"""
42- # Create topic publisher node for basestation
43- self .publisher_node , self .topic_publisher = tp .create_topic_publisher_node ("basestation" )
44-
45- # Wait for setup
46- time .sleep (1.0 )
47-
79+ """ Verify that all the topics created are there with the corresponding
80+ namespace """
81+
4882 # Get all topics that have publishers
49- topic_names_and_types = self .publisher_node .get_topic_names_and_types ()
50- topic_names = [name for name , _ in topic_names_and_types ]
51-
52- # Based on testConfigs, we should have topics for:
53- # charon (ground_robot): /charon/odometry, /charon/pose
54- # quad1 (aerial_robot): /quad1/image, /quad1/pose
55- # basestation (base_station): /basestation/target_goals
56- expected_topics = [
57- "/charon/odometry" ,
58- "/charon/pose" ,
59- "/quad1/image" ,
60- "/quad1/pose" ,
61- "/basestation/target_goals"
62- ]
63-
64- # Verify that each expected topic exists
65- for expected_topic in expected_topics :
66- publishers = self .publisher_node .get_publishers_info_by_topic (expected_topic )
67- self .assertGreater (len (publishers ), 0 ,
68- f"No publisher found for expected topic: { expected_topic } " )
69-
70-
71- # Add the mocha_core module path for imports
72- current_dir = os .path .dirname (os .path .abspath (__file__ ))
73- mocha_core_path = os .path .join (current_dir , ".." , "mocha_core" )
74- sys .path .append (mocha_core_path )
75-
76- import topic_publisher as tp
83+ topic_names_and_types = self .test_topic_publisher_node .get_topic_names_and_types ()
84+
85+ expected_names_and_types = {'/basestation/target_goals' :
86+ 'geometry_msgs/msg/PoseArray' ,
87+ '/charon/odometry' :
88+ 'nav_msgs/msg/Odometry' ,
89+ '/charon/pose' :
90+ 'geometry_msgs/msg/PointStamped' ,
91+ '/mocha/database_cb' :
92+ 'mocha_core/msg/DatabaseCB' ,
93+ '/quad1/image' :
94+ 'sensor_msgs/msg/Image' ,
95+ '/quad1/pose' :
96+ 'geometry_msgs/msg/PointStamped' ,
97+ '/styx/odometry' :
98+ 'nav_msgs/msg/Odometry' ,
99+ '/styx/pose' :
100+ 'geometry_msgs/msg/PointStamped' }
101+
102+ # Verify that the topics are there
103+ for name , topictype in topic_names_and_types :
104+ # Skip generic topics
105+ if name == "/parameter_events" or name == "/rosout" :
106+ continue
107+ self .assertIn (name , expected_names_and_types .keys ())
108+ self .assertEqual (topictype [0 ], expected_names_and_types [name ])
109+
110+ def test_publisher_publish_msg (self ):
111+ """ Verify that when a message is added to the database we will get this
112+ message published by the publisher """
113+
114+ # Create a new topic to listen to the publisher
115+ class ListenerNode (Node ):
116+ def __init__ (self ):
117+ super ().__init__ ("listener_node" )
118+ self .subscriptor = self .create_subscription (
119+ geometry_msgs .msg .PointStamped ,
120+ '/charon/pose' ,
121+ self .topic_cb ,
122+ qos_profile = tp .QOS_PROFILE
123+ )
124+ self .result = None
125+ self .counter = 0
126+
127+ def topic_cb (self , data ):
128+ self .result = data
129+ self .counter += 1
130+
131+ test_listener_node = ListenerNode ()
132+ executor = MultiThreadedExecutor (num_threads = 4 )
133+ executor .add_node (test_listener_node )
134+ executor_thread = threading .Thread (target = executor .spin , daemon = True )
135+ executor_thread .start ()
136+ time .sleep (1 )
137+ # Insert a message known to be from charon, pose
138+ bin_data = b'\x04 "M\x18 h@,\x00 \x00 \x00 \x00 \x00 \x00 \x00 <\x12 \x00 \x00 \x00 5\x00 \x01 \x00 \x01 \x00 \x07 \x0b \x00 \x0c \x02 \x00 P\x00 \x00 \x00 \x00 \x00 \x00 \x00 \x00 \x00 '
139+ msg_count = 10
140+ for i in range (msg_count ):
141+ msg = DBMessage (1 , 1 , 0 , 0 ,
142+ rclpy .time .Time (nanoseconds = 1083902000000 + i * 1000000 ),
143+ bin_data )
144+ # Insert the message in the database in a crude way
145+ self .dbs .dbl .add_modify_data (msg )
146+ # Let the publishers publish
147+ time .sleep (3 )
148+ executor .shutdown ()
149+ test_listener_node .destroy_node ()
150+ time .sleep (1 )
151+ # We should have msg_count messages on the reader
152+ self .assertEqual (msg_count , test_listener_node .counter )
153+ self .assertIsInstance (test_listener_node .result ,
154+ geometry_msgs .msg ._point_stamped .PointStamped )
155+
77156
78157if __name__ == "__main__" :
79- unittest .main ()
158+ unittest .main (failfast = True )
0 commit comments