2828from rclpy .node import Node
2929
3030
31- @pytest .mark .launch_test
32- @launch_testing .markers .keep_alive
33- def generate_test_description ():
34- launch_actions = []
35- node_names = []
36-
37- for i in range (3 ):
38- launch_actions .append (
39- launch_ros .actions .Node (
40- executable = 'talker' ,
41- package = 'demo_nodes_cpp' ,
42- name = 'demo_node_' + str (i )
43- )
44- )
45- node_names .append ('demo_node_' + str (i ))
46-
47- launch_actions .append (launch_testing .actions .ReadyToTest ())
48- return launch .LaunchDescription (launch_actions ), {'node_list' : node_names }
49-
50-
51- class CheckMultipleNodesLaunched (unittest .TestCase ):
52-
53- def test_nodes_successful (self , node_list ):
54- """Check if all the nodes were launched correctly."""
55- # Method 1
56- wait_for_nodes_1 = WaitForNodes (node_list , timeout = 10.0 )
57- assert wait_for_nodes_1 .wait ()
58- assert wait_for_nodes_1 .get_nodes_not_found () == set ()
59- wait_for_nodes_1 .shutdown ()
60-
61- # Method 2
62- with WaitForNodes (node_list , timeout = 10.0 ) as wait_for_nodes_2 :
63- print ('All nodes were found !' )
64- assert wait_for_nodes_2 .get_nodes_not_found () == set ()
65-
66- def test_node_does_not_exist (self , node_list ):
67- """Insert a invalid node name that should not exist."""
68- invalid_node_list = node_list + ['invalid_node' ]
69-
70- # Method 1
71- wait_for_nodes_1 = WaitForNodes (invalid_node_list , timeout = 10.0 )
72- assert not wait_for_nodes_1 .wait ()
73- assert wait_for_nodes_1 .get_nodes_not_found () == {'invalid_node' }
74- wait_for_nodes_1 .shutdown ()
75-
76- # Method 2
77- with pytest .raises (RuntimeError ):
78- with WaitForNodes (invalid_node_list , timeout = 10.0 ):
79- pass
80-
81-
8231# TODO (adityapande-1995): Move WaitForNodes implementation to launch_testing_ros
8332# after https://github.com/ros2/rclpy/issues/831 is resolved
8433class WaitForNodes :
@@ -90,45 +39,41 @@ class WaitForNodes:
9039 # Method 1, calling wait() and shutdown() manually
9140 def method_1():
9241 node_list = ['foo', 'bar']
93- wait_for_nodes = WaitForNodes(node_list, timeout=5 .0)
42+ wait_for_nodes = WaitForNodes(node_list, timeout=15 .0)
9443 assert wait_for_nodes.wait()
9544 print('Nodes found!')
9645 assert wait_for_nodes.get_nodes_not_found() == set()
9746 wait_for_nodes.shutdown()
9847
9948 # Method 2, using the 'with' keyword
10049 def method_2():
101- with WaitForNodes(['foo', 'bar'], timeout=5 .0) as wait_for_nodes:
50+ with WaitForNodes(['foo', 'bar'], timeout=15 .0) as wait_for_nodes:
10251 assert wait_for_nodes.get_nodes_not_found() == set()
10352 print('Nodes found!')
10453 """
10554
106- def __init__ (self , node_names , timeout = 5 .0 ):
55+ def __init__ (self , node_names , timeout = 15 .0 ):
10756 self .node_names = node_names
10857 self .timeout = timeout
10958 self .__ros_context = rclpy .Context ()
11059 rclpy .init (context = self .__ros_context )
111- self ._prepare_node ()
112-
113- self .__expected_nodes_set = set (node_names )
114- self .__nodes_found = None
115-
116- def _prepare_node (self ):
11760 self .__node_name = '_test_node_' + \
11861 '' .join (random .choices (string .ascii_uppercase + string .digits , k = 10 ))
11962 self .__ros_node = Node (node_name = self .__node_name , context = self .__ros_context )
12063
64+ self .__expected_nodes_set = set (node_names )
65+ self .__nodes_found = set ()
66+
12167 def wait (self ):
12268 start = time .time ()
123- flag = False
124- print ('Waiting for nodes' )
125- while time .time () - start < self .timeout and not flag :
126- flag = all (name in self .__ros_node .get_node_names () for name in self .node_names )
127- time .sleep (0.3 )
69+ finished = False
70+ while time .time () - start < self .timeout and not finished :
71+ finished = all (name in self .__ros_node .get_node_names () for name in self .node_names )
72+ time .sleep (0.1 )
12873
12974 self .__nodes_found = set (self .__ros_node .get_node_names ())
13075 self .__nodes_found .remove (self .__node_name )
131- return flag
76+ return finished
13277
13378 def shutdown (self ):
13479 self .__ros_node .destroy_node ()
@@ -150,3 +95,55 @@ def get_nodes_found(self):
15095
15196 def get_nodes_not_found (self ):
15297 return self .__expected_nodes_set - self .__nodes_found
98+
99+
100+ @pytest .mark .launch_test
101+ @launch_testing .markers .keep_alive
102+ def generate_test_description ():
103+ launch_actions = []
104+ node_names = []
105+
106+ for i in range (3 ):
107+ node_name = 'demo_node_' + str (i )
108+ launch_actions .append (
109+ launch_ros .actions .Node (
110+ executable = 'talker' ,
111+ package = 'demo_nodes_cpp' ,
112+ name = node_name
113+ )
114+ )
115+ node_names .append (node_name )
116+
117+ launch_actions .append (launch_testing .actions .ReadyToTest ())
118+ return launch .LaunchDescription (launch_actions ), {'node_list' : node_names }
119+
120+
121+ class CheckMultipleNodesLaunched (unittest .TestCase ):
122+
123+ def test_nodes_successful (self , node_list ):
124+ """Check if all the nodes were launched correctly."""
125+ # Method 1
126+ wait_for_nodes_1 = WaitForNodes (node_list )
127+ assert wait_for_nodes_1 .wait ()
128+ assert wait_for_nodes_1 .get_nodes_not_found () == set ()
129+ wait_for_nodes_1 .shutdown ()
130+
131+ # Method 2
132+ with WaitForNodes (node_list ) as wait_for_nodes_2 :
133+ print ('All nodes were found !' )
134+ assert wait_for_nodes_2 .get_nodes_not_found () == set ()
135+
136+ def test_node_does_not_exist (self , node_list ):
137+ """Insert a invalid node name that should not exist."""
138+ invalid_node_list = node_list + ['invalid_node' ]
139+
140+ # Method 1
141+ wait_for_nodes_1 = WaitForNodes (invalid_node_list )
142+ assert not wait_for_nodes_1 .wait ()
143+ assert wait_for_nodes_1 .get_nodes_not_found () == {'invalid_node' }
144+ wait_for_nodes_1 .shutdown ()
145+
146+ # Method 2
147+ with pytest .raises (RuntimeError ):
148+ with WaitForNodes (invalid_node_list ):
149+ pass
0 commit comments