Skip to content

Commit bf24a9e

Browse files
committed
Add RercordConverter struct
1 parent 034d4d1 commit bf24a9e

File tree

11 files changed

+91
-66
lines changed

11 files changed

+91
-66
lines changed

extensions/mqtt/processors/AbstractMQTTProcessor.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,20 @@ void AbstractMQTTProcessor::onSchedule(core::ProcessContext& context, core::Proc
109109
readProperties(context);
110110
checkProperties(context);
111111
initializeClient();
112+
113+
auto record_set_reader = utils::parseOptionalControllerService<core::RecordSetReader>(context, RecordReader, getUUID());
114+
auto record_set_writer = utils::parseOptionalControllerService<core::RecordSetWriter>(context, RecordWriter, getUUID());
115+
116+
if ((record_set_reader == nullptr) != (record_set_writer == nullptr)) {
117+
throw Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "ConsumeMQTT requires both or neither Record Reader and Record Writer to be set");
118+
}
119+
120+
if (record_set_reader) {
121+
record_converter_ = core::RecordConverter{
122+
.record_set_reader = gsl::make_not_null(std::move(record_set_reader)),
123+
.record_set_writer = gsl::make_not_null(std::move(record_set_writer)),
124+
};
125+
}
112126
}
113127

114128
void AbstractMQTTProcessor::initializeClient() {

extensions/mqtt/processors/AbstractMQTTProcessor.h

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include <vector>
2424
#include <shared_mutex>
2525
#include <future>
26+
#include <optional>
2627

2728
#include "minifi-cpp/core/PropertyDefinition.h"
2829
#include "core/ProcessorImpl.h"
@@ -32,8 +33,7 @@
3233
#include "core/logging/LoggerFactory.h"
3334
#include "utils/Enum.h"
3435
#include "MQTTAsync.h"
35-
#include "controllers/RecordSetReader.h"
36-
#include "controllers/RecordSetWriter.h"
36+
#include "minifi-cpp/controllers/RecordConverter.h"
3737

3838
namespace org::apache::nifi::minifi::processors::mqtt {
3939
enum class MqttVersions {
@@ -168,6 +168,14 @@ class AbstractMQTTProcessor : public core::ProcessorImpl {
168168
.withDescription("Private key passphrase")
169169
.isSensitive(true)
170170
.build();
171+
EXTENSIONAPI static constexpr auto RecordReader = core::PropertyDefinitionBuilder<>::createProperty("Record Reader")
172+
.withDescription("The Record Reader to use for parsing received MQTT Messages into Records.")
173+
.withAllowedTypes<minifi::core::RecordSetReader>()
174+
.build();
175+
EXTENSIONAPI static constexpr auto RecordWriter = core::PropertyDefinitionBuilder<>::createProperty("Record Writer")
176+
.withDescription("The Record Writer to use for serializing Records before writing them to a FlowFile.")
177+
.withAllowedTypes<minifi::core::RecordSetWriter>()
178+
.build();
171179
EXTENSIONAPI static constexpr auto BasicProperties = std::to_array<core::PropertyReference>({
172180
BrokerURI,
173181
ClientID,
@@ -188,7 +196,9 @@ class AbstractMQTTProcessor : public core::ProcessorImpl {
188196
SecurityCA,
189197
SecurityCert,
190198
SecurityPrivateKey,
191-
SecurityPrivateKeyPassword
199+
SecurityPrivateKeyPassword,
200+
RecordReader,
201+
RecordWriter
192202
});
193203

194204
void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& factory) override;
@@ -258,8 +268,7 @@ class AbstractMQTTProcessor : public core::ProcessorImpl {
258268
std::optional<std::chrono::seconds> maximum_session_expiry_interval_;
259269
std::optional<std::chrono::seconds> server_keep_alive_;
260270

261-
std::shared_ptr<core::RecordSetReader> record_set_reader_;
262-
std::shared_ptr<core::RecordSetWriter> record_set_writer_;
271+
std::optional<core::RecordConverter> record_converter_;
263272

264273
private:
265274
using ConnectFinishedTask = std::packaged_task<void(MQTTAsync_successData*, MQTTAsync_successData5*, MQTTAsync_failureData*, MQTTAsync_failureData5*)>;

extensions/mqtt/processors/ConsumeMQTT.cpp

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,7 @@ void ConsumeMQTT::initialize() {
3939

4040
void ConsumeMQTT::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& factory) {
4141
AbstractMQTTProcessor::onSchedule(context, factory);
42-
record_set_reader_ = utils::parseOptionalControllerService<core::RecordSetReader>(context, RecordReader, getUUID());
43-
record_set_writer_ = utils::parseOptionalControllerService<core::RecordSetWriter>(context, RecordWriter, getUUID());
4442

45-
if ((record_set_reader_ == nullptr) != (record_set_writer_ == nullptr)) {
46-
throw Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "ConsumeMQTT requires both or neither Record Reader and Record Writer to be set");
47-
}
4843
add_attributes_as_fields_ = utils::parseBoolProperty(context, AddAttributesAsFields);
4944
}
5045

@@ -89,13 +84,13 @@ void ConsumeMQTT::addAttributesAsRecordFields(core::RecordSet& new_records, cons
8984
}
9085

9186
void ConsumeMQTT::transferMessagesAsRecords(core::ProcessSession& session) {
92-
gsl_Expects(record_set_reader_ && record_set_writer_);
87+
gsl_Expects(record_converter_);
9388
auto msg_queue = getReceivedMqttMessages();
9489
core::RecordSet record_set;
9590
while (!msg_queue.empty()) {
9691
io::BufferStream buffer_stream;
9792
buffer_stream.write(reinterpret_cast<const uint8_t*>(msg_queue.front().contents->payload), gsl::narrow<size_t>(msg_queue.front().contents->payloadlen));
98-
auto new_records_result = record_set_reader_->read(buffer_stream);
93+
auto new_records_result = record_converter_->record_set_reader->read(buffer_stream);
9994
if (!new_records_result) {
10095
logger_->log_error("Failed to read records from MQTT message: {}", new_records_result.error());
10196
msg_queue.pop();
@@ -112,7 +107,7 @@ void ConsumeMQTT::transferMessagesAsRecords(core::ProcessSession& session) {
112107
return;
113108
}
114109
std::shared_ptr<core::FlowFile> flow_file = session.create();
115-
record_set_writer_->write(record_set, flow_file, session);
110+
record_converter_->record_set_writer->write(record_set, flow_file, session);
116111
session.putAttribute(*flow_file, RecordCountOutputAttribute.name, std::to_string(record_set.size()));
117112
session.putAttribute(*flow_file, BrokerOutputAttribute.name, uri_);
118113
session.transfer(flow_file, Success);
@@ -152,7 +147,7 @@ void ConsumeMQTT::transferMessagesAsFlowFiles(core::ProcessSession& session) {
152147
}
153148

154149
void ConsumeMQTT::onTriggerImpl(core::ProcessContext&, core::ProcessSession& session) {
155-
if (record_set_reader_) {
150+
if (record_converter_) {
156151
transferMessagesAsRecords(session);
157152
} else {
158153
transferMessagesAsFlowFiles(session);

extensions/mqtt/processors/ConsumeMQTT.h

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -82,14 +82,6 @@ class ConsumeMQTT : public processors::AbstractMQTTProcessor {
8282
.withValidator(core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR)
8383
.withDefaultValue(MQTT_MAX_RECEIVE_MAXIMUM_STR)
8484
.build();
85-
EXTENSIONAPI static constexpr auto RecordReader = core::PropertyDefinitionBuilder<>::createProperty("Record Reader")
86-
.withDescription("The Record Reader to use for parsing received MQTT Messages into Records.")
87-
.withAllowedTypes<minifi::core::RecordSetReader>()
88-
.build();
89-
EXTENSIONAPI static constexpr auto RecordWriter = core::PropertyDefinitionBuilder<>::createProperty("Record Writer")
90-
.withDescription("The Record Writer to use for serializing Records before writing them to a FlowFile.")
91-
.withAllowedTypes<minifi::core::RecordSetWriter>()
92-
.build();
9385
EXTENSIONAPI static constexpr auto AddAttributesAsFields = core::PropertyDefinitionBuilder<>::createProperty("Add Attributes As Fields")
9486
.withDescription("If setting this property to true, default fields are going to be added in each record: _topic, _qos, _isDuplicate, _isRetained.")
9587
.withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR)
@@ -104,8 +96,6 @@ class ConsumeMQTT : public processors::AbstractMQTTProcessor {
10496
AttributeFromContentType,
10597
TopicAliasMaximum,
10698
ReceiveMaximum,
107-
RecordReader,
108-
RecordWriter,
10999
AddAttributesAsFields
110100
}), AbstractMQTTProcessor::AdvancedProperties);
111101

extensions/mqtt/processors/PublishMQTT.cpp

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -44,16 +44,6 @@ void PublishMQTT::initialize() {
4444
setSupportedRelationships(Relationships);
4545
}
4646

47-
void PublishMQTT::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& factory) {
48-
AbstractMQTTProcessor::onSchedule(context, factory);
49-
record_set_reader_ = utils::parseOptionalControllerService<core::RecordSetReader>(context, RecordReader, getUUID());
50-
record_set_writer_ = utils::parseOptionalControllerService<core::RecordSetWriter>(context, RecordWriter, getUUID());
51-
52-
if ((record_set_reader_ == nullptr) != (record_set_writer_ == nullptr)) {
53-
throw Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "PublishMQTT requires both or neither Record Reader and Record Writer to be set");
54-
}
55-
}
56-
5747
void PublishMQTT::readProperties(core::ProcessContext& context) {
5848
if (!context.getProperty(Topic).has_value()) {
5949
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "PublishMQTT: Topic is required");
@@ -74,10 +64,10 @@ void PublishMQTT::onTriggerImpl(core::ProcessContext& context, core::ProcessSess
7464
}
7565

7666
std::vector<std::shared_ptr<core::FlowFile>> flow_files;
77-
if (record_set_reader_) {
67+
if (record_converter_) {
7868
nonstd::expected<core::RecordSet, std::error_code> record_set;
7969
session.read(original_flow_file, [this, &record_set](const std::shared_ptr<io::InputStream>& input_stream) {
80-
record_set = record_set_reader_->read(*input_stream);
70+
record_set = record_converter_->record_set_reader->read(*input_stream);
8171
return gsl::narrow<int64_t>(input_stream->size());
8272
});
8373

@@ -95,7 +85,7 @@ void PublishMQTT::onTriggerImpl(core::ProcessContext& context, core::ProcessSess
9585
}
9686
std::vector<core::Record> records;
9787
records.emplace_back(std::move(record));
98-
record_set_writer_->write(records, new_flow_file, session);
88+
record_converter_->record_set_writer->write(records, new_flow_file, session);
9989
flow_files.push_back(std::move(new_flow_file));
10090
}
10191

extensions/mqtt/processors/PublishMQTT.h

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -64,21 +64,11 @@ class PublishMQTT : public processors::AbstractMQTTProcessor {
6464
.withDescription("Content type of the message. MQTT 5.x only.")
6565
.supportsExpressionLanguage(true)
6666
.build();
67-
EXTENSIONAPI static constexpr auto RecordReader = core::PropertyDefinitionBuilder<>::createProperty("Record Reader")
68-
.withDescription("The Record Reader to use for parsing the incoming FlowFile into Records.")
69-
.withAllowedTypes<minifi::core::RecordSetReader>()
70-
.build();
71-
EXTENSIONAPI static constexpr auto RecordWriter = core::PropertyDefinitionBuilder<>::createProperty("Record Writer")
72-
.withDescription("The Record Writer to use for serializing Records before publishing them as an MQTT Message.")
73-
.withAllowedTypes<minifi::core::RecordSetWriter>()
74-
.build();
7567
EXTENSIONAPI static constexpr auto Properties = utils::array_cat(AbstractMQTTProcessor::BasicProperties, std::to_array<core::PropertyReference>({
7668
Topic,
7769
Retain,
7870
MessageExpiryInterval,
79-
ContentType,
80-
RecordReader,
81-
RecordWriter
71+
ContentType
8272
}), AbstractMQTTProcessor::AdvancedProperties);
8373

8474
EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "FlowFiles that are sent successfully to the destination are transferred to this relationship"};
@@ -92,7 +82,6 @@ class PublishMQTT : public processors::AbstractMQTTProcessor {
9282

9383
ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
9484

95-
void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& factory) override;
9685
void readProperties(core::ProcessContext& context) override;
9786
void onTriggerImpl(core::ProcessContext& context, core::ProcessSession& session) override;
9887
void initialize() override;

extensions/standard-processors/processors/ConvertRecord.cpp

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,20 @@
2020
#include "nonstd/expected.hpp"
2121
#include "utils/GeneralUtils.h"
2222
#include "utils/ProcessorConfigUtils.h"
23+
#include "minifi-cpp/utils/gsl.h"
2324

2425
namespace org::apache::nifi::minifi::processors {
2526

2627
void ConvertRecord::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) {
27-
record_set_reader_ = utils::parseControllerService<core::RecordSetReader>(context, RecordReader, getUUID());
28-
record_set_writer_ = utils::parseControllerService<core::RecordSetWriter>(context, RecordWriter, getUUID());
28+
record_converter_ = core::RecordConverter{
29+
.record_set_reader = utils::parseControllerService<core::RecordSetReader>(context, RecordReader, getUUID()),
30+
.record_set_writer = utils::parseControllerService<core::RecordSetWriter>(context, RecordWriter, getUUID())
31+
};
2932
include_zero_record_flow_files_ = utils::parseBoolProperty(context, IncludeZeroRecordFlowFiles);
3033
}
3134

3235
void ConvertRecord::onTrigger(core::ProcessContext& context, core::ProcessSession& session) {
33-
gsl_Expects(record_set_reader_ && record_set_writer_);
36+
gsl_Expects(record_converter_);
3437
const auto flow_file = session.get();
3538
if (!flow_file) {
3639
context.yield();
@@ -39,7 +42,7 @@ void ConvertRecord::onTrigger(core::ProcessContext& context, core::ProcessSessio
3942

4043
nonstd::expected<core::RecordSet, std::error_code> record_set;
4144
session.read(flow_file, [this, &record_set](const std::shared_ptr<io::InputStream>& input_stream) {
42-
record_set = record_set_reader_->read(*input_stream);
45+
record_set = record_converter_->record_set_reader->read(*input_stream);
4346
return gsl::narrow<int64_t>(input_stream->size());
4447
});
4548
if (!record_set) {
@@ -55,7 +58,7 @@ void ConvertRecord::onTrigger(core::ProcessContext& context, core::ProcessSessio
5558
return;
5659
}
5760

58-
record_set_writer_->write(*record_set, flow_file, session);
61+
record_converter_->record_set_writer->write(*record_set, flow_file, session);
5962
flow_file->setAttribute(processors::ConvertRecord::RecordCountOutputAttribute.name, std::to_string(record_set->size()));
6063
session.transfer(flow_file, Success);
6164
}

extensions/standard-processors/processors/ConvertRecord.h

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@
1919
#include <memory>
2020
#include <string_view>
2121
#include <utility>
22+
#include <optional>
2223

2324
#include "core/AbstractProcessor.h"
2425
#include "core/ProcessSession.h"
2526
#include "core/PropertyDefinitionBuilder.h"
2627
#include "minifi-cpp/core/RelationshipDefinition.h"
27-
#include "controllers/RecordSetReader.h"
28-
#include "controllers/RecordSetWriter.h"
28+
#include "minifi-cpp/controllers/RecordConverter.h"
2929

3030
namespace org::apache::nifi::minifi::processors {
3131

@@ -77,8 +77,7 @@ class ConvertRecord : public core::AbstractProcessor<ConvertRecord> {
7777
void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override;
7878

7979
private:
80-
std::shared_ptr<core::RecordSetReader> record_set_reader_;
81-
std::shared_ptr<core::RecordSetWriter> record_set_writer_;
80+
std::optional<core::RecordConverter> record_converter_;
8281
bool include_zero_record_flow_files_ = true;
8382
};
8483

extensions/standard-processors/processors/SplitRecord.cpp

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,15 @@
2020
#include "nonstd/expected.hpp"
2121
#include "utils/GeneralUtils.h"
2222
#include "utils/ProcessorConfigUtils.h"
23+
#include "minifi-cpp/utils/gsl.h"
2324

2425
namespace org::apache::nifi::minifi::processors {
2526

2627
void SplitRecord::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) {
27-
record_set_reader_ = utils::parseControllerService<core::RecordSetReader>(context, RecordReader, getUUID());
28-
record_set_writer_ = utils::parseControllerService<core::RecordSetWriter>(context, RecordWriter, getUUID());
28+
record_converter_ = core::RecordConverter{
29+
.record_set_reader = utils::parseControllerService<core::RecordSetReader>(context, RecordReader, getUUID()),
30+
.record_set_writer = utils::parseControllerService<core::RecordSetWriter>(context, RecordWriter, getUUID())
31+
};
2932
}
3033

3134
nonstd::expected<std::size_t, std::string> SplitRecord::readRecordsPerSplit(core::ProcessContext& context, const core::FlowFile& original_flow_file) {
@@ -37,6 +40,7 @@ nonstd::expected<std::size_t, std::string> SplitRecord::readRecordsPerSplit(core
3740
}
3841

3942
void SplitRecord::onTrigger(core::ProcessContext& context, core::ProcessSession& session) {
43+
gsl_Expects(record_converter_);
4044
const auto original_flow_file = session.get();
4145
if (!original_flow_file) {
4246
context.yield();
@@ -52,7 +56,7 @@ void SplitRecord::onTrigger(core::ProcessContext& context, core::ProcessSession&
5256

5357
nonstd::expected<core::RecordSet, std::error_code> record_set;
5458
session.read(original_flow_file, [this, &record_set](const std::shared_ptr<io::InputStream>& input_stream) {
55-
record_set = record_set_reader_->read(*input_stream);
59+
record_set = record_converter_->record_set_reader->read(*input_stream);
5660
return gsl::narrow<int64_t>(input_stream->size());
5761
});
5862
if (!record_set) {
@@ -85,7 +89,7 @@ void SplitRecord::onTrigger(core::ProcessContext& context, core::ProcessSession&
8589
split_flow_file->setAttribute("fragment.count", std::to_string(fragment_count));
8690
split_flow_file->setAttribute("segment.original.filename", original_flow_file->getAttribute("filename").value_or(""));
8791

88-
record_set_writer_->write(slice_record_set, split_flow_file, session);
92+
record_converter_->record_set_writer->write(slice_record_set, split_flow_file, session);
8993
session.transfer(split_flow_file, Splits);
9094
++fragment_index;
9195
}

extensions/standard-processors/processors/SplitRecord.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*/
1717
#pragma once
1818

19+
#include <optional>
20+
1921
#include "minifi-cpp/core/Annotation.h"
2022
#include "minifi-cpp/core/ProcessContext.h"
2123
#include "core/ProcessSession.h"
@@ -24,8 +26,7 @@
2426
#include "core/PropertyDefinitionBuilder.h"
2527
#include "minifi-cpp/core/RelationshipDefinition.h"
2628
#include "minifi-cpp/core/logging/Logger.h"
27-
#include "minifi-cpp/controllers/RecordSetReader.h"
28-
#include "minifi-cpp/controllers/RecordSetWriter.h"
29+
#include "minifi-cpp/controllers/RecordConverter.h"
2930
#include "core/AbstractProcessor.h"
3031

3132
namespace org::apache::nifi::minifi::processors {
@@ -87,8 +88,7 @@ class SplitRecord final : public core::AbstractProcessor<SplitRecord> {
8788
private:
8889
static nonstd::expected<std::size_t, std::string> readRecordsPerSplit(core::ProcessContext& context, const core::FlowFile& original_flow_file);
8990

90-
std::shared_ptr<core::RecordSetReader> record_set_reader_;
91-
std::shared_ptr<core::RecordSetWriter> record_set_writer_;
91+
std::optional<core::RecordConverter> record_converter_;
9292
};
9393

9494
} // namespace org::apache::nifi::minifi::processors

0 commit comments

Comments
 (0)