|
22 | 22 | #include "utils/ProcessorConfigUtils.h" |
23 | 23 |
|
24 | 24 | namespace org::apache::nifi::minifi::processors { |
25 | | -namespace { |
26 | | -template<typename RecordSetIO> |
27 | | -std::shared_ptr<RecordSetIO> getRecordSetIO(core::ProcessContext& context, const core::PropertyReference& property, const utils::Identifier& processor_uuid) { |
28 | | - std::string service_name = context.getProperty(property).value_or(""); |
29 | | - if (!IsNullOrEmpty(service_name)) { |
30 | | - auto record_set_io = std::dynamic_pointer_cast<RecordSetIO>(context.getControllerService(service_name, processor_uuid)); |
31 | | - if (!record_set_io) |
32 | | - return nullptr; |
33 | | - return record_set_io; |
34 | | - } |
35 | | - return nullptr; |
36 | | -} |
37 | | -} // namespace |
38 | 25 |
|
39 | 26 | void ConvertRecord::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { |
40 | | - record_set_reader_ = getRecordSetIO<core::RecordSetReader>(context, RecordReader, getUUID()); |
41 | | - if (!record_set_reader_) { |
42 | | - throw Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Record Reader property is missing or invalid"); |
43 | | - } |
44 | | - record_set_writer_ = getRecordSetIO<core::RecordSetWriter>(context, RecordWriter, getUUID()); |
45 | | - if (!record_set_writer_) { |
46 | | - throw Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Record Writer property is missing or invalid"); |
47 | | - } |
| 27 | + record_set_reader_ = utils::parseControllerService<core::RecordSetReader>(context, RecordReader, getUUID()); |
| 28 | + record_set_writer_ = utils::parseControllerService<core::RecordSetWriter>(context, RecordWriter, getUUID()); |
48 | 29 | include_zero_record_flow_files_ = utils::parseBoolProperty(context, IncludeZeroRecordFlowFiles); |
49 | 30 | } |
50 | 31 |
|
51 | 32 | void ConvertRecord::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { |
| 33 | + gsl_Expects(record_set_reader_ && record_set_writer_); |
52 | 34 | const auto flow_file = session.get(); |
53 | 35 | if (!flow_file) { |
54 | 36 | context.yield(); |
|
0 commit comments