|
| 1 | +# Copyright 2021 Open Source Robotics Foundation, Inc. |
| 2 | +# |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +# you may not use this file except in compliance with the License. |
| 5 | +# You may obtain a copy of the License at |
| 6 | +# |
| 7 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +# |
| 9 | +# Unless required by applicable law or agreed to in writing, software |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +# See the License for the specific language governing permissions and |
| 13 | +# limitations under the License. |
| 14 | + |
| 15 | +import random |
| 16 | +import string |
| 17 | +from threading import Event |
| 18 | +from threading import Thread |
| 19 | + |
| 20 | +import rclpy |
| 21 | +from rclpy.executors import SingleThreadedExecutor |
| 22 | +from rclpy.node import Node |
| 23 | + |
| 24 | + |
| 25 | +class WaitForTopics: |
| 26 | + """ |
| 27 | + Wait to receive messages on supplied topics. |
| 28 | +
|
| 29 | + Example usage: |
| 30 | + -------------- |
| 31 | +
|
| 32 | + from std_msgs.msg import String |
| 33 | +
|
| 34 | + # Method 1, using the 'with' keyword |
| 35 | + def method_1(): |
| 36 | + topic_list = [('topic_1', String), ('topic_2', String)] |
| 37 | + with WaitForTopics(topic_list, timeout=5.0): |
| 38 | + # 'topic_1' and 'topic_2' received at least one message each |
| 39 | + print('Given topics are receiving messages !') |
| 40 | +
|
| 41 | + # Method 2, calling wait() and shutdown() manually |
| 42 | + def method_2(): |
| 43 | + topic_list = [('topic_1', String), ('topic_2', String)] |
| 44 | + wait_for_topics = WaitForTopics(topic_list, timeout=5.0) |
| 45 | + assert wait_for_topics.wait() |
| 46 | + print('Given topics are receiving messages !') |
| 47 | + print(wait_for_topics.topics_not_received()) # Should be an empty set |
| 48 | + print(wait_for_topics.topics_received()) # Should be {'topic_1', 'topic_2'} |
| 49 | + wait_for_topics.shutdown() |
| 50 | + """ |
| 51 | + |
| 52 | + def __init__(self, topic_tuples, timeout=5.0): |
| 53 | + self.topic_tuples = topic_tuples |
| 54 | + self.timeout = timeout |
| 55 | + self.__ros_context = rclpy.Context() |
| 56 | + rclpy.init(context=self.__ros_context) |
| 57 | + self.__ros_executor = SingleThreadedExecutor(context=self.__ros_context) |
| 58 | + |
| 59 | + self._prepare_ros_node() |
| 60 | + |
| 61 | + # Start spinning |
| 62 | + self.__running = True |
| 63 | + self.__ros_spin_thread = Thread(target=self._spin_function) |
| 64 | + self.__ros_spin_thread.start() |
| 65 | + |
| 66 | + def _prepare_ros_node(self): |
| 67 | + node_name = '_test_node_' +\ |
| 68 | + ''.join(random.choices(string.ascii_uppercase + string.digits, k=10)) |
| 69 | + self.__ros_node = _WaitForTopicsNode(name=node_name, node_context=self.__ros_context) |
| 70 | + self.__ros_executor.add_node(self.__ros_node) |
| 71 | + |
| 72 | + def _spin_function(self): |
| 73 | + while self.__running: |
| 74 | + self.__ros_executor.spin_once(1.0) |
| 75 | + |
| 76 | + def wait(self): |
| 77 | + self.__ros_node.start_subscribers(self.topic_tuples) |
| 78 | + return self.__ros_node.msg_event_object.wait(self.timeout) |
| 79 | + |
| 80 | + def shutdown(self): |
| 81 | + self.__running = False |
| 82 | + self.__ros_spin_thread.join() |
| 83 | + self.__ros_node.destroy_node() |
| 84 | + rclpy.shutdown(context=self.__ros_context) |
| 85 | + |
| 86 | + def topics_received(self): |
| 87 | + """Topics that received at least one message.""" |
| 88 | + return self.__ros_node.received_topics |
| 89 | + |
| 90 | + def topics_not_received(self): |
| 91 | + """Topics that did not receive any messages.""" |
| 92 | + return self.__ros_node.expected_topics - self.__ros_node.received_topics |
| 93 | + |
| 94 | + def __enter__(self): |
| 95 | + if not self.wait(): |
| 96 | + raise RuntimeError('Did not receive messages on these topics: ', |
| 97 | + self.topics_not_received()) |
| 98 | + return self |
| 99 | + |
| 100 | + def __exit__(self, exep_type, exep_value, trace): |
| 101 | + if exep_type is not None: |
| 102 | + raise Exception('Exception occured, value: ', exep_value) |
| 103 | + self.shutdown() |
| 104 | + |
| 105 | + |
| 106 | +class _WaitForTopicsNode(Node): |
| 107 | + """Internal node used for subscribing to a set of topics.""" |
| 108 | + |
| 109 | + def __init__(self, name='test_node', node_context=None): |
| 110 | + super().__init__(node_name=name, context=node_context) |
| 111 | + self.msg_event_object = Event() |
| 112 | + |
| 113 | + def start_subscribers(self, topic_tuples): |
| 114 | + self.subscriber_list = [] |
| 115 | + self.expected_topics = {name for name, _ in topic_tuples} |
| 116 | + self.received_topics = set() |
| 117 | + |
| 118 | + for topic_name, topic_type in topic_tuples: |
| 119 | + # Create a subscriber |
| 120 | + self.subscriber_list.append( |
| 121 | + self.create_subscription( |
| 122 | + topic_type, |
| 123 | + topic_name, |
| 124 | + self.callback_template(topic_name), |
| 125 | + 10 |
| 126 | + ) |
| 127 | + ) |
| 128 | + |
| 129 | + def callback_template(self, topic_name): |
| 130 | + |
| 131 | + def topic_callback(data): |
| 132 | + if topic_name not in self.received_topics: |
| 133 | + self.get_logger().debug('Message received for ' + topic_name) |
| 134 | + self.received_topics.add(topic_name) |
| 135 | + if self.received_topics == self.expected_topics: |
| 136 | + self.msg_event_object.set() |
| 137 | + |
| 138 | + return topic_callback |
0 commit comments