diff --git a/rosapi/src/rosapi/params.py b/rosapi/src/rosapi/params.py index 20c79c35d..3af991ebc 100644 --- a/rosapi/src/rosapi/params.py +++ b/rosapi/src/rosapi/params.py @@ -39,7 +39,7 @@ from rcl_interfaces.srv import GetParameters, ListParameters, SetParameters from rclpy.callback_groups import MutuallyExclusiveCallbackGroup from rclpy.node import Node -from rclpy.parameter import get_parameter_value +from rclpy.parameter import Parameter from rclpy.task import Future from ros2node.api import get_absolute_node_name from rosapi.async_helper import futures_wait_for diff --git a/rosbridge_library/src/rosbridge_library/capabilities/subscribe.py b/rosbridge_library/src/rosbridge_library/capabilities/subscribe.py index 0f523ab22..1aab0ed5a 100644 --- a/rosbridge_library/src/rosbridge_library/capabilities/subscribe.py +++ b/rosbridge_library/src/rosbridge_library/capabilities/subscribe.py @@ -31,6 +31,7 @@ # POSSIBILITY OF SUCH DAMAGE. import fnmatch +import uuid from functools import partial from threading import Lock @@ -110,6 +111,19 @@ def subscribe( compression is to be used (current valid values are 'png') """ + if sid is None: + self.protocol.log( + "WARNING: subscribe called with no subscription id, " + + "this is not supported by ROSBridge" + ) + sid = uuid.uuid4() + + if sid in self.clients: + self.protocol.log( + "WARNING: subscribe called with existing subscription id, " + + "this is not supported by ROSBridge" + ) + sid = uuid.uuid4() client_details = { "throttle_rate": throttle_rate, @@ -307,27 +321,34 @@ def publish(self, topic, message, fragment_size=None, compression="none"): 'png' and 'none' """ - # TODO: fragmentation, proper ids - - outgoing_msg = {"op": "publish", "topic": topic} - if compression == "png": - outgoing_msg["msg"] = message.get_json_values() - outgoing_msg_dumped = encode_json(outgoing_msg) - outgoing_msg = {"op": "png", "data": encode_png(outgoing_msg_dumped)} - elif compression == "cbor": - outgoing_msg = message.get_cbor(outgoing_msg) - elif compression == "cbor-raw": - (secs, nsecs) = self.protocol.node_handle.get_clock().now().seconds_nanoseconds() - outgoing_msg["msg"] = { - "secs": secs, - "nsecs": nsecs, - "bytes": message.message, - } - outgoing_msg = message.get_cbor_raw(outgoing_msg) - else: - outgoing_msg["msg"] = message.get_json_values() - self.protocol.send(outgoing_msg, compression=compression) + subscription = self._subscriptions.get(topic) + if subscription: + for sid in subscription.clients: + outgoing_msg = {"op": "publish", "topic": topic, "id": sid} + if compression == "png": + outgoing_msg["msg"] = message.get_json_values() + outgoing_msg_dumped = encode_json(outgoing_msg) + outgoing_msg = {"op": "png", "data": encode_png(outgoing_msg_dumped), "id": sid} + elif compression == "cbor": + outgoing_msg["id"] = sid + outgoing_msg = message.get_cbor(outgoing_msg) + elif compression == "cbor-raw": + (secs, nsecs) = ( + self.protocol.node_handle.get_clock().now().seconds_nanoseconds() + ) + outgoing_msg["msg"] = { + "secs": secs, + "nsecs": nsecs, + "bytes": message.message, + } + outgoing_msg["id"] = sid + outgoing_msg = message.get_cbor_raw(outgoing_msg) + else: + outgoing_msg["msg"] = message.get_json_values() + outgoing_msg["id"] = sid + + self.protocol.send(outgoing_msg, compression=compression) def finish(self): for subscription in self._subscriptions.values():