66#include < everest/logging.hpp>
77#include < fmt/format.h>
88
9+ #include < optional>
10+
911namespace Everest {
1012
1113namespace {
@@ -62,7 +64,7 @@ bool check_topic_matches(const std::string& full_topic, const std::string& wildc
6264} // namespace
6365
6466MessageHandler::MessageHandler () {
65- operation_worker_thread = std::thread ([this ] { run_operation_message_worker (); });
67+ operation_dispatcher_thread = std::thread ([this ] { run_operation_dispatcher (); });
6668 result_worker_thread = std::thread ([this ] { run_result_message_worker (); });
6769 external_mqtt_worker_thread = std::thread ([this ] { run_external_mqtt_worker (); });
6870}
@@ -120,9 +122,13 @@ void MessageHandler::stop() {
120122 result_cv.notify_all ();
121123 external_mqtt_cv.notify_all ();
122124
123- if (operation_worker_thread.joinable ()) {
124- operation_worker_thread.join ();
125+ // Join the dispatcher first: it must not be able to call schedule_operation_message()
126+ // (which dereferences operation_thread_pool) after the pool is destroyed.
127+ if (operation_dispatcher_thread.joinable ()) {
128+ operation_dispatcher_thread.join ();
125129 }
130+ // The thread_pool destructor handles stopping and joining its workers.
131+ operation_thread_pool.reset ();
126132 if (result_worker_thread.joinable ()) {
127133 result_worker_thread.join ();
128134 }
@@ -134,7 +140,7 @@ void MessageHandler::stop() {
134140 }
135141}
136142
137- void MessageHandler::run_operation_message_worker () {
143+ void MessageHandler::run_operation_dispatcher () {
138144 while (true ) {
139145 std::unique_lock<std::mutex> lock (operation_queue_mutex);
140146 operation_cv.wait (lock, [this ] { return !operation_message_queue.empty () || !running; });
@@ -145,9 +151,73 @@ void MessageHandler::run_operation_message_worker() {
145151 operation_message_queue.pop ();
146152 lock.unlock ();
147153
148- handle_operation_message (message.topic , message.data );
154+ dispatch_operation_message (std::move (message));
155+ }
156+ EVLOG_info << " Operation dispatcher thread stopped" ;
157+ }
158+
159+ void MessageHandler::dispatch_operation_message (ParsedMessage&& message) {
160+ {
161+ std::lock_guard<std::mutex> lock (operation_topic_state_mutex);
162+ if (operation_topics_in_flight.find (message.topic ) != operation_topics_in_flight.end ()) {
163+ pending_operation_messages_by_topic[message.topic ].push (std::move (message));
164+ return ;
165+ }
166+
167+ operation_topics_in_flight.insert (message.topic );
168+ }
169+
170+ schedule_operation_message (std::move (message));
171+ }
172+
173+ // NOLINTNEXTLINE(misc-no-recursion)
174+ void MessageHandler::schedule_operation_message (ParsedMessage&& message) {
175+ // NOLINTNEXTLINE(misc-no-recursion)
176+ auto operation = [this , message = std::move (message)]() {
177+ // Wrap in try-catch so that on_operation_message_done is always called: an exception in
178+ // the handler must not leave the topic permanently stuck in operation_topics_in_flight,
179+ // which would block all subsequent messages for that topic.
180+ try {
181+ handle_operation_message (message.topic , message.data );
182+ } catch (const std::exception& e) {
183+ EVLOG_error << " Exception while handling operation message on topic '" << message.topic
184+ << " ': " << e.what ();
185+ } catch (...) {
186+ EVLOG_error << " Unknown exception while handling operation message on topic '" << message.topic << " '" ;
187+ }
188+ on_operation_message_done (message.topic );
189+ };
190+
191+ if (operation_thread_pool) {
192+ operation_thread_pool->run (std::move (operation));
193+ }
194+ }
195+
196+ // NOLINTNEXTLINE(misc-no-recursion)
197+ void MessageHandler::on_operation_message_done (const std::string& topic) {
198+ std::optional<ParsedMessage> next_message;
199+ {
200+ std::lock_guard<std::mutex> lock (operation_topic_state_mutex);
201+ if (!running) {
202+ // Shutting down: stop scheduling and release the in-flight slot.
203+ operation_topics_in_flight.erase (topic);
204+ return ;
205+ }
206+ auto pending_it = pending_operation_messages_by_topic.find (topic);
207+ if (pending_it != pending_operation_messages_by_topic.end () && !pending_it->second .empty ()) {
208+ next_message = std::move (pending_it->second .front ());
209+ pending_it->second .pop ();
210+
211+ if (pending_it->second .empty ()) {
212+ pending_operation_messages_by_topic.erase (pending_it);
213+ }
214+ } else {
215+ operation_topics_in_flight.erase (topic);
216+ return ;
217+ }
149218 }
150- EVLOG_info << " Main worker thread stopped" ;
219+
220+ schedule_operation_message (std::move (*next_message));
151221}
152222
153223void MessageHandler::run_result_message_worker () {
0 commit comments