Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 42 additions & 23 deletions rosbridge_library/src/rosbridge_library/capabilities/subscribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from __future__ import annotations

import fnmatch
import uuid
from functools import partial
from threading import Lock
from typing import TYPE_CHECKING, Any, Generic
Expand Down Expand Up @@ -135,6 +136,20 @@ def subscribe(
:param compression: "none" if no compression, or some other value if
compression is to be used (current valid values are 'png')
"""
if sid is None:
self.protocol.log(
"WARNING: subscribe called with no subscription id, "
+ "this is not supported by ROSBridge"
)
sid = uuid.uuid4()

if sid in self.clients:
self.protocol.log(
"WARNING: subscribe called with existing subscription id, "
+ "this is not supported by ROSBridge"
)
sid = uuid.uuid4()

client_details = {
"throttle_rate": throttle_rate,
"queue_length": queue_length,
Expand Down Expand Up @@ -347,29 +362,33 @@ def publish(
:param compression: (optional) compress the message. valid values are
'png' and 'none'
"""
# TODO: fragmentation, proper ids

outgoing_msg: dict[str, Any] | bytes = {}
outgoing_msg_raw: dict[str, Any] = {"op": "publish", "topic": topic}
if compression == "png":
outgoing_msg_raw["msg"] = message.get_json_values()
outgoing_msg_dumped: str = encode_json(outgoing_msg_raw)
outgoing_msg = {"op": "png", "data": encode_png(outgoing_msg_dumped)}
elif compression == "cbor":
outgoing_msg = message.get_cbor(outgoing_msg_raw)
elif compression == "cbor-raw":
(secs, nsecs) = self.protocol.node_handle.get_clock().now().seconds_nanoseconds()
outgoing_msg_raw["msg"] = {
"secs": secs,
"nsecs": nsecs,
"bytes": message.message,
}
outgoing_msg = message.get_cbor_raw(outgoing_msg_raw)
else:
outgoing_msg_raw["msg"] = message.get_json_values()
outgoing_msg = outgoing_msg_raw

self.protocol.send(outgoing_msg, compression=compression)
subscription = self._subscriptions.get(topic)
if subscription:
for sid in subscription.clients:
outgoing_msg = {"op": "publish", "topic": topic, "id": sid}
if compression == "png":
outgoing_msg["msg"] = message.get_json_values()
outgoing_msg_dumped = encode_json(outgoing_msg)
outgoing_msg = {"op": "png", "data": encode_png(outgoing_msg_dumped), "id": sid}
elif compression == "cbor":
outgoing_msg["id"] = sid
outgoing_msg = message.get_cbor(outgoing_msg)
elif compression == "cbor-raw":
(secs, nsecs) = (
self.protocol.node_handle.get_clock().now().seconds_nanoseconds()
)
outgoing_msg["msg"] = {
"secs": secs,
"nsecs": nsecs,
"bytes": message.message,
}
outgoing_msg["id"] = sid
outgoing_msg = message.get_cbor_raw(outgoing_msg)
else:
outgoing_msg["msg"] = message.get_json_values()
outgoing_msg["id"] = sid

self.protocol.send(outgoing_msg, compression=compression)

def finish(self) -> None:
for subscription in self._subscriptions.values():
Expand Down
Loading