44#include < utils/utils.h>
55
66constexpr auto LABEL = " mqtt" ;
7- constexpr auto KEEP_ALIVE = 60 ;
8- constexpr auto LOOP_TIMEOUT_MS = 100 ;
97constexpr auto QOS_SUB = 2 ;
10- constexpr auto RECONNECT_INTERVAL = std::chrono::seconds(1 );
118constexpr auto QUEUE_MAX_SIZE = 1000 ;
9+ constexpr auto LOOP_TIMEOUT = std::chrono::milliseconds(10 );
10+ constexpr auto RECONNECT_INTERVAL = std::chrono::seconds(5 );
11+ constexpr auto CONNECT_TIMEOUT = std::chrono::seconds(5 );
12+ constexpr auto KEEP_ALIVE = std::chrono::seconds(60 );
1213
13- Mqtt::Mqtt (const Config & config)
14- : m_client(mosquitto_new( nullptr , true , this ) ), m_isRunning(true ), m_thread([this , config]() {
14+ Mqtt::Mqtt (const Config& config)
15+ : m_config(config), m_client(config.mqttUrl(), "sdr-scanner" ), m_isRunning(true ), m_thread([this , config]() {
1516 Logger::info (LABEL, " started" );
16- mosquitto_username_pw_set (m_client, config.mqttUsername ().c_str (), config.mqttPassword ().c_str ());
17- mosquitto_connect_callback_set (m_client, [](mosquitto *, void *p, int ) { reinterpret_cast <Mqtt *>(p)->onConnect (); });
18- mosquitto_disconnect_callback_set (m_client, [](mosquitto *, void *p, int ) { reinterpret_cast <Mqtt *>(p)->onDisconnect (); });
19- mosquitto_message_callback_set (m_client, [](mosquitto *, void *p, const struct mosquitto_message *m) { reinterpret_cast <Mqtt *>(p)->onMessage (m); });
20- mosquitto_connect (m_client, config.mqttHostname ().c_str (), config.mqttPort (), KEEP_ALIVE);
17+ connect ();
2118 while (m_isRunning) {
22- mosquitto_loop (m_client, LOOP_TIMEOUT_MS, 1 );
23- while (m_isRunning && !m_messages.empty ()) {
24- const auto &[topic, data, qos] = m_messages.front ();
25- mosquitto_publish (m_client, nullptr , topic.c_str (), data.size (), data.data (), qos, false );
26- std::unique_lock lock (m_mutex);
27- m_messages.pop ();
19+ if (m_client.is_connected ()) {
20+ std::shared_ptr<const mqtt::message> message;
21+ while (m_client.try_consume_message_for (&message, LOOP_TIMEOUT) && message) {
22+ onMessage (message->get_topic (), message->get_payload ());
23+ }
24+ while (m_isRunning && !m_messages.empty ()) {
25+ const auto & [topic, data, qos] = m_messages.front ();
26+ m_client.publish (topic, data.data (), data.size (), qos, false );
27+ std::unique_lock lock (m_mutex);
28+ m_messages.pop ();
29+ }
30+ } else {
31+ onDisconnected ();
32+ while (m_isRunning && !m_client.is_connected ()) {
33+ Logger::info (LABEL, " reconnecting..." );
34+ connect ();
35+ if (!m_client.is_connected ()) {
36+ std::this_thread::sleep_for (RECONNECT_INTERVAL);
37+ }
38+ }
2839 }
2940 }
3041 Logger::info (LABEL, " stopped" );
@@ -33,75 +44,106 @@ Mqtt::Mqtt(const Config &config)
3344Mqtt::~Mqtt () {
3445 m_isRunning = false ;
3546 m_thread.join ();
36- mosquitto_disconnect (m_client);
37- mosquitto_destroy (m_client);
47+ if (m_client.is_connected ()) {
48+ m_client.disconnect ();
49+ }
3850}
3951
40- void Mqtt::publish (const std::string & topic, const std::string & data, int qos) {
52+ void Mqtt::publish (const std::string& topic, const std::string& data, int qos) {
4153 std::unique_lock lock (m_mutex);
4254 if (m_messages.size () < QUEUE_MAX_SIZE) {
4355 m_messages.emplace (topic, std::vector<uint8_t >{data.begin (), data.end ()}, qos);
4456 Logger::trace (LABEL, " queue size: {}" , m_messages.size ());
4557 }
4658}
4759
48- void Mqtt::publish (const std::string & topic, const std::vector<uint8_t > & data, int qos) {
60+ void Mqtt::publish (const std::string& topic, const std::vector<uint8_t >& data, int qos) {
4961 std::unique_lock lock (m_mutex);
5062 if (m_messages.size () < QUEUE_MAX_SIZE) {
5163 m_messages.emplace (topic, data, qos);
5264 Logger::trace (LABEL, " queue size: {}" , m_messages.size ());
5365 }
5466}
5567
56- void Mqtt::publish (const std::string & topic, const std::vector<uint8_t > && data, int qos) {
68+ void Mqtt::publish (const std::string& topic, const std::vector<uint8_t >&& data, int qos) {
5769 std::unique_lock lock (m_mutex);
5870 if (m_messages.size () < QUEUE_MAX_SIZE) {
5971 m_messages.emplace (topic, std::move (data), qos);
6072 Logger::trace (LABEL, " queue size: {}" , m_messages.size ());
6173 }
6274}
6375
64- void Mqtt::setMessageCallback (const std::string & topic, std::function<void (const std::string &)> callback) {
76+ void Mqtt::setMessageCallback (const std::string& topic, std::function<void (const std::string&)> callback) {
6577 subscribe (topic);
6678 m_callbacks.emplace_back (topic, callback);
6779}
6880
69- void Mqtt::subscribe (const std::string &topic) {
70- if (m_topics.count (topic) == 0 ) {
71- mosquitto_subscribe (m_client, nullptr , topic.c_str (), QOS_SUB);
72- m_topics.insert (topic);
81+ void Mqtt::connect () {
82+ mqtt::ssl_options ssl_options;
83+ ssl_options.ca_path (" /etc/ssl/certs" );
84+
85+ const auto options = mqtt::connect_options_builder ()
86+ .mqtt_version (MQTTVERSION_3_1_1)
87+ .ssl (ssl_options)
88+ .user_name (m_config.mqttUsername ())
89+ .password (m_config.mqttPassword ())
90+ .keep_alive_interval (KEEP_ALIVE)
91+ .connect_timeout (CONNECT_TIMEOUT)
92+ .automatic_reconnect (false )
93+ .clean_session (true )
94+ .finalize ();
95+
96+ try {
97+ const auto response = m_client.connect (options);
98+ if (response.is_session_present ()) {
99+ Logger::info (LABEL, " session already present" );
100+ } else {
101+ Logger::info (LABEL, " new session created" );
102+ }
103+ } catch (const std::runtime_error& exception) {
104+ Logger::warn (LABEL, " exception: {}" , exception.what ());
105+ }
106+ if (m_client.is_connected ()) {
107+ onConnected ();
73108 }
74109}
75110
76- void Mqtt::onConnect () {
111+ void Mqtt::onConnected () {
77112 Logger::info (LABEL, " connected" );
78- for (const auto &topic : m_topics) {
113+ std::unique_lock lock (m_mutex);
114+
115+ for (const auto & topic : m_topics) {
79116 Logger::info (LABEL, " subscribe: {}" , colored (GREEN, " {}" , topic));
80- mosquitto_subscribe ( m_client, nullptr , topic. c_str () , QOS_SUB);
117+ m_client. subscribe (topic , QOS_SUB);
81118 }
82- }
83119
84- void Mqtt::onDisconnect () {
85- if (!m_isRunning) {
86- return ;
87- }
88- Logger::warn (LABEL, " disconnected" );
89- while (m_isRunning && mosquitto_reconnect (m_client) != MOSQ_ERR_SUCCESS) {
90- Logger::info (LABEL, " reconnecting" );
91- std::this_thread::sleep_for (RECONNECT_INTERVAL);
120+ for (const auto & topic : m_waitingTopics) {
121+ Logger::info (LABEL, " subscribe: {}" , colored (GREEN, " {}" , topic));
122+ m_client.subscribe (topic, QOS_SUB);
123+ m_topics.insert (topic);
92124 }
93- Logger::info (LABEL, " reconnecting success" );
125+ m_waitingTopics.clear ();
126+ }
127+
128+ void Mqtt::onDisconnected () { Logger::info (LABEL, " disconnected" ); }
129+
130+ void Mqtt::subscribe (const std::string& topic) {
94131 std::unique_lock lock (m_mutex);
95- while (!m_messages.empty ()) {
96- m_messages.pop ();
132+ if (m_client.is_connected ()) {
133+ if (m_topics.count (topic) == 0 ) {
134+ Logger::info (LABEL, " subscribe: {}" , colored (GREEN, " {}" , topic));
135+ m_client.subscribe (topic, QOS_SUB);
136+ m_topics.insert (topic);
137+ }
138+ } else {
139+ m_waitingTopics.insert (topic);
97140 }
98141}
99142
100- void Mqtt::onMessage (const mosquitto_message *message) {
101- Logger::debug (LABEL, " topic: {}, data: {}" , message->topic , static_cast <char *>(message->payload ));
102- const std::string data (static_cast <char *>(message->payload ), message->payloadlen );
103- for (auto &[topic, callback] : m_callbacks) {
104- if (strcmp (message->topic , topic.c_str ()) == 0 ) {
143+ void Mqtt::onMessage (const std::string& topic, const std::string& data) {
144+ Logger::debug (LABEL, " topic: {}, data: {}" , topic, data);
145+ for (auto & [callbackTopic, callback] : m_callbacks) {
146+ if (topic == callbackTopic) {
105147 callback (data);
106148 }
107149 }
0 commit comments