Skip to content

Commit 2f305ea

Browse files
committed
Fix buffer wrap read logic
1 parent e069512 commit 2f305ea

File tree

4 files changed

+45
-23
lines changed

4 files changed

+45
-23
lines changed

CHANGELOG

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
1+
# v1.0.2.1 - 2025-10-01
2+
- Fixed buffer wrap read logic to properly retry from wrapped position
3+
- Improved reader behavior when encountering wrap jump markers
4+
- Changed to C++20
5+
16
# v1.0.2.0 - 2025-09-29
27
- Handles buffer wrap properly
3-
- Refactor to use reserved_info struct for better buffer management
8+
- Refactor to use reserved_info struct for better buffer management
49
- Add buffer wrap test case
510

611
# v1.0.1.2 - 2025-09-23

CMakeLists.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
cmake_minimum_required(VERSION 3.10)
22

3-
set(BUILD_VERSION 1.0.2.0)
3+
set(BUILD_VERSION 1.0.2.1)
44

55
project(slick_queue
66
VERSION ${BUILD_VERSION}
77
DESCRIPTION "A C++ Lock-Free MPMC queue"
88
LANGUAGES CXX)
99

10-
set(CMAKE_CXX_STANDARD 17)
10+
set(CMAKE_CXX_STANDARD 20)
1111

1212
add_library(slick_queue INTERFACE)
1313
target_include_directories(slick_queue INTERFACE include)

include/slick_queue/slick_queue.h

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ class SlickQueue {
5858

5959
uint32_t size_;
6060
uint32_t mask_;
61-
uint32_t last_data_size_;
6261
T* data_ = nullptr;
6362
slot* control_ = nullptr;
6463
std::atomic<reserved_info>* reserved_ = nullptr;
@@ -179,12 +178,16 @@ class SlickQueue {
179178
* @param n Number of slots to reserve, default is 1
180179
* @return The starting index of the reserved space
181180
*/
182-
uint64_t reserve(uint32_t n = 1) noexcept {
181+
uint64_t reserve(uint32_t n = 1) {
182+
if (n > size_) [[unlikely]] {
183+
throw std::runtime_error(std::format("required size {} > queue size {}", n, size_));
184+
}
183185
auto reserved = reserved_->load(std::memory_order_relaxed);
184186
reserved_info next;
185187
uint64_t index;
186188
bool buffer_wrapped = false;
187189
do {
190+
buffer_wrapped = false;
188191
next = reserved;
189192
index = reserved.index_;
190193
auto idx = index & mask_;
@@ -201,7 +204,8 @@ class SlickQueue {
201204
}
202205
} while(!reserved_->compare_exchange_weak(reserved, next, std::memory_order_release, std::memory_order_relaxed));
203206
if (buffer_wrapped) {
204-
//
207+
// queue wrapped, set current slock.data_index to the reserved index to let the reader
208+
// know the next available data is in different slot.
205209
auto& slot = control_[reserved.index_ & mask_];
206210
slot.size = n;
207211
slot.data_index.store(index, std::memory_order_release);
@@ -235,7 +239,6 @@ class SlickQueue {
235239
void publish(uint64_t index, uint32_t n = 1) noexcept {
236240
auto& slot = control_[index & mask_];
237241
slot.size = n;
238-
last_data_size_ = n;
239242
slot.data_index.store(index, std::memory_order_release);
240243
}
241244

@@ -245,25 +248,32 @@ class SlickQueue {
245248
* @return Pair of pointer to the data and the size of the data, or nullptr and 0 if no data is available
246249
*/
247250
std::pair<T*, uint32_t> read(uint64_t& read_index) noexcept {
248-
auto idx = read_index & mask_;
249-
auto& slot = control_[idx];
250-
auto index = slot.data_index.load(std::memory_order_relaxed);
251-
if (index != std::numeric_limits<uint64_t>::max() && reserved_->load(std::memory_order_relaxed).index_ < index) {
252-
// queue has been reset
253-
read_index = 0;
254-
}
255-
256-
if (index == std::numeric_limits<uint64_t>::max() || index < read_index) {
257-
// data not ready yet
258-
return std::make_pair(nullptr, 0);
259-
}
260-
else if (index > read_index && ((index & mask_) != idx)) {
261-
read_index = index;
251+
uint64_t index;
252+
slot* current_slot;
253+
while (true) {
254+
auto idx = read_index & mask_;
255+
current_slot = &control_[idx];
256+
index = current_slot->data_index.load(std::memory_order_acquire);
257+
if (index != std::numeric_limits<uint64_t>::max() && reserved_->load(std::memory_order_relaxed).index_ < index) [[unlikely]] {
258+
// queue has been reset
259+
read_index = 0;
260+
}
261+
262+
if (index == std::numeric_limits<uint64_t>::max() || index < read_index) {
263+
// data not ready yet
264+
return std::make_pair(nullptr, 0);
265+
}
266+
else if (index > read_index && ((index & mask_) != idx)) {
267+
// queue wrapped, skip the unused slots
268+
read_index = index;
269+
continue;
270+
}
271+
break;
262272
}
263273

264274
auto& data = data_[read_index & mask_];
265-
read_index = index + slot.size;
266-
return std::make_pair(&data, slot.size);
275+
read_index = index + current_slot->size;
276+
return std::make_pair(&data, current_slot->size);
267277
}
268278

269279
/**

tests/tests.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,13 @@ TEST_CASE( "buffer wrap" ) {
9898
reserved = queue.reserve(3);
9999
REQUIRE( reserved == 8 );
100100
memcpy(queue[reserved], "789", 3);
101+
102+
// read before publish, the read_cursor should changed to new location
103+
read = queue.read(read_cursor);
104+
REQUIRE( read_cursor == 8 );
105+
REQUIRE( read.first == nullptr );
106+
REQUIRE( read.second == 0 );
107+
101108
queue.publish(reserved, 3);
102109
read = queue.read(read_cursor);
103110
REQUIRE( read.first != nullptr );

0 commit comments

Comments
 (0)