diff --git a/rosbridge_library/src/rosbridge_library/capabilities/advertise.py b/rosbridge_library/src/rosbridge_library/capabilities/advertise.py index 9397809d..046ddd92 100644 --- a/rosbridge_library/src/rosbridge_library/capabilities/advertise.py +++ b/rosbridge_library/src/rosbridge_library/capabilities/advertise.py @@ -36,6 +36,14 @@ import fnmatch from typing import TYPE_CHECKING, Any +from rclpy.qos import ( + DurabilityPolicy, + HistoryPolicy, + LivelinessPolicy, + QoSProfile, + ReliabilityPolicy, +) + from rosbridge_library.capability import Capability from rosbridge_library.internal.publishers import manager @@ -67,7 +75,12 @@ def unregister(self) -> None: manager.unregister(self.client_id, self.topic) def register_advertisement( - self, msg_type: str, adv_id: str | None = None, latch: bool = False, queue_size: int = 100 + self, + msg_type: str, + adv_id: str | None = None, + latch: bool = False, + queue_size: int = 100, + qos: QoSProfile | None = None, ) -> None: # Register with the publisher manager, propagating any exception manager.register( @@ -77,6 +90,7 @@ def register_advertisement( msg_type=msg_type, latch=latch, queue_size=queue_size, + qos=qos, ) if adv_id is not None: @@ -122,6 +136,21 @@ def advertise(self, message: dict[str, Any]) -> None: msg_type: str = message["type"] latch: bool = message.get("latch", False) queue_size: int = message.get("queue_size", 100) + qos: QoSProfile | None = QoSProfile( + depth=message.get("qos_depth", 10), + durability=message.get("durability_policy", DurabilityPolicy.SYSTEM_DEFAULT), + reliability=message.get("reliability_policy", ReliabilityPolicy.SYSTEM_DEFAULT), + history=message.get("history_policy", HistoryPolicy.SYSTEM_DEFAULT), + liveliness=message.get("liveliness_policy", LivelinessPolicy.SYSTEM_DEFAULT), + ) + + if message.keys() & { + "durability_policy", + "history_policy", + "liveliness_policy", + "reliability_policy", + }: + qos = None if self.topics_glob is not None: self.protocol.log("debug", "Topic security glob enabled, checking topic: " + topic) @@ -149,7 +178,9 @@ def advertise(self, message: dict[str, Any]) -> None: self._registrations[topic] = Registration(client_id, topic, self.protocol.node_handle) # Register, propagating any exceptions - self._registrations[topic].register_advertisement(msg_type, aid, latch, queue_size) + self._registrations[topic].register_advertisement( + msg_type=msg_type, adv_id=aid, latch=latch, queue_size=queue_size, qos=qos + ) def unadvertise(self, message: dict[str, Any]) -> None: # Pull out the ID diff --git a/rosbridge_library/src/rosbridge_library/capabilities/subscribe.py b/rosbridge_library/src/rosbridge_library/capabilities/subscribe.py index 518fbfbf..c11cf5a6 100644 --- a/rosbridge_library/src/rosbridge_library/capabilities/subscribe.py +++ b/rosbridge_library/src/rosbridge_library/capabilities/subscribe.py @@ -37,6 +37,14 @@ from threading import Lock from typing import TYPE_CHECKING, Any, Generic +from rclpy.qos import ( + DurabilityPolicy, + HistoryPolicy, + LivelinessPolicy, + QoSProfile, + ReliabilityPolicy, +) + from rosbridge_library.capability import Capability from rosbridge_library.internal.pngcompression import encode as encode_png from rosbridge_library.internal.subscribers import manager @@ -116,6 +124,7 @@ def subscribe( queue_length: int = 0, fragment_size: int | None = None, compression: str = "none", + qos: QoSProfile | None = None, ) -> None: """ Add another client's subscription request. @@ -156,6 +165,7 @@ def subscribe( self.node_handle, msg_type=msg_type, raw=raw, + qos=qos, ) def unsubscribe(self, sid: str | None = None) -> None: @@ -245,6 +255,7 @@ class Subscribe(Capability): (False, "fragment_size", int), (False, "queue_length", int), (False, "compression", str), + (False, "qos", QoSProfile), ) unsubscribe_msg_fields = ((True, "topic", str),) @@ -307,6 +318,17 @@ def subscribe(self, msg: dict[str, Any]) -> None: "fragment_size": msg.get("fragment_size"), "queue_length": msg.get("queue_length", 0), "compression": msg.get("compression", "none"), + "qos": ( + QoSProfile( + depth=msg.get("qos_depth", 10), + durability=msg.get("durability_policy", DurabilityPolicy.SYSTEM_DEFAULT), + reliability=msg.get("reliability_policy", ReliabilityPolicy.SYSTEM_DEFAULT), + history=msg.get("history_policy", HistoryPolicy.SYSTEM_DEFAULT), + liveliness=msg.get("liveliness_policy", LivelinessPolicy.SYSTEM_DEFAULT), + ) + if "qos" in msg + else None + ), } self._subscriptions[topic].subscribe(**subscribe_args) diff --git a/rosbridge_library/src/rosbridge_library/internal/publishers.py b/rosbridge_library/src/rosbridge_library/internal/publishers.py index 6901fe64..f7a5133f 100644 --- a/rosbridge_library/src/rosbridge_library/internal/publishers.py +++ b/rosbridge_library/src/rosbridge_library/internal/publishers.py @@ -67,6 +67,7 @@ def __init__( msg_type: str | None = None, latched_client_id: str | None = None, queue_size: int = 100, + qos: QoSProfile | None = None, ) -> None: """ Register a publisher on the specified topic. @@ -119,23 +120,28 @@ def __init__( self.topic = topic self.node_handle = node_handle self.msg_class = msg_class + self.publisher_qos: QoSProfile | None = None + # Adding a lifespan solves the problem of late-joining subscribers # without the need of a custom message publisher implementation. - publisher_qos = QoSProfile( - depth=queue_size, - durability=DurabilityPolicy.TRANSIENT_LOCAL, - ) + if qos is None: + self.publisher_qos = QoSProfile( + depth=queue_size, + durability=DurabilityPolicy.TRANSIENT_LOCAL, + ) + else: + self.publisher_qos = qos # For latched clients, no lifespan has to be specified (i.e. latch forever). # Otherwise we want to keep the messages for a second to prevent late-joining subscribers from # missing messages. if latched_client_id is None: - publisher_qos.lifespan = Duration(seconds=1) + self.publisher_qos.lifespan = Duration(seconds=1) else: - publisher_qos.depth = 1 + self.publisher_qos.depth = 1 self.publisher: Publisher[ROSMessageT] = node_handle.create_publisher( - msg_class, topic, qos_profile=publisher_qos + msg_class, topic, qos_profile=self.publisher_qos ) def unregister(self) -> None: @@ -222,6 +228,7 @@ def register( msg_type: str | None = None, latch: bool = False, queue_size: int = 100, + qos: QoSProfile | None = None, ) -> None: """ Register a publisher on the specified topic. @@ -247,6 +254,7 @@ def register( msg_type=msg_type, latched_client_id=latched_client_id, queue_size=queue_size, + qos=qos, ) elif latch and self._publishers[topic].latched_client_id != client_id: node_handle.get_logger().warning( diff --git a/rosbridge_library/src/rosbridge_library/internal/subscribers.py b/rosbridge_library/src/rosbridge_library/internal/subscribers.py index 083bec68..80d35924 100644 --- a/rosbridge_library/src/rosbridge_library/internal/subscribers.py +++ b/rosbridge_library/src/rosbridge_library/internal/subscribers.py @@ -38,7 +38,11 @@ from typing import TYPE_CHECKING, Generic, cast from rclpy.callback_groups import MutuallyExclusiveCallbackGroup -from rclpy.qos import DurabilityPolicy, QoSProfile, ReliabilityPolicy +from rclpy.qos import ( + DurabilityPolicy, + QoSProfile, + ReliabilityPolicy, +) from rosbridge_library.internal import ros_loader from rosbridge_library.internal.message_conversion import msg_class_type_repr @@ -77,6 +81,7 @@ def __init__( node_handle: Node, msg_type: str | None = None, raw: bool = False, + qos: QoSProfile | None = None, ) -> None: """ Register a subscriber on the specified topic. @@ -134,21 +139,22 @@ def __init__( # - https://docs.ros.org/en/rolling/Concepts/About-Quality-of-Service-Settings.html # - https://github.com/RobotWebTools/rosbridge_suite/issues/551 # - https://github.com/RobotWebTools/rosbridge_suite/issues/769 - qos = QoSProfile( - depth=10, - durability=DurabilityPolicy.VOLATILE, - reliability=ReliabilityPolicy.BEST_EFFORT, - ) + if qos is None: + qos = QoSProfile( + depth=10, + durability=DurabilityPolicy.VOLATILE, + reliability=ReliabilityPolicy.BEST_EFFORT, + ) - infos = node_handle.get_publishers_info_by_topic(topic) + infos = node_handle.get_publishers_info_by_topic(topic) - if len(infos) > 0 and all( - pub.qos_profile.durability == DurabilityPolicy.TRANSIENT_LOCAL for pub in infos - ): - qos.durability = DurabilityPolicy.TRANSIENT_LOCAL - qos.reliability = ReliabilityPolicy.RELIABLE - if any(pub.qos_profile.reliability == ReliabilityPolicy.BEST_EFFORT for pub in infos): - qos.reliability = ReliabilityPolicy.BEST_EFFORT + if len(infos) > 0 and all( + pub.qos_profile.durability == DurabilityPolicy.TRANSIENT_LOCAL for pub in infos + ): + qos.durability = DurabilityPolicy.TRANSIENT_LOCAL + qos.reliability = ReliabilityPolicy.RELIABLE + if any(pub.qos_profile.reliability == ReliabilityPolicy.BEST_EFFORT for pub in infos): + qos.reliability = ReliabilityPolicy.BEST_EFFORT # Create the subscriber and associated member variables # Subscriptions is initialized with the current client to start with. @@ -304,6 +310,7 @@ def subscribe( node_handle: Node, msg_type: str | None = None, raw: bool = False, + qos: QoSProfile | None = None, ) -> None: """ Subscribe to a topic. @@ -316,7 +323,7 @@ def subscribe( with self._lock: if topic not in self._subscribers: self._subscribers[topic] = MultiSubscriber( - topic, client_id, callback, node_handle, msg_type=msg_type, raw=raw + topic, client_id, callback, node_handle, msg_type=msg_type, raw=raw, qos=qos ) else: self._subscribers[topic].subscribe(client_id, callback)