Skip to content

Commit 2485e63

Browse files
committed
Handle buffer wrap properly in SlickQueue
Refactor reserved_info struct for better buffer management and add buffer wrap test case. Bump version to 1.0.2.0.
1 parent e8e6e81 commit 2485e63

File tree

4 files changed

+105
-54
lines changed

4 files changed

+105
-54
lines changed

CHANGELOG

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
# v1.0.2.0 - 2025-09-29
2+
- Handles buffer wrap properly
3+
- Refactor to use reserved_info struct for better buffer management
4+
- Add buffer wrap test case
5+
16
# v1.0.1.2 - 2025-09-23
27
- Fixed dist_slick_queue might cause third-party dependency installation error in release build
38

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
cmake_minimum_required(VERSION 3.10)
22

3-
set(BUILD_VERSION 1.0.1.2)
3+
set(BUILD_VERSION 1.0.2.0)
44

55
project(slick_queue
66
VERSION ${BUILD_VERSION}

include/slick_queue/slick_queue.h

Lines changed: 67 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,18 @@ class SlickQueue {
5151
uint32_t size = 1;
5252
};
5353

54+
struct reserved_info {
55+
uint_fast64_t index_ = 0;
56+
uint_fast32_t size_ = 0;
57+
};
58+
5459
uint32_t size_;
55-
uint32_t buffered_size_;
5660
uint32_t mask_;
61+
uint32_t last_data_size_;
5762
T* data_ = nullptr;
5863
slot* control_ = nullptr;
59-
std::atomic_uint_fast64_t* reserved_ = nullptr;
60-
std::atomic_uint_fast64_t reserved_local_{ 0 };
64+
std::atomic<reserved_info>* reserved_ = nullptr;
65+
std::atomic<reserved_info> reserved_local_;
6166
bool own_ = false;
6267
bool use_shm_ = false;
6368
#if defined(_MSC_VER)
@@ -81,10 +86,9 @@ class SlickQueue {
8186
*/
8287
SlickQueue(uint32_t size, const char* const shm_name = nullptr)
8388
: size_(size)
84-
, buffered_size_(size + 1024) // add some buffer at the end
8589
, mask_(size - 1)
86-
, data_(shm_name ? nullptr : new T[buffered_size_])
87-
, control_(shm_name ? nullptr : new slot[buffered_size_])
90+
, data_(shm_name ? nullptr : new T[size_])
91+
, control_(shm_name ? nullptr : new slot[size_])
8892
, reserved_(shm_name ? nullptr : &reserved_local_)
8993
, own_(shm_name == nullptr)
9094
, use_shm_(shm_name != nullptr)
@@ -122,7 +126,7 @@ class SlickQueue {
122126
}
123127
#else
124128
if (lpvMem_) {
125-
auto BF_SZ = static_cast<size_t>(64 + sizeof(slot) * buffered_size_ + sizeof(T) * buffered_size_);
129+
auto BF_SZ = static_cast<size_t>(64 + sizeof(slot) * size_ + sizeof(T) * size_);
126130
munmap(lpvMem_, BF_SZ);
127131
lpvMem_ = nullptr;
128132
}
@@ -167,7 +171,7 @@ class SlickQueue {
167171
* @return Initial reading index
168172
*/
169173
uint64_t initial_reading_index() const noexcept {
170-
return reserved_->load(std::memory_order_relaxed);
174+
return reserved_->load(std::memory_order_relaxed).index_;
171175
}
172176

173177
/**
@@ -176,7 +180,33 @@ class SlickQueue {
176180
* @return The starting index of the reserved space
177181
*/
178182
uint64_t reserve(uint32_t n = 1) noexcept {
179-
return reserved_->fetch_add(n, std::memory_order_acq_rel);
183+
auto reserved = reserved_->load(std::memory_order_relaxed);
184+
reserved_info next;
185+
uint64_t index;
186+
bool buffer_wrapped = false;
187+
do {
188+
next = reserved;
189+
index = reserved.index_;
190+
auto idx = index & mask_;
191+
if ((idx + n) > size_) {
192+
// if there is no enough buffer left, start from the beginning
193+
index += size_ - idx;
194+
next.index_ = index + n;
195+
next.size_ = n;
196+
buffer_wrapped = true;
197+
}
198+
else {
199+
next.index_ += n;
200+
next.size_ = n;
201+
}
202+
} while(!reserved_->compare_exchange_weak(reserved, next, std::memory_order_release, std::memory_order_relaxed));
203+
if (buffer_wrapped) {
204+
//
205+
auto& slot = control_[reserved.index_ & mask_];
206+
slot.size = n;
207+
slot.data_index.store(index, std::memory_order_release);
208+
}
209+
return index;
180210
}
181211

182212
/**
@@ -205,6 +235,7 @@ class SlickQueue {
205235
void publish(uint64_t index, uint32_t n = 1) noexcept {
206236
auto& slot = control_[index & mask_];
207237
slot.size = n;
238+
last_data_size_ = n;
208239
slot.data_index.store(index, std::memory_order_release);
209240
}
210241

@@ -214,9 +245,10 @@ class SlickQueue {
214245
* @return Pair of pointer to the data and the size of the data, or nullptr and 0 if no data is available
215246
*/
216247
std::pair<T*, uint32_t> read(uint64_t& read_index) noexcept {
217-
auto& slot = control_[read_index & mask_];
248+
auto idx = read_index & mask_;
249+
auto& slot = control_[idx];
218250
auto index = slot.data_index.load(std::memory_order_relaxed);
219-
if (index != std::numeric_limits<uint64_t>::max() && reserved_->load(std::memory_order_relaxed) < index) {
251+
if (index != std::numeric_limits<uint64_t>::max() && reserved_->load(std::memory_order_relaxed).index_ < index) {
220252
// queue has been reset
221253
read_index = 0;
222254
}
@@ -225,9 +257,12 @@ class SlickQueue {
225257
// data not ready yet
226258
return std::make_pair(nullptr, 0);
227259
}
260+
else if (index > read_index && ((index & mask_) != idx)) {
261+
read_index = index;
262+
}
228263

229264
auto& data = data_[read_index & mask_];
230-
read_index = slot.data_index + slot.size;
265+
read_index = index + slot.size;
231266
return std::make_pair(&data, slot.size);
232267
}
233268

@@ -237,22 +272,10 @@ class SlickQueue {
237272
*/
238273
T* read_last() noexcept {
239274
auto reserved = reserved_->load(std::memory_order_relaxed);
240-
if (reserved == 0) {
275+
if (reserved.index_ == 0) {
241276
return nullptr;
242277
}
243-
auto index = reserved - 1;
244-
245-
// find last published data
246-
auto begin = index & mask_;
247-
while (control_[index & mask_].data_index.load(std::memory_order_relaxed) != index)
248-
{
249-
--index;
250-
if ((index & mask_) == begin // looped entire queue
251-
|| index >= reserved) // passed 0
252-
{
253-
return nullptr;
254-
}
255-
}
278+
auto index = reserved.index_ - reserved.size_;
256279
return &data_[index & mask_];
257280
}
258281

@@ -263,12 +286,12 @@ class SlickQueue {
263286
*/
264287
void reset() noexcept {
265288
if (use_shm_) {
266-
control_ = new ((uint8_t*)lpvMem_ + 64) slot[buffered_size_];
289+
control_ = new ((uint8_t*)lpvMem_ + 64) slot[size_];
267290
} else {
268291
delete [] control_;
269-
control_ = new slot[buffered_size_];
292+
control_ = new slot[size_];
270293
}
271-
reserved_->store(0, std::memory_order_release);
294+
reserved_->store(reserved_info(), std::memory_order_release);
272295
}
273296

274297
private:
@@ -305,12 +328,11 @@ class SlickQueue {
305328
}
306329
size_ = *reinterpret_cast<uint32_t*>(reinterpret_cast<uint8_t*>(lpvMem) + sizeof(std::atomic_uint_fast64_t));
307330
mask_ = size_ - 1;
308-
buffered_size_ = size_ + 1024;
309-
BF_SZ = 64 + sizeof(slot) * buffered_size_ + sizeof(T) * buffered_size_;
331+
BF_SZ = 64 + sizeof(slot) * size_ + sizeof(T) * size_;
310332
UnmapViewOfFile(lpvMem);
311333
}
312334
else {
313-
BF_SZ = 64 + sizeof(slot) * buffered_size_ + sizeof(T) * buffered_size_;
335+
BF_SZ = 64 + sizeof(slot) * size_ + sizeof(T) * size_;
314336

315337
SECURITY_ATTRIBUTES sa;
316338
sa.nLength = sizeof(SECURITY_ATTRIBUTES);
@@ -348,20 +370,20 @@ class SlickQueue {
348370
}
349371

350372
if (own_) {
351-
reserved_ = new (lpvMem_) std::atomic_uint_fast64_t{ 0 };
352-
*reinterpret_cast<uint32_t*>(reinterpret_cast<uint8_t*>(lpvMem_) + sizeof(std::atomic_uint_fast64_t)) = size_;
353-
control_ = new ((uint8_t*)lpvMem_ + 64) slot[buffered_size_];
354-
data_ = new ((uint8_t*)lpvMem_ + 64 + sizeof(slot) * buffered_size_) T[buffered_size_];
373+
reserved_ = new (lpvMem_) std::atomic<reserved_info>();
374+
*reinterpret_cast<uint32_t*>(reinterpret_cast<uint8_t*>(lpvMem_) + sizeof(std::atomic<reserved_info>)) = size_;
375+
control_ = new ((uint8_t*)lpvMem_ + 64) slot[size_];
376+
data_ = new ((uint8_t*)lpvMem_ + 64 + sizeof(slot) * size_) T[size_];
355377
}
356378
else {
357-
reserved_ = reinterpret_cast<std::atomic_uint_fast64_t*>(lpvMem_);
379+
reserved_ = reinterpret_cast<std::atomic<reserved_info>*>(lpvMem_);
358380
control_ = reinterpret_cast<slot*>((uint8_t*)lpvMem_ + 64);
359-
data_ = reinterpret_cast<T*>((uint8_t*)lpvMem_ + 64 + sizeof(slot) * buffered_size_);
381+
data_ = reinterpret_cast<T*>((uint8_t*)lpvMem_ + 64 + sizeof(slot) * size_);
360382
}
361383
}
362384
#else
363385
void allocate_shm_data(const char* const shm_name, bool open_only) {
364-
size_t BF_SZ = 64 + sizeof(slot) * buffered_size_ + sizeof(T) * buffered_size_;
386+
size_t BF_SZ = 64 + sizeof(slot) * size_ + sizeof(T) * size_;
365387
shm_name_ = shm_name;
366388
int flags = open_only ? O_RDWR : (O_RDWR | O_CREAT | O_EXCL);
367389
shm_fd_ = shm_open(shm_name, flags, 0666);
@@ -392,14 +414,14 @@ class SlickQueue {
392414
}
393415

394416
if (own_) {
395-
reserved_ = new (lpvMem_) std::atomic_uint_fast64_t{ 0 };
396-
*reinterpret_cast<uint32_t*>(reinterpret_cast<uint8_t*>(lpvMem_) + sizeof(std::atomic_uint_fast64_t)) = mask_ + 1;
397-
control_ = new ((uint8_t*)lpvMem_ + 64) slot[buffered_size_];
398-
data_ = new ((uint8_t*)lpvMem_ + 64 + sizeof(slot) * buffered_size_) T[buffered_size_];
417+
reserved_ = new (lpvMem_) std::atomic<reserved_info>{ 0, 0 };
418+
*reinterpret_cast<uint32_t*>(reinterpret_cast<uint8_t*>(lpvMem_) + sizeof(std::atomic<reserved_info>)) = mask_ + 1;
419+
control_ = new ((uint8_t*)lpvMem_ + 64) slot[size_];
420+
data_ = new ((uint8_t*)lpvMem_ + 64 + sizeof(slot) * size_) T[size_];
399421
} else {
400-
reserved_ = reinterpret_cast<std::atomic_uint_fast64_t*>(lpvMem_);
422+
reserved_ = reinterpret_cast<std::atomic<reserved_info>*>(lpvMem_);
401423
control_ = reinterpret_cast<slot*>((uint8_t*)lpvMem_ + 64);
402-
data_ = reinterpret_cast<T*>((uint8_t*)lpvMem_ + 64 + sizeof(slot) * buffered_size_);
424+
data_ = reinterpret_cast<T*>((uint8_t*)lpvMem_ + 64 + sizeof(slot) * size_);
403425
}
404426
}
405427
#endif

tests/tests.cpp

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,6 @@ TEST_CASE("Read empty queue") {
1212
REQUIRE(read.first == nullptr);
1313
}
1414

15-
//TEST_CASE("RESET read cursor") {
16-
// SlickQueue<int> queue(2);
17-
// uint64_t read_cursor = std::numeric_limits<uint64_t>::max();
18-
// auto read = queue.read(read_cursor);
19-
// REQUIRE(read.first == nullptr);
20-
// REQUIRE(read_cursor == 0);
21-
//}
22-
2315
TEST_CASE( "Reserve") {
2416
SlickQueue<int> queue(2);
2517
auto reserved = queue.reserve();
@@ -81,3 +73,35 @@ TEST_CASE( "Publish and read multiple" ) {
8173
REQUIRE(*read.first == 23);
8274
}
8375

76+
TEST_CASE( "buffer wrap" ) {
77+
SlickQueue<char> queue(8);
78+
uint64_t read_cursor = 0;
79+
80+
auto reserved = queue.reserve(3);
81+
REQUIRE( reserved == 0 );
82+
memcpy(queue[reserved], "123", 3);
83+
queue.publish(reserved, 3);
84+
auto read = queue.read(read_cursor);
85+
REQUIRE( read.first != nullptr );
86+
REQUIRE( read_cursor == 3);
87+
REQUIRE( strncmp(read.first, "123", 3) == 0);
88+
89+
reserved = queue.reserve(3);
90+
REQUIRE( reserved == 3 );
91+
memcpy(queue[reserved], "456", 3);
92+
queue.publish(reserved, 3);
93+
read = queue.read(read_cursor);
94+
REQUIRE( read.first != nullptr );
95+
REQUIRE( read_cursor == 6);
96+
REQUIRE( strncmp(read.first, "456", 3) == 0);
97+
98+
reserved = queue.reserve(3);
99+
REQUIRE( reserved == 8 );
100+
memcpy(queue[reserved], "789", 3);
101+
queue.publish(reserved, 3);
102+
read = queue.read(read_cursor);
103+
REQUIRE( read.first != nullptr );
104+
REQUIRE( read_cursor == 11);
105+
REQUIRE( strncmp(read.first, "789", 3) == 0);
106+
}
107+

0 commit comments

Comments
 (0)