This repository was archived by the owner on Dec 9, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 22
Expand file tree
/
Copy pathmessage_queue.hpp
More file actions
93 lines (72 loc) · 2.54 KB
/
message_queue.hpp
File metadata and controls
93 lines (72 loc) · 2.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
// SPDX-License-Identifier: Apache-2.0
// Copyright Pionix GmbH and Contributors to EVerest
#ifndef UTILS_MESSAGE_QUEUE_HPP
#define UTILS_MESSAGE_QUEUE_HPP
#include <condition_variable>
#include <cstddef>
#include <functional>
#include <map>
#include <memory>
#include <mutex>
#include <queue>
#include <thread>
#include <unordered_set>
#include <nlohmann/json.hpp>
#include <utils/types.hpp>
namespace Everest {
/// \brief Contains a payload and the topic it was received on
struct Message {
std::string topic; ///< The MQTT topic where this message originated from
std::string payload; ///< The message payload
};
struct ParsedMessage {
std::string topic;
json data;
};
using MessageCallback = std::function<void(const Message&)>;
/// \brief Simple message queue that takes std::string messages, parsed them and dispatches them to handlers
class MessageQueue {
private:
std::thread worker_thread;
std::queue<std::unique_ptr<Message>> message_queue;
std::mutex queue_ctrl_mutex;
MessageCallback message_callback;
std::condition_variable cv;
bool running;
public:
/// \brief Creates a message queue with the provided \p message_callback
explicit MessageQueue(MessageCallback);
~MessageQueue();
/// \brief Adds a \p message to the message queue which will then be delivered to the message callback
void add(std::unique_ptr<Message>);
/// \brief Stops the message queue
void stop();
};
/// \brief Contains a message queue driven list of handler callbacks
class MessageHandler {
private:
std::unordered_set<std::shared_ptr<TypedHandler>> handlers;
std::thread handler_thread;
std::queue<std::shared_ptr<ParsedMessage>> message_queue;
std::mutex handler_ctrl_mutex;
std::mutex handler_list_mutex;
std::condition_variable cv;
bool running;
public:
/// \brief Creates the message handler
MessageHandler();
/// \brief Destructor
~MessageHandler();
/// \brief Adds a \p message to the message queue which will be delivered to the registered handlers
void add(std::shared_ptr<ParsedMessage>);
/// \brief Stops the message handler
void stop();
/// \brief Adds a \p handler that will receive messages from the queue.
void add_handler(const std::shared_ptr<TypedHandler>& handler);
/// \brief Removes a specific \p handler
void remove_handler(const std::shared_ptr<TypedHandler>& handler);
/// \brief \returns the number of registered handlers
std::size_t count_handlers();
};
} // namespace Everest
#endif // UTILS_MESSAGE_QUEUE_HPP