@@ -433,7 +433,6 @@ void MqttClient::loadParameters() {
433433 if (get_parameter (fmt::format (" bridge.ros2mqtt.{}.ros_type" , ros_topic), ros_type_param)) {
434434 ros2mqtt.ros .msg_type = ros_type_param.as_string ();
435435 ros2mqtt.fixed_type = true ;
436- ros2mqtt.primitive = true ;
437436 RCLCPP_DEBUG (get_logger (), " Using explicit ROS message type '%s'" , ros2mqtt.ros .msg_type .c_str ());
438437 }
439438
@@ -532,7 +531,6 @@ void MqttClient::loadParameters() {
532531 if (get_parameter (fmt::format (" bridge.mqtt2ros.{}.ros_type" , mqtt_topic), ros_type_param)) {
533532 mqtt2ros.ros .msg_type = ros_type_param.as_string ();
534533 mqtt2ros.fixed_type = true ;
535- mqtt2ros.primitive = true ;
536534 RCLCPP_DEBUG (get_logger (), " Using explicit ROS message type '%s' for '%s'" , mqtt2ros.ros .msg_type .c_str (), ros_topic.c_str ());
537535 }
538536
@@ -1280,11 +1278,17 @@ void MqttClient::connected(const std::string& cause) {
12801278
12811279 // subscribe MQTT topics
12821280 for (const auto & [mqtt_topic, mqtt2ros] : mqtt2ros_) {
1283- std::string mqtt_topic_to_subscribe = mqtt_topic;
1284- if (!mqtt2ros.primitive ) // subscribe topics for ROS message types first
1285- mqtt_topic_to_subscribe = kRosMsgTypeMqttTopicPrefix + mqtt_topic;
1286- client_->subscribe (mqtt_topic_to_subscribe, mqtt2ros.mqtt .qos );
1287- RCLCPP_INFO (get_logger (), " Subscribed MQTT topic '%s'" , mqtt_topic_to_subscribe.c_str ());
1281+ if (!mqtt2ros.primitive ) {
1282+ std::string const mqtt_topic_to_subscribe = kRosMsgTypeMqttTopicPrefix + mqtt_topic;
1283+ client_->subscribe (mqtt_topic_to_subscribe, mqtt2ros.mqtt .qos );
1284+ RCLCPP_INFO (get_logger (), " Subscribed MQTT topic '%s'" , mqtt_topic_to_subscribe.c_str ());
1285+ }
1286+ // If not primitive and not fixed, we need the message type before we can public. In that case
1287+ // wait for the type to come in before subscribing to the data topic
1288+ if (mqtt2ros.primitive || mqtt2ros.fixed_type ) {
1289+ client_->subscribe (mqtt_topic, mqtt2ros.mqtt .qos );
1290+ RCLCPP_INFO (get_logger (), " Subscribed MQTT topic '%s'" , mqtt_topic.c_str ());
1291+ }
12881292 }
12891293}
12901294
@@ -1395,13 +1399,12 @@ void MqttClient::message_arrived(mqtt::const_message_ptr mqtt_msg) {
13951399 if (mqtt2ros_.count (mqtt_topic) > 0 ) {
13961400 Mqtt2RosInterface& mqtt2ros = mqtt2ros_[mqtt_topic];
13971401
1398- if (mqtt2ros.fixed_type ) {
1399- mqtt2fixed (mqtt_msg);
1400- return ;
1401- }
1402-
14031402 if (mqtt2ros.primitive ) {
1404- mqtt2primitive (mqtt_msg);
1403+ if (mqtt2ros.fixed_type ) {
1404+ mqtt2fixed (mqtt_msg);
1405+ } else {
1406+ mqtt2primitive (mqtt_msg);
1407+ }
14051408 return ;
14061409 }
14071410 }
@@ -1432,6 +1435,21 @@ void MqttClient::message_arrived(mqtt::const_message_ptr mqtt_msg) {
14321435 // if ROS message type has changed or if mapping is stale
14331436 if (ros_msg_type.name != mqtt2ros.ros .msg_type || mqtt2ros.ros .is_stale ) {
14341437
1438+ if (mqtt2ros.fixed_type ) {
1439+ // We should never be in this situation if the type has been set explicitly. As fixed_type
1440+ // is not currently supported through the service based bridge creation and the type name
1441+ // not matching means the fixed type specified in the configuration does not match the
1442+ // one we just recieved
1443+ if (ros_msg_type.name != mqtt2ros.ros .msg_type )
1444+ RCLCPP_ERROR (get_logger (),
1445+ " Unexpected type name received for topic %s (expected %s but received %s)" ,
1446+ mqtt2ros.ros .topic .c_str (), mqtt2ros.ros .msg_type .c_str (), ros_msg_type.name .c_str ());
1447+ if (mqtt2ros.ros .is_stale )
1448+ RCLCPP_ERROR (get_logger (), " Topic %s has been unexpectedly marked stale" ,
1449+ mqtt2ros.ros .topic .c_str ());
1450+ return ;
1451+ }
1452+
14351453 mqtt2ros.ros .msg_type = ros_msg_type.name ;
14361454 mqtt2ros.stamped = ros_msg_type.stamped ;
14371455 RCLCPP_INFO (get_logger (),
0 commit comments