|
16 | 16 |
|
17 | 17 | namespace srsran { |
18 | 18 |
|
19 | | -/// Scoped token that notifies the associated sync_event when it gets destroyed or reset. |
20 | | -class scoped_sync_token |
| 19 | +class scoped_sync_token; |
| 20 | + |
| 21 | +/// \brief Synchronization event to wait until all tokens to be reset. |
| 22 | +/// |
| 23 | +/// This class operates like a std::barrier, where the "arrive" call is performed by "tokens" in an RAII manner and |
| 24 | +/// "wait" is performed by the \c sync_event. Contrarily to the std::barrier, the number of participants is not fixed. |
| 25 | +class sync_event |
21 | 26 | { |
22 | 27 | public: |
23 | | - scoped_sync_token() = default; |
24 | | - scoped_sync_token(std::atomic<uint32_t>& token_count_, std::atomic<bool>& dtor_guard_) : |
25 | | - token_count(&token_count_), dtor_guard(&dtor_guard_) |
| 28 | + ~sync_event() |
26 | 29 | { |
27 | | - inc_token(); |
28 | | - } |
| 30 | + wait(); |
29 | 31 |
|
30 | | - scoped_sync_token(const scoped_sync_token& other) : token_count(other.token_count), dtor_guard(other.dtor_guard) |
31 | | - { |
32 | | - inc_token(); |
| 32 | + // Polite spinning in case the token got preempted between the fetch_sub and futex wake. |
| 33 | + while (dtor_guard.load(std::memory_order_acquire)) { |
| 34 | + std::this_thread::yield(); |
| 35 | + } |
33 | 36 | } |
34 | 37 |
|
35 | | - scoped_sync_token(scoped_sync_token&& other) noexcept : |
36 | | - token_count(std::exchange(other.token_count, nullptr)), dtor_guard(std::exchange(other.dtor_guard, nullptr)) |
| 38 | + /// Creates a new observer of wait() requests. |
| 39 | + [[nodiscard]] scoped_sync_token get_token(); |
| 40 | + |
| 41 | + /// Waits for all tokens to be reset. At the end of this call, all tokens are guaranteed to be reset. |
| 42 | + void wait() |
37 | 43 | { |
| 44 | + // Block waiting until all observers are gone. |
| 45 | + auto cur = token_count.load(std::memory_order_acquire); |
| 46 | + while (cur > 0) { |
| 47 | + futex_util::wait(token_count, cur); |
| 48 | + cur = token_count.load(std::memory_order_acquire); |
| 49 | + } |
38 | 50 | } |
39 | 51 |
|
| 52 | + [[nodiscard]] uint32_t nof_tokens_approx() const { return token_count.load(std::memory_order_relaxed); } |
| 53 | + |
| 54 | +private: |
| 55 | + friend class scoped_sync_token; |
| 56 | + |
| 57 | + std::atomic<uint32_t> token_count{0}; |
| 58 | + /// \brief Variable use to protect token_count from destruction when token still has to call futex wake. |
| 59 | + /// Useful reference about the issue: https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2023/p2616r4.html |
| 60 | + std::atomic<bool> dtor_guard{false}; |
| 61 | +}; |
| 62 | + |
| 63 | +/// Scoped token that notifies the associated sync_event when it gets destroyed or reset. |
| 64 | +class scoped_sync_token |
| 65 | +{ |
| 66 | +public: |
| 67 | + scoped_sync_token() = default; |
| 68 | + explicit scoped_sync_token(sync_event& parent_) : parent(&parent_) { inc_token(); } |
| 69 | + |
| 70 | + scoped_sync_token(const scoped_sync_token& other) : parent(other.parent) { inc_token(); } |
| 71 | + |
| 72 | + scoped_sync_token(scoped_sync_token&& other) noexcept : parent(std::exchange(other.parent, nullptr)) {} |
| 73 | + |
40 | 74 | scoped_sync_token& operator=(const scoped_sync_token& other) |
41 | 75 | { |
42 | 76 | if (this != &other) { |
43 | 77 | reset(); |
44 | | - token_count = other.token_count; |
45 | | - dtor_guard = other.dtor_guard; |
| 78 | + parent = other.parent; |
46 | 79 | inc_token(); |
47 | 80 | } |
48 | 81 | return *this; |
49 | 82 | } |
50 | 83 | scoped_sync_token& operator=(scoped_sync_token&& other) noexcept |
51 | 84 | { |
52 | 85 | reset(); |
53 | | - token_count = std::exchange(other.token_count, nullptr); |
54 | | - dtor_guard = std::exchange(other.dtor_guard, nullptr); |
| 86 | + parent = std::exchange(other.parent, nullptr); |
55 | 87 | return *this; |
56 | 88 | } |
57 | 89 |
|
58 | 90 | ~scoped_sync_token() |
59 | 91 | { |
60 | | - if (token_count != nullptr) { |
61 | | - auto cur = token_count->fetch_sub(1, std::memory_order_acq_rel) - 1; |
| 92 | + if (parent != nullptr) { |
| 93 | + auto cur = parent->token_count.fetch_sub(1, std::memory_order_acq_rel) - 1; |
62 | 94 | if (cur == 0) { |
63 | 95 | // Count is zero. Wake all waiters. |
64 | | - futex_util::wake_all(*token_count); |
| 96 | + futex_util::wake_all(parent->token_count); |
65 | 97 | // Update dtor guard. |
66 | | - dtor_guard->store(false, std::memory_order_release); |
| 98 | + parent->dtor_guard.store(false, std::memory_order_release); |
67 | 99 | } |
68 | | - token_count = nullptr; |
69 | | - dtor_guard = nullptr; |
| 100 | + parent = nullptr; |
70 | 101 | } |
71 | 102 | } |
72 | 103 |
|
73 | 104 | /// Destroys the token and potentially unlocks sync_event::wait(). |
74 | 105 | void reset() { scoped_sync_token{}.swap(*this); } |
75 | 106 |
|
76 | | - void swap(scoped_sync_token& other) noexcept |
77 | | - { |
78 | | - std::swap(token_count, other.token_count); |
79 | | - std::swap(dtor_guard, other.dtor_guard); |
80 | | - } |
| 107 | + void swap(scoped_sync_token& other) noexcept { std::swap(parent, other.parent); } |
81 | 108 |
|
82 | 109 | private: |
83 | 110 | void inc_token() |
84 | 111 | { |
85 | | - if (token_count != nullptr) { |
86 | | - if (token_count->fetch_add(1, std::memory_order_relaxed) == 0) { |
| 112 | + if (parent != nullptr) { |
| 113 | + if (parent->token_count.fetch_add(1, std::memory_order_relaxed) == 0) { |
87 | 114 | // Transition from 0 to 1. Lock dtor guard. |
88 | | - dtor_guard->store(true, std::memory_order_release); |
| 115 | + parent->dtor_guard.store(true, std::memory_order_release); |
89 | 116 | } |
90 | 117 | } |
91 | 118 | } |
92 | 119 |
|
93 | | - std::atomic<uint32_t>* token_count = nullptr; |
94 | | - std::atomic<bool>* dtor_guard = nullptr; |
| 120 | + sync_event* parent = nullptr; |
95 | 121 | }; |
96 | 122 |
|
97 | | -/// \brief Synchronization event to wait until all tokens to be reset. |
98 | | -/// |
99 | | -/// This class operates like a std::barrier, where the "arrive" call is performed by "tokens" in an RAII manner and |
100 | | -/// "wait" is performed by the \c sync_event. Contrarily to the std::barrier, the number of participants is not fixed. |
101 | | -class sync_event |
| 123 | +inline scoped_sync_token sync_event::get_token() |
102 | 124 | { |
103 | | -public: |
104 | | - ~sync_event() |
105 | | - { |
106 | | - wait(); |
107 | | - |
108 | | - // Polite spinning in case the token got preempted between the fetch_sub and futex wake. |
109 | | - while (dtor_guard.load(std::memory_order_acquire)) { |
110 | | - std::this_thread::yield(); |
111 | | - } |
112 | | - } |
113 | | - |
114 | | - /// Creates a new observer of stop() requests. |
115 | | - [[nodiscard]] scoped_sync_token get_token() { return scoped_sync_token{token_count, dtor_guard}; } |
116 | | - |
117 | | - /// Waits for all tokens to be reset. At the end of this call, all tokens are guaranteed to be reset. |
118 | | - void wait() |
119 | | - { |
120 | | - // Block waiting until all observers are gone. |
121 | | - auto cur = token_count.load(std::memory_order_acquire); |
122 | | - while (cur > 0) { |
123 | | - futex_util::wait(token_count, cur); |
124 | | - cur = token_count.load(std::memory_order_acquire); |
125 | | - } |
126 | | - } |
127 | | - |
128 | | - [[nodiscard]] uint32_t nof_tokens_approx() const { return token_count.load(std::memory_order_relaxed); } |
129 | | - |
130 | | -private: |
131 | | - std::atomic<uint32_t> token_count{0}; |
132 | | - /// \brief Variable use to protect token_count from destruction when token still has to call futex wake. |
133 | | - /// Useful reference about the issue: https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2023/p2616r4.html |
134 | | - std::atomic<bool> dtor_guard{false}; |
135 | | -}; |
| 125 | + return scoped_sync_token{*this}; |
| 126 | +} |
136 | 127 |
|
137 | 128 | } // namespace srsran |
0 commit comments