Skip to content

Commit 7740428

Browse files
committed
MINIFICPP-2600 Change RecordSetReader interface
1 parent da07086 commit 7740428

File tree

6 files changed

+22
-21
lines changed

6 files changed

+22
-21
lines changed

extensions/python/types/PyRecordSetReader.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,12 @@ PyObject* PyRecordSetReader::read(PyRecordSetReader* self, PyObject* args) {
8686
return nullptr;
8787
}
8888

89-
auto read_result = record_set_reader->read(flow_file, process_session->getSession());
89+
nonstd::expected<core::RecordSet, std::error_code> read_result;
90+
process_session->getSession().read(flow_file, [&record_set_reader, &read_result](const std::shared_ptr<io::InputStream>& input_stream) {
91+
read_result = record_set_reader->read(*input_stream);
92+
return gsl::narrow<int64_t>(input_stream->size());
93+
});
94+
9095
if (!read_result) {
9196
std::string error_message = "failed to read record set with the following error: " + read_result.error().message();
9297
PyErr_SetString(PyExc_RuntimeError, error_message.c_str());

extensions/standard-processors/controllers/JsonTreeReader.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -113,12 +113,12 @@ bool readAsArray(const std::string& content, core::RecordSet& record_set) {
113113
return true;
114114
}
115115

116-
nonstd::expected<core::RecordSet, std::error_code> JsonTreeReader::read(const std::shared_ptr<core::FlowFile>& flow_file, core::ProcessSession& session) {
116+
nonstd::expected<core::RecordSet, std::error_code> JsonTreeReader::read(io::InputStream& input_stream) {
117117
core::RecordSet record_set{};
118-
const auto read_result = session.read(flow_file, [&record_set](const std::shared_ptr<io::InputStream>& input_stream) -> int64_t {
118+
const auto read_result = [&record_set](io::InputStream& input_stream) -> int64_t {
119119
std::string content;
120-
content.resize(input_stream->size());
121-
const auto read_ret = gsl::narrow<int64_t>(input_stream->read(as_writable_bytes(std::span(content))));
120+
content.resize(input_stream.size());
121+
const auto read_ret = gsl::narrow<int64_t>(input_stream.read(as_writable_bytes(std::span(content))));
122122
if (io::isError(read_ret)) {
123123
return -1;
124124
}
@@ -128,7 +128,7 @@ nonstd::expected<core::RecordSet, std::error_code> JsonTreeReader::read(const st
128128
readAsJsonLines(content, record_set);
129129
}
130130
return read_ret;
131-
});
131+
}(input_stream);
132132
if (io::isError(read_result))
133133
return nonstd::make_unexpected(std::make_error_code(std::errc::invalid_argument));
134134
return record_set;

extensions/standard-processors/controllers/JsonTreeReader.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
#pragma once
1818

1919
#include "controllers/RecordSetReader.h"
20-
#include "core/FlowFile.h"
21-
#include "core/ProcessSession.h"
2220

2321
namespace org::apache::nifi::minifi::standard {
2422

@@ -46,7 +44,7 @@ class JsonTreeReader final : public core::RecordSetReaderImpl {
4644
EXTENSIONAPI static constexpr auto ImplementsApis = std::array{ RecordSetReader::ProvidesApi };
4745
ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_CONTROLLER_SERVICES
4846

49-
nonstd::expected<core::RecordSet, std::error_code> read(const std::shared_ptr<core::FlowFile>& flow_file, core::ProcessSession& session) override;
47+
nonstd::expected<core::RecordSet, std::error_code> read(io::InputStream& input_stream) override;
5048

5149
void initialize() override {
5250
setSupportedProperties(Properties);

extensions/standard-processors/processors/SplitRecord.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,11 @@ void SplitRecord::onTrigger(core::ProcessContext& context, core::ProcessSession&
4949
return;
5050
}
5151

52-
auto record_set = record_set_reader_->read(original_flow_file, session);
52+
nonstd::expected<core::RecordSet, std::error_code> record_set;
53+
session.read(original_flow_file, [this, &record_set](const std::shared_ptr<io::InputStream>& input_stream) {
54+
record_set = record_set_reader_->read(*input_stream);
55+
return gsl::narrow<int64_t>(input_stream->size());
56+
});
5357
if (!record_set) {
5458
logger_->log_error("Failed to read record set from flow file: {}", record_set.error().message());
5559
session.transfer(original_flow_file, Failure);

libminifi/test/libtest/unit/RecordSetTesters.h

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -62,15 +62,10 @@ bool testRecordWriter(RecordSetWriter& record_set_writer, const RecordSet& recor
6262
}
6363

6464
inline bool testRecordReader(RecordSetReader& record_set_reader, const std::string_view serialized_record_set, const RecordSet& expected_record_set) {
65-
const RecordSetFixture fixture;
66-
ProcessSession& process_session = fixture.processSession();
67-
68-
const auto flow_file = process_session.create();
69-
process_session.writeBuffer(flow_file, serialized_record_set);
70-
process_session.transfer(flow_file, fixture.getRelationship());
71-
process_session.commit();
65+
io::BufferStream buffer_stream;
66+
buffer_stream.write(reinterpret_cast<const uint8_t*>(serialized_record_set.data()), serialized_record_set.size());
7267

73-
const auto record_set = record_set_reader.read(flow_file, process_session);
68+
const auto record_set = record_set_reader.read(buffer_stream);
7469
if (!record_set)
7570
return false;
7671

minifi-api/include/minifi-cpp/controllers/RecordSetReader.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,10 @@
1919

2020
#include "core/controller/ControllerService.h"
2121

22-
#include "minifi-cpp/core/FlowFile.h"
23-
#include "minifi-cpp/core/ProcessSession.h"
2422
#include "minifi-cpp/core/Record.h"
2523
#include "utils/Enum.h"
2624
#include "utils/ProcessorConfigUtils.h"
25+
#include "minifi-cpp/io/InputStream.h"
2726

2827

2928
namespace org::apache::nifi::minifi::core {
@@ -36,7 +35,7 @@ class RecordSetReader : public virtual controller::ControllerService {
3635
.type = "org.apache.nifi.minifi.core.RecordSetReader",
3736
};
3837

39-
virtual nonstd::expected<RecordSet, std::error_code> read(const std::shared_ptr<FlowFile>& flow_file, ProcessSession& session) = 0;
38+
virtual nonstd::expected<RecordSet, std::error_code> read(io::InputStream& input_stream) = 0;
4039
};
4140

4241
} // namespace org::apache::nifi::minifi::core

0 commit comments

Comments
 (0)