Skip to content
This repository was archived by the owner on Dec 9, 2025. It is now read-only.
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 50 additions & 62 deletions lib/everest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -549,36 +549,39 @@ void Everest::subscribe_error(const Requirement& req, const error::ErrorType& er
return;
}

const auto raise_handler = [this, requirement_module_id, requirement_impl_id, error_type,
callback](const std::string&, json const& data) {
EVLOG_debug << fmt::format("Incoming error {}->{}",
this->config.printable_identifier(requirement_module_id, requirement_impl_id),
error_type);

callback(data.get<error::Error>());
};

const auto clear_handler = [this, requirement_module_id, requirement_impl_id, error_type,
const auto error_handler = [this, requirement_module_id, requirement_impl_id, error_type, callback,
clear_callback](const std::string&, json const& data) {
EVLOG_debug << fmt::format("Error cleared {}->{}",
this->config.printable_identifier(requirement_module_id, requirement_impl_id),
error_type);
clear_callback(data.get<error::Error>());
};
auto error = data.get<error::Error>();
if (error.type != error_type) {
// error type doesn't match, ignoring
return;
}

const std::string raise_topic =
fmt::format("{}/error/{}", this->config.mqtt_prefix(requirement_module_id, requirement_impl_id), error_type);
switch (error.state) {
case error::State::Active:
EVLOG_debug << fmt::format("Incoming error {}->{}",
this->config.printable_identifier(requirement_module_id, requirement_impl_id),
error_type);

callback(error);
break;
case error::State::ClearedByModule:
case error::State::ClearedByReboot:
EVLOG_debug << fmt::format("Error cleared {}->{}",
this->config.printable_identifier(requirement_module_id, requirement_impl_id),
error_type);
clear_callback(error);
break;
}
};

const std::string clear_topic = fmt::format(
"{}/error-cleared/{}", this->config.mqtt_prefix(requirement_module_id, requirement_impl_id), error_type);
const std::string error_topic =
fmt::format("{}/error", this->config.mqtt_prefix(requirement_module_id, requirement_impl_id));

const std::shared_ptr<TypedHandler> raise_token = std::make_shared<TypedHandler>(
error_type, HandlerType::SubscribeError, std::make_shared<Handler>(raise_handler));
const std::shared_ptr<TypedHandler> clear_token = std::make_shared<TypedHandler>(
error_type, HandlerType::SubscribeError, std::make_shared<Handler>(clear_handler));
const std::shared_ptr<TypedHandler> error_token = std::make_shared<TypedHandler>(
error_type, HandlerType::SubscribeError, std::make_shared<Handler>(error_handler));

this->mqtt_abstraction->register_handler(raise_topic, raise_token, QOS::QOS2);
this->mqtt_abstraction->register_handler(clear_topic, clear_token, QOS::QOS2);
this->mqtt_abstraction->register_handler(error_topic, error_token, QOS::QOS2);
}

std::shared_ptr<error::ErrorManagerImpl> Everest::get_error_manager_impl(const std::string& impl_id) {
Expand Down Expand Up @@ -647,64 +650,49 @@ void Everest::subscribe_global_all_errors(const error::ErrorCallback& callback,
return;
}

const auto raise_handler = [this, callback](const std::string&, json const& data) {
const auto error_handler = [this, callback, clear_callback](const std::string&, json const& data) {
error::Error error = data.get<error::Error>();
EVLOG_debug << fmt::format(
"Incoming error {}->{}",
this->config.printable_identifier(error.origin.module_id, error.origin.implementation_id), error.type);
callback(error);
};

const auto clear_handler = [this, clear_callback](const std::string&, json const& data) {
error::Error error = data.get<error::Error>();
EVLOG_debug << fmt::format(
"Incoming error cleared {}->{}",
this->config.printable_identifier(error.origin.module_id, error.origin.implementation_id), error.type);
clear_callback(error);
switch (error.state) {
case error::State::Active:
EVLOG_debug << fmt::format(
"Incoming error {}->{}",
this->config.printable_identifier(error.origin.module_id, error.origin.implementation_id), error.type);
callback(error);
break;
case error::State::ClearedByModule:
case error::State::ClearedByReboot:
EVLOG_debug << fmt::format(
"Incoming error cleared {}->{}",
this->config.printable_identifier(error.origin.module_id, error.origin.implementation_id), error.type);
clear_callback(error);
break;
}
Comment on lines +655 to +668
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part of the handler can probably be shared with the other one

};

for (const auto& [module_id, module_name] : this->config.get_module_names()) {
const json provides = this->config.get_manifests().at(module_name).at("provides");
for (const auto& impl : provides.items()) {
const std::string& impl_id = impl.key();
const std::string& interface = impl.value().at("interface");
const json errors = this->config.get_interface_definition(interface).at("errors");
for (const auto& error_namespace_it : errors.items()) {
const std::string& error_type_namespace = error_namespace_it.key();
for (const auto& error_name_it : error_namespace_it.value().items()) {
const std::string& error_type_name = error_name_it.key();
const std::string raise_topic =
fmt::format("{}/error/{}/{}", this->config.mqtt_prefix(module_id, impl_id),
error_type_namespace, error_type_name);
const std::shared_ptr<TypedHandler> raise_token = std::make_shared<TypedHandler>(
HandlerType::SubscribeError, std::make_shared<Handler>(raise_handler));
this->mqtt_abstraction->register_handler(raise_topic, raise_token, QOS::QOS2);

const std::string clear_topic =
fmt::format("{}/error-cleared/{}/{}", this->config.mqtt_prefix(module_id, impl_id),
error_type_namespace, error_type_name);
const std::shared_ptr<TypedHandler> clear_token = std::make_shared<TypedHandler>(
HandlerType::SubscribeError, std::make_shared<Handler>(clear_handler));
this->mqtt_abstraction->register_handler(clear_topic, clear_token, QOS::QOS2);
}
}
const std::string error_topic = fmt::format("{}/error", this->config.mqtt_prefix(module_id, impl_id));
const std::shared_ptr<TypedHandler> error_token =
std::make_shared<TypedHandler>(HandlerType::SubscribeError, std::make_shared<Handler>(error_handler));
this->mqtt_abstraction->register_handler(error_topic, error_token, QOS::QOS2);
}
}
}

void Everest::publish_raised_error(const std::string& impl_id, const error::Error& error) {
BOOST_LOG_FUNCTION();

const auto error_topic = fmt::format("{}/error/{}", this->config.mqtt_prefix(this->module_id, impl_id), error.type);
const auto error_topic = fmt::format("{}/error", this->config.mqtt_prefix(this->module_id, impl_id));

this->mqtt_abstraction->publish(error_topic, json(error), QOS::QOS2);
}

void Everest::publish_cleared_error(const std::string& impl_id, const error::Error& error) {
BOOST_LOG_FUNCTION();

const auto error_topic =
fmt::format("{}/error-cleared/{}", this->config.mqtt_prefix(this->module_id, impl_id), error.type);
const auto error_topic = fmt::format("{}/error", this->config.mqtt_prefix(this->module_id, impl_id));

this->mqtt_abstraction->publish(error_topic, json(error), QOS::QOS2);
}
Expand Down