Skip to content

Commit 160d84f

Browse files
committed
Review update
1 parent abf19bf commit 160d84f

File tree

4 files changed

+30
-19
lines changed

4 files changed

+30
-19
lines changed

extensions/mqtt/processors/ConsumeMQTT.cpp

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -69,22 +69,22 @@ void ConsumeMQTT::readProperties(core::ProcessContext& context) {
6969
receive_maximum_ = gsl::narrow<uint16_t>(utils::parseU64Property(context, ReceiveMaximum));
7070
}
7171

72-
void ConsumeMQTT::addAttributesAsRecordFields(core::RecordSet& new_records, const std::queue<SmartMessage>& msg_queue) const {
72+
void ConsumeMQTT::addAttributesAsRecordFields(core::RecordSet& new_records, const SmartMessage& message) const {
7373
if (!add_attributes_as_fields_) {
7474
return;
7575
}
7676

77-
for (auto& record: new_records) {
78-
record.emplace("_topic", core::RecordField(msg_queue.front().topic));
79-
auto topic_segments = utils::string::split(msg_queue.front().topic, "/");
77+
for (auto& record : new_records) {
78+
record.emplace("_topic", core::RecordField(message.topic));
79+
auto topic_segments = utils::string::split(message.topic, "/");
8080
core::RecordArray topic_segments_array;
8181
for (size_t i = 0; i < topic_segments.size(); ++i) {
8282
topic_segments_array.emplace_back(core::RecordField(topic_segments[i]));
8383
}
8484
record.emplace("_topicSegments", core::RecordField(std::move(topic_segments_array)));
85-
record.emplace("_qos", core::RecordField(msg_queue.front().contents->qos));
86-
record.emplace("_isDuplicate", core::RecordField(msg_queue.front().contents->dup > 0));
87-
record.emplace("_isRetained", core::RecordField(msg_queue.front().contents->retained > 0));
85+
record.emplace("_qos", core::RecordField(message.contents->qos));
86+
record.emplace("_isDuplicate", core::RecordField(message.contents->dup > 0));
87+
record.emplace("_isRetained", core::RecordField(message.contents->retained > 0));
8888
}
8989
}
9090

@@ -102,7 +102,7 @@ void ConsumeMQTT::transferMessagesAsRecords(core::ProcessSession& session) {
102102
continue;
103103
}
104104
auto& new_records = new_records_result.value();
105-
addAttributesAsRecordFields(new_records, msg_queue);
105+
addAttributesAsRecordFields(new_records, msg_queue.front());
106106
record_set.reserve(record_set.size() + new_records.size());
107107
record_set.insert(record_set.end(), std::make_move_iterator(new_records.begin()), std::make_move_iterator(new_records.end()));
108108
msg_queue.pop();

extensions/mqtt/processors/ConsumeMQTT.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ class ConsumeMQTT : public processors::AbstractMQTTProcessor {
221221
void setProcessorSpecificMqtt5ConnectOptions(MQTTProperties& connect_props) const override;
222222

223223
void transferMessagesAsRecords(core::ProcessSession& session);
224-
void addAttributesAsRecordFields(core::RecordSet& new_records, const std::queue<SmartMessage>& msg_queue) const;
224+
void addAttributesAsRecordFields(core::RecordSet& new_records, const SmartMessage& message) const;
225225
void transferMessagesAsFlowFiles(core::ProcessSession& session);
226226

227227
std::string topic_;

extensions/mqtt/tests/ConsumeMQTTTests.cpp

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,19 +42,29 @@ void verifyXmlJsonResult(const std::string& json_content, size_t expected_record
4242

4343
if (add_attributes_as_fields) {
4444
string_result = current_record["_topic"].GetString();
45-
CHECK(string_result == "mytopic/segment");
45+
CHECK(string_result == "mytopic/segment/" + std::to_string(i));
4646
auto array = current_record["_topicSegments"].GetArray();
47-
CHECK(array.Size() == 2);
47+
CHECK(array.Size() == 3);
4848
string_result = array[0].GetString();
4949
CHECK(string_result == "mytopic");
5050
string_result = array[1].GetString();
5151
CHECK(string_result == "segment");
52+
string_result = array[2].GetString();
53+
CHECK(string_result == std::to_string(i));
5254
int_result = current_record["_qos"].GetInt64();
53-
CHECK(int_result == 1);
55+
CHECK(int_result == i);
5456
bool bool_result = current_record["_isDuplicate"].GetBool();
55-
CHECK_FALSE(bool_result);
57+
if (i == 0) {
58+
CHECK_FALSE(bool_result);
59+
} else {
60+
CHECK(bool_result);
61+
}
5662
bool_result = current_record["_isRetained"].GetBool();
57-
CHECK_FALSE(bool_result);
63+
if (i == 0) {
64+
CHECK_FALSE(bool_result);
65+
} else {
66+
CHECK(bool_result);
67+
}
5868
} else {
5969
CHECK_FALSE(current_record.HasMember("_topic"));
6070
CHECK_FALSE(current_record.HasMember("_qos"));
@@ -268,9 +278,10 @@ TEST_CASE_METHOD(ConsumeMqttTestFixture, "Read XML messages and write them to js
268278
const std::string payload = R"(<root><int_value>42</int_value><string_value>test</string_value></root>)";
269279
for (size_t i = 0; i < expected_record_count; ++i) {
270280
TestConsumeMQTTProcessor::SmartMessage message{std::unique_ptr<MQTTAsync_message, TestConsumeMQTTProcessor::MQTTMessageDeleter>(
271-
new MQTTAsync_message{.struct_id = {'M', 'Q', 'T', 'M'}, .struct_version = 1, .payloadlen = gsl::narrow<int>(payload.size()),
272-
.payload = const_cast<char*>(payload.data()), .qos = 1, .retained = 0, .dup = 0, .msgid = 42, .properties = {}}),
273-
std::string{"mytopic/segment"}};
281+
new MQTTAsync_message{.struct_id = {'M', 'Q', 'T', 'M'}, .struct_version = gsl::narrow<int>(i), .payloadlen = gsl::narrow<int>(payload.size()),
282+
.payload = const_cast<char*>(payload.data()), .qos = gsl::narrow<int>(i), .retained = gsl::narrow<int>(i), .dup = gsl::narrow<int>(i),
283+
.msgid = gsl::narrow<int>(i + 1), .properties = {}}),
284+
std::string{"mytopic/segment/" + std::to_string(i)}};
274285
consume_mqtt_processor_->enqueueReceivedMQTTMsg(std::move(message));
275286
}
276287
const auto trigger_results = test_controller_.trigger();
@@ -359,7 +370,7 @@ TEST_CASE_METHOD(ConsumeMqttTestFixture, "Read MQTT message and write it to a fl
359370
}
360371
}
361372

362-
TEST_CASE_METHOD(ConsumeMqttTestFixture, "Test scheduling failure if non-existant recordset reader or writer is set", "[consumeMQTTTest]") {
373+
TEST_CASE_METHOD(ConsumeMqttTestFixture, "Test scheduling failure if non-existent recordset reader or writer is set", "[consumeMQTTTest]") {
363374
test_controller_.plan->addController("XMLReader", "XMLReader");
364375
test_controller_.plan->addController("JsonRecordSetWriter", "JsonRecordSetWriter");
365376
REQUIRE(consume_mqtt_processor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));

extensions/mqtt/tests/PublishMQTTTests.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ TEST_CASE_METHOD(PublishMQTTTestFixture, "Test sending XML message records", "[p
148148
CHECK(string_content == R"(<?xml version="1.0"?><root><record><element2>42</element2></record></root>)");
149149
}
150150

151-
TEST_CASE_METHOD(PublishMQTTTestFixture, "Test scheduling failure if non-existant recordset reader or writer is set", "[publishMQTTTest]") {
151+
TEST_CASE_METHOD(PublishMQTTTestFixture, "Test scheduling failure if non-existent recordset reader or writer is set", "[publishMQTTTest]") {
152152
test_controller_.plan->addController("XMLReader", "XMLReader");
153153
test_controller_.plan->addController("JsonRecordSetWriter", "JsonRecordSetWriter");
154154
REQUIRE(publish_mqtt_processor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));

0 commit comments

Comments
 (0)