Skip to content

Commit 4965d2f

Browse files
Enable support for MQTT stitcher in stirling (#1918)
Summary: This PR adds the stitcher component of MQTT (v5), a newly added protocol. Related issues: #341 Type of change: /kind feature Test Plan: Added tests --------- Signed-off-by: Chinmay <[email protected]>
1 parent 03184cc commit 4965d2f

File tree

9 files changed

+918
-65
lines changed

9 files changed

+918
-65
lines changed

src/stirling/source_connectors/socket_tracer/protocols/mqtt/BUILD.bazel

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,11 @@ pl_cc_test(
4848
":cc_library",
4949
],
5050
)
51+
52+
pl_cc_test(
53+
name = "stitcher_test",
54+
srcs = ["stitcher_test.cc"],
55+
deps = [
56+
":cc_library",
57+
],
58+
)

src/stirling/source_connectors/socket_tracer/protocols/mqtt/parse.cc

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <iomanip>
2020
#include <map>
2121
#include <string_view>
22+
#include <tuple>
2223

2324
#include "src/common/base/base.h"
2425
#include "src/stirling/source_connectors/socket_tracer/protocols/mqtt/parse.h"
@@ -37,24 +38,6 @@ namespace protocols {
3738

3839
namespace mqtt {
3940

40-
enum class MqttControlPacketType : uint8_t {
41-
CONNECT = 1,
42-
CONNACK = 2,
43-
PUBLISH = 3,
44-
PUBACK = 4,
45-
PUBREC = 5,
46-
PUBREL = 6,
47-
PUBCOMP = 7,
48-
SUBSCRIBE = 8,
49-
SUBACK = 9,
50-
UNSUBSCRIBE = 10,
51-
UNSUBACK = 11,
52-
PINGREQ = 12,
53-
PINGRESP = 13,
54-
DISCONNECT = 14,
55-
AUTH = 15
56-
};
57-
5841
enum class PropertyCode : uint8_t {
5942
PayloadFormatIndicator = 0x01,
6043
MessageExpiryInterval = 0x02,
@@ -653,7 +636,8 @@ ParseState ParsePayload(Message* result, BinaryDecoder* decoder,
653636
}
654637
}
655638

656-
ParseState ParseFrame(message_type_t type, std::string_view* buf, Message* result) {
639+
ParseState ParseFrame(message_type_t type, std::string_view* buf, Message* result,
640+
mqtt::StateWrapper* state) {
657641
CTX_DCHECK(type == message_type_t::kRequest || type == message_type_t::kResponse);
658642
if (buf->size() < 2) {
659643
return ParseState::kNeedsMoreData;
@@ -723,6 +707,19 @@ ParseState ParseFrame(message_type_t type, std::string_view* buf, Message* resul
723707
return ParseState::kInvalid;
724708
}
725709

710+
// Updating the state for PUBLISH based on whether it is duplicate
711+
if (control_packet_type == MqttControlPacketType::PUBLISH) {
712+
if (result->dup) {
713+
if (type == message_type_t::kRequest) {
714+
state->send[std::tuple<uint32_t, uint32_t>(result->header_fields["packet_identifier"],
715+
result->header_fields["qos"])] += 1;
716+
} else {
717+
state->recv[std::tuple<uint32_t, uint32_t>(result->header_fields["packet_identifier"],
718+
result->header_fields["qos"])] += 1;
719+
}
720+
}
721+
}
722+
726723
if (ParsePayload(result, &decoder, control_packet_type) == ParseState::kInvalid) {
727724
return ParseState::kInvalid;
728725
}
@@ -735,8 +732,8 @@ ParseState ParseFrame(message_type_t type, std::string_view* buf, Message* resul
735732

736733
template <>
737734
ParseState ParseFrame(message_type_t type, std::string_view* buf, mqtt::Message* result,
738-
NoState* /*state*/) {
739-
return mqtt::ParseFrame(type, buf, result);
735+
mqtt::StateWrapper* state) {
736+
return mqtt::ParseFrame(type, buf, result, state);
740737
}
741738

742739
template <>
@@ -745,6 +742,11 @@ size_t FindFrameBoundary<mqtt::Message>(message_type_t /*type*/, std::string_vie
745742
return start_pos + buf.length();
746743
}
747744

745+
template <>
746+
mqtt::packet_id_t GetStreamID(mqtt::Message* message) {
747+
return message->header_fields["packet_identifier"];
748+
}
749+
748750
} // namespace protocols
749751
} // namespace stirling
750752
} // namespace px

src/stirling/source_connectors/socket_tracer/protocols/mqtt/parse.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,14 @@ namespace protocols {
3535

3636
template <>
3737
ParseState ParseFrame(message_type_t type, std::string_view* buf, mqtt::Message* frame,
38-
NoState* state);
38+
mqtt::StateWrapper* state);
3939

4040
template <>
4141
size_t FindFrameBoundary<mqtt::Message>(message_type_t type, std::string_view buf, size_t start_pos,
42-
NoState* state);
42+
mqtt::StateWrapper* state);
43+
44+
template <>
45+
mqtt::packet_id_t GetStreamID(mqtt::Message* message);
4346

4447
} // namespace protocols
4548
} // namespace stirling

0 commit comments

Comments
 (0)