Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions rosbridge_library/src/rosbridge_library/capabilities/advertise.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@
import fnmatch
from typing import TYPE_CHECKING, Any

from rclpy.qos import (
DurabilityPolicy,
HistoryPolicy,
LivelinessPolicy,
QoSProfile,
ReliabilityPolicy,
)

from rosbridge_library.capability import Capability
from rosbridge_library.internal.publishers import manager

Expand Down Expand Up @@ -67,7 +75,12 @@ def unregister(self) -> None:
manager.unregister(self.client_id, self.topic)

def register_advertisement(
self, msg_type: str, adv_id: str | None = None, latch: bool = False, queue_size: int = 100
self,
msg_type: str,
adv_id: str | None = None,
latch: bool = False,
queue_size: int = 100,
qos: QoSProfile | None = None,
) -> None:
# Register with the publisher manager, propagating any exception
manager.register(
Expand All @@ -77,6 +90,7 @@ def register_advertisement(
msg_type=msg_type,
latch=latch,
queue_size=queue_size,
qos=qos,
)

if adv_id is not None:
Expand Down Expand Up @@ -122,6 +136,21 @@ def advertise(self, message: dict[str, Any]) -> None:
msg_type: str = message["type"]
latch: bool = message.get("latch", False)
queue_size: int = message.get("queue_size", 100)
qos: QoSProfile | None = QoSProfile(
depth=message.get("qos_depth", 10),
durability=message.get("durability_policy", DurabilityPolicy.SYSTEM_DEFAULT),
reliability=message.get("reliability_policy", ReliabilityPolicy.SYSTEM_DEFAULT),
history=message.get("history_policy", HistoryPolicy.SYSTEM_DEFAULT),
liveliness=message.get("liveliness_policy", LivelinessPolicy.SYSTEM_DEFAULT),
)

if message.keys() & {
"durability_policy",
"history_policy",
"liveliness_policy",
"reliability_policy",
}:
qos = None

if self.topics_glob is not None:
self.protocol.log("debug", "Topic security glob enabled, checking topic: " + topic)
Expand Down Expand Up @@ -149,7 +178,9 @@ def advertise(self, message: dict[str, Any]) -> None:
self._registrations[topic] = Registration(client_id, topic, self.protocol.node_handle)

# Register, propagating any exceptions
self._registrations[topic].register_advertisement(msg_type, aid, latch, queue_size)
self._registrations[topic].register_advertisement(
msg_type=msg_type, adv_id=aid, latch=latch, queue_size=queue_size, qos=qos
)

def unadvertise(self, message: dict[str, Any]) -> None:
# Pull out the ID
Expand Down
22 changes: 22 additions & 0 deletions rosbridge_library/src/rosbridge_library/capabilities/subscribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@
from threading import Lock
from typing import TYPE_CHECKING, Any, Generic

from rclpy.qos import (
DurabilityPolicy,
HistoryPolicy,
LivelinessPolicy,
QoSProfile,
ReliabilityPolicy,
)

from rosbridge_library.capability import Capability
from rosbridge_library.internal.pngcompression import encode as encode_png
from rosbridge_library.internal.subscribers import manager
Expand Down Expand Up @@ -116,6 +124,7 @@ def subscribe(
queue_length: int = 0,
fragment_size: int | None = None,
compression: str = "none",
qos: QoSProfile | None = None,
) -> None:
"""
Add another client's subscription request.
Expand Down Expand Up @@ -156,6 +165,7 @@ def subscribe(
self.node_handle,
msg_type=msg_type,
raw=raw,
qos=qos,
)

def unsubscribe(self, sid: str | None = None) -> None:
Expand Down Expand Up @@ -245,6 +255,7 @@ class Subscribe(Capability):
(False, "fragment_size", int),
(False, "queue_length", int),
(False, "compression", str),
(False, "qos", QoSProfile),
)
unsubscribe_msg_fields = ((True, "topic", str),)

Expand Down Expand Up @@ -307,6 +318,17 @@ def subscribe(self, msg: dict[str, Any]) -> None:
"fragment_size": msg.get("fragment_size"),
"queue_length": msg.get("queue_length", 0),
"compression": msg.get("compression", "none"),
"qos": (
QoSProfile(
depth=msg.get("qos_depth", 10),
durability=msg.get("durability_policy", DurabilityPolicy.SYSTEM_DEFAULT),
reliability=msg.get("reliability_policy", ReliabilityPolicy.SYSTEM_DEFAULT),
history=msg.get("history_policy", HistoryPolicy.SYSTEM_DEFAULT),
liveliness=msg.get("liveliness_policy", LivelinessPolicy.SYSTEM_DEFAULT),
)
if "qos" in msg
else None
),
}
self._subscriptions[topic].subscribe(**subscribe_args)

Expand Down
22 changes: 15 additions & 7 deletions rosbridge_library/src/rosbridge_library/internal/publishers.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def __init__(
msg_type: str | None = None,
latched_client_id: str | None = None,
queue_size: int = 100,
qos: QoSProfile | None = None,
) -> None:
"""
Register a publisher on the specified topic.
Expand Down Expand Up @@ -119,23 +120,28 @@ def __init__(
self.topic = topic
self.node_handle = node_handle
self.msg_class = msg_class
self.publisher_qos: QoSProfile | None = None

# Adding a lifespan solves the problem of late-joining subscribers
# without the need of a custom message publisher implementation.
publisher_qos = QoSProfile(
depth=queue_size,
durability=DurabilityPolicy.TRANSIENT_LOCAL,
)
if qos is None:
self.publisher_qos = QoSProfile(
depth=queue_size,
durability=DurabilityPolicy.TRANSIENT_LOCAL,
)
else:
self.publisher_qos = qos

# For latched clients, no lifespan has to be specified (i.e. latch forever).
# Otherwise we want to keep the messages for a second to prevent late-joining subscribers from
# missing messages.
if latched_client_id is None:
publisher_qos.lifespan = Duration(seconds=1)
self.publisher_qos.lifespan = Duration(seconds=1)
else:
publisher_qos.depth = 1
self.publisher_qos.depth = 1

self.publisher: Publisher[ROSMessageT] = node_handle.create_publisher(
msg_class, topic, qos_profile=publisher_qos
msg_class, topic, qos_profile=self.publisher_qos
)

def unregister(self) -> None:
Expand Down Expand Up @@ -222,6 +228,7 @@ def register(
msg_type: str | None = None,
latch: bool = False,
queue_size: int = 100,
qos: QoSProfile | None = None,
) -> None:
"""
Register a publisher on the specified topic.
Expand All @@ -247,6 +254,7 @@ def register(
msg_type=msg_type,
latched_client_id=latched_client_id,
queue_size=queue_size,
qos=qos,
)
elif latch and self._publishers[topic].latched_client_id != client_id:
node_handle.get_logger().warning(
Expand Down
37 changes: 22 additions & 15 deletions rosbridge_library/src/rosbridge_library/internal/subscribers.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@
from typing import TYPE_CHECKING, Generic, cast

from rclpy.callback_groups import MutuallyExclusiveCallbackGroup
from rclpy.qos import DurabilityPolicy, QoSProfile, ReliabilityPolicy
from rclpy.qos import (
DurabilityPolicy,
QoSProfile,
ReliabilityPolicy,
)

from rosbridge_library.internal import ros_loader
from rosbridge_library.internal.message_conversion import msg_class_type_repr
Expand Down Expand Up @@ -77,6 +81,7 @@ def __init__(
node_handle: Node,
msg_type: str | None = None,
raw: bool = False,
qos: QoSProfile | None = None,
) -> None:
"""
Register a subscriber on the specified topic.
Expand Down Expand Up @@ -134,21 +139,22 @@ def __init__(
# - https://docs.ros.org/en/rolling/Concepts/About-Quality-of-Service-Settings.html
# - https://github.com/RobotWebTools/rosbridge_suite/issues/551
# - https://github.com/RobotWebTools/rosbridge_suite/issues/769
qos = QoSProfile(
depth=10,
durability=DurabilityPolicy.VOLATILE,
reliability=ReliabilityPolicy.BEST_EFFORT,
)
if qos is None:
qos = QoSProfile(
depth=10,
durability=DurabilityPolicy.VOLATILE,
reliability=ReliabilityPolicy.BEST_EFFORT,
)

infos = node_handle.get_publishers_info_by_topic(topic)
infos = node_handle.get_publishers_info_by_topic(topic)

if len(infos) > 0 and all(
pub.qos_profile.durability == DurabilityPolicy.TRANSIENT_LOCAL for pub in infos
):
qos.durability = DurabilityPolicy.TRANSIENT_LOCAL
qos.reliability = ReliabilityPolicy.RELIABLE
if any(pub.qos_profile.reliability == ReliabilityPolicy.BEST_EFFORT for pub in infos):
qos.reliability = ReliabilityPolicy.BEST_EFFORT
if len(infos) > 0 and all(
pub.qos_profile.durability == DurabilityPolicy.TRANSIENT_LOCAL for pub in infos
):
qos.durability = DurabilityPolicy.TRANSIENT_LOCAL
qos.reliability = ReliabilityPolicy.RELIABLE
if any(pub.qos_profile.reliability == ReliabilityPolicy.BEST_EFFORT for pub in infos):
qos.reliability = ReliabilityPolicy.BEST_EFFORT

# Create the subscriber and associated member variables
# Subscriptions is initialized with the current client to start with.
Expand Down Expand Up @@ -304,6 +310,7 @@ def subscribe(
node_handle: Node,
msg_type: str | None = None,
raw: bool = False,
qos: QoSProfile | None = None,
) -> None:
"""
Subscribe to a topic.
Expand All @@ -316,7 +323,7 @@ def subscribe(
with self._lock:
if topic not in self._subscribers:
self._subscribers[topic] = MultiSubscriber(
topic, client_id, callback, node_handle, msg_type=msg_type, raw=raw
topic, client_id, callback, node_handle, msg_type=msg_type, raw=raw, qos=qos
)
else:
self._subscribers[topic].subscribe(client_id, callback)
Expand Down