Skip to content

Commit 0c9f756

Browse files
committed
Add LockFreeFifoDyn (dynamic memory lockfree fifo)
1 parent b1e4c15 commit 0c9f756

File tree

1 file changed

+163
-0
lines changed

1 file changed

+163
-0
lines changed

util/lockfree_fifo_spsc_dyn.hh

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
#pragma once
2+
#include "util/math.hh"
3+
#include <atomic>
4+
#include <optional>
5+
#include <vector>
6+
7+
// Thread-safe Lock-free Single-Producer Single-Consumer
8+
// Allocates a vector on construction. Re-allocates (resizes) on resize()
9+
// For cheaply copied types
10+
// Will not overwrite -- put() returns false if buffer is full, and get() returns nullopt if buffer is empty.
11+
// Provide the buffer size in the constructor.
12+
// max_size_ MUST be a power of 2. If it's not, the next-highest power of 2 will be used for the size.
13+
//
14+
// Edge-case: max_size cannot be more than half the largest integer representable by size_t
15+
template<class T>
16+
class LockFreeFifoSpscDyn {
17+
public:
18+
size_t max_size_;
19+
size_t SIZE_MASK;
20+
21+
LockFreeFifoSpscDyn(size_t max_size)
22+
: max_size_{MathTools::next_power_of_2(max_size)}
23+
, SIZE_MASK{max_size_ - 1}
24+
, head_{0}
25+
, tail_{0}
26+
, buf_(max_size_) {
27+
}
28+
29+
// Initialize with an offset between get and put
30+
// Useful, for example, if get() and put() happen at the same rates
31+
// and you want to provide a fixed delay
32+
LockFreeFifoSpscDyn(size_t max_size, size_t head)
33+
: max_size_{MathTools::next_power_of_2(max_size)}
34+
, SIZE_MASK{max_size_ - 1}
35+
, head_{head}
36+
, tail_{0}
37+
, buf_(max_size_) {
38+
}
39+
40+
// Resets and resizes the vector. All contents are lost
41+
// max_size must be a power of 2 or else the next highest power of 2 will be used
42+
// Edge-case: max_size cannot be more than half the largest integer representable by size_t
43+
void resize(size_t max_size) {
44+
reset();
45+
max_size_ = MathTools::next_power_of_2(max_size);
46+
SIZE_MASK = max_size_ - 1;
47+
buf_.resize(max_size_);
48+
}
49+
50+
//
51+
// Producer
52+
//
53+
bool put(T item) {
54+
auto tmp_head = head_.load(std::memory_order_relaxed);
55+
56+
if ((tmp_head - tail_.load(std::memory_order_acquire)) == max_size_)
57+
return false;
58+
59+
buf_[tmp_head & SIZE_MASK] = item;
60+
tmp_head++;
61+
std::atomic_signal_fence(std::memory_order_release);
62+
head_.store(tmp_head, std::memory_order_release);
63+
return true;
64+
}
65+
66+
// Number of elements available to write
67+
size_t num_free() const {
68+
return max_size_ - (head_.load(std::memory_order_relaxed) - tail_.load(std::memory_order_acquire));
69+
}
70+
71+
// Not safe, but useful for debugging:
72+
// size_t head() const {
73+
// return head_ & SIZE_MASK;
74+
// }
75+
// size_t tail() const {
76+
// return tail_ & SIZE_MASK;
77+
// }
78+
79+
bool full() const {
80+
return num_free() == 0;
81+
}
82+
83+
//
84+
// Consumer
85+
//
86+
std::optional<T> get() {
87+
auto tmp_tail = tail_.load(std::memory_order_relaxed);
88+
89+
if (tmp_tail == head_.load(std::memory_order_relaxed)) {
90+
return std::nullopt;
91+
}
92+
93+
auto item = buf_[tmp_tail & SIZE_MASK];
94+
tmp_tail++;
95+
std::atomic_signal_fence(std::memory_order_release);
96+
tail_.store(tmp_tail, std::memory_order_release);
97+
return item;
98+
}
99+
100+
bool get_move(T &t) {
101+
auto tmp_tail = tail_.load(std::memory_order_relaxed);
102+
103+
if (tmp_tail == head_.load(std::memory_order_relaxed)) {
104+
return false;
105+
}
106+
107+
t = std::move(buf_[tmp_tail & SIZE_MASK]);
108+
tmp_tail++;
109+
std::atomic_signal_fence(std::memory_order_release);
110+
tail_.store(tmp_tail, std::memory_order_release);
111+
return true;
112+
}
113+
114+
T get_or_default() {
115+
return get().value_or(T{});
116+
}
117+
118+
void remove_first() {
119+
auto tmp_tail = tail_.load(std::memory_order_relaxed);
120+
121+
if (tmp_tail != head_.load(std::memory_order_relaxed))
122+
tail_.store(++tmp_tail, std::memory_order_release);
123+
}
124+
125+
// Number of elements available to read
126+
size_t num_filled() const {
127+
return head_.load(std::memory_order_acquire) - tail_.load(std::memory_order_relaxed);
128+
}
129+
130+
bool empty() const {
131+
return num_filled() == 0;
132+
}
133+
134+
void set_read_pos(size_t pos) {
135+
tail_.store(pos, std::memory_order_release);
136+
}
137+
138+
// Sets read head `offset` samples before write head, if possible
139+
void set_read_offset(size_t offset) {
140+
auto head = head_.load(std::memory_order_acquire);
141+
if (offset <= head)
142+
tail_.store(head - offset, std::memory_order_release);
143+
}
144+
145+
void set_write_pos(size_t pos) {
146+
head_.store(pos, std::memory_order_release);
147+
}
148+
149+
// Reset can be done by consumer or producer
150+
void reset() {
151+
tail_.store(head_.load(std::memory_order_relaxed), std::memory_order_relaxed);
152+
}
153+
154+
// Maximum number of elements it can hold
155+
constexpr size_t max_size() const {
156+
return max_size_;
157+
}
158+
159+
private:
160+
std::atomic<size_t> head_ = 0;
161+
std::atomic<size_t> tail_ = 0;
162+
std::vector<T> buf_;
163+
};

0 commit comments

Comments
 (0)