Skip to content

Commit b9aab8c

Browse files
committed
Improve README
1 parent 897919f commit b9aab8c

File tree

6 files changed

+340
-14
lines changed

6 files changed

+340
-14
lines changed

CHANGELOG

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
1-
# v1.0.2.2 - 2025-10-04
1+
# v1.1.0.0 - 2025-10-04
22
- Fixed size calculation in shared memory mapping to use reserved_info struct
33
- Added server-client shared memory test case
44
- Added GitHub CI workflow for automated testing across Ubuntu, Windows, and macOS
55
- Added CI status badge to README
66
- Added CTest integration for automated test discovery
7+
- Added read() overload with atomic cursor for work-stealing/load-balancing patterns
8+
- Added work-stealing test cases for both in-process and shared memory modes
9+
- Enhanced README with work-stealing example and updated API documentation
10+
- Fixed MSBuild MSB8028 warnings for custom targets
711

812
# v1.0.2.1 - 2025-10-01
913
- Fixed buffer wrap read logic to properly retry from wrapped position

CMakeLists.txt

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
cmake_minimum_required(VERSION 3.10)
22

3-
set(BUILD_VERSION 1.0.2.2)
3+
set(BUILD_VERSION 1.1.0.0)
44

55
project(slick_queue
66
VERSION ${BUILD_VERSION}
@@ -48,13 +48,25 @@ if(CMAKE_BUILD_TYPE STREQUAL "Release")
4848
VERBATIM
4949
)
5050

51+
if (MSVC)
52+
set_target_properties(dist_slick_queue PROPERTIES
53+
VS_GLOBAL_IntDir "$(Platform)\\$(Configuration)\\dist_slick_queue\\"
54+
)
55+
endif()
56+
5157
if (PROJECT_IS_TOP_LEVEL)
5258
add_custom_target(package_slick_queue ALL
5359
COMMAND ${CMAKE_COMMAND} -E tar "cfv" "${CMAKE_BINARY_DIR}/dist/slick_queue_${BUILD_VERSION}.zip" --format=zip "${CMAKE_CURRENT_SOURCE_DIR}/include"
5460
WORKING_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}"
5561
COMMENT "Creating zip archive"
5662
)
5763
add_dependencies(package_slick_queue dist_slick_queue)
64+
65+
if (MSVC)
66+
set_target_properties(package_slick_queue PROPERTIES
67+
VS_GLOBAL_IntDir "$(Platform)\\$(Configuration)\\package_slick_queue\\"
68+
)
69+
endif()
5870
endif()
5971
endif()
6072

README.md

Lines changed: 196 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,39 +9,223 @@ over shared memory for inter-process communication.
99

1010
## Features
1111

12-
- Lock-free operations for multiple producers and consumers
13-
- Header-only implementation
14-
- Optional shared-memory mode
15-
- No dynamic allocation on the hot path
12+
- **Lock-free operations** for multiple producers and consumers
13+
- **Header-only implementation** - just include and go
14+
- **Zero dynamic allocation** on the hot path for predictable performance
15+
- **Shared memory support** for inter-process communication
16+
- **Cross-platform** - supports Windows, Linux, and macOS
17+
- **Modern C++20** implementation
1618

17-
## Getting Started
19+
## Requirements
1820

19-
Simply add the `include` directory to your project's include path and include
20-
the header:
21+
- C++20 compatible compiler
22+
- CMake 3.10+ (for building tests)
23+
24+
## Installation
25+
26+
SlickQueue is header-only. Simply add the `include` directory to your project's include path:
2127

2228
```cpp
23-
#include "slick_queue.h"
29+
#include "slick_queue/slick_queue.h"
30+
```
31+
32+
### Using CMake
2433

34+
```cmake
35+
include(FetchContent)
36+
37+
# Disable tests for slick_queue
38+
set(BUILD_SLICK_QUEUE_TESTS OFF CACHE BOOL "" FORCE)
39+
FetchContent_Declare(
40+
slick_queue
41+
GIT_REPOSITORY https://github.com/SlickQuant/slick_queue.git
42+
GIT_TAG v1.0.2.1
43+
)
44+
FetchContent_MakeAvailable(slick_queue)
45+
46+
target_link_libraries(your_target PRIVATE slick_queue)
47+
```
48+
49+
## Usage
50+
51+
### Basic Example
52+
53+
```cpp
54+
#include "slick_queue/slick_queue.h"
55+
56+
// Create a queue with 1024 slots (must be power of 2)
2557
slick::SlickQueue<int> queue(1024);
58+
59+
// Producer: reserve a slot, write data, and publish
2660
auto slot = queue.reserve();
2761
*queue[slot] = 42;
2862
queue.publish(slot);
2963

64+
// Consumer: read from queue using a cursor
3065
uint64_t cursor = 0;
3166
auto result = queue.read(cursor);
67+
if (result.first != nullptr) {
68+
int value = *result.first; // value == 42
69+
}
70+
```
71+
72+
### Shared Memory Example (IPC)
73+
74+
```cpp
75+
#include "slick_queue/slick_queue.h"
76+
77+
// Process 1 (Server/Writer)
78+
slick::SlickQueue<int> server(1024, "my_queue");
79+
auto slot = server.reserve();
80+
*server[slot] = 100;
81+
server.publish(slot);
82+
83+
// Process 2 (Client/Reader)
84+
slick::SlickQueue<int> client("my_queue");
85+
uint64_t cursor = 0;
86+
auto result = client.read(cursor);
87+
if (result.first != nullptr) {
88+
int value = *result.first; // value == 100
89+
}
90+
```
91+
92+
### Multi-Producer Multi-Consumer
93+
94+
```cpp
95+
#include "slick_queue/slick_queue.h"
96+
#include <thread>
97+
98+
slick::SlickQueue<int> queue(1024);
99+
100+
// Multiple producers
101+
auto producer = [&](int id) {
102+
for (int i = 0; i < 100; ++i) {
103+
auto slot = queue.reserve();
104+
*queue[slot] = id * 1000 + i;
105+
queue.publish(slot);
106+
}
107+
};
108+
109+
// Multiple consumers (each maintains independent cursor)
110+
// Note: Each consumer will see ALL published items (broadcast pattern)
111+
auto consumer = [&](int id) {
112+
uint64_t cursor = 0;
113+
int count = 0;
114+
while (count < 200) { // 2 producers × 100 items each
115+
auto result = queue.read(cursor);
116+
if (result.first != nullptr) {
117+
int value = *result.first;
118+
++count;
119+
}
120+
}
121+
};
122+
123+
std::thread p1(producer, 1);
124+
std::thread p2(producer, 2);
125+
std::thread c1(consumer, 1);
126+
std::thread c2(consumer, 2);
127+
128+
p1.join(); p2.join();
129+
c1.join(); c2.join();
32130
```
33131
34-
## Building Tests
132+
### Work-Stealing with Shared Atomic Cursor
133+
134+
```cpp
135+
#include "slick_queue/slick_queue.h"
136+
#include <thread>
137+
#include <atomic>
138+
139+
slick::SlickQueue<int> queue(1024);
140+
std::atomic<uint64_t> shared_cursor{0};
141+
142+
// Multiple producers
143+
auto producer = [&](int id) {
144+
for (int i = 0; i < 100; ++i) {
145+
auto slot = queue.reserve();
146+
*queue[slot] = id * 1000 + i;
147+
queue.publish(slot);
148+
}
149+
};
150+
151+
// Multiple consumers sharing atomic cursor (work-stealing/load-balancing)
152+
// Each item is consumed by exactly ONE consumer
153+
auto consumer = [&]() {
154+
int count = 0;
155+
for (int i = 0; i < 100; ++i) {
156+
auto result = queue.read(shared_cursor);
157+
if (result.first != nullptr) {
158+
int value = *result.first;
159+
++count;
160+
}
161+
}
162+
return count;
163+
};
164+
165+
std::thread p1(producer, 1);
166+
std::thread p2(producer, 2);
167+
std::thread c1(consumer);
168+
std::thread c2(consumer);
169+
170+
p1.join(); p2.join();
171+
c1.join(); c2.join();
172+
// Total items consumed: 200 (each item consumed exactly once)
173+
```
174+
175+
## API Overview
176+
177+
### Constructor
178+
179+
```cpp
180+
// In-process queue
181+
SlickQueue(uint32_t size);
182+
183+
// Shared memory queue
184+
SlickQueue(uint32_t size, const char* shm_name); // Writer/Creator
185+
SlickQueue(const char* shm_name); // Reader/Attacher
186+
```
35187
36-
A small test suite is provided using CMake and Catch. Build and run the tests
37-
with:
188+
### Core Methods
189+
190+
- `uint64_t reserve()` - Reserve a slot for writing (blocks if queue is full)
191+
- `T* operator[](uint64_t slot)` - Access reserved slot
192+
- `void publish(uint64_t slot)` - Publish written data to consumers
193+
- `std::pair<T*, uint32_t> read(uint64_t& cursor)` - Read next available item (independent cursor)
194+
- `std::pair<T*, uint32_t> read(std::atomic<uint64_t>& cursor)` - Read next available item (shared atomic cursor for work-stealing)
195+
- `uint32_t size()` - Get queue capacity
196+
197+
## Performance Characteristics
198+
199+
- **Lock-free**: No mutex contention between producers/consumers
200+
- **Wait-free reads**: Consumers never block each other
201+
- **Cache-friendly**: Ring buffer design with power-of-2 sizing
202+
- **Predictable**: No allocations or system calls on hot path (except for initial reserve when full)
203+
204+
## Building and Testing
205+
206+
### Build Tests
38207
39208
```bash
40209
cmake -S . -B build
41-
cmake --build build --target slick_queue_tests
210+
cmake --build build
211+
```
212+
213+
### Run Tests
214+
215+
```bash
216+
# Using CTest
217+
cd build
218+
ctest --output-on-failure
219+
220+
# Or run directly
42221
./build/tests/slick_queue_tests
43222
```
44223

224+
### Build Options
225+
226+
- `BUILD_SLICK_QUEUE_TESTS` - Enable/disable test building (default: ON)
227+
- `CMAKE_BUILD_TYPE` - Set to `Release` or `Debug`
228+
45229
## License
46230

47231
SlickQueue is released under the [MIT License](LICENSE).

include/slick_queue/slick_queue.h

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,47 @@ class SlickQueue {
276276
return std::make_pair(&data, current_slot->size);
277277
}
278278

279+
/**
280+
* @brief Read data from the queue using a shared atomic cursor
281+
* @param read_index Reference to the atomic reading index, will be atomically updated after reading
282+
* @return Pair of pointer to the data and the size of the data, or nullptr and 0 if no data is available
283+
*
284+
* This overload allows multiple consumers to share a single atomic cursor for load-balancing/work-stealing patterns.
285+
* Each consumer atomically claims the next item to process.
286+
*/
287+
std::pair<T*, uint32_t> read(std::atomic<uint64_t>& read_index) noexcept {
288+
while (true) {
289+
uint64_t current_index = read_index.load(std::memory_order_acquire);
290+
auto idx = current_index & mask_;
291+
slot* current_slot = &control_[idx];
292+
uint64_t index = current_slot->data_index.load(std::memory_order_acquire);
293+
294+
if (index != std::numeric_limits<uint64_t>::max() && reserved_->load(std::memory_order_relaxed).index_ < index) [[unlikely]] {
295+
// queue has been reset
296+
read_index.store(0, std::memory_order_release);
297+
continue;
298+
}
299+
300+
if (index == std::numeric_limits<uint64_t>::max() || index < current_index) {
301+
// data not ready yet
302+
return std::make_pair(nullptr, 0);
303+
}
304+
else if (index > current_index && ((index & mask_) != idx)) {
305+
// queue wrapped, skip the unused slots
306+
read_index.compare_exchange_weak(current_index, index, std::memory_order_release, std::memory_order_relaxed);
307+
continue;
308+
}
309+
310+
// Try to atomically claim this item
311+
uint64_t next_index = index + current_slot->size;
312+
if (read_index.compare_exchange_weak(current_index, next_index, std::memory_order_release, std::memory_order_relaxed)) {
313+
// Successfully claimed the item
314+
return std::make_pair(&data_[current_index & mask_], current_slot->size);
315+
}
316+
// CAS failed, another consumer claimed it, retry
317+
}
318+
}
319+
279320
/**
280321
* @brief Read the last published data in the queue
281322
* @return Pointer to the last published data, or nullptr if no data is available

tests/shm_tests.cpp

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#define CATCH_CONFIG_NO_POSIX_SIGNALS
22
#include "catch.hh"
33
#include <slick_queue/slick_queue.h>
4+
#include <thread>
45

56
using namespace slick;
67

@@ -107,3 +108,45 @@ TEST_CASE( "Server Client - shm" ) {
107108
REQUIRE(read_cursor == 3);
108109
REQUIRE(*read.first == 23);
109110
}
111+
112+
TEST_CASE( "Atomic cursor - shared memory work-stealing" ) {
113+
SlickQueue<int> server(1024, "sq_atomic_cursor_shm");
114+
SlickQueue<int> client1("sq_atomic_cursor_shm");
115+
SlickQueue<int> client2("sq_atomic_cursor_shm");
116+
117+
std::atomic<uint64_t> shared_cursor{0};
118+
std::atomic<int> total_consumed{0};
119+
120+
// Server: publish 100 items
121+
std::thread producer([&]() {
122+
for (int i = 0; i < 100; ++i) {
123+
auto slot = server.reserve();
124+
*server[slot] = i;
125+
server.publish(slot);
126+
}
127+
});
128+
129+
// Multiple clients sharing atomic cursor via shared memory
130+
auto consumer = [&](SlickQueue<int>& client) {
131+
int local_count = 0;
132+
while (total_consumed.load() < 100) {
133+
auto result = client.read(shared_cursor);
134+
if (result.first != nullptr) {
135+
local_count++;
136+
total_consumed.fetch_add(1);
137+
}
138+
}
139+
return local_count;
140+
};
141+
142+
std::thread c1([&]() { consumer(client1); });
143+
std::thread c2([&]() { consumer(client2); });
144+
145+
producer.join();
146+
c1.join();
147+
c2.join();
148+
149+
// Verify all 100 items were consumed exactly once
150+
REQUIRE(total_consumed.load() == 100);
151+
REQUIRE(shared_cursor.load() == 100);
152+
}

0 commit comments

Comments
 (0)