Skip to content
This repository was archived by the owner on Dec 9, 2025. It is now read-only.
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions lib/mqtt_abstraction_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,9 +364,11 @@ void MQTTAbstractionImpl::on_mqtt_message(const Message& message) {
}
lock.unlock();

// TODO(kai): this should maybe not even be a warning since it can happen that we unsubscribe from a topic and
// have removed the message handler but the MQTT unsubscribe didn't complete yet and we still receive messages
// on this topic that we can just ignore
if (!found) {
EVLOG_AND_THROW(
EverestInternalError(fmt::format("Internal error: topic '{}' should have a matching handler!", topic)));
EVLOG_warning << fmt::format("Internal error: topic '{}' should have a matching handler!", topic);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not really an error, as is written in the comment, this is an valid but unlikely event.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to a verbose log and slightly reworded the comment

}
} catch (boost::exception& e) {
EVLOG_critical << fmt::format("Caught MQTT on_message boost::exception:\n{}",
Expand Down Expand Up @@ -451,15 +453,15 @@ void MQTTAbstractionImpl::register_handler(const std::string& topic, std::shared
if (this->message_handlers.count(topic) == 0) {
this->message_handlers.emplace(std::piecewise_construct, std::forward_as_tuple(topic), std::forward_as_tuple());
}
this->message_handlers[topic].add_handler(handler);
this->message_handlers.at(topic).add_handler(handler);

// only subscribe for this topic if we aren't already and the mqtt client is connected
// if we are not connected the on_mqtt_connect() callback will subscribe to the topic
if (this->mqtt_is_connected && this->message_handlers[topic].count_handlers() == 1) {
if (this->mqtt_is_connected && this->message_handlers.at(topic).count_handlers() == 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seeing this the first time, why should count_handlers() == 1?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's because if count_handler == 1 after registering a new handler it was the first handler being registered which should then trigger a subscribe

EVLOG_verbose << fmt::format("Subscribing to {}", topic);
this->subscribe(topic, qos);
}
EVLOG_verbose << fmt::format("#handler[{}] = {}", topic, this->message_handlers[topic].count_handlers());
EVLOG_verbose << fmt::format("#handler[{}] = {}", topic, this->message_handlers.at(topic).count_handlers());
}

void MQTTAbstractionImpl::unregister_handler(const std::string& topic, const Token& token) {
Expand All @@ -484,6 +486,7 @@ void MQTTAbstractionImpl::unregister_handler(const std::string& topic, const Tok
EVLOG_verbose << fmt::format("Unsubscribing from {}", topic);
this->unsubscribe(topic);
}
this->message_handlers.erase(topic);
}

const std::string handler_count = (number_of_handlers == 0) ? "None" : std::to_string(number_of_handlers);
Expand Down