From d7a01311d73e8961b7520a6f1ce651810df95486 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Fri, 18 Jul 2025 21:47:30 +0200 Subject: [PATCH 01/12] handle spurious wakeup fixes #339 --- keyvi/include/keyvi/index/internal/index_writer_worker.h | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/keyvi/include/keyvi/index/internal/index_writer_worker.h b/keyvi/include/keyvi/index/internal/index_writer_worker.h index ea2f30e2d..bc05a73fc 100644 --- a/keyvi/include/keyvi/index/internal/index_writer_worker.h +++ b/keyvi/include/keyvi/index/internal/index_writer_worker.h @@ -211,17 +211,22 @@ class IndexWriterWorker final { } else { std::condition_variable c; std::unique_lock lock(payload_.flush_mutex_); + std::atomic_bool flushed{false}; - compiler_active_object_([&c](IndexPayload& payload) { + compiler_active_object_([&c, &flushed](IndexPayload& payload) { { PersistDeletes(&payload); Compile(&payload); std::unique_lock lock(payload.flush_mutex_); + flushed = true; } c.notify_all(); }); - c.wait(lock); + // condition may be unblocked spuriously, check flushed + while (flushed == false) { + c.wait(lock); + } } } From 6304ab94261f248b301a2c3b10d1222d951d6b7d Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Sat, 19 Jul 2025 09:58:54 +0200 Subject: [PATCH 02/12] testing only: run index limit test 1000 times --- keyvi/tests/keyvi/index/index_limits_test.cpp | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/keyvi/tests/keyvi/index/index_limits_test.cpp b/keyvi/tests/keyvi/index/index_limits_test.cpp index a41ee501f..7bd914fc0 100644 --- a/keyvi/tests/keyvi/index/index_limits_test.cpp +++ b/keyvi/tests/keyvi/index/index_limits_test.cpp @@ -92,6 +92,12 @@ BOOST_AUTO_TEST_CASE(filedescriptor_limit) { limit_filedescriptors(old_limit); } +BOOST_AUTO_TEST_CASE(repeat_limit_test) { + for (size_t i = 0; i < 1000; i++) { + IndexLimitsTests::filedescriptor_limit_invoker(); + } +} + BOOST_AUTO_TEST_SUITE_END() } // namespace index From 3fb110f36257f34676a7eb6b49823a413b3acb9e Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Sat, 19 Jul 2025 10:33:42 +0200 Subject: [PATCH 03/12] ensure all is done on destruction --- .../index/internal/index_writer_worker.h | 22 +++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/keyvi/include/keyvi/index/internal/index_writer_worker.h b/keyvi/include/keyvi/index/internal/index_writer_worker.h index bc05a73fc..fa42099f7 100644 --- a/keyvi/include/keyvi/index/internal/index_writer_worker.h +++ b/keyvi/include/keyvi/index/internal/index_writer_worker.h @@ -127,13 +127,27 @@ class IndexWriterWorker final { TRACE("destruct worker: %s", payload_.index_directory_.c_str()); payload_.merge_enabled_ = false; + std::condition_variable c; + std::unique_lock lock(payload_.flush_mutex_); + std::atomic_bool no_pending_ops{false}; + // push a function to finish all pending merges - compiler_active_object_([](IndexPayload& payload) { - Compile(&payload); - for (MergeJob& p : payload.merge_jobs_) { - p.Finalize(); + compiler_active_object_([&c, &no_pending_ops](IndexPayload& payload) { + { + Compile(&payload); + for (MergeJob& p : payload.merge_jobs_) { + p.Finalize(); + } + std::unique_lock lock(payload.flush_mutex_); + no_pending_ops = true; } + c.notify_all(); }); + + // wait until everything has been executed + while (no_pending_ops == false) { + c.wait(lock); + } } const_segments_t Segments() { From 992fc0c37391fb5fbac9ec8a356a15360a002fcb Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Sat, 19 Jul 2025 22:24:20 +0200 Subject: [PATCH 04/12] use own mutex --- keyvi/include/keyvi/index/internal/index_writer_worker.h | 8 +++++--- keyvi/tests/keyvi/index/index_limits_test.cpp | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/keyvi/include/keyvi/index/internal/index_writer_worker.h b/keyvi/include/keyvi/index/internal/index_writer_worker.h index fa42099f7..8f54dca61 100644 --- a/keyvi/include/keyvi/index/internal/index_writer_worker.h +++ b/keyvi/include/keyvi/index/internal/index_writer_worker.h @@ -127,20 +127,22 @@ class IndexWriterWorker final { TRACE("destruct worker: %s", payload_.index_directory_.c_str()); payload_.merge_enabled_ = false; + std::mutex m; std::condition_variable c; std::unique_lock lock(payload_.flush_mutex_); std::atomic_bool no_pending_ops{false}; // push a function to finish all pending merges - compiler_active_object_([&c, &no_pending_ops](IndexPayload& payload) { + compiler_active_object_([&m, &c, &no_pending_ops](IndexPayload& payload) { { Compile(&payload); for (MergeJob& p : payload.merge_jobs_) { p.Finalize(); } - std::unique_lock lock(payload.flush_mutex_); - no_pending_ops = true; + std::unique_lock lock(m); } + no_pending_ops = true; + c.notify_all(); }); diff --git a/keyvi/tests/keyvi/index/index_limits_test.cpp b/keyvi/tests/keyvi/index/index_limits_test.cpp index 7bd914fc0..d61ca9fbd 100644 --- a/keyvi/tests/keyvi/index/index_limits_test.cpp +++ b/keyvi/tests/keyvi/index/index_limits_test.cpp @@ -93,7 +93,7 @@ BOOST_AUTO_TEST_CASE(filedescriptor_limit) { } BOOST_AUTO_TEST_CASE(repeat_limit_test) { - for (size_t i = 0; i < 1000; i++) { + for (size_t i = 0; i < 100; i++) { IndexLimitsTests::filedescriptor_limit_invoker(); } } From ba132336a0821435fa732ee398a3d45023ba8797 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Sat, 19 Jul 2025 22:55:13 +0200 Subject: [PATCH 05/12] make mutex a class variable --- keyvi/include/keyvi/index/internal/index_writer_worker.h | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/keyvi/include/keyvi/index/internal/index_writer_worker.h b/keyvi/include/keyvi/index/internal/index_writer_worker.h index 8f54dca61..2e0896cf2 100644 --- a/keyvi/include/keyvi/index/internal/index_writer_worker.h +++ b/keyvi/include/keyvi/index/internal/index_writer_worker.h @@ -114,6 +114,7 @@ class IndexWriterWorker final { explicit IndexWriterWorker(const std::string& index_directory, const keyvi::util::parameters_t& params) : payload_(index_directory, params), merge_policy_(merge_policy(keyvi::util::mapGet(params, MERGE_POLICY, DEFAULT_MERGE_POLICY))), + destruct_mutex_(), compiler_active_object_(&payload_, std::bind(&index::internal::IndexWriterWorker::ScheduledTask, this), std::chrono::milliseconds(payload_.index_refresh_interval_)) { TRACE("construct worker: %s", payload_.index_directory_.c_str()); @@ -127,19 +128,18 @@ class IndexWriterWorker final { TRACE("destruct worker: %s", payload_.index_directory_.c_str()); payload_.merge_enabled_ = false; - std::mutex m; std::condition_variable c; - std::unique_lock lock(payload_.flush_mutex_); + std::unique_lock lock(destruct_mutex_); std::atomic_bool no_pending_ops{false}; // push a function to finish all pending merges - compiler_active_object_([&m, &c, &no_pending_ops](IndexPayload& payload) { + compiler_active_object_([this, &c, &no_pending_ops](IndexPayload& payload) { { Compile(&payload); for (MergeJob& p : payload.merge_jobs_) { p.Finalize(); } - std::unique_lock lock(m); + std::unique_lock lock(destruct_mutex_); } no_pending_ops = true; @@ -270,6 +270,7 @@ class IndexWriterWorker final { private: IndexPayload payload_; merge_policy_t merge_policy_; + std::mutex destruct_mutex_; util::ActiveObject compiler_active_object_; void CompileIfThresholdIsHit() { From 9c07c3645ff15fc0686a9f248e1845a210dfe599 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Sat, 19 Jul 2025 23:08:26 +0200 Subject: [PATCH 06/12] test if leaking, avoids destruction problems --- keyvi/include/keyvi/index/internal/index_writer_worker.h | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/keyvi/include/keyvi/index/internal/index_writer_worker.h b/keyvi/include/keyvi/index/internal/index_writer_worker.h index 2e0896cf2..ea58c47f2 100644 --- a/keyvi/include/keyvi/index/internal/index_writer_worker.h +++ b/keyvi/include/keyvi/index/internal/index_writer_worker.h @@ -128,18 +128,19 @@ class IndexWriterWorker final { TRACE("destruct worker: %s", payload_.index_directory_.c_str()); payload_.merge_enabled_ = false; + std::mutex* leaked_mutex = new std::mutex(); std::condition_variable c; - std::unique_lock lock(destruct_mutex_); + std::unique_lock lock(*leaked_mutex); std::atomic_bool no_pending_ops{false}; // push a function to finish all pending merges - compiler_active_object_([this, &c, &no_pending_ops](IndexPayload& payload) { + compiler_active_object_([leaked_mutex, &c, &no_pending_ops](IndexPayload& payload) { { Compile(&payload); for (MergeJob& p : payload.merge_jobs_) { p.Finalize(); } - std::unique_lock lock(destruct_mutex_); + std::unique_lock lock(*leaked_mutex); } no_pending_ops = true; From b6db3e1db1ea7735bb746584806036f955ed939b Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Sat, 19 Jul 2025 23:19:12 +0200 Subject: [PATCH 07/12] move atomic outside of context --- keyvi/include/keyvi/index/internal/index_writer_worker.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/keyvi/include/keyvi/index/internal/index_writer_worker.h b/keyvi/include/keyvi/index/internal/index_writer_worker.h index ea58c47f2..366391436 100644 --- a/keyvi/include/keyvi/index/internal/index_writer_worker.h +++ b/keyvi/include/keyvi/index/internal/index_writer_worker.h @@ -235,8 +235,8 @@ class IndexWriterWorker final { PersistDeletes(&payload); Compile(&payload); std::unique_lock lock(payload.flush_mutex_); - flushed = true; } + flushed = true; c.notify_all(); }); From 11135e7e896708cbc7d2b375f00517c6219ec23f Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Sun, 20 Jul 2025 07:23:38 +0200 Subject: [PATCH 08/12] remove destructor changes --- .../index/internal/index_writer_worker.h | 27 ++++++++++--------- keyvi/tests/keyvi/index/index_limits_test.cpp | 8 +++--- 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/keyvi/include/keyvi/index/internal/index_writer_worker.h b/keyvi/include/keyvi/index/internal/index_writer_worker.h index 366391436..816139e6c 100644 --- a/keyvi/include/keyvi/index/internal/index_writer_worker.h +++ b/keyvi/include/keyvi/index/internal/index_writer_worker.h @@ -114,7 +114,7 @@ class IndexWriterWorker final { explicit IndexWriterWorker(const std::string& index_directory, const keyvi::util::parameters_t& params) : payload_(index_directory, params), merge_policy_(merge_policy(keyvi::util::mapGet(params, MERGE_POLICY, DEFAULT_MERGE_POLICY))), - destruct_mutex_(), + //destruct_mutex_(), compiler_active_object_(&payload_, std::bind(&index::internal::IndexWriterWorker::ScheduledTask, this), std::chrono::milliseconds(payload_.index_refresh_interval_)) { TRACE("construct worker: %s", payload_.index_directory_.c_str()); @@ -128,29 +128,30 @@ class IndexWriterWorker final { TRACE("destruct worker: %s", payload_.index_directory_.c_str()); payload_.merge_enabled_ = false; - std::mutex* leaked_mutex = new std::mutex(); - std::condition_variable c; - std::unique_lock lock(*leaked_mutex); - std::atomic_bool no_pending_ops{false}; + //std::mutex* leaked_mutex = new std::mutex(); + //std::condition_variable c; + //std::unique_lock lock(*leaked_mutex); + //std::atomic_bool no_pending_ops{false}; // push a function to finish all pending merges - compiler_active_object_([leaked_mutex, &c, &no_pending_ops](IndexPayload& payload) { + //compiler_active_object_([leaked_mutex, &c, &no_pending_ops](IndexPayload& payload) { + compiler_active_object_([](IndexPayload& payload) { { Compile(&payload); for (MergeJob& p : payload.merge_jobs_) { p.Finalize(); } - std::unique_lock lock(*leaked_mutex); + //std::unique_lock lock(*leaked_mutex); } - no_pending_ops = true; + //no_pending_ops = true; - c.notify_all(); + //c.notify_all(); }); // wait until everything has been executed - while (no_pending_ops == false) { - c.wait(lock); - } + //while (no_pending_ops == false) { + // c.wait(lock); + //} } const_segments_t Segments() { @@ -271,7 +272,7 @@ class IndexWriterWorker final { private: IndexPayload payload_; merge_policy_t merge_policy_; - std::mutex destruct_mutex_; + //std::mutex destruct_mutex_; util::ActiveObject compiler_active_object_; void CompileIfThresholdIsHit() { diff --git a/keyvi/tests/keyvi/index/index_limits_test.cpp b/keyvi/tests/keyvi/index/index_limits_test.cpp index d61ca9fbd..7bd0857eb 100644 --- a/keyvi/tests/keyvi/index/index_limits_test.cpp +++ b/keyvi/tests/keyvi/index/index_limits_test.cpp @@ -49,7 +49,7 @@ inline size_t limit_filedescriptors(size_t file_descriptor_limit) { struct rlimit limit; getrlimit(RLIMIT_NOFILE, &limit); - size_t old_limit = limit.rlim_cur; + const size_t old_limit = limit.rlim_cur; limit.rlim_cur = file_descriptor_limit; BOOST_CHECK(setrlimit(RLIMIT_NOFILE, &limit) == 0); getrlimit(RLIMIT_NOFILE, &limit); @@ -65,7 +65,7 @@ BOOST_AUTO_TEST_CASE(filedescriptor_limit) { using boost::filesystem::temp_directory_path; using boost::filesystem::unique_path; - size_t old_limit = limit_filedescriptors(40); + const size_t old_limit = limit_filedescriptors(40); auto tmp_path = temp_directory_path(); tmp_path /= unique_path("index-limits-test-temp-index-%%%%-%%%%-%%%%-%%%%"); @@ -80,13 +80,13 @@ BOOST_AUTO_TEST_CASE(filedescriptor_limit) { } writer.Flush(); BOOST_CHECK(writer.Contains("a")); - dictionary::match_t m = writer["a"]; + const dictionary::match_t m = writer["a"]; BOOST_CHECK_EQUAL("{\"id\":4999}", m->GetValueAsString()); } boost::filesystem::remove_all(tmp_path); - size_t increased_file_descriptors = keyvi::util::OsUtils::TryIncreaseFileDescriptors(); + const size_t increased_file_descriptors = keyvi::util::OsUtils::TryIncreaseFileDescriptors(); BOOST_CHECK(increased_file_descriptors > 40); limit_filedescriptors(old_limit); From c6a5c3a828a9a5a465b4b3ed91daf75153908aae Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Sun, 20 Jul 2025 07:37:41 +0200 Subject: [PATCH 09/12] keep lock when calling notify_all --- keyvi/include/keyvi/index/internal/index_writer_worker.h | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/keyvi/include/keyvi/index/internal/index_writer_worker.h b/keyvi/include/keyvi/index/internal/index_writer_worker.h index 816139e6c..d34143bc9 100644 --- a/keyvi/include/keyvi/index/internal/index_writer_worker.h +++ b/keyvi/include/keyvi/index/internal/index_writer_worker.h @@ -232,11 +232,9 @@ class IndexWriterWorker final { std::atomic_bool flushed{false}; compiler_active_object_([&c, &flushed](IndexPayload& payload) { - { - PersistDeletes(&payload); - Compile(&payload); - std::unique_lock lock(payload.flush_mutex_); - } + std::unique_lock lock(payload.flush_mutex_); + PersistDeletes(&payload); + Compile(&payload); flushed = true; c.notify_all(); }); From 94ca8474b257c369a9fe8324e943e8ea289e7838 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Sun, 20 Jul 2025 08:51:01 +0200 Subject: [PATCH 10/12] remove test debug code --- .../index/internal/index_writer_worker.h | 25 +++---------------- 1 file changed, 3 insertions(+), 22 deletions(-) diff --git a/keyvi/include/keyvi/index/internal/index_writer_worker.h b/keyvi/include/keyvi/index/internal/index_writer_worker.h index d34143bc9..78d4709ed 100644 --- a/keyvi/include/keyvi/index/internal/index_writer_worker.h +++ b/keyvi/include/keyvi/index/internal/index_writer_worker.h @@ -114,7 +114,6 @@ class IndexWriterWorker final { explicit IndexWriterWorker(const std::string& index_directory, const keyvi::util::parameters_t& params) : payload_(index_directory, params), merge_policy_(merge_policy(keyvi::util::mapGet(params, MERGE_POLICY, DEFAULT_MERGE_POLICY))), - //destruct_mutex_(), compiler_active_object_(&payload_, std::bind(&index::internal::IndexWriterWorker::ScheduledTask, this), std::chrono::milliseconds(payload_.index_refresh_interval_)) { TRACE("construct worker: %s", payload_.index_directory_.c_str()); @@ -128,30 +127,13 @@ class IndexWriterWorker final { TRACE("destruct worker: %s", payload_.index_directory_.c_str()); payload_.merge_enabled_ = false; - //std::mutex* leaked_mutex = new std::mutex(); - //std::condition_variable c; - //std::unique_lock lock(*leaked_mutex); - //std::atomic_bool no_pending_ops{false}; - // push a function to finish all pending merges - //compiler_active_object_([leaked_mutex, &c, &no_pending_ops](IndexPayload& payload) { compiler_active_object_([](IndexPayload& payload) { - { - Compile(&payload); - for (MergeJob& p : payload.merge_jobs_) { - p.Finalize(); - } - //std::unique_lock lock(*leaked_mutex); + Compile(&payload); + for (MergeJob& p : payload.merge_jobs_) { + p.Finalize(); } - //no_pending_ops = true; - - //c.notify_all(); }); - - // wait until everything has been executed - //while (no_pending_ops == false) { - // c.wait(lock); - //} } const_segments_t Segments() { @@ -270,7 +252,6 @@ class IndexWriterWorker final { private: IndexPayload payload_; merge_policy_t merge_policy_; - //std::mutex destruct_mutex_; util::ActiveObject compiler_active_object_; void CompileIfThresholdIsHit() { From 000e513c1dc71b4e3e2c2cf51ba3fdf95d53b557 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Sun, 20 Jul 2025 09:55:16 +0200 Subject: [PATCH 11/12] fix clang-tidy warnings --- keyvi/tests/keyvi/index/index_limits_test.cpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/keyvi/tests/keyvi/index/index_limits_test.cpp b/keyvi/tests/keyvi/index/index_limits_test.cpp index 7bd0857eb..d9cee868f 100644 --- a/keyvi/tests/keyvi/index/index_limits_test.cpp +++ b/keyvi/tests/keyvi/index/index_limits_test.cpp @@ -37,7 +37,7 @@ #include "keyvi/index/index.h" inline std::string get_keyvimerger_bin() { - boost::filesystem::path path{std::getenv("KEYVI_UNITTEST_BASEPATH")}; + boost::filesystem::path path{std::getenv("KEYVI_UNITTEST_BASEPATH")}; // NOLINT path /= DEFAULT_KEYVIMERGER_BIN; BOOST_CHECK(boost::filesystem::is_regular_file(path)); @@ -46,7 +46,9 @@ inline std::string get_keyvimerger_bin() { } inline size_t limit_filedescriptors(size_t file_descriptor_limit) { - struct rlimit limit; + struct rlimit limit { + 0 + }; getrlimit(RLIMIT_NOFILE, &limit); const size_t old_limit = limit.rlim_cur; From c1af918bab722406608ee4c9ae87a4125250cbea Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Sun, 20 Jul 2025 22:38:11 +0200 Subject: [PATCH 12/12] remove repeat_limit_test --- keyvi/tests/keyvi/index/index_limits_test.cpp | 6 ------ 1 file changed, 6 deletions(-) diff --git a/keyvi/tests/keyvi/index/index_limits_test.cpp b/keyvi/tests/keyvi/index/index_limits_test.cpp index d9cee868f..141e82cb9 100644 --- a/keyvi/tests/keyvi/index/index_limits_test.cpp +++ b/keyvi/tests/keyvi/index/index_limits_test.cpp @@ -94,12 +94,6 @@ BOOST_AUTO_TEST_CASE(filedescriptor_limit) { limit_filedescriptors(old_limit); } -BOOST_AUTO_TEST_CASE(repeat_limit_test) { - for (size_t i = 0; i < 100; i++) { - IndexLimitsTests::filedescriptor_limit_invoker(); - } -} - BOOST_AUTO_TEST_SUITE_END() } // namespace index