Skip to content

Commit c151058

Browse files
committed
Check if record reader and writer are valid controller services
1 parent 4f234e2 commit c151058

File tree

2 files changed

+21
-1
lines changed

2 files changed

+21
-1
lines changed

extensions/mqtt/processors/ConsumeMQTT.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@ std::shared_ptr<RecordSetIO> getRecordSetIO(core::ProcessContext& context, const
3838
std::string service_name = context.getProperty(property).value_or("");
3939
if (!IsNullOrEmpty(service_name)) {
4040
auto record_set_io = std::dynamic_pointer_cast<RecordSetIO>(context.getControllerService(service_name, processor_uuid));
41-
if (!record_set_io) { return nullptr; }
41+
if (!record_set_io) {
42+
throw Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, fmt::format("'{}' property is set to invalid controller service '{}'", property.name, service_name));
43+
}
4244
return record_set_io;
4345
}
4446
return nullptr;

extensions/mqtt/tests/ConsumeMQTTTests.cpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,4 +383,22 @@ TEST_CASE_METHOD(ConsumeMqttTestFixture, "Read MQTT message and write it to a fl
383383
}
384384
}
385385

386+
TEST_CASE_METHOD(ConsumeMqttTestFixture, "Test scheduling failure if non-existant recordset reader or writer is set", "[consumeMQTTTest]") {
387+
test_controller_.plan->addController("SparkplugBReader", "SparkplugBReader");
388+
test_controller_.plan->addController("JsonRecordSetWriter", "JsonRecordSetWriter");
389+
REQUIRE(consume_mqtt_processor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
390+
REQUIRE(consume_mqtt_processor_->setProperty(minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
391+
SECTION("RecordReader is set to invalid controller service") {
392+
REQUIRE(consume_mqtt_processor_->setProperty(minifi::processors::ConsumeMQTT::RecordReader.name, "invalid_reader"));
393+
REQUIRE(consume_mqtt_processor_->setProperty(minifi::processors::ConsumeMQTT::RecordWriter.name, "JsonRecordSetWriter"));
394+
REQUIRE_THROWS_WITH(test_controller_.trigger(), Catch::Matchers::EndsWith("'Record Reader' property is set to invalid controller service 'invalid_reader'"));
395+
}
396+
397+
SECTION("RecordWriter is set to invalid controller service") {
398+
REQUIRE(consume_mqtt_processor_->setProperty(minifi::processors::ConsumeMQTT::RecordReader.name, "SparkplugBReader"));
399+
REQUIRE(consume_mqtt_processor_->setProperty(minifi::processors::ConsumeMQTT::RecordWriter.name, "invalid_writer"));
400+
REQUIRE_THROWS_WITH(test_controller_.trigger(), Catch::Matchers::EndsWith("'Record Writer' property is set to invalid controller service 'invalid_writer'"));
401+
}
402+
}
403+
386404
} // namespace org::apache::nifi::minifi::test

0 commit comments

Comments
 (0)