Skip to content

Commit 8be2890

Browse files
committed
Release v1.2.0: Refactor reserved_info to packed uint64_t for lock-free atomics, add HEADER_SIZE constant, and enhance README with vcpkg installation and implementation details.
1 parent 1bffcaa commit 8be2890

File tree

4 files changed

+105
-46
lines changed

4 files changed

+105
-46
lines changed

CHANGELOG

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,13 @@
1+
# v1.2.0 - 2025-12-19
2+
- **BREAKING CHANGE**: Refactored `reserved_info` from struct to packed uint64_t for guaranteed lock-free atomics
3+
- Uses 48-bit index (supports 2^48 = 281 trillion iterations) and 16-bit size
4+
- `std::atomic<reserved_info>` is now always lock-free on all platforms
5+
- Reserve size limit of 65,535 (2^16 - 1) when using `read_last()` - see documentation
6+
- Added `HEADER_SIZE` constant to replace magic number 64 in shared memory layout
7+
- Added element size validation in shared memory to prevent type mismatches between processes
8+
- Updated README with vcpkg installation instructions
9+
- Updated README with lock-free implementation details and constraints documentation
10+
111
# v1.1.2 - 2025-11-14
212
- Add alias target to provide the same slick::slick_queue name during build time
313

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
cmake_minimum_required(VERSION 3.10)
22

33
project(slick_queue
4-
VERSION 1.1.2
4+
VERSION 1.2.0
55
DESCRIPTION "A C++ Lock-Free MPMC queue"
66
LANGUAGES CXX)
77

README.md

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,22 @@ SlickQueue is header-only. Simply add the `include` directory to your project's
3434
#include "slick/queue.h"
3535
```
3636

37-
### Using CMake
37+
### Using vcpkg
38+
39+
SlickQueue is available in the [vcpkg](https://github.com/microsoft/vcpkg) package manager:
40+
41+
```bash
42+
vcpkg install slick-queue
43+
```
44+
45+
Then in your CMakeLists.txt:
46+
47+
```cmake
48+
find_package(slick_queue CONFIG REQUIRED)
49+
target_link_libraries(your_target PRIVATE slick_queue::slick_queue)
50+
```
51+
52+
### Using CMake FetchContent
3853

3954
```cmake
4055
include(FetchContent)
@@ -192,12 +207,24 @@ SlickQueue(const char* shm_name); // Reader/Attacher
192207
193208
### Core Methods
194209
195-
- `uint64_t reserve()` - Reserve a slot for writing (blocks if queue is full)
210+
- `uint64_t reserve(uint32_t n = 1)` - Reserve `n` slots for writing (blocks if queue is full)
196211
- `T* operator[](uint64_t slot)` - Access reserved slot
197-
- `void publish(uint64_t slot)` - Publish written data to consumers
212+
- `void publish(uint64_t slot, uint32_t n = 1)` - Publish `n` written items to consumers
198213
- `std::pair<T*, uint32_t> read(uint64_t& cursor)` - Read next available item (independent cursor)
199214
- `std::pair<T*, uint32_t> read(std::atomic<uint64_t>& cursor)` - Read next available item (shared atomic cursor for work-stealing)
215+
- `T* read_last()` - Read the most recently published item without a cursor
200216
- `uint32_t size()` - Get queue capacity
217+
- `void reset()` - Reset the queue, invalidating all existing data
218+
219+
### Important Constraints
220+
221+
**Lock-Free Atomics Implementation**: SlickQueue uses a packed 64-bit atomic internally to guarantee lock-free operations on all platforms. This packs both the write index (48 bits) and the reservation size (16 bits) into a single atomic value.
222+
223+
**⚠️ Reserve Size Limitation**: When using `read_last()`, the number of slots in any `reserve(n)` call **must not exceed 65,535** (2^16 - 1). This is because the size is stored in 16 bits within the packed atomic.
224+
225+
- For typical use cases with `reserve()` or `reserve(1)`, this limit is not a concern
226+
- If you need to reserve more than 65,535 slots at once, do not use `read_last()`
227+
- The 48-bit index supports up to 2^48 (281 trillion) iterations, sufficient for any practical application
201228
202229
## Performance Characteristics
203230

include/slick/queue.h

Lines changed: 64 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,7 @@ class SlickQueue {
5151
uint32_t size = 1;
5252
};
5353

54-
struct reserved_info {
55-
uint_fast64_t index_ = 0;
56-
uint_fast32_t size_ = 0;
57-
};
54+
using reserved_info = uint64_t;
5855

5956
uint32_t size_;
6057
uint32_t mask_;
@@ -73,6 +70,8 @@ class SlickQueue {
7370
std::string shm_name_;
7471
#endif
7572

73+
static constexpr uint32_t HEADER_SIZE = 64;
74+
7675
public:
7776
/**
7877
* @brief Construct a new SlickQueue object
@@ -125,7 +124,7 @@ class SlickQueue {
125124
}
126125
#else
127126
if (lpvMem_) {
128-
auto BF_SZ = static_cast<size_t>(64 + sizeof(slot) * size_ + sizeof(T) * size_);
127+
auto BF_SZ = static_cast<size_t>(HEADER_SIZE + sizeof(slot) * size_ + sizeof(T) * size_);
129128
munmap(lpvMem_, BF_SZ);
130129
lpvMem_ = nullptr;
131130
}
@@ -170,7 +169,7 @@ class SlickQueue {
170169
* @return Initial reading index
171170
*/
172171
uint64_t initial_reading_index() const noexcept {
173-
return reserved_->load(std::memory_order_relaxed).index_;
172+
return get_index(reserved_->load(std::memory_order_relaxed));
174173
}
175174

176175
/**
@@ -183,30 +182,27 @@ class SlickQueue {
183182
throw std::runtime_error("required size " + std::to_string(n) + " > queue size " + std::to_string(size_));
184183
}
185184
auto reserved = reserved_->load(std::memory_order_relaxed);
186-
reserved_info next;
185+
uint64_t next;
187186
uint64_t index;
188187
bool buffer_wrapped = false;
189188
do {
190189
buffer_wrapped = false;
191-
next = reserved;
192-
index = reserved.index_;
190+
index = get_index(reserved);
193191
auto idx = index & mask_;
194192
if ((idx + n) > size_) {
195193
// if there is no enough buffer left, start from the beginning
196194
index += size_ - idx;
197-
next.index_ = index + n;
198-
next.size_ = n;
195+
next = make_reserved_info(index + n, n);
199196
buffer_wrapped = true;
200197
}
201198
else {
202-
next.index_ += n;
203-
next.size_ = n;
199+
next = make_reserved_info(index + n, n);
204200
}
205201
} while(!reserved_->compare_exchange_weak(reserved, next, std::memory_order_release, std::memory_order_relaxed));
206202
if (buffer_wrapped) {
207203
// queue wrapped, set current slock.data_index to the reserved index to let the reader
208204
// know the next available data is in different slot.
209-
auto& slot = control_[reserved.index_ & mask_];
205+
auto& slot = control_[get_index(reserved) & mask_];
210206
slot.size = n;
211207
slot.data_index.store(index, std::memory_order_release);
212208
}
@@ -254,7 +250,7 @@ class SlickQueue {
254250
auto idx = read_index & mask_;
255251
current_slot = &control_[idx];
256252
index = current_slot->data_index.load(std::memory_order_acquire);
257-
if (index != std::numeric_limits<uint64_t>::max() && reserved_->load(std::memory_order_relaxed).index_ < index) [[unlikely]] {
253+
if (index != std::numeric_limits<uint64_t>::max() && get_index(reserved_->load(std::memory_order_relaxed)) < index) [[unlikely]] {
258254
// queue has been reset
259255
read_index = 0;
260256
}
@@ -291,7 +287,7 @@ class SlickQueue {
291287
slot* current_slot = &control_[idx];
292288
uint64_t index = current_slot->data_index.load(std::memory_order_acquire);
293289

294-
if (index != std::numeric_limits<uint64_t>::max() && reserved_->load(std::memory_order_relaxed).index_ < index) [[unlikely]] {
290+
if (index != std::numeric_limits<uint64_t>::max() && get_index(reserved_->load(std::memory_order_relaxed)) < index) [[unlikely]] {
295291
// queue has been reset
296292
read_index.store(0, std::memory_order_release);
297293
continue;
@@ -323,11 +319,12 @@ class SlickQueue {
323319
*/
324320
T* read_last() noexcept {
325321
auto reserved = reserved_->load(std::memory_order_relaxed);
326-
if (reserved.index_ == 0) {
322+
auto index = get_index(reserved);
323+
if (index == 0) {
327324
return nullptr;
328325
}
329-
auto index = reserved.index_ - reserved.size_;
330-
return &data_[index & mask_];
326+
auto last_index = index - get_size(reserved);
327+
return &data_[last_index & mask_];
331328
}
332329

333330
/**
@@ -337,15 +334,27 @@ class SlickQueue {
337334
*/
338335
void reset() noexcept {
339336
if (use_shm_) {
340-
control_ = new ((uint8_t*)lpvMem_ + 64) slot[size_];
337+
control_ = new ((uint8_t*)lpvMem_ + HEADER_SIZE) slot[size_];
341338
} else {
342339
delete [] control_;
343340
control_ = new slot[size_];
344341
}
345-
reserved_->store(reserved_info(), std::memory_order_release);
342+
reserved_->store(0, std::memory_order_release);
346343
}
347344

348345
private:
346+
// Helper functions for packing/unpacking reserved_info (16-bit size, 48-bit index)
347+
static constexpr uint64_t make_reserved_info(uint64_t index, uint32_t size) noexcept {
348+
return ((index & 0xFFFFFFFFFFFFULL) << 16) | (size & 0xFFFF);
349+
}
350+
351+
static constexpr uint64_t get_index(uint64_t reserved) noexcept {
352+
return reserved >> 16;
353+
}
354+
355+
static constexpr uint32_t get_size(uint64_t reserved) noexcept {
356+
return static_cast<uint32_t>(reserved & 0xFFFF);
357+
}
349358

350359
#if defined(_MSC_VER)
351360
void allocate_shm_data(const char* const shm_name, bool open_only) {
@@ -372,18 +381,22 @@ class SlickQueue {
372381
throw std::runtime_error("Failed to open shm. err=" + std::to_string(err));
373382
}
374383

375-
auto lpvMem = MapViewOfFile(hMapFile_, FILE_MAP_ALL_ACCESS, 0, 0, 64);
384+
auto lpvMem = MapViewOfFile(hMapFile_, FILE_MAP_ALL_ACCESS, 0, 0, HEADER_SIZE);
376385
if (!lpvMem) {
377386
auto err = GetLastError();
378387
throw std::runtime_error("Failed to map shm. err=" + std::to_string(err));
379388
}
380389
size_ = *reinterpret_cast<uint32_t*>(reinterpret_cast<uint8_t*>(lpvMem) + sizeof(std::atomic<reserved_info>));
390+
auto element_size = *reinterpret_cast<uint32_t*>(reinterpret_cast<uint8_t*>(lpvMem) + sizeof(std::atomic<reserved_info>) + sizeof(size_));
391+
if (element_size != sizeof(T)) {
392+
throw std::runtime_error("Shared memory element size mismatch. Expected " + std::to_string(sizeof(T)) + " but got " + std::to_string(element_size));
393+
}
381394
mask_ = size_ - 1;
382-
BF_SZ = 64 + sizeof(slot) * size_ + sizeof(T) * size_;
395+
BF_SZ = HEADER_SIZE + sizeof(slot) * size_ + sizeof(T) * size_;
383396
UnmapViewOfFile(lpvMem);
384397
}
385398
else {
386-
BF_SZ = 64 + sizeof(slot) * size_ + sizeof(T) * size_;
399+
BF_SZ = HEADER_SIZE + sizeof(slot) * size_ + sizeof(T) * size_;
387400

388401
SECURITY_ATTRIBUTES sa;
389402
sa.nLength = sizeof(SECURITY_ATTRIBUTES);
@@ -413,16 +426,21 @@ class SlickQueue {
413426
own_ = true;
414427
} else {
415428
// Shared memory already exists, need to read and validate size
416-
auto lpvMem = MapViewOfFile(hMapFile_, FILE_MAP_ALL_ACCESS, 0, 0, 64);
429+
auto lpvMem = MapViewOfFile(hMapFile_, FILE_MAP_ALL_ACCESS, 0, 0, HEADER_SIZE);
417430
if (!lpvMem) {
418431
auto err = GetLastError();
419432
throw std::runtime_error("Failed to map shm for size read. err=" + std::to_string(err));
420433
}
421-
uint32_t shm_size = *reinterpret_cast<uint32_t*>(reinterpret_cast<uint8_t*>(lpvMem) + sizeof(std::atomic<reserved_info>));
434+
uint32_t queue_size = *reinterpret_cast<uint32_t*>(reinterpret_cast<uint8_t*>(lpvMem) + sizeof(std::atomic<reserved_info>));
435+
uint32_t element_size = *reinterpret_cast<uint32_t*>(reinterpret_cast<uint8_t*>(lpvMem) + sizeof(std::atomic<reserved_info>) + sizeof(size_));
422436
UnmapViewOfFile(lpvMem);
423437

424-
if (shm_size != size_) {
425-
throw std::runtime_error("Shared memory size mismatch. Expected " + std::to_string(size_) + " but got " + std::to_string(shm_size));
438+
if (queue_size != size_) {
439+
throw std::runtime_error("Shared memory size mismatch. Expected " + std::to_string(size_) + " but got " + std::to_string(queue_size));
440+
}
441+
442+
if (element_size != sizeof(T)) {
443+
throw std::runtime_error("Shared memory element size mismatch. Expected " + std::to_string(sizeof(T)) + " but got " + std::to_string(element_size));
426444
}
427445
}
428446
}
@@ -435,14 +453,17 @@ class SlickQueue {
435453

436454
if (own_) {
437455
reserved_ = new (lpvMem_) std::atomic<reserved_info>();
456+
// save queue size
438457
*reinterpret_cast<uint32_t*>(reinterpret_cast<uint8_t*>(lpvMem_) + sizeof(std::atomic<reserved_info>)) = size_;
439-
control_ = new ((uint8_t*)lpvMem_ + 64) slot[size_];
440-
data_ = new ((uint8_t*)lpvMem_ + 64 + sizeof(slot) * size_) T[size_];
458+
// save element size
459+
*reinterpret_cast<uint32_t*>(reinterpret_cast<uint8_t*>(lpvMem_) + sizeof(std::atomic<reserved_info>) + sizeof(size_)) = sizeof(T);
460+
control_ = new ((uint8_t*)lpvMem_ + HEADER_SIZE) slot[size_];
461+
data_ = new ((uint8_t*)lpvMem_ + HEADER_SIZE + sizeof(slot) * size_) T[size_];
441462
}
442463
else {
443464
reserved_ = reinterpret_cast<std::atomic<reserved_info>*>(lpvMem_);
444-
control_ = reinterpret_cast<slot*>((uint8_t*)lpvMem_ + 64);
445-
data_ = reinterpret_cast<T*>((uint8_t*)lpvMem_ + 64 + sizeof(slot) * size_);
465+
control_ = reinterpret_cast<slot*>((uint8_t*)lpvMem_ + HEADER_SIZE);
466+
data_ = reinterpret_cast<T*>((uint8_t*)lpvMem_ + HEADER_SIZE + sizeof(slot) * size_);
446467
}
447468
}
448469
#else
@@ -461,12 +482,12 @@ class SlickQueue {
461482
own_ = false;
462483

463484
// Read size from shared memory and verify it matches
464-
void* temp_map = mmap(nullptr, 64, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd_, 0);
485+
void* temp_map = mmap(nullptr, HEADER_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd_, 0);
465486
if (temp_map == MAP_FAILED) {
466487
throw std::runtime_error("Failed to map shm for size read. err=" + std::to_string(errno));
467488
}
468489
uint32_t shm_size = *reinterpret_cast<uint32_t*>(reinterpret_cast<uint8_t*>(temp_map) + sizeof(std::atomic<reserved_info>));
469-
munmap(temp_map, 64);
490+
munmap(temp_map, HEADER_SIZE);
470491

471492
if (shm_size != size_) {
472493
throw std::runtime_error("Shared memory size mismatch. Expected " + std::to_string(size_) + " but got " + std::to_string(shm_size));
@@ -480,16 +501,16 @@ class SlickQueue {
480501

481502
if (open_only) {
482503
// Map first 64 bytes to read the size
483-
void* temp_map = mmap(nullptr, 64, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd_, 0);
504+
void* temp_map = mmap(nullptr, HEADER_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd_, 0);
484505
if (temp_map == MAP_FAILED) {
485506
throw std::runtime_error("Failed to map shm for size read. err=" + std::to_string(errno));
486507
}
487508
size_ = *reinterpret_cast<uint32_t*>(reinterpret_cast<uint8_t*>(temp_map) + sizeof(std::atomic<reserved_info>));
488509
mask_ = size_ - 1;
489-
munmap(temp_map, 64);
510+
munmap(temp_map, HEADER_SIZE);
490511
}
491512

492-
BF_SZ = 64 + sizeof(slot) * size_ + sizeof(T) * size_;
513+
BF_SZ = HEADER_SIZE + sizeof(slot) * size_ + sizeof(T) * size_;
493514

494515
if (own_) {
495516
if (ftruncate(shm_fd_, BF_SZ) == -1) {
@@ -504,13 +525,14 @@ class SlickQueue {
504525

505526
if (own_) {
506527
reserved_ = new (lpvMem_) std::atomic<reserved_info>();
507-
*reinterpret_cast<uint32_t*>(reinterpret_cast<uint8_t*>(lpvMem_) + sizeof(std::atomic<reserved_info>)) = mask_ + 1;
508-
control_ = new ((uint8_t*)lpvMem_ + 64) slot[size_];
509-
data_ = new ((uint8_t*)lpvMem_ + 64 + sizeof(slot) * size_) T[size_];
528+
*reinterpret_cast<uint32_t*>(reinterpret_cast<uint8_t*>(lpvMem_) + sizeof(std::atomic<reserved_info>)) = size_;
529+
*reinterpret_cast<uint32_t*>(reinterpret_cast<uint8_t*>(lpvMem_) + sizeof(std::atomic<reserved_info>) + sizeof(size_)) = sizeof(T);
530+
control_ = new ((uint8_t*)lpvMem_ + HEADER_SIZE) slot[size_];
531+
data_ = new ((uint8_t*)lpvMem_ + HEADER_SIZE + sizeof(slot) * size_) T[size_];
510532
} else {
511533
reserved_ = reinterpret_cast<std::atomic<reserved_info>*>(lpvMem_);
512-
control_ = reinterpret_cast<slot*>((uint8_t*)lpvMem_ + 64);
513-
data_ = reinterpret_cast<T*>((uint8_t*)lpvMem_ + 64 + sizeof(slot) * size_);
534+
control_ = reinterpret_cast<slot*>((uint8_t*)lpvMem_ + HEADER_SIZE);
535+
data_ = reinterpret_cast<T*>((uint8_t*)lpvMem_ + HEADER_SIZE + sizeof(slot) * size_);
514536
}
515537
}
516538
#endif

0 commit comments

Comments
 (0)