Skip to content
This repository was archived by the owner on Dec 9, 2025. It is now read-only.

Commit de31429

Browse files
authored
Add function to cleanup retained topics to MQTTAbstraction (#233)
* Add --retain-topics flag to manager to keep retained topics after startup Remove them by default now but allow them to be explicitly kept for inspection * Remove old cleanup_retained_topics functions since this is now provided by the MQTT abstraction * Also clear retained topics when a module terminates unexpectedly --------- Signed-off-by: Kai-Uwe Hermann <kai-uwe.hermann@pionix.de>
1 parent 695995b commit de31429

File tree

5 files changed

+53
-34
lines changed

5 files changed

+53
-34
lines changed

include/utils/mqtt_abstraction.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ class MQTTAbstraction {
6464
/// \copydoc MQTTAbstractionImpl::unsubscribe(const std::string&)
6565
void unsubscribe(const std::string& topic);
6666

67+
///
68+
/// \copydoc MQTTAbstractionImpl::clear_retained_topics()
69+
void clear_retained_topics();
70+
6771
///
6872
/// \copydoc MQTTAbstractionImpl::get(const std::string&, QOS)
6973
nlohmann::json get(const std::string& topic, QOS qos);

include/utils/mqtt_abstraction_impl.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ class MQTTAbstractionImpl {
8080
/// \brief unsubscribes from the given \p topic
8181
void unsubscribe(const std::string& topic);
8282

83+
///
84+
/// \brief clears any previously published topics that had the retain flag set
85+
void clear_retained_topics();
86+
8387
///
8488
/// \brief subscribe and wait for value on the subscribed topic
8589
nlohmann::json get(const std::string& topic, QOS qos);
@@ -121,6 +125,8 @@ class MQTTAbstractionImpl {
121125
MessageQueue message_queue;
122126
std::vector<std::shared_ptr<MessageWithQOS>> messages_before_connected;
123127
std::mutex messages_before_connected_mutex;
128+
std::mutex retained_topics_mutex;
129+
std::vector<std::string> retained_topics;
124130

125131
Thread mqtt_mainloop_thread;
126132
std::shared_future<void> main_loop_future;

lib/mqtt_abstraction.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,11 @@ void MQTTAbstraction::unsubscribe(const std::string& topic) {
7171
mqtt_abstraction->unsubscribe(topic);
7272
}
7373

74+
void MQTTAbstraction::clear_retained_topics() {
75+
BOOST_LOG_FUNCTION();
76+
mqtt_abstraction->clear_retained_topics();
77+
}
78+
7479
json MQTTAbstraction::get(const std::string& topic, QOS qos) {
7580
BOOST_LOG_FUNCTION();
7681
return mqtt_abstraction->get(topic, qos);

lib/mqtt_abstraction_impl.cpp

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,13 @@ void MQTTAbstractionImpl::publish(const std::string& topic, const std::string& d
155155

156156
if (retain) {
157157
publish_flags |= MQTT_PUBLISH_RETAIN;
158+
if (not(data.empty() and qos == QOS::QOS0)) {
159+
// topic should be retained, so save the topic in retained_topics
160+
// do not save the topic when the payload is empty and QOS is set to 0 which means a retained topic is to be
161+
// cleared
162+
const std::lock_guard<std::mutex> lock(retained_topics_mutex);
163+
this->retained_topics.push_back(topic);
164+
}
158165
}
159166

160167
if (!this->mqtt_is_connected) {
@@ -207,6 +214,18 @@ void MQTTAbstractionImpl::unsubscribe(const std::string& topic) {
207214
notify_write_data();
208215
}
209216

217+
void MQTTAbstractionImpl::clear_retained_topics() {
218+
BOOST_LOG_FUNCTION();
219+
const std::lock_guard<std::mutex> lock(retained_topics_mutex);
220+
221+
for (const auto& retained_topic : retained_topics) {
222+
this->publish(retained_topic, std::string(), QOS::QOS0, true);
223+
EVLOG_verbose << "Cleared retained topic: " << retained_topic;
224+
}
225+
226+
retained_topics.clear();
227+
}
228+
210229
json MQTTAbstractionImpl::get(const std::string& topic, QOS qos) {
211230
BOOST_LOG_FUNCTION();
212231
std::promise<json> res_promise;

src/manager.cpp

Lines changed: 19 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -265,37 +265,11 @@ struct ModuleReadyInfo {
265265
std::map<std::string, ModuleReadyInfo> modules_ready;
266266
std::mutex modules_ready_mutex;
267267

268-
void cleanup_retained_topics(ManagerConfig& config, MQTTAbstraction& mqtt_abstraction,
269-
const std::string& mqtt_everest_prefix) {
270-
const auto& interface_definitions = config.get_interface_definitions();
271-
272-
mqtt_abstraction.publish(fmt::format("{}interfaces", mqtt_everest_prefix), std::string(), QOS::QOS2, true);
273-
274-
for (const auto& interface_definition : interface_definitions.items()) {
275-
mqtt_abstraction.publish(
276-
fmt::format("{}interface_definitions/{}", mqtt_everest_prefix, interface_definition.key()), std::string(),
277-
QOS::QOS2, true);
278-
}
279-
280-
mqtt_abstraction.publish(fmt::format("{}types", mqtt_everest_prefix), std::string(), QOS::QOS2, true);
281-
282-
mqtt_abstraction.publish(fmt::format("{}module_provides", mqtt_everest_prefix), std::string(), QOS::QOS2, true);
283-
284-
mqtt_abstraction.publish(fmt::format("{}settings", mqtt_everest_prefix), std::string(), QOS::QOS2, true);
285-
286-
mqtt_abstraction.publish(fmt::format("{}schemas", mqtt_everest_prefix), std::string(), QOS::QOS2, true);
287-
288-
mqtt_abstraction.publish(fmt::format("{}manifests", mqtt_everest_prefix), std::string(), QOS::QOS2, true);
289-
290-
mqtt_abstraction.publish(fmt::format("{}error_types_map", mqtt_everest_prefix), std::string(), QOS::QOS2, true);
291-
292-
mqtt_abstraction.publish(fmt::format("{}module_config_cache", mqtt_everest_prefix), std::string(), QOS::QOS2, true);
293-
}
294-
295268
static std::map<pid_t, std::string> start_modules(ManagerConfig& config, MQTTAbstraction& mqtt_abstraction,
296269
const std::vector<std::string>& ignored_modules,
297270
const std::vector<std::string>& standalone_modules,
298-
const ManagerSettings& ms, StatusFifo& status_fifo) {
271+
const ManagerSettings& ms, StatusFifo& status_fifo,
272+
bool retain_topics) {
299273
BOOST_LOG_FUNCTION();
300274

301275
std::vector<ModuleStartInfo> modules_to_spawn;
@@ -405,8 +379,8 @@ static std::map<pid_t, std::string> start_modules(ManagerConfig& config, MQTTAbs
405379
}
406380

407381
const Handler module_ready_handler = [module_name, &mqtt_abstraction, &config, standalone_modules,
408-
mqtt_everest_prefix = ms.mqtt_settings.everest_prefix,
409-
&status_fifo](const std::string&, const nlohmann::json& json) {
382+
mqtt_everest_prefix = ms.mqtt_settings.everest_prefix, &status_fifo,
383+
retain_topics](const std::string&, const nlohmann::json& json) {
410384
EVLOG_debug << fmt::format("received module ready signal for module: {}({})", module_name, json.dump());
411385
const std::unique_lock<std::mutex> lock(modules_ready_mutex);
412386
// FIXME (aw): here are race conditions, if the ready handler gets called while modules are shut down!
@@ -428,11 +402,16 @@ static std::map<pid_t, std::string> start_modules(ManagerConfig& config, MQTTAbs
428402
[](const auto& element) { return element.second.ready; })) {
429403
const auto complete_end_time = std::chrono::system_clock::now();
430404
status_fifo.update(StatusFifo::ALL_MODULES_STARTED);
405+
if (not retain_topics) {
406+
EVLOG_info << "Clearing retained topics published by manager during startup";
407+
mqtt_abstraction.clear_retained_topics();
408+
} else {
409+
EVLOG_info << "Keeping retained topics published by manager during startup for inspection";
410+
}
431411
EVLOG_info << fmt::format(
432412
TERMINAL_STYLE_OK, "🚙🚙🚙 All modules are initialized. EVerest up and running [{}ms] 🚙🚙🚙",
433413
std::chrono::duration_cast<std::chrono::milliseconds>(complete_end_time - complete_start_time)
434414
.count());
435-
// cleanup_retained_topics(config, mqtt_abstraction, mqtt_everest_prefix);
436415
mqtt_abstraction.publish(fmt::format("{}ready", mqtt_everest_prefix), nlohmann::json(true));
437416
} else if (!standalone_modules.empty()) {
438417
if (modules_spawned == modules_ready.size() - standalone_modules.size()) {
@@ -666,6 +645,8 @@ int boot(const po::variables_map& vm) {
666645
return EXIT_SUCCESS;
667646
}
668647

648+
const bool retain_topics = (vm.count("retain-topics") != 0);
649+
669650
const auto start_time = std::chrono::system_clock::now();
670651
std::unique_ptr<ManagerConfig> config;
671652
try {
@@ -759,7 +740,7 @@ int boot(const po::variables_map& vm) {
759740
mqtt_abstraction.spawn_main_loop_thread();
760741

761742
auto module_handles =
762-
start_modules(*config, mqtt_abstraction, ignored_modules, standalone_modules, ms, status_fifo);
743+
start_modules(*config, mqtt_abstraction, ignored_modules, standalone_modules, ms, status_fifo, retain_topics);
763744
bool modules_started = true;
764745
bool restart_modules = false;
765746

@@ -814,6 +795,8 @@ int boot(const po::variables_map& vm) {
814795
shutdown_modules(module_handles, *config, mqtt_abstraction);
815796
modules_started = false;
816797

798+
mqtt_abstraction.clear_retained_topics();
799+
817800
// Exit if a module died, this gives systemd a change to restart manager
818801
EVLOG_critical << "Exiting manager.";
819802
return EXIT_FAILURE;
@@ -824,8 +807,8 @@ int boot(const po::variables_map& vm) {
824807

825808
#ifdef ENABLE_ADMIN_PANEL
826809
if (module_handles.size() == 0 && restart_modules) {
827-
module_handles =
828-
start_modules(*config, mqtt_abstraction, ignored_modules, standalone_modules, ms, status_fifo);
810+
module_handles = start_modules(*config, mqtt_abstraction, ignored_modules, standalone_modules, ms,
811+
status_fifo, retain_topics);
829812
restart_modules = false;
830813
modules_started = true;
831814
}
@@ -889,6 +872,8 @@ int main(int argc, char* argv[]) {
889872
"looked up in the default config directory");
890873
desc.add_options()("status-fifo", po::value<std::string>()->default_value(""),
891874
"Path to a named pipe, that shall be used for status updates from the manager");
875+
desc.add_options()("retain-topics", "Retain configuration MQTT topics setup by manager for inspection, by default "
876+
"these will be cleared after startup");
892877

893878
po::variables_map vm;
894879

0 commit comments

Comments
 (0)