Skip to content

Commit 27c4154

Browse files
authored
Ruilizhenhu/feat(mpmc) close #13
* feat(test): complete mpmc and mpsc unit test * #13 refactor(mpmc): implement mpmc ring buffer using slots and atomic * #13 docs(mpmc): update readme
1 parent 72094c5 commit 27c4154

File tree

4 files changed

+191
-50
lines changed

4 files changed

+191
-50
lines changed

README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ A header-only C++20 ring buffer library providing several synchronization strate
77
## Features
88

99
- Basic, SPSC, MPSC and MPMC variants
10-
- Implementations using mutexes, atomics and semi-atomic slots
10+
- Line cache friendly memory layout
11+
- Implementations using atomics and slots
1112
- Simple integration through CMake's `find_package`
1213

1314
## Requirements
@@ -111,7 +112,7 @@ target_link_libraries(
111112
| `BasicRingBuffer<T>` | Basic | None (single-thread only) |
112113
| `SPSCRingBuffer<T>` | SPSC | atomics |
113114
| `MPSCRingBuffer<T>` | MPSC | atomics + slots |
114-
| `MPMCRingBuffer<T>` | MPMC | mutex |
115+
| `MPMCRingBuffer<T>` | MPMC | atomics + slots |
115116

116117
**Common API** (all variants):
117118

Lines changed: 97 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
#pragma once
22

3+
#include <atomic>
34
#include <cstddef>
45
#include <memory>
5-
#include <mutex>
66
#include <stdexcept>
77

88
#include "internal/common.h"
@@ -12,21 +12,26 @@ namespace RingBuffer { // interface
1212
template <typename T>
1313
class alignas(Common::CACHELINE_SIZE) MPMCRingBuffer {
1414
private:
15-
std::unique_ptr<T[], Common::AlignedDeleter> buffer;
15+
struct Slot {
16+
alignas(Common::CACHELINE_SIZE) std::atomic<size_t> seq;
17+
alignas(Common::CACHELINE_SIZE) T data;
18+
};
19+
20+
std::unique_ptr<Slot[], Common::AlignedDeleter> buffer;
1621
const size_t capacity;
22+
const std::size_t alignment;
1723

18-
alignas(Common::CACHELINE_SIZE) mutable std::mutex mtx;
19-
alignas(Common::CACHELINE_SIZE) size_t head = 0;
20-
alignas(Common::CACHELINE_SIZE) size_t tail = 0;
24+
alignas(Common::CACHELINE_SIZE) std::atomic<size_t> tail = 0;
25+
alignas(Common::CACHELINE_SIZE) std::atomic<size_t> head = 0;
2126

2227
public:
2328
explicit MPMCRingBuffer(size_t cap,
2429
std::size_t align = Common::CACHELINE_SIZE);
25-
bool tryPush(const T& value);
26-
bool tryPush(T&& value);
30+
bool tryPush(const T &value);
31+
bool tryPush(T &&value);
2732
template <typename... Args>
28-
bool tryEmplace(Args&&... args);
29-
bool tryPop(T& value);
33+
bool tryEmplace(Args &&...args);
34+
bool tryPop(T &value);
3035
};
3136

3237
} // namespace RingBuffer
@@ -35,56 +40,102 @@ namespace RingBuffer { // implementation
3540

3641
template <typename T>
3742
MPMCRingBuffer<T>::MPMCRingBuffer(size_t cap, std::size_t align)
38-
: capacity(cap + 1), buffer(nullptr, Common::AlignedDeleter{align}) {
39-
if (cap == 0)
40-
throw std::invalid_argument(
41-
"capacity of ring buffer must be greater than 0");
42-
43-
T* ptr = static_cast<T*>(
44-
::operator new[](capacity * sizeof(T), std::align_val_t(align)));
45-
buffer.reset(ptr);
43+
: capacity(cap),
44+
alignment(align),
45+
buffer(nullptr, Common::AlignedDeleter{align}) {
46+
if (cap == 0) throw std::invalid_argument("Capacity must be greater than 0");
47+
48+
Slot *raw = static_cast<Slot *>(
49+
::operator new[](capacity * sizeof(Slot), std::align_val_t(align)));
50+
for (size_t i = 0; i < capacity; ++i) {
51+
new (&raw[i]) Slot{.seq = i}; // placement new
52+
}
53+
buffer.reset(raw);
4654
}
4755

4856
template <typename T>
49-
bool MPMCRingBuffer<T>::tryPush(const T& value) {
50-
std::lock_guard<std::mutex> lock(mtx);
51-
if (((tail + 1) % capacity) == head) return false;
52-
buffer[tail] = value;
53-
tail = (tail + 1) % capacity;
54-
return true;
57+
bool MPMCRingBuffer<T>::tryPush(const T &value) {
58+
size_t pos = tail.load(std::memory_order_relaxed);
59+
60+
while (true) {
61+
Slot &slot = buffer[pos % capacity];
62+
size_t expected = pos;
63+
64+
if (slot.seq.load(std::memory_order_acquire) != expected) {
65+
return false;
66+
}
67+
68+
if (tail.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) {
69+
slot.data = value;
70+
slot.seq.store(expected + 1, std::memory_order_release);
71+
return true;
72+
}
73+
}
5574
}
5675

5776
template <typename T>
58-
bool MPMCRingBuffer<T>::tryPush(T&& value) {
59-
std::lock_guard<std::mutex> lock(mtx);
60-
if (((tail + 1) % capacity) == head) return false;
61-
buffer[tail] = std::move(value);
62-
tail = (tail + 1) % capacity;
63-
return true;
77+
bool MPMCRingBuffer<T>::tryPush(T &&value) {
78+
size_t pos = tail.load(std::memory_order_relaxed);
79+
80+
while (true) {
81+
Slot &slot = buffer[pos % capacity];
82+
size_t expected = pos;
83+
84+
if (slot.seq.load(std::memory_order_acquire) != expected) {
85+
return false;
86+
}
87+
88+
if (tail.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) {
89+
slot.data = std::move(value);
90+
slot.seq.store(expected + 1, std::memory_order_release);
91+
return true;
92+
}
93+
}
6494
}
6595

6696
template <typename T>
6797
template <typename... Args>
68-
bool MPMCRingBuffer<T>::tryEmplace(Args&&... args) {
69-
std::lock_guard<std::mutex> lock(mtx);
70-
if (((tail + 1) % capacity) == head) return false;
71-
new (&buffer[tail]) T(std::forward<Args>(args)...);
72-
tail = (tail + 1) % capacity;
73-
return true;
98+
bool MPMCRingBuffer<T>::tryEmplace(Args &&...args) {
99+
size_t pos = tail.load(std::memory_order_relaxed);
100+
101+
while (true) {
102+
Slot &slot = buffer[pos % capacity];
103+
size_t expected = pos;
104+
105+
if (slot.seq.load(std::memory_order_acquire) != expected) {
106+
return false;
107+
}
108+
109+
if (tail.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) {
110+
new (&slot.data) T(std::forward<Args>(args)...);
111+
slot.seq.store(expected + 1, std::memory_order_release);
112+
return true;
113+
}
114+
}
74115
}
75116

76117
template <typename T>
77-
bool MPMCRingBuffer<T>::tryPop(T& value) {
78-
std::lock_guard<std::mutex> lock(mtx);
79-
if (head == tail) return false;
80-
81-
T* elem = &buffer[head];
82-
value.~T();
83-
new (&value) T(std::move(*elem));
84-
elem->~T();
85-
86-
head = (head + 1) % capacity;
87-
return true;
118+
bool MPMCRingBuffer<T>::tryPop(T &value) {
119+
size_t pos = head.load(std::memory_order_relaxed);
120+
121+
while (true) {
122+
Slot &slot = buffer[pos % capacity];
123+
size_t expected = pos + 1;
124+
125+
if (slot.seq.load(std::memory_order_acquire) != expected) {
126+
return false;
127+
}
128+
129+
if (head.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) {
130+
T *elem = &(slot.data);
131+
value.~T();
132+
new (&value) T(std::move(*elem));
133+
elem->~T();
134+
135+
slot.seq.store(pos + capacity, std::memory_order_release);
136+
return true;
137+
}
138+
}
88139
}
89140

90141
} // namespace RingBuffer

tests/mpmc_ring_buffer_test.cc

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,57 @@ TEST(MPMCRingBufferTest, SPSC) {
119119
}
120120
}
121121

122+
TEST(MPSCRingBufferTest, MPSC) {
123+
constexpr int PRODUCERS = 4;
124+
constexpr int ITEMS_PER_PRODUCER = 500;
125+
constexpr int TOTAL_ITEMS = PRODUCERS * ITEMS_PER_PRODUCER;
126+
constexpr int CAPACITY = 128;
127+
128+
RingBuffer::MPMCRingBuffer<int> buffer(CAPACITY);
129+
std::atomic<int> produced{0};
130+
std::atomic<int> consumed{0};
131+
132+
std::vector<std::thread> producer_threads;
133+
for (int p = 0; p < PRODUCERS; ++p) {
134+
producer_threads.emplace_back([p, &buffer, &produced]() {
135+
for (int i = 0; i < ITEMS_PER_PRODUCER; ++i) {
136+
int value = p * ITEMS_PER_PRODUCER + i;
137+
while (true) {
138+
if (buffer.tryPush(value)) {
139+
produced.fetch_add(1, std::memory_order_relaxed);
140+
break;
141+
} else {
142+
std::this_thread::yield();
143+
}
144+
}
145+
}
146+
});
147+
}
148+
149+
std::vector<int> results;
150+
results.reserve(TOTAL_ITEMS);
151+
std::thread consumer_thread([&]() {
152+
while (consumed.load(std::memory_order_relaxed) < TOTAL_ITEMS) {
153+
int value;
154+
if (buffer.tryPop(value)) {
155+
results.push_back(value);
156+
consumed.fetch_add(1, std::memory_order_relaxed);
157+
} else {
158+
std::this_thread::yield();
159+
}
160+
}
161+
});
162+
163+
for (auto& t : producer_threads) t.join();
164+
consumer_thread.join();
165+
166+
EXPECT_EQ(results.size(), static_cast<size_t>(TOTAL_ITEMS));
167+
std::sort(results.begin(), results.end());
168+
for (int i = 0; i < TOTAL_ITEMS; ++i) {
169+
EXPECT_EQ(results[i], i);
170+
}
171+
}
172+
122173
TEST(MPMCRingBufferTest, MPMC) {
123174
constexpr int PRODUCERS = 4;
124175
constexpr int ITEMS_PER_PRODUCER = 250;
@@ -209,9 +260,11 @@ TEST(MPMCRingBufferTest, EmplaceString) {
209260
}
210261

211262
TEST(MPMCRingBufferTest, CounterLifecycle) {
263+
const int CAPACITY = 2;
212264
Counter::reset();
265+
213266
{
214-
RingBuffer::MPMCRingBuffer<Counter> buffer(2);
267+
RingBuffer::MPMCRingBuffer<Counter> buffer(CAPACITY);
215268

216269
EXPECT_TRUE(buffer.tryEmplace(1));
217270
EXPECT_TRUE(buffer.tryEmplace(2));
@@ -223,5 +276,6 @@ TEST(MPMCRingBufferTest, CounterLifecycle) {
223276
EXPECT_FALSE(buffer.tryPop(tmp));
224277
}
225278

226-
EXPECT_EQ(Counter::constructed, Counter::destructed);
279+
// the init phase should be cleaned up
280+
EXPECT_EQ(Counter::constructed - CAPACITY, Counter::destructed);
227281
}

tests/mpsc_ring_buffer_test.cc

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,41 @@ TEST(MPSCRingBufferTest, WrapAround) {
8282
EXPECT_FALSE(buffer.tryPop(value));
8383
}
8484

85+
TEST(SPSCRingBufferTest, SPSC) {
86+
constexpr int ITERATIONS = 10000;
87+
constexpr int CAPACITY = 128;
88+
RingBuffer::MPSCRingBuffer<int> buffer(CAPACITY);
89+
90+
std::thread producer([&]() {
91+
for (int i = 0; i < ITERATIONS; ++i) {
92+
while (true) {
93+
if (buffer.tryPush(i)) break;
94+
std::this_thread::yield();
95+
}
96+
}
97+
});
98+
99+
std::vector<int> results;
100+
results.reserve(ITERATIONS);
101+
std::thread consumer([&]() {
102+
int value;
103+
for (int i = 0; i < ITERATIONS; ++i) {
104+
while (true) {
105+
if (buffer.tryPop(value)) break;
106+
std::this_thread::yield();
107+
}
108+
results.push_back(value);
109+
}
110+
});
111+
112+
producer.join();
113+
consumer.join();
114+
115+
for (int i = 0; i < ITERATIONS; ++i) {
116+
EXPECT_EQ(results[i], i);
117+
}
118+
}
119+
85120
TEST(MPSCRingBufferTest, MPSC) {
86121
constexpr int PRODUCERS = 4;
87122
constexpr int ITEMS_PER_PRODUCER = 500;

0 commit comments

Comments
 (0)