11/* *
2- * \file mqtt.hpp
3- * \brief MQTT plugin for ipfixprobe
4- * \author Damir Zainullin <[email protected] > 5- * \date 2024
2+ * \file mqtt.hpp
3+ * \brief MQTT plugin for ipfixprobe
4+ * \author Damir Zainullin <[email protected] >5+ * \date 2024
66 */
77/*
8- * Copyright (C) 2023 CESNET
9- *
10- * LICENSE TERMS
11- *
12- * Redistribution and use in source and binary forms, with or without
13- * modification, are permitted provided that the following conditions
14- * are met:
15- * 1. Redistributions of source code must retain the above copyright
16- * notice, this list of conditions and the following disclaimer.
17- * 2. Redistributions in binary form must reproduce the above copyright
18- * notice, this list of conditions and the following disclaimer in
19- * the documentation and/or other materials provided with the
20- * distribution.
21- * 3. Neither the name of the Company nor the names of its contributors
22- * may be used to endorse or promote products derived from this
23- * software without specific prior written permission.
8+ * Copyright (C) 2023 CESNET
9+ *
10+ * LICENSE TERMS
11+ *
12+ * Redistribution and use in source and binary forms, with or without
13+ * modification, are permitted provided that the following conditions
14+ * are met:
15+ * 1. Redistributions of source code must retain the above copyright
16+ * notice, this list of conditions and the following disclaimer.
17+ * 2. Redistributions in binary form must reproduce the above copyright
18+ * notice, this list of conditions and the following disclaimer in
19+ * the documentation and/or other materials provided with the
20+ * distribution.
21+ * 3. Neither the name of the Company nor the names of its contributors
22+ * may be used to endorse or promote products derived from this
23+ * software without specific prior written permission.
2424 */
25+
2526#include " mqtt.hpp"
26- #include " cstring"
27+
28+ #include < cstring>
29+
2730#ifdef DEBUG_MQTT
2831static const bool debug_mqtt = true ;
2932#else
@@ -33,60 +36,67 @@ namespace ipxp {
3336
3437int RecordExtMQTT::REGISTERED_ID = -1 ;
3538
36- __attribute__ ((constructor)) static void register_this_plugin (){
39+ __attribute__ ((constructor)) static void register_this_plugin ()
40+ {
3741 static PluginRecord rec = PluginRecord (" mqtt" , []() { return new MQTTPlugin (); });
3842 register_plugin (&rec);
3943 RecordExtMQTT::REGISTERED_ID = register_extension ();
4044}
4145
42- int MQTTPlugin::post_create (Flow &rec, const Packet &pkt){
46+ int MQTTPlugin::post_create (Flow& rec, const Packet& pkt)
47+ {
4348 if (has_mqtt_protocol_name (reinterpret_cast <const char *>(pkt.payload ), pkt.payload_len ))
4449 add_ext_mqtt (reinterpret_cast <const char *>(pkt.payload ), pkt.payload_len , rec);
4550 return 0 ;
4651}
4752
48- int MQTTPlugin::pre_update (Flow &rec, Packet &pkt){
53+ int MQTTPlugin::pre_update (Flow& rec, Packet& pkt)
54+ {
4955 const char * payload = reinterpret_cast <const char *>(pkt.payload );
5056 RecordExt* ext = rec.get_extension (RecordExtMQTT::REGISTERED_ID);
5157 if (ext == nullptr ) {
5258 return 0 ;
53- }else {
59+ } else {
5460 parse_mqtt (payload, pkt.payload_len , static_cast <RecordExtMQTT*>(ext));
5561 }
5662 return 0 ;
5763}
5864
5965/* *
60- * \brief Read variable integer as defined in http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html.
61- * \param [in] data Pointer to IP payload.
62- * \param [in] payload_len IP payload length.
63- * \param [in] last_byte Next after last read byte.
64- * \return Pair of read integer and bool. Bool is false in case read was unsuccessful.
66+ * \brief Read variable integer as defined in
67+ * http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html. \param [in] data Pointer to IP
68+ * payload. \param [in] payload_len IP payload length. \param [in] last_byte Next after last read
69+ * byte. \return Pair of read integer and bool. Bool is false in case read was unsuccessful.
6570 */
66- std::pair<uint32_t , bool > MQTTPlugin::read_variable_int (const char * data, int payload_len, uint32_t & last_byte)const noexcept {
71+ std::pair<uint32_t , bool >
72+ MQTTPlugin::read_variable_int (const char * data, int payload_len, uint32_t & last_byte) const noexcept
73+ {
6774 uint32_t res = 0 ;
6875 bool next;
69- for ( next = true ; next && last_byte < (uint32_t )payload_len; last_byte++){
76+ for ( next = true ; next && last_byte < (uint32_t ) payload_len; last_byte++) {
7077 res <<= 8 ;
7178 res |= data[last_byte];
7279 next = (data[last_byte] & 0b1000'0000 );
7380 }
74- return last_byte == (uint32_t )payload_len && next ? std::make_pair (0u , false ) : std::make_pair (res, true );
81+ return last_byte == (uint32_t ) payload_len && next ? std::make_pair (0u , false )
82+ : std::make_pair (res, true );
7583}
7684
7785/* *
78- * \brief Read utf8 encoded string as defined in http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html.
79- * \param [in] data Pointer to IP payload.
80- * \param [in] payload_len IP payload length.
81- * \param [in] last_byte Next after last read byte.
82- * \return Tuple of read string, its length and bool. Bool is false in case read was unsuccessful.
86+ * \brief Read utf8 encoded string as defined in
87+ * http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html. \param [in] data Pointer to IP
88+ * payload. \param [in] payload_len IP payload length. \param [in] last_byte Next after last read
89+ * byte. \return Tuple of read string, its length and bool. Bool is false in case read was
90+ * unsuccessful.
8391 */
84- std::tuple<uint32_t , std::string_view, bool > MQTTPlugin::read_utf8_string (const char * data, int payload_len, uint32_t & last_byte) const noexcept {
85- if (last_byte + 2 >= (uint32_t )payload_len)
92+ std::tuple<uint32_t , std::string_view, bool >
93+ MQTTPlugin::read_utf8_string (const char * data, int payload_len, uint32_t & last_byte) const noexcept
94+ {
95+ if (last_byte + 2 >= (uint32_t ) payload_len)
8696 return {0 , {}, false };
87- uint16_t string_length = ntohs (*(uint16_t *)&data[last_byte]);
97+ uint16_t string_length = ntohs (*(uint16_t *) &data[last_byte]);
8898 last_byte += 2 ;
89- if (last_byte + string_length >= (uint32_t )payload_len)
99+ if (last_byte + string_length >= (uint32_t ) payload_len)
90100 return {0 , {}, false };
91101 return {string_length, std::string_view (&data[last_byte], string_length), true };
92102}
@@ -98,12 +108,13 @@ std::tuple<uint32_t, std::string_view, bool> MQTTPlugin::read_utf8_string(const
98108 * \param [in,out] rec Record to write MQTT data in.
99109 * \return True if buffer contains set of valid mqtt packets.
100110 */
101- bool MQTTPlugin::parse_mqtt (const char * data, int payload_len, RecordExtMQTT* rec) noexcept {
111+ bool MQTTPlugin::parse_mqtt (const char * data, int payload_len, RecordExtMQTT* rec) noexcept
112+ {
102113 if (payload_len <= 0 )
103114 return false ;
104115 uint32_t last_byte = 0 ;
105116 // Each tcp segment may contain more MQTT packets
106- while (last_byte < (uint32_t )payload_len) {
117+ while (last_byte < (uint32_t ) payload_len) {
107118 uint8_t type, flags;
108119 type = flags = data[last_byte++];
109120 type >>= 4 ;
@@ -119,15 +130,15 @@ bool MQTTPlugin::parse_mqtt(const char* data, int payload_len, RecordExtMQTT* re
119130 auto first_byte_after_payload = remaining_length + last_byte;
120131 // Connect packet
121132 if (type == 1 ) {
122- if (!has_mqtt_protocol_name (data, payload_len)){
133+ if (!has_mqtt_protocol_name (data, payload_len)) {
123134 if constexpr (debug_mqtt)
124135 std::cout << " Connection packet doesn't have MQTT label" << std::endl;
125136 return false ;
126137 }
127138 last_byte += 6 ; // Skip "MQTT" label(and its 2-byte length)
128139 rec->version = data[last_byte++];
129- // Only MQTT v3.1.1 and v5.0 are supported
130- if (rec->version != 4 && rec->version != 5 ){
140+ // Only MQTT v3.1.1 and v5.0 are supported
141+ if (rec->version != 4 && rec->version != 5 ) {
131142 if constexpr (debug_mqtt)
132143 std::cout << " Unsupported mqtt version" << std::endl;
133144 return false ;
@@ -163,62 +174,35 @@ bool MQTTPlugin::parse_mqtt(const char* data, int payload_len, RecordExtMQTT* re
163174 else if (type == 14 ) {
164175 flow_flush = true ;
165176 }
166- // Read packet identifier
167- /* if ( (rec->type >= 3 && rec->type <= 11) || (rec->type == 3 && ((rec->flags >> 1) & 0b11)!= 0)){
168- rec->id = *((const uint16_t*)(&data[last_byte])); last_byte += 2;
169- }*/
170-
171- // Read properties (only for MQTT v5.0)
172- /* if (rec->type <= 11 || rec->type >= 14){
173- rec->property_length = read_variable_int(data, payload_len, last_byte);
174- read_properties(data, payload_len, last_byte, rec);
175- }*/
176177
177178 last_byte = first_byte_after_payload; // Skip rest of payload
178179 }
179180 return true ;
180181}
181182
182- int MQTTPlugin::post_update (Flow &rec, const Packet &pkt){
183- if (flow_flush){
183+ int MQTTPlugin::post_update (Flow& rec, const Packet& pkt)
184+ {
185+ if (flow_flush) {
184186 flow_flush = false ;
185187 return FLOW_FLUSH;
186188 }
187189 return 0 ;
188190}
189191
190- void MQTTPlugin::pre_export (Flow &rec){
191- if constexpr (debug_mqtt){
192- RecordExtMQTT* ext = static_cast <RecordExtMQTT*>(rec.get_extension (RecordExtMQTT::REGISTERED_ID));
193- if (ext == nullptr )
194- return ;
195- /* std::cout << "MQTT Exports: type cumulative = " << std::to_string(ext->type_cumulative)
196- << ", version = " << std::to_string(ext->version) << ", con_flags = "
197- << std::to_string(ext->connection_flags) << ",keep_alive = "
198- << std::to_string(ext->keep_alive)<< ",\nsession_present= "
199- << std::to_string(ext->session_present_flag) << ", last return code= "
200- << std::to_string(ext->last_connect_return_code) << ", publish flags= "
201- << std::to_string(ext->publish_flags) << ",\npublished topics: ";
202- for(auto it = ext->topics.begin(); it != ext->topics.end(); it++)
203- std::cout << (it == ext->topics.begin() ? "" : ", ") << *it;
204- std::cout << std::endl;*/
205- }
206- return ;
207- }
208-
209192/* *
210193 * \brief Parse buffer to check if it contains MQTT packets.
211194 * \param [in] data Pointer to IP payload.
212195 * \param [in] payload_len IP payload length.
213196 * \return True if buffer starts with MQTT label as part of connection mqtt packet.
214197 */
215- bool MQTTPlugin::has_mqtt_protocol_name (const char * data, int payload_len) const noexcept {
198+ bool MQTTPlugin::has_mqtt_protocol_name (const char * data, int payload_len) const noexcept
199+ {
216200 if (payload_len <= 1 )
217201 return false ;
218202 auto pos = 1u ;
219- if (auto [_, success] = read_variable_int (data,payload_len, pos); !success)
203+ if (auto [_, success] = read_variable_int (data, payload_len, pos); !success)
220204 return false ;
221- auto [string_length, str, success] = read_utf8_string (data,payload_len, pos);
205+ auto [string_length, str, success] = read_utf8_string (data, payload_len, pos);
222206 return success && str == " MQTT" ;
223207}
224208
@@ -228,16 +212,17 @@ void MQTTPlugin::add_ext_mqtt(const char* data, int payload_len, Flow& flow)
228212 recPrealloc = new RecordExtMQTT ();
229213 }
230214 if (!parse_mqtt (data, payload_len, recPrealloc))
231- return ;
215+ return ;
232216 flow.add_extension (recPrealloc);
233217 recPrealloc = nullptr ;
234218}
235219
236- void MQTTPlugin::init (const char *params){
220+ void MQTTPlugin::init (const char * params)
221+ {
237222 MQTTOptionsParser parser;
238223 try {
239224 parser.parse (params);
240- } catch (ParserError & e) {
225+ } catch (ParserError& e) {
241226 throw PluginError (e.what ());
242227 }
243228 maximal_topic_count = parser.m_maximal_topic_count ;
0 commit comments