Skip to content

Commit 0c9f233

Browse files
authored
Add support for topic QOS for ros2topic bw, delay and hz (#935)
* Add support for topic QOS for ros2topic bw, delay and hz Signed-off-by: Anthony Welte <[email protected]> * Fix style according to flake8 Signed-off-by: Anthony Welte <[email protected]> * Fix instability in test_topic_bw Signed-off-by: Anthony Welte <[email protected]> * Add tests for bw, delay and hz qos parameters Signed-off-by: Anthony Welte <[email protected]> * Remove extract_qos_arguments and move choose_qos - Remove extract_qos_arguments since it's doing nothing useful. The QoS arguments are passed to choose_qos with other argument unfiltered - Move choose_qos outside of the _rostopic_verb method for bw and delay. The _rostopic_verb method takes a QoSProfile instead of the cli arguments Signed-off-by: Anthony Welte <[email protected]> --------- Signed-off-by: Anthony Welte <[email protected]>
1 parent f5e5a79 commit 0c9f233

File tree

7 files changed

+297
-85
lines changed

7 files changed

+297
-85
lines changed

ros2topic/ros2topic/api/__init__.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121

2222
from rclpy.duration import Duration
2323
from rclpy.expand_topic_name import expand_topic_name
24+
from rclpy.qos import QoSDurabilityPolicy
25+
from rclpy.qos import QoSPresetProfiles
26+
from rclpy.qos import QoSReliabilityPolicy
2427
from rclpy.topic_or_service_is_hidden import topic_or_service_is_hidden
2528
from rclpy.validate_full_topic_name import validate_full_topic_name
2629
from ros2cli.node.strategy import NodeStrategy
@@ -259,3 +262,64 @@ def add_qos_arguments(
259262
f'Quality of service liveliness lease duration setting to {entity_type} '
260263
'with (overrides liveliness lease duration value of --qos-profile option, default: '
261264
f'{default_profile.liveliness_lease_duration})'))
265+
266+
267+
def choose_qos(node, topic_name: str, qos_args):
268+
if (qos_args.qos_reliability is not None or
269+
qos_args.qos_durability is not None or
270+
qos_args.qos_depth is not None or
271+
qos_args.qos_history is not None or
272+
qos_args.qos_liveliness is not None or
273+
qos_args.qos_liveliness_lease_duration_seconds is not None):
274+
275+
return qos_profile_from_short_keys(
276+
qos_args.qos_profile,
277+
reliability=qos_args.qos_reliability,
278+
durability=qos_args.qos_durability,
279+
depth=qos_args.qos_depth,
280+
history=qos_args.qos_history,
281+
liveliness=qos_args.qos_liveliness,
282+
liveliness_lease_duration_s=qos_args.qos_liveliness_lease_duration_seconds)
283+
284+
qos_profile = QoSPresetProfiles.get_from_short_key(qos_args.qos_profile)
285+
reliability_reliable_endpoints_count = 0
286+
durability_transient_local_endpoints_count = 0
287+
288+
pubs_info = node.get_publishers_info_by_topic(topic_name)
289+
publishers_count = len(pubs_info)
290+
if publishers_count == 0:
291+
return qos_profile
292+
293+
for info in pubs_info:
294+
if (info.qos_profile.reliability == QoSReliabilityPolicy.RELIABLE):
295+
reliability_reliable_endpoints_count += 1
296+
if (info.qos_profile.durability == QoSDurabilityPolicy.TRANSIENT_LOCAL):
297+
durability_transient_local_endpoints_count += 1
298+
299+
# If all endpoints are reliable, ask for reliable
300+
if reliability_reliable_endpoints_count == publishers_count:
301+
qos_profile.reliability = QoSReliabilityPolicy.RELIABLE
302+
else:
303+
if reliability_reliable_endpoints_count > 0:
304+
print(
305+
'Some, but not all, publishers are offering '
306+
'QoSReliabilityPolicy.RELIABLE. Falling back to '
307+
'QoSReliabilityPolicy.BEST_EFFORT as it will connect '
308+
'to all publishers'
309+
)
310+
qos_profile.reliability = QoSReliabilityPolicy.BEST_EFFORT
311+
312+
# If all endpoints are transient_local, ask for transient_local
313+
if durability_transient_local_endpoints_count == publishers_count:
314+
qos_profile.durability = QoSDurabilityPolicy.TRANSIENT_LOCAL
315+
else:
316+
if durability_transient_local_endpoints_count > 0:
317+
print(
318+
'Some, but not all, publishers are offering '
319+
'QoSDurabilityPolicy.TRANSIENT_LOCAL. Falling back to '
320+
'QoSDurabilityPolicy.VOLATILE as it will connect '
321+
'to all publishers'
322+
)
323+
qos_profile.durability = QoSDurabilityPolicy.VOLATILE
324+
325+
return qos_profile

ros2topic/ros2topic/verb/bw.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,10 @@
3434
import traceback
3535

3636
import rclpy
37-
from rclpy.qos import qos_profile_sensor_data
3837
from ros2cli.node.direct import add_arguments as add_direct_node_arguments
3938
from ros2cli.node.direct import DirectNode
39+
from ros2topic.api import add_qos_arguments
40+
from ros2topic.api import choose_qos
4041
from ros2topic.api import get_msg_class
4142
from ros2topic.api import positive_int
4243
from ros2topic.api import TopicNameCompleter
@@ -62,19 +63,21 @@ class BwVerb(VerbExtension):
6263

6364
def add_arguments(self, parser, cli_name):
6465
arg = parser.add_argument(
65-
'topic',
66+
'topic_name',
6667
help='Topic name to monitor for bandwidth utilization')
6768
arg.completer = TopicNameCompleter(
6869
include_hidden_topics_key='include_hidden_topics')
70+
add_qos_arguments(parser, 'subscribe', 'sensor_data')
6971
parser.add_argument(
70-
'--window', '-w', type=positive_int, default=DEFAULT_WINDOW_SIZE,
72+
'--window', '-w', dest='window_size', type=positive_int, default=DEFAULT_WINDOW_SIZE,
7173
help='maximum window size, in # of messages, for calculating rate '
7274
f'(default: {DEFAULT_WINDOW_SIZE})', metavar='WINDOW')
7375
add_direct_node_arguments(parser)
7476

7577
def main(self, *, args):
7678
with DirectNode(args) as node:
77-
_rostopic_bw(node.node, args.topic, window_size=args.window)
79+
qos_profile = choose_qos(node.node, topic_name=args.topic_name, qos_args=args)
80+
_rostopic_bw(node.node, args.topic_name, qos_profile, window_size=args.window_size)
7881

7982

8083
class ROSTopicBandwidth(object):
@@ -150,7 +153,7 @@ def print_bw(self):
150153
print(f'{bw} from {n} messages\n\tMessage size mean: {mean} min: {min_s} max: {max_s}')
151154

152155

153-
def _rostopic_bw(node, topic, window_size=DEFAULT_WINDOW_SIZE):
156+
def _rostopic_bw(node, topic, qos_profile, window_size=DEFAULT_WINDOW_SIZE):
154157
"""Periodically print the received bandwidth of a topic to console until shutdown."""
155158
# pause bw until topic is published
156159
msg_class = get_msg_class(node, topic, blocking=True, include_hidden_topics=True)
@@ -163,7 +166,7 @@ def _rostopic_bw(node, topic, window_size=DEFAULT_WINDOW_SIZE):
163166
msg_class,
164167
topic,
165168
rt.callback,
166-
qos_profile_sensor_data,
169+
qos_profile,
167170
raw=True
168171
)
169172

ros2topic/ros2topic/verb/delay.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,11 @@
3333

3434
import rclpy
3535

36-
from rclpy.qos import qos_profile_sensor_data
3736
from rclpy.time import Time
3837
from ros2cli.node.direct import add_arguments as add_direct_node_arguments
3938
from ros2cli.node.direct import DirectNode
39+
from ros2topic.api import add_qos_arguments
40+
from ros2topic.api import choose_qos
4041
from ros2topic.api import get_msg_class
4142
from ros2topic.api import positive_int
4243
from ros2topic.api import TopicNameCompleter
@@ -50,12 +51,13 @@ class DelayVerb(VerbExtension):
5051

5152
def add_arguments(self, parser, cli_name):
5253
arg = parser.add_argument(
53-
'topic',
54+
'topic_name',
5455
help='Topic name to calculate the delay for')
5556
arg.completer = TopicNameCompleter(
5657
include_hidden_topics_key='include_hidden_topics')
58+
add_qos_arguments(parser, 'subscribe', 'sensor_data')
5759
parser.add_argument(
58-
'--window', '-w', type=positive_int, default=DEFAULT_WINDOW_SIZE,
60+
'--window', '-w', dest='window_size', type=positive_int, default=DEFAULT_WINDOW_SIZE,
5961
help='window size, in # of messages, for calculating rate, '
6062
'string to (default: %d)' % DEFAULT_WINDOW_SIZE)
6163
add_direct_node_arguments(parser)
@@ -66,8 +68,8 @@ def main(self, *, args):
6668

6769
def main(args):
6870
with DirectNode(args) as node:
69-
_rostopic_delay(
70-
node.node, args.topic, window_size=args.window)
71+
qos_profile = choose_qos(node.node, topic_name=args.topic_name, qos_args=args)
72+
_rostopic_delay(node.node, args.topic_name, qos_profile, window_size=args.window_size)
7173

7274

7375
class ROSTopicDelay(object):
@@ -155,11 +157,12 @@ def print_delay(self):
155157
% (delay * 1e-9, min_delta * 1e-9, max_delta * 1e-9, std_dev * 1e-9, window))
156158

157159

158-
def _rostopic_delay(node, topic, window_size=DEFAULT_WINDOW_SIZE):
160+
def _rostopic_delay(node, topic, qos_profile, window_size=DEFAULT_WINDOW_SIZE):
159161
"""
160162
Periodically print the publishing delay of a topic to console until shutdown.
161163
162164
:param topic: topic name, ``str``
165+
:param qos_profile: qos profile of the subscriber
163166
:param window_size: number of messages to average over, ``unsigned_int``
164167
:param blocking: pause delay until topic is published, ``bool``
165168
"""
@@ -175,7 +178,7 @@ def _rostopic_delay(node, topic, window_size=DEFAULT_WINDOW_SIZE):
175178
msg_class,
176179
topic,
177180
rt.callback_delay,
178-
qos_profile_sensor_data)
181+
qos_profile)
179182

180183
timer = node.create_timer(1, rt.print_delay)
181184
while rclpy.ok():

ros2topic/ros2topic/verb/echo.py

Lines changed: 2 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,15 @@
1919
from rclpy.event_handler import SubscriptionEventCallbacks
2020
from rclpy.event_handler import UnsupportedEventTypeError
2121
from rclpy.node import Node
22-
from rclpy.qos import QoSDurabilityPolicy
23-
from rclpy.qos import QoSPresetProfiles
2422
from rclpy.qos import QoSProfile
25-
from rclpy.qos import QoSReliabilityPolicy
2623
from rclpy.task import Future
2724
from ros2cli.helpers import unsigned_int
2825
from ros2cli.node.strategy import add_arguments as add_strategy_node_arguments
2926
from ros2cli.node.strategy import NodeStrategy
3027
from ros2topic.api import add_qos_arguments
28+
from ros2topic.api import choose_qos
3129
from ros2topic.api import get_msg_class
3230
from ros2topic.api import positive_float
33-
from ros2topic.api import qos_profile_from_short_keys
3431
from ros2topic.api import TopicNameCompleter
3532
from ros2topic.verb import VerbExtension
3633
from rosidl_runtime_py import message_to_csv
@@ -116,67 +113,6 @@ def add_arguments(self, parser, cli_name):
116113
'-n', '--node-name', type=str, default=None,
117114
help='The name of the echoing node; by default, will be a hidden node name')
118115

119-
def choose_qos(self, node, args):
120-
121-
if (args.qos_reliability is not None or
122-
args.qos_durability is not None or
123-
args.qos_depth is not None or
124-
args.qos_history is not None or
125-
args.qos_liveliness is not None or
126-
args.qos_liveliness_lease_duration_seconds is not None):
127-
128-
return qos_profile_from_short_keys(
129-
args.qos_profile,
130-
reliability=args.qos_reliability,
131-
durability=args.qos_durability,
132-
depth=args.qos_depth,
133-
history=args.qos_history,
134-
liveliness=args.qos_liveliness,
135-
liveliness_lease_duration_s=args.qos_liveliness_lease_duration_seconds)
136-
137-
qos_profile = QoSPresetProfiles.get_from_short_key(args.qos_profile)
138-
reliability_reliable_endpoints_count = 0
139-
durability_transient_local_endpoints_count = 0
140-
141-
pubs_info = node.get_publishers_info_by_topic(args.topic_name)
142-
publishers_count = len(pubs_info)
143-
if publishers_count == 0:
144-
return qos_profile
145-
146-
for info in pubs_info:
147-
if (info.qos_profile.reliability == QoSReliabilityPolicy.RELIABLE):
148-
reliability_reliable_endpoints_count += 1
149-
if (info.qos_profile.durability == QoSDurabilityPolicy.TRANSIENT_LOCAL):
150-
durability_transient_local_endpoints_count += 1
151-
152-
# If all endpoints are reliable, ask for reliable
153-
if reliability_reliable_endpoints_count == publishers_count:
154-
qos_profile.reliability = QoSReliabilityPolicy.RELIABLE
155-
else:
156-
if reliability_reliable_endpoints_count > 0:
157-
print(
158-
'Some, but not all, publishers are offering '
159-
'QoSReliabilityPolicy.RELIABLE. Falling back to '
160-
'QoSReliabilityPolicy.BEST_EFFORT as it will connect '
161-
'to all publishers'
162-
)
163-
qos_profile.reliability = QoSReliabilityPolicy.BEST_EFFORT
164-
165-
# If all endpoints are transient_local, ask for transient_local
166-
if durability_transient_local_endpoints_count == publishers_count:
167-
qos_profile.durability = QoSDurabilityPolicy.TRANSIENT_LOCAL
168-
else:
169-
if durability_transient_local_endpoints_count > 0:
170-
print(
171-
'Some, but not all, publishers are offering '
172-
'QoSDurabilityPolicy.TRANSIENT_LOCAL. Falling back to '
173-
'QoSDurabilityPolicy.VOLATILE as it will connect '
174-
'to all publishers'
175-
)
176-
qos_profile.durability = QoSDurabilityPolicy.VOLATILE
177-
178-
return qos_profile
179-
180116
def main(self, *, args):
181117

182118
self.csv = args.csv
@@ -210,7 +146,7 @@ def main(self, *, args):
210146

211147
with NodeStrategy(args, node_name=args.node_name) as node:
212148

213-
qos_profile = self.choose_qos(node, args)
149+
qos_profile = choose_qos(node, topic_name=args.topic_name, qos_args=args)
214150

215151
if args.message_type is None:
216152
message_type = get_msg_class(

ros2topic/ros2topic/verb/hz.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,10 @@
4141
from rclpy.clock import Clock
4242
from rclpy.clock import ClockType
4343
from rclpy.executors import ExternalShutdownException
44-
from rclpy.qos import qos_profile_sensor_data
4544
from ros2cli.node.direct import add_arguments as add_direct_node_arguments
4645
from ros2cli.node.direct import DirectNode
46+
from ros2topic.api import add_qos_arguments
47+
from ros2topic.api import choose_qos
4748
from ros2topic.api import get_msg_class
4849
from ros2topic.api import positive_int
4950
from ros2topic.api import TopicNameCompleter
@@ -62,6 +63,7 @@ def add_arguments(self, parser, cli_name):
6263
help="Names of the ROS topic to listen to (e.g. '/chatter')")
6364
arg.completer = TopicNameCompleter(
6465
include_hidden_topics_key='include_hidden_topics')
66+
add_qos_arguments(parser, 'subscribe', 'sensor_data')
6567
parser.add_argument(
6668
'--window', '-w',
6769
dest='window_size', type=positive_int, default=DEFAULT_WINDOW_SIZE,
@@ -94,8 +96,8 @@ def eval_fn(m):
9496
filter_expr = None
9597

9698
with DirectNode(args) as node:
97-
_rostopic_hz(node.node, topics, window_size=args.window_size, filter_expr=filter_expr,
98-
use_wtime=args.use_wtime)
99+
_rostopic_hz(node.node, topics, qos_args=args, window_size=args.window_size,
100+
filter_expr=filter_expr, use_wtime=args.use_wtime)
99101

100102

101103
class ROSTopicHz(object):
@@ -278,11 +280,13 @@ def _get_ascii_table(header, cols):
278280
return table
279281

280282

281-
def _rostopic_hz(node, topics, window_size=DEFAULT_WINDOW_SIZE, filter_expr=None, use_wtime=False):
283+
def _rostopic_hz(node, topics, qos_args, window_size=DEFAULT_WINDOW_SIZE, filter_expr=None,
284+
use_wtime=False):
282285
"""
283286
Periodically print the publishing rate of a topic to console until shutdown.
284287
285288
:param topics: list of topic names, ``list`` of ``str``
289+
:param qos_args: qos arguments used to pick the qos profile of the subscriber
286290
:param window_size: number of messages to average over, -1 for infinite, ``int``
287291
:param filter_expr: Python filter expression that is called with m, the message instance
288292
"""
@@ -299,11 +303,13 @@ def _rostopic_hz(node, topics, window_size=DEFAULT_WINDOW_SIZE, filter_expr=None
299303
print('WARNING: failed to find message type for topic [%s]' % topic)
300304
continue
301305

306+
qos_profile = choose_qos(node, topic_name=topic, qos_args=qos_args)
307+
302308
node.create_subscription(
303309
msg_class,
304310
topic,
305311
functools.partial(rt.callback_hz, topic=topic),
306-
qos_profile_sensor_data)
312+
qos_profile)
307313
if topics_len > 1:
308314
print('Subscribed to [%s]' % topic)
309315

0 commit comments

Comments
 (0)