Skip to content
This repository was archived by the owner on Dec 9, 2025. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions lib/mqtt_abstraction_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,9 +364,10 @@ void MQTTAbstractionImpl::on_mqtt_message(const Message& message) {
}
lock.unlock();

// It can happen that we unsubscribe from a topic and have removed the message handler but the MQTT unsubscribe
// didn't complete yet and we still receive messages on this topic that we can just ignore
if (!found) {
EVLOG_AND_THROW(
EverestInternalError(fmt::format("Internal error: topic '{}' should have a matching handler!", topic)));
EVLOG_verbose << fmt::format("Topic '{}' should have a matching handler!", topic);
}
} catch (boost::exception& e) {
EVLOG_critical << fmt::format("Caught MQTT on_message boost::exception:\n{}",
Expand Down Expand Up @@ -451,15 +452,17 @@ void MQTTAbstractionImpl::register_handler(const std::string& topic, std::shared
if (this->message_handlers.count(topic) == 0) {
this->message_handlers.emplace(std::piecewise_construct, std::forward_as_tuple(topic), std::forward_as_tuple());
}
this->message_handlers[topic].add_handler(handler);

// only subscribe for this topic if we aren't already and the mqtt client is connected
// if we are not connected the on_mqtt_connect() callback will subscribe to the topic
if (this->mqtt_is_connected && this->message_handlers[topic].count_handlers() == 1) {
const auto subscription_necessary =
(this->mqtt_is_connected && this->message_handlers.at(topic).count_handlers() == 0);

this->message_handlers.at(topic).add_handler(handler);

if (subscription_necessary) {
EVLOG_verbose << fmt::format("Subscribing to {}", topic);
this->subscribe(topic, qos);
}
EVLOG_verbose << fmt::format("#handler[{}] = {}", topic, this->message_handlers[topic].count_handlers());
EVLOG_verbose << fmt::format("#handler[{}] = {}", topic, this->message_handlers.at(topic).count_handlers());
}

void MQTTAbstractionImpl::unregister_handler(const std::string& topic, const Token& token) {
Expand All @@ -484,6 +487,7 @@ void MQTTAbstractionImpl::unregister_handler(const std::string& topic, const Tok
EVLOG_verbose << fmt::format("Unsubscribing from {}", topic);
this->unsubscribe(topic);
}
this->message_handlers.erase(topic);
}

const std::string handler_count = (number_of_handlers == 0) ? "None" : std::to_string(number_of_handlers);
Expand Down