|
14 | 14 |
|
15 | 15 | import functools |
16 | 16 | import pathlib |
| 17 | +import re |
17 | 18 | import sys |
18 | 19 | import unittest |
19 | 20 |
|
|
32 | 33 | import rclpy |
33 | 34 | from rclpy.executors import SingleThreadedExecutor |
34 | 35 | from rclpy.qos import DurabilityPolicy |
| 36 | +from rclpy.qos import qos_check_compatible |
| 37 | +from rclpy.qos import QoSCompatibility |
35 | 38 | from rclpy.qos import QoSProfile |
36 | 39 | from rclpy.qos import ReliabilityPolicy |
37 | 40 | from rclpy.utilities import get_rmw_implementation_identifier |
@@ -124,12 +127,23 @@ def test_pub_basic(self, launch_service, proc_info, proc_output): |
124 | 127 | pub_extra_options = [ |
125 | 128 | '--qos-reliability', 'best_effort', |
126 | 129 | '--qos-durability', 'volatile'] |
| 130 | + # This QoS profile matched with the extra options defined above |
| 131 | + rostopic_qos_profile = QoSProfile( |
| 132 | + depth=10, |
| 133 | + reliability=ReliabilityPolicy.BEST_EFFORT, |
| 134 | + durability=DurabilityPolicy.VOLATILE) |
127 | 135 | subscription_qos_profile = QoSProfile( |
128 | 136 | depth=10, |
129 | 137 | reliability=ReliabilityPolicy.RELIABLE, |
130 | 138 | durability=DurabilityPolicy.TRANSIENT_LOCAL) |
131 | 139 | expected_maximum_message_count = 0 |
132 | 140 | expected_minimum_message_count = 0 |
| 141 | + # Skip this test if the QoS between the publisher and subscription |
| 142 | + # are compatible according to the underlying middleware. |
| 143 | + comp, reason = qos_check_compatible( |
| 144 | + rostopic_qos_profile, subscription_qos_profile) |
| 145 | + if comp == QoSCompatibility.OK: |
| 146 | + raise unittest.SkipTest() |
133 | 147 |
|
134 | 148 | future = rclpy.task.Future() |
135 | 149 |
|
@@ -363,6 +377,17 @@ def test_echo_basic(self, launch_service, proc_info, proc_output): |
363 | 377 | depth=10, |
364 | 378 | reliability=ReliabilityPolicy.BEST_EFFORT, |
365 | 379 | durability=DurabilityPolicy.VOLATILE) |
| 380 | + # This QoS profile matched with the extra options defined above |
| 381 | + rostopic_qos_profile = QoSProfile( |
| 382 | + depth=10, |
| 383 | + reliability=ReliabilityPolicy.RELIABLE, |
| 384 | + durability=DurabilityPolicy.TRANSIENT_LOCAL) |
| 385 | + # Skip this test if the QoS between the publisher and subscription |
| 386 | + # are compatible according to the underlying middleware. |
| 387 | + comp, reason = qos_check_compatible( |
| 388 | + rostopic_qos_profile, publisher_qos_profile) |
| 389 | + if comp == QoSCompatibility.OK or comp == QoSCompatibility.WARNING: |
| 390 | + raise unittest.SkipTest() |
366 | 391 | if not message_lost: |
367 | 392 | echo_extra_options.append('--no-lost-messages') |
368 | 393 | publisher = self.node.create_publisher(String, topic, publisher_qos_profile) |
@@ -494,9 +519,9 @@ def publish_message(): |
494 | 519 | ) |
495 | 520 | assert command.wait_for_output(functools.partial( |
496 | 521 | launch_testing.tools.expect_output, expected_lines=[ |
497 | | - "b'\\x00\\x01\\x00\\x00\\x06\\x00\\x00\\x00hello\\x00\\x00\\x00'", |
498 | | - '---', |
499 | | - ], strict=True |
| 522 | + re.compile(r"^b'\\x00\\x01\\x00\\x00\\x06\\x00\\x00\\x00hello(\\x00)*'$"), |
| 523 | + re.compile(r'^---$'), |
| 524 | + ], strict=False |
500 | 525 | ), timeout=10), 'Echo CLI did not print expected message' |
501 | 526 | assert command.wait_for_shutdown(timeout=10) |
502 | 527 |
|
|
0 commit comments