Skip to content

Commit 8c09c07

Browse files
committed
Add mqtt process plugin
1 parent 3a7a546 commit 8c09c07

File tree

3 files changed

+445
-0
lines changed

3 files changed

+445
-0
lines changed

include/ipfixprobe/ipfix-elements.hpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,14 @@ namespace ipxp {
295295
#define NTS_TIME_DISTRIBUTION(F) F(8057, 1031, 4, nullptr)
296296
#define NTS_SWITCHING_RATIO(F) F(8057, 1032, 4, nullptr)
297297

298+
#define MQTT_TYPE_CUMULATIVE(F) F(8057, 1033, 2, nullptr)
299+
#define MQTT_VERSION(F) F(8057, 1034, 1, nullptr)
300+
#define MQTT_CONNECTION_FLAGS(F) F(8057, 1035, 1, nullptr)
301+
#define MQTT_KEEP_ALIVE(F) F(8057, 1036, 2, nullptr)
302+
#define MQTT_LAST_RETURN_CODE(F) F(8057, 1037, 1, nullptr)
303+
#define MQTT_PUBLISH_FLAGS(F) F(8057, 1038, 1, nullptr)
304+
#define MQTT_TOPICS(F) F(8057, 1039, -1, nullptr)
305+
298306
#define MPLS_TOP_LABEL_STACK_SECTION F(0, 70, -1, nullptr)
299307

300308

@@ -443,6 +451,15 @@ namespace ipxp {
443451
F(SIP_USER_AGENT) \
444452
F(SIP_REQUEST_URI) \
445453
F(SIP_VIA)
454+
455+
#define IPFIX_MQTT_TEMPLATE(F) \
456+
F(MQTT_TYPE_CUMULATIVE) \
457+
F(MQTT_VERSION) \
458+
F(MQTT_CONNECTION_FLAGS) \
459+
F(MQTT_KEEP_ALIVE) \
460+
F(MQTT_LAST_RETURN_CODE) \
461+
F(MQTT_PUBLISH_FLAGS) \
462+
F(MQTT_TOPICS)
446463

447464
#define IPFIX_PSTATS_TEMPLATE(F) \
448465
F(STATS_PCKT_SIZES) \

process/mqtt.cpp

Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
/**
2+
* \file mqtt.hpp
3+
* \brief MQTT plugin for ipfixprobe
4+
* \author Damir Zainullin <[email protected]>
5+
* \date 2024
6+
*/
7+
/*
8+
* Copyright (C) 2023 CESNET
9+
*
10+
* LICENSE TERMS
11+
*
12+
* Redistribution and use in source and binary forms, with or without
13+
* modification, are permitted provided that the following conditions
14+
* are met:
15+
* 1. Redistributions of source code must retain the above copyright
16+
* notice, this list of conditions and the following disclaimer.
17+
* 2. Redistributions in binary form must reproduce the above copyright
18+
* notice, this list of conditions and the following disclaimer in
19+
* the documentation and/or other materials provided with the
20+
* distribution.
21+
* 3. Neither the name of the Company nor the names of its contributors
22+
* may be used to endorse or promote products derived from this
23+
* software without specific prior written permission.
24+
*/
25+
#include "mqtt.hpp"
26+
#include "cstring"
27+
#ifdef DEBUG_MQTT
28+
static const bool debug_mqtt = true;
29+
#else
30+
static const bool debug_mqtt = false;
31+
#endif
32+
namespace ipxp {
33+
34+
int RecordExtMQTT::REGISTERED_ID = -1;
35+
36+
__attribute__((constructor)) static void register_this_plugin(){
37+
static PluginRecord rec = PluginRecord("mqtt", []() { return new MQTTPlugin(); });
38+
register_plugin(&rec);
39+
RecordExtMQTT::REGISTERED_ID = register_extension();
40+
}
41+
42+
int MQTTPlugin::post_create(Flow &rec, const Packet &pkt){
43+
if (has_mqtt_protocol_name(reinterpret_cast<const char*>(pkt.payload), pkt.payload_len))
44+
add_ext_mqtt(reinterpret_cast<const char*>(pkt.payload), pkt.payload_len, rec);
45+
return 0;
46+
}
47+
48+
int MQTTPlugin::pre_update(Flow &rec, Packet &pkt){
49+
const char* payload = reinterpret_cast<const char*>(pkt.payload);
50+
RecordExt* ext = rec.get_extension(RecordExtMQTT::REGISTERED_ID);
51+
if (ext == nullptr) {
52+
return 0;
53+
}else{
54+
parse_mqtt(payload, pkt.payload_len, static_cast<RecordExtMQTT*>(ext));
55+
}
56+
return 0;
57+
}
58+
59+
/**
60+
* \brief Read variable integer as defined in http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html.
61+
* \param [in] data Pointer to IP payload.
62+
* \param [in] payload_len IP payload length.
63+
* \param [in] last_byte Next after last read byte.
64+
* \return Pair of read integer and bool. Bool is false in case read was unsuccessful.
65+
*/
66+
std::pair<uint32_t, bool> MQTTPlugin::read_variable_int(const char* data, int payload_len, uint32_t& last_byte)const noexcept{
67+
uint32_t res = 0;
68+
bool next;
69+
for( next = true; next && last_byte < (uint32_t)payload_len; last_byte++){
70+
res <<= 8;
71+
res |= data[last_byte];
72+
next = (data[last_byte] & 0b1000'0000);
73+
}
74+
return last_byte == (uint32_t)payload_len && next ? std::make_pair(0u, false) : std::make_pair(res, true);
75+
}
76+
77+
/**
78+
* \brief Read utf8 encoded string as defined in http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html.
79+
* \param [in] data Pointer to IP payload.
80+
* \param [in] payload_len IP payload length.
81+
* \param [in] last_byte Next after last read byte.
82+
* \return Tuple of read string, its length and bool. Bool is false in case read was unsuccessful.
83+
*/
84+
std::tuple<uint32_t, std::string_view, bool> MQTTPlugin::read_utf8_string(const char* data, int payload_len, uint32_t& last_byte) const noexcept{
85+
if (last_byte + 2 >= (uint32_t)payload_len)
86+
return {0, {}, false};
87+
uint16_t string_length = ntohs(*(uint16_t*)&data[last_byte]);
88+
last_byte += 2;
89+
if (last_byte + string_length >= (uint32_t)payload_len)
90+
return {0, {}, false};
91+
return {string_length, std::string_view(&data[last_byte], string_length), true};
92+
}
93+
94+
/**
95+
* \brief Parse buffer to check if it contains MQTT packets.
96+
* \param [in] data Pointer to IP payload.
97+
* \param [in] payload_len IP payload length.
98+
* \param [in,out] rec Record to write MQTT data in.
99+
* \return True if buffer contains set of valid mqtt packets.
100+
*/
101+
bool MQTTPlugin::parse_mqtt(const char* data, int payload_len, RecordExtMQTT* rec) noexcept{
102+
if (payload_len <= 0)
103+
return false;
104+
uint32_t last_byte = 0;
105+
// Each tcp segment may contain more MQTT packets
106+
while(last_byte < (uint32_t)payload_len) {
107+
uint8_t type, flags;
108+
type = flags = data[last_byte++];
109+
type >>= 4;
110+
flags &= 0b00001111;
111+
rec->type_cumulative |= (0b1 << type);
112+
113+
auto [remaining_length, success] = read_variable_int(data, payload_len, last_byte);
114+
if (!success || last_byte + remaining_length > (uint32_t) payload_len) {
115+
if constexpr (debug_mqtt)
116+
std::cout << "Invalid remaining length read" << std::endl;
117+
return false;
118+
}
119+
auto first_byte_after_payload = remaining_length + last_byte;
120+
// Connect packet
121+
if (type == 1) {
122+
if (!has_mqtt_protocol_name(data, payload_len)){
123+
if constexpr (debug_mqtt)
124+
std::cout << "Connection packet doesn't have MQTT label" << std::endl;
125+
return false;
126+
}
127+
last_byte += 6; // Skip "MQTT" label(and its 2-byte length)
128+
rec->version = data[last_byte++];
129+
//Only MQTT v3.1.1 and v5.0 are supported
130+
if (rec->version != 4 && rec->version != 5){
131+
if constexpr (debug_mqtt)
132+
std::cout << "Unsupported mqtt version" << std::endl;
133+
return false;
134+
}
135+
rec->connection_flags = data[last_byte++];
136+
rec->keep_alive = ntohs(*(uint16_t*) &data[last_byte]);
137+
}
138+
// Connect ACK packet
139+
else if (type == 2) {
140+
rec->session_present_flag = data[last_byte++] & 0b1; /// Connect Acknowledge Flags
141+
rec->connection_return_code = data[last_byte++];
142+
}
143+
// Publish packet
144+
else if (type == 3) {
145+
rec->publish_flags |= flags;
146+
auto [str_len, str, success] = read_utf8_string(data, payload_len, last_byte);
147+
if (!success) {
148+
if constexpr (debug_mqtt)
149+
std::cout << "Invalid utf8 string read" << std::endl;
150+
return false;
151+
}
152+
if (str.find('#') != std::string::npos) {
153+
if constexpr (debug_mqtt)
154+
std::cout << "Topic name contains wildcard char" << std::endl;
155+
return false;
156+
}
157+
// Use '#' as delimiter, as '#' and '?' are only forbidden characters for topic name
158+
if (rec->topics.count++ < maximal_topic_count) {
159+
rec->topics.str += std::move(std::string(str.begin(), str.end()).append("#"));
160+
}
161+
}
162+
// Disconnect packet
163+
else if (type == 14) {
164+
flow_flush = true;
165+
}
166+
// Read packet identifier
167+
/*if ( (rec->type >= 3 && rec->type <= 11) || (rec->type == 3 && ((rec->flags >> 1) & 0b11)!= 0)){
168+
rec->id = *((const uint16_t*)(&data[last_byte])); last_byte += 2;
169+
}*/
170+
171+
// Read properties (only for MQTT v5.0)
172+
/*if (rec->type <= 11 || rec->type >= 14){
173+
rec->property_length = read_variable_int(data, payload_len, last_byte);
174+
read_properties(data, payload_len, last_byte, rec);
175+
}*/
176+
177+
last_byte = first_byte_after_payload; // Skip rest of payload
178+
}
179+
return true;
180+
}
181+
182+
int MQTTPlugin::post_update(Flow &rec, const Packet &pkt){
183+
if (flow_flush){
184+
flow_flush = false;
185+
return FLOW_FLUSH;
186+
}
187+
return 0;
188+
}
189+
190+
void MQTTPlugin::pre_export(Flow &rec){
191+
if constexpr (debug_mqtt){
192+
RecordExtMQTT* ext = static_cast<RecordExtMQTT*>(rec.get_extension(RecordExtMQTT::REGISTERED_ID));
193+
if (ext == nullptr)
194+
return;
195+
/*std::cout << "MQTT Exports: type cumulative = " << std::to_string(ext->type_cumulative)
196+
<< ", version = " << std::to_string(ext->version) << ", con_flags = "
197+
<< std::to_string(ext->connection_flags) << ",keep_alive = "
198+
<< std::to_string(ext->keep_alive)<< ",\nsession_present= "
199+
<< std::to_string(ext->session_present_flag) << ", last return code= "
200+
<< std::to_string(ext->last_connect_return_code) << ", publish flags= "
201+
<< std::to_string(ext->publish_flags) << ",\npublished topics: ";
202+
for(auto it = ext->topics.begin(); it != ext->topics.end(); it++)
203+
std::cout << (it == ext->topics.begin() ? "" : ", ") << *it;
204+
std::cout << std::endl;*/
205+
}
206+
return;
207+
}
208+
209+
/**
210+
* \brief Parse buffer to check if it contains MQTT packets.
211+
* \param [in] data Pointer to IP payload.
212+
* \param [in] payload_len IP payload length.
213+
* \return True if buffer starts with MQTT label as part of connection mqtt packet.
214+
*/
215+
bool MQTTPlugin::has_mqtt_protocol_name(const char* data, int payload_len) const noexcept{
216+
if (payload_len <= 1)
217+
return false;
218+
auto pos = 1u;
219+
if (auto [_, success] = read_variable_int(data,payload_len, pos); !success)
220+
return false;
221+
auto [string_length, str, success] = read_utf8_string(data,payload_len, pos);
222+
return success && str == "MQTT";
223+
}
224+
225+
void MQTTPlugin::add_ext_mqtt(const char* data, int payload_len, Flow& flow)
226+
{
227+
if (recPrealloc == nullptr) {
228+
recPrealloc = new RecordExtMQTT();
229+
}
230+
if (!parse_mqtt(data, payload_len, recPrealloc))
231+
return ;
232+
flow.add_extension(recPrealloc);
233+
recPrealloc = nullptr;
234+
}
235+
236+
void MQTTPlugin::init(const char *params){
237+
MQTTOptionsParser parser;
238+
try {
239+
parser.parse(params);
240+
} catch (ParserError &e) {
241+
throw PluginError(e.what());
242+
}
243+
maximal_topic_count = parser.m_maximal_topic_count;
244+
}
245+
246+
ProcessPlugin* MQTTPlugin::copy()
247+
{
248+
return new MQTTPlugin(*this);
249+
}
250+
251+
} // namespace ipxp

0 commit comments

Comments
 (0)