Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core-framework/include/core/BufferedContentSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class BufferedContentSession : public ContentSessionImpl {

std::shared_ptr<io::BaseStream> read(const std::shared_ptr<ResourceClaim>& resource_id) override;

void remove(const std::shared_ptr<ResourceClaim>& resource_id) override;

void commit() override;

void rollback() override;
Expand Down
5 changes: 5 additions & 0 deletions core-framework/src/core/BufferedContentSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ void BufferedContentSession::commit() {
append_state_.clear();
}

void BufferedContentSession::remove(const std::shared_ptr<ResourceClaim>& resource_id) {
managed_resources_.erase(resource_id);
append_state_.erase(resource_id);
}

void BufferedContentSession::rollback() {
managed_resources_.clear();
append_state_.clear();
Expand Down
6 changes: 6 additions & 0 deletions extensions/rocksdb-repos/tests/DBContentRepositoryTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,12 @@ TEST_CASE("ProcessSession::read can read zero length flowfiles without crash (Ro
ContentRepositoryDependentTests::testReadFromZeroLengthFlowFile(std::make_shared<core::repository::DatabaseContentRepository>());
}

TEST_CASE("ProcessSession::write can be cancelled") {
ContentRepositoryDependentTests::testOkWrite(std::make_shared<core::repository::DatabaseContentRepository>());
ContentRepositoryDependentTests::testErrWrite(std::make_shared<core::repository::DatabaseContentRepository>());
ContentRepositoryDependentTests::testCancelWrite(std::make_shared<core::repository::DatabaseContentRepository>());
}

size_t getDbSize(const std::filesystem::path& dir) {
auto db = minifi::internal::RocksDatabase::create({}, {}, dir.string(), {});
auto opendb = db->open();
Expand Down
2 changes: 2 additions & 0 deletions libminifi/include/core/ForwardingContentSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class ForwardingContentSession : public ContentSessionImpl {

std::shared_ptr<io::BaseStream> read(const std::shared_ptr<ResourceClaim>& resource_id) override;

void remove(const std::shared_ptr<ResourceClaim>& resource_id) override;

void commit() override;

void rollback() override;
Expand Down
6 changes: 6 additions & 0 deletions libminifi/src/core/ForwardingContentSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ std::shared_ptr<io::BaseStream> ForwardingContentSession::append(const std::shar
return repository_->write(*resource_id, true);
}

void ForwardingContentSession::remove(const std::shared_ptr<ResourceClaim>& resource_id) {
created_claims_.erase(resource_id);
append_state_.erase(resource_id);
repository_->remove(*resource_id);
}

void ForwardingContentSession::commit() {
created_claims_.clear();
append_state_.clear();
Expand Down
15 changes: 12 additions & 3 deletions libminifi/src/core/ProcessSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@
#include <vector>

#include "core/ProcessSessionReadCallback.h"
#include "io/StreamSlice.h"
#include "core/Processor.h"
#include "io/StreamPipe.h"
#include "io/StreamSlice.h"
#include "minifi-c/minifi-c.h"
#include "minifi-cpp/utils/gsl.h"
#include "core/Processor.h"

/* This implementation is only for native Windows systems. */
#if (defined _WIN32 || defined __WIN32__) && !defined __CYGWIN__
Expand Down Expand Up @@ -256,7 +257,15 @@ void ProcessSessionImpl::write(core::FlowFile &flow, const io::OutputStreamCallb
if (nullptr == stream) {
throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for write");
}
if (callback(stream) < 0) {
const auto callback_result = callback(stream);
if (callback_result == MinifiIoStatus::MINIFI_IO_CANCEL) {
stream->close();
content_session_->remove(claim);
claim.reset();
return;
}

if (callback_result < 0) {
throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content");
}

Expand Down
105 changes: 85 additions & 20 deletions libminifi/test/libtest/unit/ContentRepositoryDependentTests.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@
* limitations under the License.
*/

#pragma once

#include <array>
#include <memory>
#include <string>
#include <vector>

#include "catch2/catch_test_macros.hpp"
#include "core/Processor.h"
#include "core/ProcessSession.h"
#include "core/Resource.h"
#include "unit/TestBase.h"
#include "unit/Catch.h"
#include "unit/DummyProcessor.h"
#include "core/Processor.h"
#include "io/StreamPipe.h"
#include "minifi-c/minifi-c.h"
#include "unit/DummyProcessor.h"
#include "unit/TestBase.h"

#pragma once

Expand All @@ -42,11 +42,11 @@ struct ReadUntilItCan {
std::array<std::byte, 1024> buffer{};
size_t bytes_read = 0;
while (true) {
size_t read_result = stream->read(buffer);
const size_t read_result = stream->read(buffer);
if (minifi::io::isError(read_result))
return -1;
if (read_result == 0)
return bytes_read;
return gsl::narrow<int64_t>(bytes_read);
bytes_read += read_result;
const auto char_view = gsl::make_span(buffer).subspan(0, read_result).as_span<const char>();
value_.append(std::begin(char_view), std::end(char_view));
Expand All @@ -69,18 +69,18 @@ class Fixture {
process_session_ = std::make_unique<core::ProcessSessionImpl>(context_);
}

core::ProcessSession &processSession() { return *process_session_; }
[[nodiscard]] core::ProcessSession& processSession() const { return *process_session_; }

void transferAndCommit(const std::shared_ptr<core::FlowFile>& flow_file) {
void transferAndCommit(const std::shared_ptr<core::FlowFile>& flow_file) const {
process_session_->transfer(flow_file, Success);
process_session_->commit();
}

void writeToFlowFile(const std::shared_ptr<core::FlowFile>& flow_file, const std::string content) {
void writeToFlowFile(const std::shared_ptr<core::FlowFile>& flow_file, const std::string_view content) const {
process_session_->writeBuffer(flow_file, content);
}

void appendToFlowFile(const std::shared_ptr<core::FlowFile>& flow_file, const std::string content_to_append) {
void appendToFlowFile(const std::shared_ptr<core::FlowFile>& flow_file, const std::string_view content_to_append) const {
process_session_->add(flow_file);
process_session_->appendBuffer(flow_file, content_to_append);
}
Expand All @@ -94,8 +94,8 @@ class Fixture {
std::unique_ptr<core::ProcessSession> process_session_;
};

void testReadOnSmallerClonedFlowFiles(std::shared_ptr<core::ContentRepository> content_repo) {
Fixture fixture = Fixture(std::move(content_repo));
inline void testReadOnSmallerClonedFlowFiles(std::shared_ptr<core::ContentRepository> content_repo) {
auto fixture = Fixture(std::move(content_repo));
core::ProcessSession& process_session = fixture.processSession();
const auto original_ff = process_session.create();
fixture.writeToFlowFile(original_ff, "foobar");
Expand Down Expand Up @@ -123,8 +123,8 @@ void testReadOnSmallerClonedFlowFiles(std::shared_ptr<core::ContentRepository> c
CHECK(read_until_it_can_callback.value_ == "bar");
}

void testAppendToUnmanagedFlowFile(std::shared_ptr<core::ContentRepository> content_repo) {
Fixture fixture = Fixture(std::move(content_repo));
inline void testAppendToUnmanagedFlowFile(std::shared_ptr<core::ContentRepository> content_repo) {
auto fixture = Fixture(std::move(content_repo));
core::ProcessSession& process_session = fixture.processSession();
const auto flow_file = process_session.create();
REQUIRE(flow_file);
Expand All @@ -142,8 +142,8 @@ void testAppendToUnmanagedFlowFile(std::shared_ptr<core::ContentRepository> cont
CHECK(read_until_it_can_callback.value_ == "myfoobar");
}

void testAppendToManagedFlowFile(std::shared_ptr<core::ContentRepository> content_repo) {
Fixture fixture = Fixture(std::move(content_repo));
inline void testAppendToManagedFlowFile(std::shared_ptr<core::ContentRepository> content_repo) {
auto fixture = Fixture(std::move(content_repo));
core::ProcessSession& process_session = fixture.processSession();
const auto flow_file = process_session.create();
REQUIRE(flow_file);
Expand All @@ -160,8 +160,8 @@ void testAppendToManagedFlowFile(std::shared_ptr<core::ContentRepository> conten
CHECK(read_until_it_can_callback.value_ == "myfoobar");
}

void testReadFromZeroLengthFlowFile(std::shared_ptr<core::ContentRepository> content_repo) {
Fixture fixture = Fixture(std::move(content_repo));
inline void testReadFromZeroLengthFlowFile(std::shared_ptr<core::ContentRepository> content_repo) {
const auto fixture = Fixture(std::move(content_repo));
core::ProcessSession& process_session = fixture.processSession();
const auto flow_file = process_session.create();
REQUIRE(flow_file);
Expand All @@ -171,4 +171,69 @@ void testReadFromZeroLengthFlowFile(std::shared_ptr<core::ContentRepository> con
REQUIRE_NOTHROW(process_session.readBuffer(flow_file));
REQUIRE_NOTHROW(process_session.read(flow_file, ReadUntilItCan{}));
}

inline void testErrWrite(std::shared_ptr<core::ContentRepository> content_repo) {
const auto fixture = Fixture(std::move(content_repo));
core::ProcessSession& process_session = fixture.processSession();
const auto flow_file = process_session.create();
fixture.writeToFlowFile(flow_file, "original_content");

REQUIRE_THROWS(
process_session.write(flow_file, [](const std::shared_ptr<minifi::io::OutputStream>& output_stream) {
std::string str = "new_content";
output_stream->write(as_bytes(std::span(str)));
return MinifiIoStatus::MINIFI_IO_ERROR;
}));

fixture.transferAndCommit(flow_file);

CHECK(flow_file->getSize() == 16);
ReadUntilItCan read_until_it_can_callback;
const auto read_result = process_session.readBuffer(flow_file);
process_session.read(flow_file, std::ref(read_until_it_can_callback));
CHECK(to_string(read_result) == "original_content");
}

inline void testOkWrite(std::shared_ptr<core::ContentRepository> content_repo) {
const auto fixture = Fixture(std::move(content_repo));
core::ProcessSession& process_session = fixture.processSession();
const auto flow_file = process_session.create();
fixture.writeToFlowFile(flow_file, "original_content");

CHECK(flow_file->getSize() == 16);

process_session.write(flow_file, [](const std::shared_ptr<minifi::io::OutputStream>& output_stream) {
std::string str = "new_content";
return output_stream->write(as_bytes(std::span(str)));
});

fixture.transferAndCommit(flow_file);

CHECK(flow_file->getSize() == 11);
ReadUntilItCan read_until_it_can_callback;
const auto read_result = process_session.readBuffer(flow_file);
process_session.read(flow_file, std::ref(read_until_it_can_callback));
CHECK(to_string(read_result) == "new_content");
}

inline void testCancelWrite(std::shared_ptr<core::ContentRepository> content_repo) {
const auto fixture = Fixture(std::move(content_repo));
core::ProcessSession& process_session = fixture.processSession();
const auto flow_file = process_session.create();
fixture.writeToFlowFile(flow_file, "original_content");

process_session.write(flow_file, [](const std::shared_ptr<minifi::io::OutputStream>& output_stream) {
std::string str = "new_content";
output_stream->write(as_bytes(std::span(str)));
return MinifiIoStatus::MINIFI_IO_CANCEL;
});

fixture.transferAndCommit(flow_file);

CHECK(flow_file->getSize() == 16);
ReadUntilItCan read_until_it_can_callback;
const auto read_result = process_session.readBuffer(flow_file);
process_session.read(flow_file, std::ref(read_until_it_can_callback));
CHECK(to_string(read_result) == "original_content");
}
} // namespace ContentRepositoryDependentTests
15 changes: 12 additions & 3 deletions libminifi/test/unit/ProcessSessionTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include <string>

#include "core/ProcessSession.h"
#include "core/Resource.h"
#include "unit/TestBase.h"
#include "unit/Catch.h"
#include "unit/ContentRepositoryDependentTests.h"
Expand All @@ -34,13 +33,13 @@ class Fixture {
public:
explicit Fixture(TestController::PlanConfig config = {}): plan_config_(std::move(config)) {}

minifi::core::ProcessSession &processSession() { return *process_session_; }
[[nodiscard]] minifi::core::ProcessSession& processSession() const { return *process_session_; }

private:
TestController test_controller_;
TestController::PlanConfig plan_config_;
std::shared_ptr<TestPlan> test_plan_ = test_controller_.createPlan(plan_config_);
minifi::core::Processor* dummy_processor_ = test_plan_->addProcessor("DummyProcessor", "dummyProcessor");
[[maybe_unused]] minifi::core::Processor* dummy_processor_ = test_plan_->addProcessor("DummyProcessor", "dummyProcessor");
std::shared_ptr<minifi::core::ProcessContext> context_ = [this] { test_plan_->runNextProcessor(); return test_plan_->getCurrentContext(); }();
std::unique_ptr<minifi::core::ProcessSession> process_session_ = std::make_unique<core::ProcessSessionImpl>(context_);
};
Expand Down Expand Up @@ -127,3 +126,13 @@ TEST_CASE("ProcessSession::read can read zero length flowfiles without crash", "
ContentRepositoryDependentTests::testReadFromZeroLengthFlowFile(std::make_shared<core::repository::VolatileContentRepository>());
ContentRepositoryDependentTests::testReadFromZeroLengthFlowFile(std::make_shared<core::repository::FileSystemRepository>());
}

TEST_CASE("Test ProcessSession::write's possible outcomes") {
ContentRepositoryDependentTests::testOkWrite(std::make_shared<core::repository::VolatileContentRepository>());
ContentRepositoryDependentTests::testErrWrite(std::make_shared<core::repository::VolatileContentRepository>());
ContentRepositoryDependentTests::testCancelWrite(std::make_shared<core::repository::VolatileContentRepository>());

ContentRepositoryDependentTests::testOkWrite(std::make_shared<core::repository::FileSystemRepository>());
ContentRepositoryDependentTests::testErrWrite(std::make_shared<core::repository::FileSystemRepository>());
ContentRepositoryDependentTests::testCancelWrite(std::make_shared<core::repository::FileSystemRepository>());
}
5 changes: 5 additions & 0 deletions minifi-api/include/minifi-c/minifi-c.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ extern "C" {

typedef bool MinifiBool;

typedef enum MinifiIoStatus : int64_t {
MINIFI_IO_ERROR = -1,
MINIFI_IO_CANCEL = -125
} MinifiIoStatus;

typedef enum MinifiInputRequirement : uint32_t {
MINIFI_INPUT_REQUIRED = 0,
MINIFI_INPUT_ALLOWED = 1,
Expand Down
2 changes: 2 additions & 0 deletions minifi-api/include/minifi-cpp/core/ContentSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class ContentSession {

virtual std::shared_ptr<io::BaseStream> read(const std::shared_ptr<ResourceClaim>& resource_id) = 0;

virtual void remove(const std::shared_ptr<ResourceClaim>& resource_id) = 0;

virtual void commit() = 0;

virtual void rollback() = 0;
Expand Down
Loading