Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions dependencies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,6 @@ ftxui:
git: https://github.com/ArthurSonzogni/ftxui
git_tag: v6.1.9
cmake_condition: "EVEREST_DEPENDENCY_ENABLED_FTXUI"
# everest/io library
mqttc:
git: https://github.com/LiamBindle/MQTT-C.git
git_tag: v1.1.6
options: ["MQTT_C_EXAMPLES OFF", "CMAKE_POSITION_INDEPENDENT_CODE ON"]
# Linux_Systemd_Rauc module
sdbus-cpp:
git: https://github.com/Kistler-Group/sdbus-cpp.git
Expand All @@ -68,6 +63,7 @@ catch2:
git: https://github.com/catchorg/Catch2.git
git_tag: v3.9.0
cmake_condition: "EVEREST_CORE_BUILD_TESTING"
# everest/io library
mosquitto:
git: https://github.com/eclipse-mosquitto/mosquitto
git_tag: v2.0.22
Expand All @@ -76,3 +72,4 @@ mosquitto:
- WITH_BROKER OFF
- WITH_APPS OFF
- WITH_PLUGINS OFF
- WITH_TESTS OFF
4 changes: 3 additions & 1 deletion lib/everest/framework/everestpy/src/everest/misc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ Everest::MQTTSettings get_mqtt_settings_from_env() {
const auto mqtt_broker_socket_path = std::getenv(Everest::EV_MQTT_BROKER_SOCKET_PATH);
const auto mqtt_broker_host = std::getenv(Everest::EV_MQTT_BROKER_HOST);
const auto mqtt_broker_port = std::getenv(Everest::EV_MQTT_BROKER_PORT);
const auto mqtt_bind_address =
get_variable_from_env(Everest::EV_MQTT_BIND_ADDRESS, Everest::defaults::MQTT_BIND_ADDRESS);

if (mqtt_broker_socket_path == nullptr) {
if (mqtt_broker_host == nullptr or mqtt_broker_port == nullptr) {
Expand All @@ -48,7 +50,7 @@ Everest::MQTTSettings get_mqtt_settings_from_env() {
EVLOG_warning << "Could not parse MQTT broker port, using default: " << mqtt_broker_port_;
}
return Everest::create_mqtt_settings(mqtt_broker_host, mqtt_broker_port_, mqtt_everest_prefix,
mqtt_external_prefix);
mqtt_external_prefix, mqtt_bind_address);
} else {
return Everest::create_mqtt_settings(mqtt_broker_socket_path, mqtt_everest_prefix, mqtt_external_prefix);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ std::unique_ptr<Module> mod;

const Module& create_module(rust::Str module_id, rust::Str prefix, rust::Str mqtt_broker_socket_path,
rust::Str mqtt_broker_host, const unsigned int& mqtt_broker_port,
rust::Str mqtt_everest_prefix, rust::Str mqtt_external_prefix) {
rust::Str mqtt_everest_prefix, rust::Str mqtt_external_prefix,
rust::Str mqtt_bind_address) {
std::call_once(mod_flag, [&]() {
auto socket_path = std::string(mqtt_broker_socket_path);
Everest::MQTTSettings mqtt_settings;
Expand All @@ -88,7 +89,8 @@ const Module& create_module(rust::Str module_id, rust::Str prefix, rust::Str mqt
std::string(mqtt_external_prefix));
} else {
Everest::populate_mqtt_settings(mqtt_settings, std::string(mqtt_broker_host), mqtt_broker_port,
std::string(mqtt_everest_prefix), std::string(mqtt_external_prefix));
std::string(mqtt_everest_prefix), std::string(mqtt_external_prefix),
std::string(mqtt_bind_address));
}
mod = std::make_unique<Module>(std::string(module_id), std::string(prefix), mqtt_settings);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class Module {

const Module& create_module(rust::Str module_id, rust::Str prefix, rust::Str mqtt_broker_socket_path,
rust::Str mqtt_broker_host, const unsigned int& mqtt_broker_port,
rust::Str mqtt_everest_prefix, rust::Str mqtt_external_prefix);
rust::Str mqtt_everest_prefix, rust::Str mqtt_external_prefix, rust::Str mqtt_bind_address);

int init_logging(rust::Str module_id, rust::Str prefix, rust::Str logging_config_file);
void log2cxx(int level, int line, rust::Str file, rust::Str message);
7 changes: 7 additions & 0 deletions lib/everest/framework/everestrs/everestrs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ mod ffi {
mqtt_broker_port: &u32,
mqtt_everest_prefix: &str,
mqtt_external_prefix: &str,
mqtt_bind_address: &str,
) -> &'static Module;

/// Returns the manifest.
Expand Down Expand Up @@ -397,6 +398,10 @@ struct Args {
#[arg(long = "mqtt_broker_host")]
pub mqtt_broker_host: String,

/// MQTT bind address
#[arg(long = "mqtt_bind_address")]
pub mqtt_bind_address: String,

/// MQTT broker port
#[arg(long = "mqtt_broker_port")]
pub mqtt_broker_port: u32,
Expand Down Expand Up @@ -596,6 +601,7 @@ impl Runtime {
&args.mqtt_broker_port,
&args.mqtt_everest_prefix,
&args.mqtt_external_prefix,
&args.mqtt_bind_address,
);
Arc::pin(Self {
cpp_module,
Expand Down Expand Up @@ -746,6 +752,7 @@ pub fn get_module_configs() -> HashMap<String, HashMap<String, Config>> {
&args.mqtt_broker_port,
&args.mqtt_everest_prefix,
&args.mqtt_external_prefix,
&args.mqtt_bind_address,
);
let raw_config = cpp_module.get_module_configs(&args.module);

Expand Down
2 changes: 2 additions & 0 deletions lib/everest/framework/include/framework/runtime.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ inline constexpr auto EV_MQTT_EVEREST_PREFIX = "EV_MQTT_EVEREST_PREFIX";
inline constexpr auto EV_MQTT_EXTERNAL_PREFIX = "EV_MQTT_EXTERNAL_PREFIX";
inline constexpr auto EV_MQTT_BROKER_SOCKET_PATH = "EV_MQTT_BROKER_SOCKET_PATH";
inline constexpr auto EV_MQTT_BROKER_HOST = "EV_MQTT_BROKER_HOST";
inline constexpr auto EV_MQTT_BIND_ADDRESS = "EV_MQTT_BIND_ADDRESS";
inline constexpr auto EV_MQTT_BROKER_PORT = "EV_MQTT_BROKER_PORT";
inline constexpr auto EV_VALIDATE_SCHEMA = "EV_VALIDATE_SCHEMA";
inline constexpr auto VERSION_INFORMATION_FILE = "version_information.txt";
Expand Down Expand Up @@ -90,6 +91,7 @@ inline constexpr auto CONTROLLER_PORT = 8849;
inline constexpr auto CONTROLLER_RPC_TIMEOUT_MS = 2000;
inline constexpr auto MQTT_BROKER_SOCKET_PATH = "/tmp/mqtt_broker.sock";
inline constexpr auto MQTT_BROKER_HOST = "localhost";
inline constexpr auto MQTT_BIND_ADDRESS = "127.0.0.1";
inline constexpr auto MQTT_BROKER_PORT = 1883;
inline constexpr auto MQTT_EVEREST_PREFIX = "everest";
inline constexpr auto MQTT_EXTERNAL_PREFIX = "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ struct MQTTSettings {
std::string broker_socket_path; ///< A path to a socket the MQTT broker uses in socket mode. If this is set
///< broker_host and broker_port are ignored
std::string broker_host; ///< The hostname of the MQTT broker
std::string bind_address; ///< Bind address of the MQTT client
int broker_port = 0; ///< The port the MQTT broker listens on
std::string everest_prefix; ///< MQTT topic prefix for the "everest" topic
std::string external_prefix; ///< MQTT topic prefix for external topics
Expand All @@ -29,7 +30,8 @@ MQTTSettings create_mqtt_settings(const std::string& mqtt_broker_socket_path, co
/// \brief Creates MQTTSettings for IP based connections with the provided \p mqtt_broker_host
/// and \p mqtt_broker_port using the \p mqtt_everest_prefix and \p mqtt_external_prefix
MQTTSettings create_mqtt_settings(const std::string& mqtt_broker_host, int mqtt_broker_port,
const std::string& mqtt_everest_prefix, const std::string& mqtt_external_prefix);
const std::string& mqtt_everest_prefix, const std::string& mqtt_external_prefix,
const std::string& mqtt_bind_address);

/// \brief Populates the given MQTTSettings \p mqtt_settings with a Unix Domain Socket with the provided \p
/// mqtt_broker_socket_path using the \p mqtt_everest_prefix and \p mqtt_external_prefix
Expand All @@ -39,7 +41,8 @@ void populate_mqtt_settings(MQTTSettings& mqtt_settings, const std::string& mqtt
/// \brief Populates the given MQTTSettings \p mqtt_settings for IP based connections with the provided \p
/// mqtt_broker_host and \p mqtt_broker_port using the \p mqtt_everest_prefix and \p mqtt_external_prefix
void populate_mqtt_settings(MQTTSettings& mqtt_settings, const std::string& mqtt_broker_host, int mqtt_broker_port,
const std::string& mqtt_everest_prefix, const std::string& mqtt_external_prefix);
const std::string& mqtt_everest_prefix, const std::string& mqtt_external_prefix,
const std::string& mqtt_bind_address);

} // namespace Everest

Expand Down
1 change: 1 addition & 0 deletions lib/everest/framework/include/utils/config/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ struct Settings {
std::optional<int> controller_rpc_timeout_ms;
std::optional<std::string> mqtt_broker_socket_path;
std::optional<std::string> mqtt_broker_host;
std::optional<std::string> mqtt_bind_address;
std::optional<int> mqtt_broker_port;
std::optional<std::string> mqtt_everest_prefix;
std::optional<std::string> mqtt_external_prefix;
Expand Down
3 changes: 3 additions & 0 deletions lib/everest/framework/include/utils/message_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ namespace Everest {
struct Message {
std::string topic; ///< The MQTT topic where this message originated from
std::string payload; ///< The message payload

Message(const std::string& topic_, const std::string& payload_) : topic(topic_), payload(payload_) {
}
};

struct ParsedMessage {
Expand Down
37 changes: 11 additions & 26 deletions lib/everest/framework/include/utils/mqtt_abstraction_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,17 @@
#include <thread>
#include <vector>

#include <mqtt.h>
#include <nlohmann/json.hpp>

#include <everest/io/event/fd_event_handler.hpp>

Check warning on line 17 in lib/everest/framework/include/utils/mqtt_abstraction_impl.hpp

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

lib/everest/framework/include/utils/mqtt_abstraction_impl.hpp#L17

Include file: <everest/io/event/fd_event_handler.hpp> not found. Please note: Cppcheck does not need standard library headers to get proper results.
#include <everest/io/event/timer_fd.hpp>

Check warning on line 18 in lib/everest/framework/include/utils/mqtt_abstraction_impl.hpp

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

lib/everest/framework/include/utils/mqtt_abstraction_impl.hpp#L18

Include file: <everest/io/event/timer_fd.hpp> not found. Please note: Cppcheck does not need standard library headers to get proper results.
#include <everest/io/mqtt/mqtt_client.hpp>

Check warning on line 19 in lib/everest/framework/include/utils/mqtt_abstraction_impl.hpp

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

lib/everest/framework/include/utils/mqtt_abstraction_impl.hpp#L19

Include file: <everest/io/mqtt/mqtt_client.hpp> not found. Please note: Cppcheck does not need standard library headers to get proper results.

#include <utils/config/mqtt_settings.hpp>

Check warning on line 21 in lib/everest/framework/include/utils/mqtt_abstraction_impl.hpp

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

lib/everest/framework/include/utils/mqtt_abstraction_impl.hpp#L21

Include file: <utils/config/mqtt_settings.hpp> not found. Please note: Cppcheck does not need standard library headers to get proper results.
#include <utils/message_handler.hpp>
#include <utils/message_queue.hpp>
#include <utils/types.hpp>

#include <utils/thread.hpp>

constexpr auto MQTT_BUF_SIZE = 500 * std::size_t{1024};
#include <utils/types.hpp>

namespace Everest {
/// \brief Contains a payload and the topic it was received on with additional QOS
Expand All @@ -37,10 +38,7 @@
///
class MQTTAbstractionImpl {
public:
MQTTAbstractionImpl(const std::string& mqtt_server_address, const std::string& mqtt_server_port,
const std::string& mqtt_everest_prefix, const std::string& mqtt_external_prefix);
MQTTAbstractionImpl(const std::string& mqtt_server_socket_path, const std::string& mqtt_everest_prefix,
const std::string& mqtt_external_prefix);
MQTTAbstractionImpl(const MQTTSettings& mqtt_settings);

~MQTTAbstractionImpl();

Expand Down Expand Up @@ -137,13 +135,9 @@
/// \returns true if the topic matches, false otherwise
static bool check_topic_matches(const std::string& full_topic, const std::string& wildcard_topic);

///
/// \brief callback that is called from the mqtt implementation whenever a message is received
static void publish_callback(void** unused, struct mqtt_response_publish* published);

private:
static constexpr int mqtt_poll_timeout_ms{300000};
bool mqtt_is_connected;
std::atomic_bool running;
MessageHandler message_handler;
MessageQueue message_queue;
std::vector<std::shared_ptr<MessageWithQOS>> messages_before_connected;
Expand All @@ -158,25 +152,16 @@

std::string mqtt_server_socket_path;
std::string mqtt_server_address;
std::string mqtt_bind_address;
std::string mqtt_server_port;
std::string mqtt_everest_prefix;
std::string mqtt_external_prefix;
struct mqtt_client mqtt_client;
std::array<uint8_t, MQTT_BUF_SIZE> sendbuf;
std::array<uint8_t, MQTT_BUF_SIZE> recvbuf;

static int open_nb_socket(const char* addr, const char* port);
bool connectBroker(std::string& socket_path);
bool connectBroker(const char* host, const char* port);
std::unique_ptr<everest::lib::io::mqtt::mqtt_client> mqtt_client;

void on_mqtt_message(const Message& message);
void on_mqtt_connect();
static void on_mqtt_disconnect();

void notify_write_data();

int mqtt_socket_fd{-1};
int event_fd{-1};
int disconnect_event_fd{-1};
};
} // namespace Everest

Expand Down
2 changes: 1 addition & 1 deletion lib/everest/framework/lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ target_link_libraries(framework
${EVEREST_FRAMEWORK_BOOST_SYSTEM_LINK_LIBRARY}
Boost::thread
Boost::program_options
mqttc
everest::io
ryml::ryml
)

Expand Down
11 changes: 7 additions & 4 deletions lib/everest/framework/lib/config/mqtt_settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ MQTTSettings create_mqtt_settings(const std::string& mqtt_broker_socket_path, co
}

MQTTSettings create_mqtt_settings(const std::string& mqtt_broker_host, int mqtt_broker_port,
const std::string& mqtt_everest_prefix, const std::string& mqtt_external_prefix) {
const std::string& mqtt_everest_prefix, const std::string& mqtt_external_prefix,
const std::string& mqtt_bind_address) {
MQTTSettings mqtt_settings;
populate_mqtt_settings(mqtt_settings, mqtt_broker_host, mqtt_broker_port, mqtt_everest_prefix,
mqtt_external_prefix);
populate_mqtt_settings(mqtt_settings, mqtt_broker_host, mqtt_broker_port, mqtt_everest_prefix, mqtt_external_prefix,
mqtt_bind_address);
return mqtt_settings;
}

Expand All @@ -34,11 +35,13 @@ void populate_mqtt_settings(MQTTSettings& mqtt_settings, const std::string& mqtt
}

void populate_mqtt_settings(MQTTSettings& mqtt_settings, const std::string& mqtt_broker_host, int mqtt_broker_port,
const std::string& mqtt_everest_prefix, const std::string& mqtt_external_prefix) {
const std::string& mqtt_everest_prefix, const std::string& mqtt_external_prefix,
const std::string& mqtt_bind_address) {
mqtt_settings.broker_host = mqtt_broker_host;
mqtt_settings.broker_port = mqtt_broker_port;
mqtt_settings.everest_prefix = mqtt_everest_prefix;
mqtt_settings.external_prefix = mqtt_external_prefix;
mqtt_settings.bind_address = mqtt_bind_address;
}

} // namespace Everest
3 changes: 3 additions & 0 deletions lib/everest/framework/lib/config/types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ Settings parse_settings(const json& settings_json) {
if (auto it = settings_json.find("mqtt_broker_host"); it != settings_json.end()) {
settings.mqtt_broker_host = it->get<std::string>();
}
if (auto it = settings_json.find("mqtt_bind_address"); it != settings_json.end()) {
settings.mqtt_bind_address = it->get<std::string>();
}
if (auto it = settings_json.find("mqtt_broker_port"); it != settings_json.end()) {
settings.mqtt_broker_port = it->get<int>();
}
Expand Down
11 changes: 11 additions & 0 deletions lib/everest/framework/lib/error/error_manager_req.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,17 @@ void ErrorManagerReq::on_error_raised(const Error& error) {
EVLOG_error << ss.str();
return;
}
std::list<ErrorPtr> errors =
database->get_errors({ErrorFilter(TypeFilter(error.type)), ErrorFilter(SubTypeFilter(error.sub_type)),
ErrorFilter(OriginFilter(error.origin))});
if (!errors.empty()) {
std::stringstream ss;
ss << "Error of type '" << error.type << "' and sub type '" << error.sub_type
<< "' is already raised, ignoring new error";
ss << std::endl << "Error object: " << nlohmann::json(error).dump(2);
EVLOG_error << ss.str();
return;
}
database->add_error(std::make_shared<Error>(error));
on_error(error, true);
}
Expand Down
9 changes: 1 addition & 8 deletions lib/everest/framework/lib/mqtt_abstraction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,7 @@ namespace Everest {

namespace {
std::unique_ptr<MQTTAbstractionImpl> create_mqtt_client(const MQTTSettings& mqtt_settings) {
if (mqtt_settings.uses_socket()) {
return std::make_unique<MQTTAbstractionImpl>(mqtt_settings.broker_socket_path, mqtt_settings.everest_prefix,
mqtt_settings.external_prefix);
} else {
return std::make_unique<MQTTAbstractionImpl>(mqtt_settings.broker_host,
std::to_string(mqtt_settings.broker_port),
mqtt_settings.everest_prefix, mqtt_settings.external_prefix);
}
return std::make_unique<MQTTAbstractionImpl>(mqtt_settings);
}
} // namespace

Expand Down
Loading
Loading