diff --git a/libminifi/include/core/ProcessContextImpl.h b/libminifi/include/core/ProcessContextImpl.h index 31f6ad1882..9017de7247 100644 --- a/libminifi/include/core/ProcessContextImpl.h +++ b/libminifi/include/core/ProcessContextImpl.h @@ -206,7 +206,7 @@ class ProcessContextImpl : public core::VariableRegistryImpl, public virtual Pro gsl::not_null> configure_; std::unique_ptr info_; - mutable std::mutex mutex_; + // each ProcessContextImpl instance is only accessed from one thread at a time, so no synchronization is needed on these caches mutable std::unordered_map> cached_expressions_; mutable std::unordered_map> cached_dynamic_expressions_; }; diff --git a/libminifi/src/ThreadedSchedulingAgent.cpp b/libminifi/src/ThreadedSchedulingAgent.cpp index 05cbb11e6a..c37603e4d5 100644 --- a/libminifi/src/ThreadedSchedulingAgent.cpp +++ b/libminifi/src/ThreadedSchedulingAgent.cpp @@ -77,15 +77,12 @@ void ThreadedSchedulingAgent::schedule(core::Processor* processor) { processor->onSchedule(*process_context, *session_factory); - std::vector threads; - ThreadedSchedulingAgent *agent = this; for (uint8_t i = 0; i < processor->getMaxConcurrentTasks(); i++) { - // reference the disable function from serviceNode processor->incrementActiveTasks(); - - std::function f_ex = [agent, processor, process_context, session_factory] () { - return agent->run(processor, process_context, session_factory); + auto thread_process_context = std::make_shared(*processor, controller_service_provider_, repo_, flow_repo_, configure_, content_repo_); + std::function f_ex = [agent, processor, thread_process_context, session_factory] () { + return agent->run(processor, thread_process_context, session_factory); }; std::future future; diff --git a/libminifi/src/core/ProcessContextImpl.cpp b/libminifi/src/core/ProcessContextImpl.cpp index 94caeab6c9..f0ba1e2150 100644 --- a/libminifi/src/core/ProcessContextImpl.cpp +++ b/libminifi/src/core/ProcessContextImpl.cpp @@ -77,7 +77,6 @@ bool ProcessContextImpl::hasNonEmptyProperty(std::string_view name) const { std::vector ProcessContextImpl::getDynamicPropertyKeys() const { return processor_.getDynamicPropertyKeys(); } std::map ProcessContextImpl::getDynamicProperties(const FlowFile* flow_file) const { - std::lock_guard lock(mutex_); auto dynamic_props = processor_.getDynamicProperties(); const expression::Parameters params{this, flow_file}; for (auto& [dynamic_property_name, dynamic_property_value]: dynamic_props) { @@ -101,7 +100,6 @@ uint8_t ProcessContextImpl::getMaxConcurrentTasks() const { return processor_.ge void ProcessContextImpl::yield() { processor_.yield(); } nonstd::expected ProcessContextImpl::getProperty(const std::string_view name, const FlowFile* flow_file) const { - std::lock_guard lock(mutex_); const auto property = getProcessorInfo().getSupportedProperty(name); if (!property) { return nonstd::make_unexpected(PropertyErrorCode::NotSupportedProperty); @@ -124,19 +122,16 @@ nonstd::expected ProcessContextImpl::getProperty(c } nonstd::expected ProcessContextImpl::setProperty(const std::string_view name, std::string value) { - std::lock_guard lock(mutex_); cached_expressions_.erase(std::string{name}); return getProcessor().setProperty(name, std::move(value)); } nonstd::expected ProcessContextImpl::clearProperty(const std::string_view name) { - std::lock_guard lock(mutex_); cached_expressions_.erase(std::string{name}); return getProcessor().clearProperty(name); } nonstd::expected ProcessContextImpl::getDynamicProperty(const std::string_view name, const FlowFile* flow_file) const { - std::lock_guard lock(mutex_); if (!cached_dynamic_expressions_.contains(name)) { auto expression_str = getProcessor().getDynamicProperty(name); if (!expression_str) { return expression_str; } @@ -155,7 +150,6 @@ nonstd::expected ProcessContextImpl::getRawDynamic } nonstd::expected ProcessContextImpl::setDynamicProperty(std::string name, std::string value) { - std::lock_guard lock(mutex_); cached_dynamic_expressions_.erase(name); return getProcessor().setDynamicProperty(std::move(name), std::move(value)); } diff --git a/libminifi/test/unit/ProcessContextExprTests.cpp b/libminifi/test/unit/ProcessContextExprTests.cpp index a8cd6a1177..0bca2a8915 100644 --- a/libminifi/test/unit/ProcessContextExprTests.cpp +++ b/libminifi/test/unit/ProcessContextExprTests.cpp @@ -159,28 +159,3 @@ TEST_CASE("ProcessContextExpr can use expression language in dynamic properties" } } } - -TEST_CASE("ProcessContextExpr is mutex guarded properly") { - TestController test_controller; - const std::shared_ptr test_plan = test_controller.createPlan(); - std::ignore = test_plan->addProcessor("DummyProcessor", "dummy_processor"); - test_plan->runNextProcessor(); - const auto context = test_plan->getCurrentContext(); - REQUIRE(dynamic_pointer_cast(context) != nullptr); - - auto play_with_context = [=]() { - for (auto i = 0; i < 100; ++i) { - CHECK(context->setDynamicProperty("foo", fmt::format("${{literal('{}')}}", std::this_thread::get_id()))); - const auto dynamic_properties = context->getDynamicProperties(); - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - } - }; - - std::thread thread_one{play_with_context}; - std::thread thread_two{play_with_context}; - std::thread thread_three{play_with_context}; - - REQUIRE_NOTHROW(thread_one.join()); - REQUIRE_NOTHROW(thread_two.join()); - REQUIRE_NOTHROW(thread_three.join()); -}