Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion libminifi/include/core/ProcessContextImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ class ProcessContextImpl : public core::VariableRegistryImpl, public virtual Pro
gsl::not_null<std::shared_ptr<Configure>> configure_;
std::unique_ptr<ProcessorInfo> info_;

mutable std::mutex mutex_;
// each ProcessContextImpl instance is only accessed from one thread at a time, so no synchronization is needed on these caches
mutable std::unordered_map<std::string, expression::Expression, utils::string::transparent_string_hash, std::equal_to<>> cached_expressions_;
mutable std::unordered_map<std::string, expression::Expression, utils::string::transparent_string_hash, std::equal_to<>> cached_dynamic_expressions_;
};
Expand Down
9 changes: 3 additions & 6 deletions libminifi/src/ThreadedSchedulingAgent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,12 @@ void ThreadedSchedulingAgent::schedule(core::Processor* processor) {

processor->onSchedule(*process_context, *session_factory);

std::vector<std::thread *> threads;

ThreadedSchedulingAgent *agent = this;
for (uint8_t i = 0; i < processor->getMaxConcurrentTasks(); i++) {
// reference the disable function from serviceNode
processor->incrementActiveTasks();

std::function<utils::TaskRescheduleInfo()> f_ex = [agent, processor, process_context, session_factory] () {
return agent->run(processor, process_context, session_factory);
auto thread_process_context = std::make_shared<core::ProcessContextImpl>(*processor, controller_service_provider_, repo_, flow_repo_, configure_, content_repo_);
std::function<utils::TaskRescheduleInfo()> f_ex = [agent, processor, thread_process_context, session_factory] () {
return agent->run(processor, thread_process_context, session_factory);
};

std::future<utils::TaskRescheduleInfo> future;
Expand Down
6 changes: 0 additions & 6 deletions libminifi/src/core/ProcessContextImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ bool ProcessContextImpl::hasNonEmptyProperty(std::string_view name) const {
std::vector<std::string> ProcessContextImpl::getDynamicPropertyKeys() const { return processor_.getDynamicPropertyKeys(); }

std::map<std::string, std::string> ProcessContextImpl::getDynamicProperties(const FlowFile* flow_file) const {
std::lock_guard<std::mutex> lock(mutex_);
auto dynamic_props = processor_.getDynamicProperties();
const expression::Parameters params{this, flow_file};
for (auto& [dynamic_property_name, dynamic_property_value]: dynamic_props) {
Expand All @@ -101,7 +100,6 @@ uint8_t ProcessContextImpl::getMaxConcurrentTasks() const { return processor_.ge
void ProcessContextImpl::yield() { processor_.yield(); }

nonstd::expected<std::string, std::error_code> ProcessContextImpl::getProperty(const std::string_view name, const FlowFile* flow_file) const {
std::lock_guard<std::mutex> lock(mutex_);
const auto property = getProcessorInfo().getSupportedProperty(name);
if (!property) {
return nonstd::make_unexpected(PropertyErrorCode::NotSupportedProperty);
Expand All @@ -124,19 +122,16 @@ nonstd::expected<std::string, std::error_code> ProcessContextImpl::getProperty(c
}

nonstd::expected<void, std::error_code> ProcessContextImpl::setProperty(const std::string_view name, std::string value) {
std::lock_guard<std::mutex> lock(mutex_);
cached_expressions_.erase(std::string{name});
return getProcessor().setProperty(name, std::move(value));
}

nonstd::expected<void, std::error_code> ProcessContextImpl::clearProperty(const std::string_view name) {
std::lock_guard<std::mutex> lock(mutex_);
cached_expressions_.erase(std::string{name});
return getProcessor().clearProperty(name);
}

nonstd::expected<std::string, std::error_code> ProcessContextImpl::getDynamicProperty(const std::string_view name, const FlowFile* flow_file) const {
std::lock_guard<std::mutex> lock(mutex_);
if (!cached_dynamic_expressions_.contains(name)) {
auto expression_str = getProcessor().getDynamicProperty(name);
if (!expression_str) { return expression_str; }
Expand All @@ -155,7 +150,6 @@ nonstd::expected<std::string, std::error_code> ProcessContextImpl::getRawDynamic
}

nonstd::expected<void, std::error_code> ProcessContextImpl::setDynamicProperty(std::string name, std::string value) {
std::lock_guard<std::mutex> lock(mutex_);
cached_dynamic_expressions_.erase(name);
return getProcessor().setDynamicProperty(std::move(name), std::move(value));
}
Expand Down
25 changes: 0 additions & 25 deletions libminifi/test/unit/ProcessContextExprTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,28 +159,3 @@ TEST_CASE("ProcessContextExpr can use expression language in dynamic properties"
}
}
}

TEST_CASE("ProcessContextExpr is mutex guarded properly") {
TestController test_controller;
const std::shared_ptr<TestPlan> test_plan = test_controller.createPlan();
std::ignore = test_plan->addProcessor("DummyProcessor", "dummy_processor");
test_plan->runNextProcessor();
const auto context = test_plan->getCurrentContext();
REQUIRE(dynamic_pointer_cast<core::ProcessContextImpl>(context) != nullptr);

auto play_with_context = [=]() {
for (auto i = 0; i < 100; ++i) {
CHECK(context->setDynamicProperty("foo", fmt::format("${{literal('{}')}}", std::this_thread::get_id())));
const auto dynamic_properties = context->getDynamicProperties();
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
};

std::thread thread_one{play_with_context};
std::thread thread_two{play_with_context};
std::thread thread_three{play_with_context};

REQUIRE_NOTHROW(thread_one.join());
REQUIRE_NOTHROW(thread_two.join());
REQUIRE_NOTHROW(thread_three.join());
}
Loading