Skip to content

Commit b2fe116

Browse files
MBkktAndrei Lobov
andauthored
Separate WaitGroup (iresearch-toolkit#572)
* Separate WaitGroup * Update core/utils/wait_group.hpp Co-authored-by: Andrei Lobov <[email protected]> * Exact count * fix lints --------- Co-authored-by: Andrei Lobov <[email protected]>
1 parent b37f93e commit b2fe116

File tree

2 files changed

+78
-29
lines changed

2 files changed

+78
-29
lines changed

core/index/index_writer.hpp

Lines changed: 2 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
#include "utils/object_pool.hpp"
4646
#include "utils/string.hpp"
4747
#include "utils/thread_utils.hpp"
48+
#include "utils/wait_group.hpp"
4849

4950
#include <absl/container/flat_hash_map.h>
5051

@@ -813,35 +814,7 @@ class IndexWriter : private util::noncopyable {
813814
std::deque<PendingSegmentContext> pending_segments_;
814815
// entries from 'pending_segments_' that are available for reuse
815816
Freelist pending_freelist_;
816-
// TODO(MBkkt) Considered to replace with YACLib in ArangoDB 3.11+ or ...
817-
struct WaitGroup {
818-
std::mutex& Mutex() noexcept { return m_; }
819-
820-
void Add() noexcept { counter_.fetch_add(1, std::memory_order_relaxed); }
821-
822-
void Done() noexcept {
823-
if (counter_.fetch_sub(1, std::memory_order_acq_rel) == 1) {
824-
std::lock_guard lock{m_};
825-
cv_.notify_one();
826-
}
827-
}
828-
829-
void Wait() noexcept {
830-
if (counter_.fetch_sub(1, std::memory_order_acq_rel) != 1) {
831-
std::unique_lock lock{m_};
832-
while (counter_.load(std::memory_order_acquire) != 0) {
833-
cv_.wait(lock);
834-
}
835-
}
836-
// We can put acquire here and remove above, but is it worth?
837-
counter_.store(1, std::memory_order_relaxed);
838-
}
839-
840-
private:
841-
std::atomic_size_t counter_{1};
842-
std::mutex m_;
843-
std::condition_variable cv_;
844-
} pending_;
817+
WaitGroup pending_;
845818

846819
// set of segments to be removed from the index upon commit
847820
ConsolidatingSegments segment_mask_;

core/utils/wait_group.hpp

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
////////////////////////////////////////////////////////////////////////////////
2+
/// DISCLAIMER
3+
///
4+
/// Copyright 2016 by EMC Corporation, All Rights Reserved
5+
///
6+
/// Licensed under the Apache License, Version 2.0 (the "License");
7+
/// you may not use this file except in compliance with the License.
8+
/// You may obtain a copy of the License at
9+
///
10+
/// http://www.apache.org/licenses/LICENSE-2.0
11+
///
12+
/// Unless required by applicable law or agreed to in writing, software
13+
/// distributed under the License is distributed on an "AS IS" BASIS,
14+
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
/// See the License for the specific language governing permissions and
16+
/// limitations under the License.
17+
///
18+
/// Copyright holder is EMC Corporation
19+
///
20+
/// @author Valery Mironov
21+
////////////////////////////////////////////////////////////////////////////////
22+
23+
#pragma once
24+
25+
#include <atomic>
26+
#include <condition_variable>
27+
#include <mutex>
28+
29+
namespace irs {
30+
31+
// TODO(MBkkt) Considered to replace with YACLib
32+
struct WaitGroup {
33+
explicit WaitGroup(size_t counter = 0) noexcept : counter_{2 * counter + 1} {}
34+
35+
void Add(size_t counter = 1) noexcept {
36+
counter_.fetch_add(2 * counter, std::memory_order_relaxed);
37+
}
38+
39+
void Done(size_t counter = 1) noexcept {
40+
if (counter_.fetch_sub(2 * counter, std::memory_order_acq_rel) ==
41+
2 * counter) {
42+
std::lock_guard lock{m_};
43+
cv_.notify_one();
44+
}
45+
}
46+
47+
// Multiple parallel Wait not supported, if needed check YACLib
48+
void Wait(size_t counter = 0) noexcept {
49+
if (counter_.fetch_sub(1, std::memory_order_acq_rel) != 1) {
50+
std::unique_lock lock{m_};
51+
while (counter_.load(std::memory_order_acquire) != 0) {
52+
cv_.wait(lock);
53+
}
54+
}
55+
// We can put acquire here and remove above, but is it worth?
56+
Reset(counter);
57+
}
58+
59+
// It shouldn't used for synchronization
60+
size_t Count() const noexcept {
61+
return counter_.load(std::memory_order_relaxed) / 2;
62+
}
63+
64+
void Reset(size_t counter) noexcept {
65+
counter_.store(2 * counter + 1, std::memory_order_relaxed);
66+
}
67+
68+
std::mutex& Mutex() noexcept { return m_; }
69+
70+
private:
71+
std::atomic_size_t counter_;
72+
std::condition_variable cv_;
73+
std::mutex m_;
74+
};
75+
76+
} // namespace irs

0 commit comments

Comments
 (0)