Skip to content

Commit 4d6d255

Browse files
committed
add handling for overwriting existing connections to ros2mqtt as well
1 parent aacc47e commit 4d6d255

File tree

2 files changed

+15
-35
lines changed

2 files changed

+15
-35
lines changed

mqtt_client/include/mqtt_client/MqttClient.ros2.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,7 @@ class MqttClient : public rclcpp::Node,
396396
subscriber; ///< generic ROS subscriber
397397
std::string msg_type; ///< message type of subscriber
398398
int queue_size = 1; ///< ROS subscriber queue size
399+
bool is_stale = false; ///< whether a new generic publisher/subscriber is required
399400
} ros; ///< ROS-related variables
400401
struct {
401402
std::string topic; ///< MQTT topic
@@ -421,11 +422,11 @@ class MqttClient : public rclcpp::Node,
421422
latency_publisher; ///< ROS publisher for latency
422423
int queue_size = 1; ///< ROS publisher queue size
423424
bool latched = false; ///< whether to latch ROS message
425+
bool is_stale = false; ///< whether a new generic publisher/subscriber is required
424426
} ros; ///< ROS-related variables
425427
bool primitive = false; ///< whether to publish as primitive message (if
426428
///< coming from non-ROS MQTT client)
427429
bool stamped = false; ///< whether timestamp is injected
428-
bool is_stale = false; ///< whether a new generic publisher/subscriber is required
429430
};
430431

431432
protected:

mqtt_client/src/MqttClient.ros2.cpp

Lines changed: 13 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -475,9 +475,10 @@ void MqttClient::setupSubscriptions() {
475475
for (auto& [ros_topic, ros2mqtt] : ros2mqtt_) {
476476
if (all_topics_and_types.count(ros_topic)) {
477477

478-
// check if message type has changed
478+
// check if message type has changed or if mapping is stale
479479
const std::string& msg_type = all_topics_and_types.at(ros_topic)[0];
480-
if (msg_type == ros2mqtt.ros.msg_type) continue;
480+
if (msg_type == ros2mqtt.ros.msg_type && !ros2mqtt.ros.is_stale) continue;
481+
ros2mqtt.ros.is_stale = false;
481482
ros2mqtt.ros.msg_type = msg_type;
482483

483484
// create new generic subscription, if message type has changed
@@ -836,7 +837,7 @@ void MqttClient::mqtt2primitive(mqtt::const_message_ptr mqtt_msg) {
836837
}
837838

838839
// if ROS message type has changed or if mapping is stale
839-
if (ros_msg_type != mqtt2ros.ros.msg_type || mqtt2ros.is_stale) {
840+
if (ros_msg_type != mqtt2ros.ros.msg_type || mqtt2ros.ros.is_stale) {
840841

841842
mqtt2ros.ros.msg_type = ros_msg_type;
842843
RCLCPP_INFO(get_logger(),
@@ -852,7 +853,7 @@ void MqttClient::mqtt2primitive(mqtt::const_message_ptr mqtt_msg) {
852853
e.what());
853854
return;
854855
}
855-
mqtt2ros.is_stale = false;
856+
mqtt2ros.ros.is_stale = false;
856857
}
857858

858859
// publish
@@ -918,6 +919,7 @@ void MqttClient::newRos2MqttBridge(
918919

919920
// add mapping definition to ros2mqtt_
920921
Ros2MqttInterface& ros2mqtt = ros2mqtt_[request->ros_topic];
922+
ros2mqtt.ros.is_stale = true;
921923
ros2mqtt.mqtt.topic = request->mqtt_topic;
922924
ros2mqtt.primitive = request->primitive;
923925
ros2mqtt.stamped = request->inject_timestamp;
@@ -939,32 +941,10 @@ void MqttClient::newRos2MqttBridge(
939941
ros2mqtt.mqtt.topic.c_str(),
940942
ros2mqtt.stamped ? "and measuring latency" : "");
941943

942-
// setup subscription
944+
// (re-)setup ROS subscriptions
945+
setupSubscriptions();
943946

944-
// check if ROS topic exists
945-
const auto all_topics_and_types = get_topic_names_and_types();
946-
if (all_topics_and_types.count(request->ros_topic)) {
947-
948-
const std::string& msg_type = all_topics_and_types.at(request->ros_topic)[0];
949-
ros2mqtt.ros.msg_type = msg_type;
950-
951-
// create new generic subscription, if message type has changed
952-
std::function<void(const std::shared_ptr<rclcpp::SerializedMessage> msg)>
953-
bound_callback_func = std::bind(&MqttClient::ros2mqtt, this,
954-
std::placeholders::_1, request->ros_topic);
955-
try {
956-
ros2mqtt.ros.subscriber = create_generic_subscription(
957-
request->ros_topic, msg_type, ros2mqtt.ros.queue_size, bound_callback_func);
958-
} catch (rclcpp::exceptions::RCLError& e) {
959-
RCLCPP_ERROR(get_logger(), "Failed to create generic subscriber: %s",
960-
e.what());
961-
response->success = false;
962-
return;
963-
}
964-
RCLCPP_INFO(get_logger(), "Subscribed ROS topic '%s' of type '%s'",
965-
request->ros_topic.c_str(), msg_type.c_str());
966-
response->success = true;
967-
}
947+
response->success = true;
968948
}
969949

970950
void MqttClient::newMqtt2RosBridge(
@@ -973,6 +953,7 @@ void MqttClient::newMqtt2RosBridge(
973953

974954
// add mapping definition to mqtt2ros_
975955
Mqtt2RosInterface& mqtt2ros = mqtt2ros_[request->mqtt_topic];
956+
mqtt2ros.ros.is_stale = true;
976957
mqtt2ros.ros.topic = request->ros_topic;
977958
mqtt2ros.primitive = request->primitive;
978959
mqtt2ros.mqtt.qos = request->mqtt_qos;
@@ -995,10 +976,8 @@ void MqttClient::newMqtt2RosBridge(
995976
mqtt_topic_to_subscribe = kRosMsgTypeMqttTopicPrefix + request->mqtt_topic;
996977
client_->subscribe(mqtt_topic_to_subscribe, mqtt2ros.mqtt.qos);
997978
RCLCPP_INFO(get_logger(), "Subscribed MQTT topic '%s'", mqtt_topic_to_subscribe.c_str());
998-
response->success = true;
999979

1000-
// mark as stale to force creation of new ROS publisher
1001-
mqtt2ros.is_stale = true;
980+
response->success = true;
1002981
}
1003982

1004983
void MqttClient::message_arrived(mqtt::const_message_ptr mqtt_msg) {
@@ -1044,7 +1023,7 @@ void MqttClient::message_arrived(mqtt::const_message_ptr mqtt_msg) {
10441023
Mqtt2RosInterface& mqtt2ros = mqtt2ros_[mqtt_data_topic];
10451024

10461025
// if ROS message type has changed or if mapping is stale
1047-
if (ros_msg_type.name != mqtt2ros.ros.msg_type || mqtt2ros.is_stale) {
1026+
if (ros_msg_type.name != mqtt2ros.ros.msg_type || mqtt2ros.ros.is_stale) {
10481027

10491028
mqtt2ros.ros.msg_type = ros_msg_type.name;
10501029
mqtt2ros.stamped = ros_msg_type.stamped;
@@ -1061,7 +1040,7 @@ void MqttClient::message_arrived(mqtt::const_message_ptr mqtt_msg) {
10611040
e.what());
10621041
return;
10631042
}
1064-
mqtt2ros.is_stale = false;
1043+
mqtt2ros.ros.is_stale = false;
10651044

10661045
// subscribe to MQTT topic with actual ROS messages
10671046
client_->subscribe(mqtt_data_topic, mqtt2ros.mqtt.qos);

0 commit comments

Comments
 (0)