Skip to content

Commit f616af2

Browse files
authored
Merge pull request #209 from Zadamsa/mqtt
MQTT plugin v3.1.1
2 parents 6de8a04 + a2e029f commit f616af2

File tree

9 files changed

+487
-4
lines changed

9 files changed

+487
-4
lines changed

Makefile.am

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ DISTCHECK_CONFIGURE_FLAGS="--with-systemdsystemunitdir=$$dc_install_base/$(syste
1111

1212
ipfixprobe_LDFLAGS=-lpthread -ldl -latomic
1313
ipfixprobe_CFLAGS=-I$(srcdir)/include/ -fPIC
14-
ipfixprobe_CXXFLAGS=-std=gnu++11 -Wno-write-strings -I$(srcdir)/include/ -fPIC
14+
ipfixprobe_CXXFLAGS=-std=gnu++17 -Wno-write-strings -I$(srcdir)/include/ -fPIC
1515

1616
if OS_CYGWIN
1717
ipfixprobe_CXXFLAGS+=-Wl,--export-all-symbols
@@ -143,7 +143,9 @@ ipfixprobe_process_src=\
143143
process/flow_hash.hpp \
144144
process/flow_hash.cpp \
145145
process/mpls.hpp \
146-
process/mpls.cpp
146+
process/mpls.cpp \
147+
process/mqtt.hpp \
148+
process/mqtt.cpp
147149

148150
if WITH_QUIC
149151
ipfixprobe_process_src+=\

README.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,20 @@ List of unirec fields exported together with basic flow fields on interface by P
352352
| DNS_RR_TTL | uint32 | resource record TTL field |
353353
| DNS_IP | ipaddr | IP address from PTR, A or AAAA record |
354354

355+
356+
### MQTT
357+
List of unirec fields exported together with basic flow fields on interface by MQTT plugin.
358+
359+
| Output field | Type | Description |
360+
|:-----------------------------:|:------:|:-----------------------------------------------------:|
361+
| MQTT_TYPE_CUMULATIVE | uint16 | types of packets and session present flag cumulative |
362+
| MQTT_VERSION | uint8 | MQTT version |
363+
| MQTT_CONNECTION_FLAGS | uint8 | last CONNECT packet flags |
364+
| MQTT_KEEP_ALIVE | uint16 | last CONNECT keep alive |
365+
| MQTT_CONNECTION_RETURN_CODE | uint8 | last CONNECT return code |
366+
| MQTT_PUBLISH_FLAGS | uint8 | cumulative of PUBLISH packet flags |
367+
| MQTT_TOPICS | string | topics from PUBLISH packets headers |
368+
355369
### SIP
356370
List of unirec fields exported together with basic flow fields on interface by SIP plugin.
357371

include/ipfixprobe/ipfix-elements.hpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,14 @@ namespace ipxp {
295295
#define NTS_TIME_DISTRIBUTION(F) F(8057, 1031, 4, nullptr)
296296
#define NTS_SWITCHING_RATIO(F) F(8057, 1032, 4, nullptr)
297297

298+
#define MQTT_TYPE_CUMULATIVE(F) F(8057, 1033, 2, nullptr)
299+
#define MQTT_VERSION(F) F(8057, 1034, 1, nullptr)
300+
#define MQTT_CONNECTION_FLAGS(F) F(8057, 1035, 1, nullptr)
301+
#define MQTT_KEEP_ALIVE(F) F(8057, 1036, 2, nullptr)
302+
#define MQTT_LAST_RETURN_CODE(F) F(8057, 1037, 1, nullptr)
303+
#define MQTT_PUBLISH_FLAGS(F) F(8057, 1038, 1, nullptr)
304+
#define MQTT_TOPICS(F) F(8057, 1039, -1, nullptr)
305+
298306
#define MPLS_TOP_LABEL_STACK_SECTION F(0, 70, -1, nullptr)
299307

300308

@@ -443,6 +451,15 @@ namespace ipxp {
443451
F(SIP_USER_AGENT) \
444452
F(SIP_REQUEST_URI) \
445453
F(SIP_VIA)
454+
455+
#define IPFIX_MQTT_TEMPLATE(F) \
456+
F(MQTT_TYPE_CUMULATIVE) \
457+
F(MQTT_VERSION) \
458+
F(MQTT_CONNECTION_FLAGS) \
459+
F(MQTT_KEEP_ALIVE) \
460+
F(MQTT_LAST_RETURN_CODE) \
461+
F(MQTT_PUBLISH_FLAGS) \
462+
F(MQTT_TOPICS)
446463

447464
#define IPFIX_PSTATS_TEMPLATE(F) \
448465
F(STATS_PCKT_SIZES) \

pcaps/mqtt.pcap

3.07 KB
Binary file not shown.

process/mqtt.cpp

Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
/**
2+
* \file mqtt.hpp
3+
* \brief MQTT plugin for ipfixprobe
4+
* \author Damir Zainullin <[email protected]>
5+
* \date 2024
6+
*/
7+
/*
8+
* Copyright (C) 2023 CESNET
9+
*
10+
* LICENSE TERMS
11+
*
12+
* Redistribution and use in source and binary forms, with or without
13+
* modification, are permitted provided that the following conditions
14+
* are met:
15+
* 1. Redistributions of source code must retain the above copyright
16+
* notice, this list of conditions and the following disclaimer.
17+
* 2. Redistributions in binary form must reproduce the above copyright
18+
* notice, this list of conditions and the following disclaimer in
19+
* the documentation and/or other materials provided with the
20+
* distribution.
21+
* 3. Neither the name of the Company nor the names of its contributors
22+
* may be used to endorse or promote products derived from this
23+
* software without specific prior written permission.
24+
*/
25+
26+
#include "mqtt.hpp"
27+
28+
#include <cstring>
29+
30+
#ifdef DEBUG_MQTT
31+
static const bool debug_mqtt = true;
32+
#else
33+
static const bool debug_mqtt = false;
34+
#endif
35+
namespace ipxp {
36+
37+
int RecordExtMQTT::REGISTERED_ID = -1;
38+
39+
__attribute__((constructor)) static void register_this_plugin()
40+
{
41+
static PluginRecord rec = PluginRecord("mqtt", []() { return new MQTTPlugin(); });
42+
register_plugin(&rec);
43+
RecordExtMQTT::REGISTERED_ID = register_extension();
44+
}
45+
46+
int MQTTPlugin::post_create(Flow& rec, const Packet& pkt)
47+
{
48+
if (has_mqtt_protocol_name(reinterpret_cast<const char*>(pkt.payload), pkt.payload_len))
49+
add_ext_mqtt(reinterpret_cast<const char*>(pkt.payload), pkt.payload_len, rec);
50+
return 0;
51+
}
52+
53+
int MQTTPlugin::pre_update(Flow& rec, Packet& pkt)
54+
{
55+
const char* payload = reinterpret_cast<const char*>(pkt.payload);
56+
RecordExt* ext = rec.get_extension(RecordExtMQTT::REGISTERED_ID);
57+
if (ext == nullptr) {
58+
return 0;
59+
} else {
60+
parse_mqtt(payload, pkt.payload_len, static_cast<RecordExtMQTT*>(ext));
61+
}
62+
return 0;
63+
}
64+
65+
/**
66+
* \brief Read variable integer as defined in
67+
* http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html. \param [in] data Pointer to IP
68+
* payload. \param [in] payload_len IP payload length. \param [in] last_byte Next after last read
69+
* byte. \return Pair of read integer and bool. Bool is false in case read was unsuccessful.
70+
*/
71+
std::pair<uint32_t, bool>
72+
MQTTPlugin::read_variable_int(const char* data, int payload_len, uint32_t& last_byte) const noexcept
73+
{
74+
uint32_t res = 0;
75+
bool next;
76+
for (next = true; next && last_byte < (uint32_t) payload_len; last_byte++) {
77+
res <<= 8;
78+
res |= data[last_byte];
79+
next = (data[last_byte] & 0b1000'0000);
80+
}
81+
return last_byte == (uint32_t) payload_len && next ? std::make_pair(0u, false)
82+
: std::make_pair(res, true);
83+
}
84+
85+
/**
86+
* \brief Read utf8 encoded string as defined in
87+
* http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html. \param [in] data Pointer to IP
88+
* payload. \param [in] payload_len IP payload length. \param [in] last_byte Next after last read
89+
* byte. \return Tuple of read string, its length and bool. Bool is false in case read was
90+
* unsuccessful.
91+
*/
92+
std::tuple<uint32_t, std::string_view, bool>
93+
MQTTPlugin::read_utf8_string(const char* data, int payload_len, uint32_t& last_byte) const noexcept
94+
{
95+
if (last_byte + 2 >= (uint32_t) payload_len)
96+
return {0, {}, false};
97+
uint16_t string_length = ntohs(*(uint16_t*) &data[last_byte]);
98+
last_byte += 2;
99+
if (last_byte + string_length >= (uint32_t) payload_len)
100+
return {0, {}, false};
101+
return {string_length, std::string_view(&data[last_byte], string_length), true};
102+
}
103+
104+
/**
105+
* \brief Parse buffer to check if it contains MQTT packets.
106+
* \param [in] data Pointer to IP payload.
107+
* \param [in] payload_len IP payload length.
108+
* \param [in,out] rec Record to write MQTT data in.
109+
* \return True if buffer contains set of valid mqtt packets.
110+
*/
111+
bool MQTTPlugin::parse_mqtt(const char* data, int payload_len, RecordExtMQTT* rec) noexcept
112+
{
113+
if (payload_len <= 0)
114+
return false;
115+
uint32_t last_byte = 0;
116+
// Each tcp segment may contain more MQTT packets
117+
while (last_byte < (uint32_t) payload_len) {
118+
uint8_t type, flags;
119+
type = flags = data[last_byte++];
120+
type >>= 4;
121+
flags &= 0b00001111;
122+
rec->type_cumulative |= (0b1 << type);
123+
124+
auto [remaining_length, success] = read_variable_int(data, payload_len, last_byte);
125+
if (!success || last_byte + remaining_length > (uint32_t) payload_len) {
126+
if constexpr (debug_mqtt)
127+
std::cout << "Invalid remaining length read" << std::endl;
128+
return false;
129+
}
130+
auto first_byte_after_payload = remaining_length + last_byte;
131+
// Connect packet
132+
if (type == 1) {
133+
if (!has_mqtt_protocol_name(data, payload_len)) {
134+
if constexpr (debug_mqtt)
135+
std::cout << "Connection packet doesn't have MQTT label" << std::endl;
136+
return false;
137+
}
138+
last_byte += 6; // Skip "MQTT" label(and its 2-byte length)
139+
rec->version = data[last_byte++];
140+
// Only MQTT v3.1.1 and v5.0 are supported
141+
if (rec->version != 4 && rec->version != 5) {
142+
if constexpr (debug_mqtt)
143+
std::cout << "Unsupported mqtt version" << std::endl;
144+
return false;
145+
}
146+
rec->connection_flags = data[last_byte++];
147+
rec->keep_alive = ntohs(*(uint16_t*) &data[last_byte]);
148+
}
149+
// Connect ACK packet
150+
else if (type == 2) {
151+
rec->session_present_flag = data[last_byte++] & 0b1; /// Connect Acknowledge Flags
152+
rec->connection_return_code = data[last_byte++];
153+
}
154+
// Publish packet
155+
else if (type == 3) {
156+
rec->publish_flags |= flags;
157+
auto [str_len, str, success] = read_utf8_string(data, payload_len, last_byte);
158+
if (!success) {
159+
if constexpr (debug_mqtt)
160+
std::cout << "Invalid utf8 string read" << std::endl;
161+
return false;
162+
}
163+
if (str.find('#') != std::string::npos) {
164+
if constexpr (debug_mqtt)
165+
std::cout << "Topic name contains wildcard char" << std::endl;
166+
return false;
167+
}
168+
// Use '#' as delimiter, as '#' and '?' are only forbidden characters for topic name
169+
if (rec->topics.count++ < maximal_topic_count) {
170+
rec->topics.str += std::move(std::string(str.begin(), str.end()).append("#"));
171+
}
172+
}
173+
// Disconnect packet
174+
else if (type == 14) {
175+
flow_flush = true;
176+
}
177+
178+
last_byte = first_byte_after_payload; // Skip rest of payload
179+
}
180+
return true;
181+
}
182+
183+
int MQTTPlugin::post_update(Flow& rec, const Packet& pkt)
184+
{
185+
if (flow_flush) {
186+
flow_flush = false;
187+
return FLOW_FLUSH;
188+
}
189+
return 0;
190+
}
191+
192+
/**
193+
* \brief Parse buffer to check if it contains MQTT packets.
194+
* \param [in] data Pointer to IP payload.
195+
* \param [in] payload_len IP payload length.
196+
* \return True if buffer starts with MQTT label as part of connection mqtt packet.
197+
*/
198+
bool MQTTPlugin::has_mqtt_protocol_name(const char* data, int payload_len) const noexcept
199+
{
200+
if (payload_len <= 1)
201+
return false;
202+
auto pos = 1u;
203+
if (auto [_, success] = read_variable_int(data, payload_len, pos); !success)
204+
return false;
205+
auto [string_length, str, success] = read_utf8_string(data, payload_len, pos);
206+
return success && str == "MQTT";
207+
}
208+
209+
void MQTTPlugin::add_ext_mqtt(const char* data, int payload_len, Flow& flow)
210+
{
211+
if (recPrealloc == nullptr) {
212+
recPrealloc = new RecordExtMQTT();
213+
}
214+
if (!parse_mqtt(data, payload_len, recPrealloc))
215+
return;
216+
flow.add_extension(recPrealloc);
217+
recPrealloc = nullptr;
218+
}
219+
220+
void MQTTPlugin::init(const char* params)
221+
{
222+
MQTTOptionsParser parser;
223+
try {
224+
parser.parse(params);
225+
} catch (ParserError& e) {
226+
throw PluginError(e.what());
227+
}
228+
maximal_topic_count = parser.m_maximal_topic_count;
229+
}
230+
231+
ProcessPlugin* MQTTPlugin::copy()
232+
{
233+
return new MQTTPlugin(*this);
234+
}
235+
236+
} // namespace ipxp

0 commit comments

Comments
 (0)