1919from threading import Thread
2020
2121import rclpy
22+ from rclpy .event_handler import QoSSubscriptionMatchedInfo
23+ from rclpy .event_handler import SubscriptionEventCallbacks
2224from rclpy .executors import SingleThreadedExecutor
2325from rclpy .node import Node
2426
@@ -50,12 +52,32 @@ def method_2():
5052 print(wait_for_topics.topics_received()) # Should be {'topic_1', 'topic_2'}
5153 print(wait_for_topics.messages_received('topic_1')) # Should be [message_1, ...]
5254 wait_for_topics.shutdown()
55+
56+ # Method3, calling a trigger function before the wait. The trigger function takes
57+ # the WaitForTopics node object as the first argument. Any additional arguments have
58+ # to be passed to the wait(*args, **kwargs) method directly.
59+ def trigger_function(node, arg=""):
60+ node.get_logger().info('Trigger function called with argument: ' + arg)
61+
62+ def method_3():
63+ topic_list = [('topic_1', String), ('topic_2', String)]
64+ wait_for_topics = WaitForTopics(topic_list, timeout=5.0, trigger=trigger_function)
65+ # The trigger function will be called inside the wait() method after the
66+ # subscribers are created and before the publishers are connected.
67+ assert wait_for_topics.wait("Hello World!")
68+ print('Given topics are receiving messages !')
69+ wait_for_topics.shutdown()
5370 """
5471
55- def __init__ (self , topic_tuples , timeout = 5.0 , messages_received_buffer_length = 10 ) -> None :
72+ def __init__ (self , topic_tuples , timeout = 5.0 , messages_received_buffer_length = 10 ,
73+ trigger = None , node_namespace = None ) -> None :
5674 self .topic_tuples = topic_tuples
5775 self .timeout = timeout
5876 self .messages_received_buffer_length = messages_received_buffer_length
77+ self .trigger = trigger
78+ self .node_namespace = node_namespace
79+ if self .trigger is not None and not callable (self .trigger ):
80+ raise TypeError ('The passed trigger is not callable' )
5981 self .__ros_context = rclpy .Context ()
6082 rclpy .init (context = self .__ros_context )
6183 self .__ros_executor = SingleThreadedExecutor (context = self .__ros_context )
@@ -80,11 +102,15 @@ def _prepare_ros_node(self):
80102 name = node_name ,
81103 node_context = self .__ros_context ,
82104 messages_received_buffer_length = self .messages_received_buffer_length ,
105+ node_namespace = self .node_namespace
83106 )
84107 self .__ros_executor .add_node (self .__ros_node )
85108
86- def wait (self ):
109+ def wait (self , * args , ** kwargs ):
87110 self .__ros_node .start_subscribers (self .topic_tuples )
111+ if self .trigger :
112+ self .trigger (self .__ros_node , * args , ** kwargs )
113+ self .__ros_node .any_publisher_connected .wait (self .timeout )
88114 return self .__ros_node .msg_event_object .wait (self .timeout )
89115
90116 def shutdown (self ):
@@ -121,16 +147,26 @@ class _WaitForTopicsNode(Node):
121147 """Internal node used for subscribing to a set of topics."""
122148
123149 def __init__ (
124- self , name = 'test_node' , node_context = None , messages_received_buffer_length = None
150+ self , name = 'test_node' ,
151+ node_context = None ,
152+ messages_received_buffer_length = None ,
153+ node_namespace = None
125154 ) -> None :
126- super ().__init__ (node_name = name , context = node_context ) # type: ignore
155+ super ().__init__ (node_name = name , context = node_context , namespace = node_namespace )
127156 self .msg_event_object = Event ()
128157 self .messages_received_buffer_length = messages_received_buffer_length
129158 self .subscriber_list = []
130159 self .topic_tuples = []
131160 self .expected_topics = set ()
132161 self .received_topics = set ()
133162 self .received_messages_buffer = {}
163+ self .any_publisher_connected = Event ()
164+
165+ def _sub_matched_event_callback (self , info : QoSSubscriptionMatchedInfo ):
166+ if info .current_count != 0 :
167+ self .any_publisher_connected .set ()
168+ else :
169+ self .any_publisher_connected .clear ()
134170
135171 def _reset (self ):
136172 self .msg_event_object .clear ()
@@ -149,12 +185,16 @@ def start_subscribers(self, topic_tuples):
149185 maxlen = self .messages_received_buffer_length
150186 )
151187 # Create a subscriber
188+ sub_event_callback = SubscriptionEventCallbacks (
189+ matched = self ._sub_matched_event_callback
190+ )
152191 self .subscriber_list .append (
153192 self .create_subscription (
154193 topic_type ,
155194 topic_name ,
156195 self .callback_template (topic_name ),
157- 10
196+ 10 ,
197+ event_callbacks = sub_event_callback ,
158198 )
159199 )
160200
0 commit comments