Skip to content

Commit b84a48e

Browse files
authored
Add Linux shared memory support
* Add Linux shared memory support
1 parent b8df5c1 commit b84a48e

File tree

5 files changed

+110
-20
lines changed

5 files changed

+110
-20
lines changed

CMakeLists.txt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,8 @@ set(CMAKE_CXX_STANDARD 17)
99
add_library(slick_queue INTERFACE)
1010
target_include_directories(slick_queue INTERFACE include)
1111

12-
add_subdirectory(tests EXCLUDE_FROM_ALL)
12+
if(UNIX AND NOT APPLE)
13+
target_link_libraries(slick_queue INTERFACE rt)
14+
endif()
15+
16+
add_subdirectory(tests EXCLUDE_FROM_ALL)

include/slick_queue.h

Lines changed: 89 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,12 @@
2020
#if defined(_MSC_VER)
2121
#include <windows.h>
2222
#include <tchar.h>
23+
#else
24+
#include <sys/mman.h>
25+
#include <sys/stat.h>
26+
#include <fcntl.h>
27+
#include <unistd.h>
28+
#include <cerrno>
2329
#endif
2430

2531
namespace slick {
@@ -42,6 +48,10 @@ class SlickQueue {
4248
#if defined(_MSC_VER)
4349
HANDLE hMapFile_ = nullptr;
4450
LPVOID lpvMem_ = nullptr;
51+
#else
52+
int shm_fd_ = -1;
53+
void* lpvMem_ = nullptr;
54+
std::string shm_name_;
4555
#endif
4656

4757
public:
@@ -83,8 +93,21 @@ class SlickQueue {
8393
CloseHandle(hMapFile_);
8494
hMapFile_ = nullptr;
8595
}
96+
#else
97+
if (lpvMem_) {
98+
auto BF_SZ = static_cast<size_t>(64 + sizeof(slot) * size_ + sizeof(T) * size_);
99+
munmap(lpvMem_, BF_SZ);
100+
lpvMem_ = nullptr;
101+
}
102+
if (shm_fd_ != -1) {
103+
close(shm_fd_);
104+
shm_fd_ = -1;
105+
}
106+
if (own_ && !shm_name_.empty()) {
107+
shm_unlink(shm_name_.c_str());
108+
}
86109
#endif
87-
110+
88111
if (!use_shm_) {
89112
delete[] data_;
90113
data_ = nullptr;
@@ -171,7 +194,7 @@ class SlickQueue {
171194

172195
#if defined(_MSC_VER)
173196
void allocate_shm_data(const char* const shm_name, bool open_only) {
174-
SIZE_T BF_SZ;
197+
SIZE_T BF_SZ = 64 + sizeof(slot) * size_ + sizeof(T) * size_;
175198
hMapFile_ = NULL;
176199
if (open_only) {
177200
hMapFile_ = OpenFileMapping(FILE_MAP_ALL_ACCESS, FALSE, (LPCWSTR)shm_name);
@@ -188,13 +211,10 @@ class SlickQueue {
188211
}
189212
mask_ = *reinterpret_cast<uint32_t*>(reinterpret_cast<uint8_t*>(lpvMem_) + sizeof(std::atomic_uint_fast64_t)) - 1;
190213
size_ = mask_ + 1025;
191-
BF_SZ = 64 + sizeof(slot) * size_ + sizeof(T) * size_;
192214
UnmapViewOfFile(lpvMem_);
193215
lpvMem_ = nullptr;
194216
}
195217
else {
196-
BF_SZ = 64 + sizeof(slot) * size_ + sizeof(T) * size_;
197-
198218
hMapFile_ = CreateFileMapping(
199219
INVALID_HANDLE_VALUE, // use paging file
200220
NULL, // default security
@@ -207,7 +227,7 @@ class SlickQueue {
207227
own_ = false;
208228
auto err = GetLastError();
209229
if (hMapFile_ == NULL) {
210-
throw std::runtime_error("Failed to create shm. err=" + std::to_string(err));
230+
throw std::runtime_error("Failed to create shm. err=" + std::to_string(err));
211231
}
212232

213233
if (err != ERROR_ALREADY_EXISTS) {
@@ -234,7 +254,69 @@ class SlickQueue {
234254
}
235255
}
236256
#else
237-
void allocateShmData(const char* const shm_name) noexcept {
257+
void allocate_shm_data(const char* const shm_name, bool open_only) {
258+
size_t BF_SZ = 64 + sizeof(slot) * size_ + sizeof(T) * size_;
259+
shm_name_ = shm_name;
260+
if (open_only) {
261+
shm_fd_ = shm_open(shm_name, O_RDWR, 0666);
262+
if (shm_fd_ == -1) {
263+
throw std::runtime_error("Failed to open shm. err=" + std::to_string(errno));
264+
}
265+
266+
void* tmp = mmap(nullptr, 64, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd_, 0);
267+
if (tmp == MAP_FAILED) {
268+
throw std::runtime_error("Failed to map shm header. err=" + std::to_string(errno));
269+
}
270+
mask_ = *reinterpret_cast<uint32_t*>(reinterpret_cast<uint8_t*>(tmp) + sizeof(std::atomic_uint_fast64_t)) - 1;
271+
size_ = mask_ + 1025;
272+
munmap(tmp, 64);
273+
274+
lpvMem_ = mmap(nullptr, BF_SZ, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd_, 0);
275+
if (lpvMem_ == MAP_FAILED) {
276+
throw std::runtime_error("Failed to map shm. err=" + std::to_string(errno));
277+
}
278+
279+
reserved_ = reinterpret_cast<std::atomic_uint_fast64_t*>(lpvMem_);
280+
control_ = reinterpret_cast<slot*>((uint8_t*)lpvMem_ + 64);
281+
data_ = reinterpret_cast<T*>((uint8_t*)lpvMem_ + 64 + sizeof(slot) * size_);
282+
own_ = false;
283+
} else {
284+
shm_fd_ = shm_open(shm_name, O_RDWR | O_CREAT | O_EXCL, 0666);
285+
if (shm_fd_ == -1) {
286+
if (errno != EEXIST) {
287+
throw std::runtime_error("Failed to create shm. err=" + std::to_string(errno));
288+
}
289+
shm_fd_ = shm_open(shm_name, O_RDWR, 0666);
290+
if (shm_fd_ == -1) {
291+
throw std::runtime_error("Failed to open existing shm. err=" + std::to_string(errno));
292+
}
293+
own_ = false;
294+
} else {
295+
own_ = true;
296+
}
297+
298+
if (own_) {
299+
if (ftruncate(shm_fd_, BF_SZ) == -1) {
300+
throw std::runtime_error("Failed to size shm. err=" + std::to_string(errno));
301+
}
302+
}
303+
304+
lpvMem_ = mmap(nullptr, BF_SZ, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd_, 0);
305+
if (lpvMem_ == MAP_FAILED) {
306+
throw std::runtime_error("Failed to map shm. err=" + std::to_string(errno));
307+
}
308+
309+
if (own_) {
310+
reserved_ = new (lpvMem_) std::atomic_uint_fast64_t{ 0 };
311+
*reinterpret_cast<uint32_t*>(reinterpret_cast<uint8_t*>(lpvMem_) + sizeof(std::atomic_uint_fast64_t)) = mask_ + 1;
312+
control_ = new ((uint8_t*)lpvMem_ + 64) slot[size_];
313+
data_ = new ((uint8_t*)lpvMem_ + 64 + sizeof(slot) * size_) T[size_];
314+
} else {
315+
reserved_ = reinterpret_cast<std::atomic_uint_fast64_t*>(lpvMem_);
316+
control_ = reinterpret_cast<slot*>((uint8_t*)lpvMem_ + 64);
317+
data_ = reinterpret_cast<T*>((uint8_t*)lpvMem_ + 64 + sizeof(slot) * size_);
318+
}
319+
}
238320
}
239321
#endif
240322

tests/CMakeLists.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,6 @@ project(slick_queue_tests LANGUAGES CXX)
22

33
set(CMAKE_CXX_STANDARD 17)
44

5-
add_executable(slick_queue_tests tests.cpp shm_tests.cpp)
5+
add_executable(slick_queue_tests tests.cpp shm_tests.cpp)
6+
7+
target_link_libraries(slick_queue_tests PRIVATE slick_queue)

tests/shm_tests.cpp

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,26 @@
1+
#define CATCH_CONFIG_NO_POSIX_SIGNALS
12
#include "catch.hh"
23
#include "../include/slick_queue.h"
34

45
using namespace slick;
56

67
TEST_CASE("Read empty queue - shm") {
7-
SlickQueue<int, 2> queue;
8+
SlickQueue<int> queue(2, "sq_read_empty");
89
uint64_t read_cursor = 0;
910
auto read = queue.read(read_cursor);
1011
REQUIRE(read.first == nullptr);
1112
}
1213

1314
TEST_CASE( "Reserve - shm") {
14-
SlickQueue<int, 2> queue;
15+
SlickQueue<int> queue(2, "sq_reserve");
1516
auto reserved = queue.reserve();
1617
REQUIRE( reserved == 0 );
1718
REQUIRE( queue.reserve() == 1);
1819
REQUIRE( queue.reserve() == 2);
1920
}
2021

2122
TEST_CASE( "Read should fail w/o publish - shm") {
22-
SlickQueue<int, 2> queue;
23+
SlickQueue<int> queue(2, "sq_read_fail");
2324
uint64_t read_cursor = 0;
2425
auto reserved = queue.reserve();
2526
auto read = queue.read(read_cursor);
@@ -28,7 +29,7 @@ TEST_CASE( "Read should fail w/o publish - shm") {
2829
}
2930

3031
TEST_CASE( "Publish and read - shm" ) {
31-
SlickQueue<int, 2> queue;
32+
SlickQueue<int> queue(2, "sq_publish_read");
3233
uint64_t read_cursor = 0;
3334
auto reserved = queue.reserve();
3435
*queue[reserved] = 5;
@@ -40,7 +41,7 @@ TEST_CASE( "Publish and read - shm" ) {
4041
}
4142

4243
TEST_CASE( "Publish and read multiple - shm" ) {
43-
SlickQueue<int, 4> queue;
44+
SlickQueue<int> queue(4, "sq_publish_read_multiple");
4445
uint64_t read_cursor = 0;
4546
auto reserved = queue.reserve();
4647
*queue[reserved] = 5;
@@ -72,7 +73,7 @@ TEST_CASE( "Publish and read multiple - shm" ) {
7273
}
7374

7475
TEST_CASE("SHM test - shm") {
75-
SlickQueue<int, 2> queue("test");
76+
SlickQueue<int> queue(2, "sq_shm_test");
7677
uint64_t read_cursor = 0;
7778
auto reserved = queue.reserve();
7879
*queue[reserved] = 5;

tests/tests.cpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file
2+
#define CATCH_CONFIG_NO_POSIX_SIGNALS
23
#include "catch.hh"
34
#include "../include/slick_queue.h"
45

56
using namespace slick;
67

78
TEST_CASE("Read empty queue") {
8-
SlickQueue<int, 2> queue("test");
9+
SlickQueue<int> queue(2);
910
uint64_t read_cursor = 0;
1011
auto read = queue.read(read_cursor);
1112
REQUIRE(read.first == nullptr);
@@ -20,15 +21,15 @@ TEST_CASE("Read empty queue") {
2021
//}
2122

2223
TEST_CASE( "Reserve") {
23-
SlickQueue<int, 2> queue("test");
24+
SlickQueue<int> queue(2);
2425
auto reserved = queue.reserve();
2526
REQUIRE( reserved == 0 );
2627
REQUIRE( queue.reserve() == 1);
2728
REQUIRE( queue.reserve() == 2);
2829
}
2930

3031
TEST_CASE( "Read should fail w/o publish") {
31-
SlickQueue<int, 2> queue("test");
32+
SlickQueue<int> queue(2);
3233
uint64_t read_cursor = 0;
3334
auto reserved = queue.reserve();
3435
auto read = queue.read(read_cursor);
@@ -37,7 +38,7 @@ TEST_CASE( "Read should fail w/o publish") {
3738
}
3839

3940
TEST_CASE( "Publish and read" ) {
40-
SlickQueue<int, 2> queue("test");
41+
SlickQueue<int> queue(2);
4142
uint64_t read_cursor = 0;
4243
auto reserved = queue.reserve();
4344
*queue[reserved] = 5;
@@ -49,7 +50,7 @@ TEST_CASE( "Publish and read" ) {
4950
}
5051

5152
TEST_CASE( "Publish and read multiple" ) {
52-
SlickQueue<int, 4> queue("test");
53+
SlickQueue<int> queue(4);
5354
uint64_t read_cursor = 0;
5455
auto reserved = queue.reserve();
5556
*queue[reserved] = 5;

0 commit comments

Comments
 (0)