Skip to content

Commit c5d5405

Browse files
authored
Merge pull request #35 from mvccogo/main
2 parents 6a1287c + f727680 commit c5d5405

File tree

6 files changed

+154
-9
lines changed

6 files changed

+154
-9
lines changed

README.md

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,7 @@ Enables connected ROS-based devices or robots to exchange ROS messages via an MQ
383383

384384
###### Services
385385

386-
- `is_connected` ([`mqtt_client/srv/IsConnected`](mqtt_client_interfaces/srv/IsConnected.srv))
386+
- `~is_connected` ([`mqtt_client/srv/IsConnected`](mqtt_client_interfaces/srv/IsConnected.srv))
387387
Returns whether the client is connected to the MQTT broker.
388388

389389
###### Parameters
@@ -412,9 +412,17 @@ Enables connected ROS-based devices or robots to exchange ROS messages via an MQ
412412

413413
###### Services
414414

415-
- `is_connected` ([`mqtt_client/srv/IsConnected`](mqtt_client_interfaces/srv/IsConnected.srv))
415+
- `~/is_connected` ([`mqtt_client/srv/IsConnected`](mqtt_client_interfaces/srv/IsConnected.srv))
416416
Returns whether the client is connected to the MQTT broker.
417417

418+
- `~/new_ros2mqtt_bridge` ([`mqtt_client/srv/NewRos2MqttBridge`](mqtt_client_interfaces/srv/NewRos2MqttBridge.srv))
419+
Returns whether a new ROS -> MQTT bridge was created.
420+
421+
- `~/new_mqtt2ros_bridge` ([`mqtt_client/srv/NewMqtt2RosBridge`](mqtt_client_interfaces/srv/NewMqtt2RosBridge.srv))
422+
Returns whether a new MQTT -> ROS bridge was created.
423+
424+
425+
418426
###### Parameters
419427

420428
See [Configuration](#configuration).

mqtt_client/include/mqtt_client/MqttClient.ros2.hpp

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ SOFTWARE.
3535
#define FMT_HEADER_ONLY
3636
#include <fmt/format.h>
3737
#include <mqtt_client_interfaces/srv/is_connected.hpp>
38+
#include <mqtt_client_interfaces/srv/new_mqtt2_ros_bridge.hpp>
39+
#include <mqtt_client_interfaces/srv/new_ros2_mqtt_bridge.hpp>
3840
#include <mqtt/async_client.h>
3941
#include <rclcpp/rclcpp.hpp>
4042
#include <rclcpp/serialization.hpp>
@@ -277,6 +279,26 @@ class MqttClient : public rclcpp::Node,
277279
mqtt_client_interfaces::srv::IsConnected::Request::SharedPtr request,
278280
mqtt_client_interfaces::srv::IsConnected::Response::SharedPtr response);
279281

282+
/**
283+
* @brief ROS service that dynamically creates a ROS -> MQTT mapping.
284+
*
285+
* @param request service request
286+
* @param response service response
287+
*/
288+
void newRos2MqttBridge(
289+
mqtt_client_interfaces::srv::NewRos2MqttBridge::Request::SharedPtr request,
290+
mqtt_client_interfaces::srv::NewRos2MqttBridge::Response::SharedPtr response);
291+
292+
/**
293+
* @brief ROS service that dynamically creates an MQTT -> ROS mapping.
294+
*
295+
* @param request service request
296+
* @param response service response
297+
*/
298+
void newMqtt2RosBridge(
299+
mqtt_client_interfaces::srv::NewMqtt2RosBridge::Request::SharedPtr request,
300+
mqtt_client_interfaces::srv::NewMqtt2RosBridge::Response::SharedPtr response);
301+
280302
/**
281303
* @brief Callback for when the client receives a MQTT message from the
282304
* broker.
@@ -374,6 +396,7 @@ class MqttClient : public rclcpp::Node,
374396
subscriber; ///< generic ROS subscriber
375397
std::string msg_type; ///< message type of subscriber
376398
int queue_size = 1; ///< ROS subscriber queue size
399+
bool is_stale = false; ///< whether a new generic publisher/subscriber is required
377400
} ros; ///< ROS-related variables
378401
struct {
379402
std::string topic; ///< MQTT topic
@@ -399,6 +422,7 @@ class MqttClient : public rclcpp::Node,
399422
latency_publisher; ///< ROS publisher for latency
400423
int queue_size = 1; ///< ROS publisher queue size
401424
bool latched = false; ///< whether to latch ROS message
425+
bool is_stale = false; ///< whether a new generic publisher/subscriber is required
402426
} ros; ///< ROS-related variables
403427
bool primitive = false; ///< whether to publish as primitive message (if
404428
///< coming from non-ROS MQTT client)
@@ -432,6 +456,18 @@ class MqttClient : public rclcpp::Node,
432456
rclcpp::Service<mqtt_client_interfaces::srv::IsConnected>::SharedPtr
433457
is_connected_service_;
434458

459+
/**
460+
* @brief ROS Service server for providing dynamic ROS to MQTT mappings.
461+
*/
462+
rclcpp::Service<mqtt_client_interfaces::srv::NewRos2MqttBridge>::SharedPtr
463+
new_ros2mqtt_bridge_service_;
464+
465+
/**
466+
* @brief ROS Service server for providing dynamic MQTT to ROS mappings.
467+
*/
468+
rclcpp::Service<mqtt_client_interfaces::srv::NewMqtt2RosBridge>::SharedPtr
469+
new_mqtt2ros_bridge_service_;
470+
435471
/**
436472
* @brief Status variable keeping track of connection status to broker
437473
*/

mqtt_client/src/MqttClient.ros2.cpp

Lines changed: 85 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -446,9 +446,18 @@ void MqttClient::setup() {
446446
// create ROS service server
447447
is_connected_service_ =
448448
create_service<mqtt_client_interfaces::srv::IsConnected>(
449-
"is_connected", std::bind(&MqttClient::isConnectedService, this,
449+
"~/is_connected", std::bind(&MqttClient::isConnectedService, this,
450450
std::placeholders::_1, std::placeholders::_2));
451451

452+
// create dynamic mappings services
453+
new_ros2mqtt_bridge_service_ =
454+
create_service<mqtt_client_interfaces::srv::NewRos2MqttBridge>(
455+
"~/new_ros2mqtt_bridge", std::bind(&MqttClient::newRos2MqttBridge, this,
456+
std::placeholders::_1, std::placeholders::_2));
457+
new_mqtt2ros_bridge_service_ =
458+
create_service<mqtt_client_interfaces::srv::NewMqtt2RosBridge>(
459+
"~/new_mqtt2ros_bridge", std::bind(&MqttClient::newMqtt2RosBridge, this,
460+
std::placeholders::_1, std::placeholders::_2));
452461

453462
// setup subscribers; check for new types every second
454463
check_subscriptions_timer_ =
@@ -466,9 +475,10 @@ void MqttClient::setupSubscriptions() {
466475
for (auto& [ros_topic, ros2mqtt] : ros2mqtt_) {
467476
if (all_topics_and_types.count(ros_topic)) {
468477

469-
// check if message type has changed
478+
// check if message type has changed or if mapping is stale
470479
const std::string& msg_type = all_topics_and_types.at(ros_topic)[0];
471-
if (msg_type == ros2mqtt.ros.msg_type) continue;
480+
if (msg_type == ros2mqtt.ros.msg_type && !ros2mqtt.ros.is_stale) continue;
481+
ros2mqtt.ros.is_stale = false;
472482
ros2mqtt.ros.msg_type = msg_type;
473483

474484
// create new generic subscription, if message type has changed
@@ -826,8 +836,8 @@ void MqttClient::mqtt2primitive(mqtt::const_message_ptr mqtt_msg) {
826836
serializeRosMessage(msg, serialized_msg);
827837
}
828838

829-
// if ROS message type has changed
830-
if (ros_msg_type != mqtt2ros.ros.msg_type) {
839+
// if ROS message type has changed or if mapping is stale
840+
if (ros_msg_type != mqtt2ros.ros.msg_type || mqtt2ros.ros.is_stale) {
831841

832842
mqtt2ros.ros.msg_type = ros_msg_type;
833843
RCLCPP_INFO(get_logger(),
@@ -843,6 +853,7 @@ void MqttClient::mqtt2primitive(mqtt::const_message_ptr mqtt_msg) {
843853
e.what());
844854
return;
845855
}
856+
mqtt2ros.ros.is_stale = false;
846857
}
847858

848859
// publish
@@ -902,6 +913,72 @@ void MqttClient::isConnectedService(
902913
response->connected = isConnected();
903914
}
904915

916+
void MqttClient::newRos2MqttBridge(
917+
mqtt_client_interfaces::srv::NewRos2MqttBridge::Request::SharedPtr request,
918+
mqtt_client_interfaces::srv::NewRos2MqttBridge::Response::SharedPtr response){
919+
920+
// add mapping definition to ros2mqtt_
921+
Ros2MqttInterface& ros2mqtt = ros2mqtt_[request->ros_topic];
922+
ros2mqtt.ros.is_stale = true;
923+
ros2mqtt.mqtt.topic = request->mqtt_topic;
924+
ros2mqtt.primitive = request->primitive;
925+
ros2mqtt.stamped = request->inject_timestamp;
926+
ros2mqtt.ros.queue_size = request->ros_queue_size;
927+
ros2mqtt.mqtt.qos = request->mqtt_qos;
928+
ros2mqtt.mqtt.retained = request->mqtt_retained;
929+
930+
if (ros2mqtt.stamped && ros2mqtt.primitive) {
931+
RCLCPP_WARN(
932+
get_logger(),
933+
"Timestamp will not be injected into primitive messages on ROS "
934+
"topic '%s'",
935+
request->ros_topic.c_str());
936+
ros2mqtt.stamped = false;
937+
}
938+
939+
RCLCPP_INFO(get_logger(), "Bridging %sROS topic '%s' to MQTT topic '%s' %s",
940+
ros2mqtt.primitive ? "primitive " : "", request->ros_topic.c_str(),
941+
ros2mqtt.mqtt.topic.c_str(),
942+
ros2mqtt.stamped ? "and measuring latency" : "");
943+
944+
// (re-)setup ROS subscriptions
945+
setupSubscriptions();
946+
947+
response->success = true;
948+
}
949+
950+
void MqttClient::newMqtt2RosBridge(
951+
mqtt_client_interfaces::srv::NewMqtt2RosBridge::Request::SharedPtr request,
952+
mqtt_client_interfaces::srv::NewMqtt2RosBridge::Response::SharedPtr response){
953+
954+
// add mapping definition to mqtt2ros_
955+
Mqtt2RosInterface& mqtt2ros = mqtt2ros_[request->mqtt_topic];
956+
mqtt2ros.ros.is_stale = true;
957+
mqtt2ros.ros.topic = request->ros_topic;
958+
mqtt2ros.primitive = request->primitive;
959+
mqtt2ros.mqtt.qos = request->mqtt_qos;
960+
mqtt2ros.ros.queue_size = request->ros_queue_size;
961+
mqtt2ros.ros.latched = request->ros_latched;
962+
if (mqtt2ros.ros.latched) {
963+
RCLCPP_WARN(get_logger(),
964+
fmt::format("Parameter 'bridge.mqtt2ros.{}.advanced.ros.latched' is ignored "
965+
"since ROS 2 does not easily support latched topics.", request->mqtt_topic).c_str());
966+
967+
}
968+
969+
RCLCPP_INFO(get_logger(), "Bridging MQTT topic '%s' to %sROS topic '%s'",
970+
request->mqtt_topic.c_str(), mqtt2ros.primitive ? "primitive " : "",
971+
mqtt2ros.ros.topic.c_str());
972+
973+
// subscribe to the MQTT topic
974+
std::string mqtt_topic_to_subscribe = request->mqtt_topic;
975+
if (!mqtt2ros.primitive)
976+
mqtt_topic_to_subscribe = kRosMsgTypeMqttTopicPrefix + request->mqtt_topic;
977+
client_->subscribe(mqtt_topic_to_subscribe, mqtt2ros.mqtt.qos);
978+
RCLCPP_INFO(get_logger(), "Subscribed MQTT topic '%s'", mqtt_topic_to_subscribe.c_str());
979+
980+
response->success = true;
981+
}
905982

906983
void MqttClient::message_arrived(mqtt::const_message_ptr mqtt_msg) {
907984

@@ -945,8 +1022,8 @@ void MqttClient::message_arrived(mqtt::const_message_ptr mqtt_msg) {
9451022
kRosMsgTypeMqttTopicPrefix.length());
9461023
Mqtt2RosInterface& mqtt2ros = mqtt2ros_[mqtt_data_topic];
9471024

948-
// if ROS message type has changed
949-
if (ros_msg_type.name != mqtt2ros.ros.msg_type) {
1025+
// if ROS message type has changed or if mapping is stale
1026+
if (ros_msg_type.name != mqtt2ros.ros.msg_type || mqtt2ros.ros.is_stale) {
9501027

9511028
mqtt2ros.ros.msg_type = ros_msg_type.name;
9521029
mqtt2ros.stamped = ros_msg_type.stamped;
@@ -963,6 +1040,7 @@ void MqttClient::message_arrived(mqtt::const_message_ptr mqtt_msg) {
9631040
e.what());
9641041
return;
9651042
}
1043+
mqtt2ros.ros.is_stale = false;
9661044

9671045
// subscribe to MQTT topic with actual ROS messages
9681046
client_->subscribe(mqtt_data_topic, mqtt2ros.mqtt.qos);

mqtt_client_interfaces/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ if(${ROS_VERSION} EQUAL 2)
1414
rosidl_generate_interfaces(${PROJECT_NAME}
1515
msg/RosMsgType.msg
1616
srv/IsConnected.srv
17+
srv/ros2/NewMqtt2RosBridge.srv
18+
srv/ros2/NewRos2MqttBridge.srv
1719
DEPENDENCIES std_msgs
1820
)
1921

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# This service is used to dynamically register new MQTT -> ROS mappings.
2+
3+
string ros_topic
4+
string mqtt_topic
5+
bool primitive false
6+
uint8 mqtt_qos 0
7+
uint32 ros_queue_size 1
8+
bool ros_latched false
9+
---
10+
bool success
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# This service is used to dynamically register new ROS -> MQTT mappings.
2+
3+
string ros_topic
4+
string mqtt_topic
5+
bool primitive false
6+
bool inject_timestamp false
7+
uint32 ros_queue_size 1
8+
uint8 mqtt_qos 0
9+
bool mqtt_retained false
10+
---
11+
bool success

0 commit comments

Comments
 (0)