Skip to content

Commit ffd3cfa

Browse files
committed
Enhance queue performance with CPU relax backoff and fast path for reserve(1); Increase version number
1 parent f31f751 commit ffd3cfa

File tree

5 files changed

+78
-8
lines changed

5 files changed

+78
-8
lines changed

CHANGELOG

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
1-
# v1.2.3 - unreleased
1+
# v1.2.3 - 2026-01-29
22
- Initialize reserved counters for local and shared-memory queues
33
- Validate shared-memory size is power-of-two and element size matches when attaching
44
- Add lossy semantics documentation and debug loss detection counter
55
- Add lossy overwrite tests for in-process and shared-memory queues
6+
- Align hot atomics to cache lines and add light spin backoff under contention
7+
- Add reserve(1) fast path using fetch_add
8+
- Add SLICK_QUEUE_ENABLE_CPU_RELAX to control pause/yield backoff
69
- Guard against zero-length reservations
710
- Add limits header
811

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
cmake_minimum_required(VERSION 3.10)
22

33
project(slick-queue
4-
VERSION 1.2.2
4+
VERSION 1.2.3
55
DESCRIPTION "A C++ Lock-Free MPMC queue"
66
LANGUAGES CXX)
77

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,8 @@ SlickQueue(const char* shm_name); // Reader/Attacher
225225
226226
**Debug Loss Detection**: Define `SLICK_QUEUE_ENABLE_LOSS_DETECTION=1` to enable a per-instance skipped-item counter (enabled by default in Debug builds). Use `loss_count()` to inspect how many items were skipped.
227227
228+
**CPU Relax Backoff**: Define `SLICK_QUEUE_ENABLE_CPU_RELAX=0` to disable the pause/yield backoff used on contended CAS loops (default is enabled). Disabling may reduce latency in very short contention bursts but can increase CPU usage under load.
229+
228230
**⚠️ Reserve Size Limitation**: When using `read_last()`, the number of slots in any `reserve(n)` call **must not exceed 65,535** (2^16 - 1). This is because the size is stored in 16 bits within the packed atomic.
229231
230232
- For typical use cases with `reserve()` or `reserve(1)`, this limit is not a concern

include/slick/queue.h

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,15 @@
1212
#pragma once
1313

1414
#include <cstdint>
15+
#include <cstddef>
1516
#include <atomic>
1617
#include <stdexcept>
1718
#include <string>
1819
#include <cassert>
1920
#include <thread>
2021
#include <chrono>
2122
#include <limits>
23+
#include <new>
2224

2325
#if defined(_MSC_VER)
2426
#ifndef NOMINMAX
@@ -37,6 +39,9 @@
3739
#include <cerrno>
3840
#endif
3941

42+
#if defined(_MSC_VER) && (defined(_M_IX86) || defined(_M_X64))
43+
#include <immintrin.h>
44+
#endif
4045

4146
#ifndef SLICK_QUEUE_ENABLE_LOSS_DETECTION
4247
#if !defined(NDEBUG)
@@ -46,6 +51,10 @@
4651
#endif
4752
#endif
4853

54+
#ifndef SLICK_QUEUE_ENABLE_CPU_RELAX
55+
#define SLICK_QUEUE_ENABLE_CPU_RELAX 1
56+
#endif
57+
4958
namespace slick {
5059

5160
/**
@@ -66,14 +75,20 @@ class SlickQueue {
6675

6776
using reserved_info = uint64_t;
6877

78+
#if defined(__cpp_lib_hardware_interference_size)
79+
static constexpr std::size_t cacheline_size = std::hardware_destructive_interference_size;
80+
#else
81+
static constexpr std::size_t cacheline_size = 64;
82+
#endif
83+
6984
uint32_t size_;
7085
uint32_t mask_;
7186
T* data_ = nullptr;
7287
slot* control_ = nullptr;
7388
std::atomic<reserved_info>* reserved_ = nullptr;
74-
std::atomic<reserved_info> reserved_local_{0};
89+
alignas(cacheline_size) std::atomic<reserved_info> reserved_local_{0};
7590
#if SLICK_QUEUE_ENABLE_LOSS_DETECTION
76-
std::atomic<uint64_t> loss_count_{0};
91+
alignas(cacheline_size) std::atomic<uint64_t> loss_count_{0};
7792
#endif
7893
bool own_ = false;
7994
bool use_shm_ = false;
@@ -223,11 +238,23 @@ class SlickQueue {
223238
if (n > size_) [[unlikely]] {
224239
throw std::runtime_error("required size " + std::to_string(n) + " > queue size " + std::to_string(size_));
225240
}
241+
if (n == 1) {
242+
constexpr reserved_info step = (1ULL << 16);
243+
auto prev = reserved_->fetch_add(step, std::memory_order_release);
244+
auto index = get_index(prev);
245+
auto prev_size = get_size(prev);
246+
if (prev_size != 1) {
247+
auto expected = make_reserved_info(index + 1, prev_size);
248+
reserved_->compare_exchange_strong(expected, make_reserved_info(index + 1, 1),
249+
std::memory_order_release, std::memory_order_relaxed);
250+
}
251+
return index;
252+
}
226253
auto reserved = reserved_->load(std::memory_order_relaxed);
227-
uint64_t next;
228-
uint64_t index;
254+
uint64_t next = 0;
255+
uint64_t index = 0;
229256
bool buffer_wrapped = false;
230-
do {
257+
for (;;) {
231258
buffer_wrapped = false;
232259
index = get_index(reserved);
233260
auto idx = index & mask_;
@@ -240,7 +267,11 @@ class SlickQueue {
240267
else {
241268
next = make_reserved_info(index + n, n);
242269
}
243-
} while(!reserved_->compare_exchange_weak(reserved, next, std::memory_order_release, std::memory_order_relaxed));
270+
if (reserved_->compare_exchange_weak(reserved, next, std::memory_order_release, std::memory_order_relaxed)) {
271+
break;
272+
}
273+
cpu_relax();
274+
}
244275
if (buffer_wrapped) {
245276
// queue wrapped, set current slock.data_index to the reserved index to let the reader
246277
// know the next available data is in different slot.
@@ -370,6 +401,7 @@ class SlickQueue {
370401
// Successfully claimed the item
371402
return std::make_pair(&data_[current_index & mask_], current_slot->size);
372403
}
404+
cpu_relax();
373405
// CAS failed, another consumer claimed it, retry
374406
}
375407
}
@@ -420,6 +452,22 @@ class SlickQueue {
420452
return static_cast<uint32_t>(reserved & 0xFFFF);
421453
}
422454

455+
static inline void cpu_relax() noexcept {
456+
#if SLICK_QUEUE_ENABLE_CPU_RELAX
457+
#if defined(_MSC_VER) && (defined(_M_IX86) || defined(_M_X64))
458+
_mm_pause();
459+
#elif defined(__i386__) || defined(__x86_64__)
460+
__builtin_ia32_pause();
461+
#elif defined(__aarch64__) || defined(__arm__)
462+
__asm__ __volatile__("yield" ::: "memory");
463+
#else
464+
std::this_thread::yield();
465+
#endif
466+
#else
467+
(void)0;
468+
#endif
469+
}
470+
423471
#if defined(_MSC_VER)
424472
void allocate_shm_data(const char* const shm_name, bool open_only) {
425473
DWORD BF_SZ;

tests/tests.cpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,23 @@ TEST(SlickQueueTests, BufferWrap) {
125125
EXPECT_EQ(strncmp(read.first, "789", 3), 0);
126126
}
127127

128+
TEST(SlickQueueTests, ReadLastUsesLatestReserveSize) {
129+
SlickQueue<int> queue(8);
130+
131+
auto first = queue.reserve(2);
132+
*queue[first] = 1;
133+
*queue[first + 1] = 2;
134+
queue.publish(first, 2);
135+
136+
auto last = queue.reserve(1);
137+
*queue[last] = 3;
138+
queue.publish(last, 1);
139+
140+
auto latest = queue.read_last();
141+
ASSERT_NE(latest, nullptr);
142+
EXPECT_EQ(*latest, 3);
143+
}
144+
128145
TEST(SlickQueueTests, LossyOverwriteSkipsOldData) {
129146
SlickQueue<int> queue(2);
130147
uint64_t read_cursor = 0;

0 commit comments

Comments
 (0)