@@ -461,7 +461,6 @@ void MqttClient::setup() {
461461 " new_mqtt2ros_bridge" , std::bind (&MqttClient::newMqtt2RosBridge, this ,
462462 std::placeholders::_1, std::placeholders::_2));
463463
464-
465464 // setup subscribers; check for new types every second
466465 check_subscriptions_timer_ =
467466 create_wall_timer (std::chrono::duration<double >(1.0 ),
@@ -919,9 +918,7 @@ void MqttClient::newRos2MqttBridge(
919918 mqtt_client_interfaces::srv::NewRos2MqttBridge::Request::SharedPtr request,
920919 mqtt_client_interfaces::srv::NewRos2MqttBridge::Response::SharedPtr response){
921920
922-
923-
924- // Add mapping definition to ros2mqtt_
921+ // add mapping definition to ros2mqtt_
925922 Ros2MqttInterface& ros2mqtt = ros2mqtt_[request->ros_topic ];
926923 ros2mqtt.mqtt .topic = request->mqtt_topic ;
927924 ros2mqtt.primitive = request->primitive ;
@@ -944,33 +941,30 @@ void MqttClient::newRos2MqttBridge(
944941 ros2mqtt.mqtt .topic .c_str (),
945942 ros2mqtt.stamped ? " and measuring latency" : " " );
946943
947- // Setup subscription
944+ // setup subscription
948945
949946 // check if ROS topic exists
950-
951947 const auto all_topics_and_types = get_topic_names_and_types ();
952- if (all_topics_and_types.count (request->ros_topic )){
948+ if (all_topics_and_types.count (request->ros_topic )) {
953949
950+ const std::string& msg_type = all_topics_and_types.at (request->ros_topic )[0 ];
951+ ros2mqtt.ros .msg_type = msg_type;
954952
955-
956- const std::string& msg_type = all_topics_and_types.at (request->ros_topic )[0 ];
957- ros2mqtt.ros .msg_type = msg_type;
958-
959- // create new generic subscription, if message type has changed
960- std::function<void (const std::shared_ptr<rclcpp::SerializedMessage> msg)>
961- bound_callback_func = std::bind (&MqttClient::ros2mqtt, this ,
962- std::placeholders::_1, request->ros_topic );
963- try {
964- ros2mqtt.ros .subscriber = create_generic_subscription (
965- request->ros_topic , msg_type, ros2mqtt.ros .queue_size , bound_callback_func);
966- } catch (rclcpp::exceptions::RCLError& e) {
967- RCLCPP_ERROR (get_logger (), " Failed to create generic subscriber: %s" ,
968- e.what ());
969- response->success = false ;
970- return ;
971- }
972- RCLCPP_INFO (get_logger (), " Subscribed ROS topic '%s' of type '%s'" ,
973- request->ros_topic .c_str (), msg_type.c_str ());
953+ // create new generic subscription, if message type has changed
954+ std::function<void (const std::shared_ptr<rclcpp::SerializedMessage> msg)>
955+ bound_callback_func = std::bind (&MqttClient::ros2mqtt, this ,
956+ std::placeholders::_1, request->ros_topic );
957+ try {
958+ ros2mqtt.ros .subscriber = create_generic_subscription (
959+ request->ros_topic , msg_type, ros2mqtt.ros .queue_size , bound_callback_func);
960+ } catch (rclcpp::exceptions::RCLError& e) {
961+ RCLCPP_ERROR (get_logger (), " Failed to create generic subscriber: %s" ,
962+ e.what ());
963+ response->success = false ;
964+ return ;
965+ }
966+ RCLCPP_INFO (get_logger (), " Subscribed ROS topic '%s' of type '%s'" ,
967+ request->ros_topic .c_str (), msg_type.c_str ());
974968 response->success = true ;
975969 }
976970}
@@ -979,15 +973,14 @@ void MqttClient::newMqtt2RosBridge(
979973 mqtt_client_interfaces::srv::NewMqtt2RosBridge::Request::SharedPtr request,
980974 mqtt_client_interfaces::srv::NewMqtt2RosBridge::Response::SharedPtr response){
981975
982-
983- // Add mapping definition to mqtt2ros_
976+ // add mapping definition to mqtt2ros_
984977 Mqtt2RosInterface& mqtt2ros = mqtt2ros_[request->mqtt_topic ];
985978 mqtt2ros.ros .topic = request->ros_topic ;
986979 mqtt2ros.primitive = request->primitive ;
987980 mqtt2ros.mqtt .qos = request->mqtt_qos ;
988981 mqtt2ros.ros .queue_size = request->ros_queue_size ;
989982 mqtt2ros.ros .latched = request->ros_latched ;
990- if (mqtt2ros.ros .latched ){
983+ if (mqtt2ros.ros .latched ) {
991984 RCLCPP_WARN (get_logger (),
992985 fmt::format (" Parameter 'bridge.mqtt2ros.{}.advanced.ros.latched' is ignored "
993986 " since ROS 2 does not easily support latched topics." , request->mqtt_topic ).c_str ());
@@ -998,14 +991,15 @@ void MqttClient::newMqtt2RosBridge(
998991 request->mqtt_topic .c_str (), mqtt2ros.primitive ? " primitive " : " " ,
999992 mqtt2ros.ros .topic .c_str ());
1000993
1001- // Subscribe to the MQTT topic
994+ // subscribe to the MQTT topic
1002995 std::string mqtt_topic_to_subscribe = request->mqtt_topic ;
1003- if (!mqtt2ros.primitive )
996+ if (!mqtt2ros.primitive )
1004997 mqtt_topic_to_subscribe = kRosMsgTypeMqttTopicPrefix + request->mqtt_topic ;
1005998 client_->subscribe (mqtt_topic_to_subscribe, mqtt2ros.mqtt .qos );
1006999 RCLCPP_INFO (get_logger (), " Subscribed MQTT topic '%s'" , mqtt_topic_to_subscribe.c_str ());
10071000 response->success = true ;
1008- // Mark as stale to force creation of new publisher
1001+
1002+ // mark as stale to force creation of new ROS publisher
10091003 mqtt2ros.is_stale = true ;
10101004}
10111005
@@ -1070,6 +1064,7 @@ void MqttClient::message_arrived(mqtt::const_message_ptr mqtt_msg) {
10701064 return ;
10711065 }
10721066 mqtt2ros.is_stale = false ;
1067+
10731068 // subscribe to MQTT topic with actual ROS messages
10741069 client_->subscribe (mqtt_data_topic, mqtt2ros.mqtt .qos );
10751070 RCLCPP_DEBUG (get_logger (), " Subscribed MQTT topic '%s'" ,
0 commit comments