Skip to content

Commit 66e3684

Browse files
committed
Remove debug logging
1 parent 3fa6d17 commit 66e3684

File tree

1 file changed

+174
-148
lines changed

1 file changed

+174
-148
lines changed

include/slick_queue.h

Lines changed: 174 additions & 148 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include <atomic>
1616
#include <stdexcept>
1717
#include <string>
18+
#include <cassert>
1819

1920
#if defined(_MSC_VER)
2021
#include <windows.h>
@@ -23,179 +24,204 @@
2324

2425
namespace slick {
2526

26-
template<typename T, size_t SIZE>
27+
template<typename T>
2728
class SlickQueue {
28-
static_assert(SIZE && !(SIZE & (SIZE - 1)), "Size must power of 2");
29-
30-
struct slot {
31-
std::atomic_uint_fast64_t data_index { 0 };
32-
uint32_t size = 1;
33-
};
34-
35-
static constexpr uint32_t mask_ = SIZE - 1;
36-
T* data_;
37-
slot* control_;
38-
std::atomic_uint_fast64_t* reserved_;
39-
std::atomic_uint_fast64_t reserved_local_{ 0 };
40-
bool own_;
41-
bool use_shm_;
29+
struct slot {
30+
std::atomic_uint_fast64_t data_index{ 0 };
31+
uint32_t size = 1;
32+
};
33+
34+
uint32_t size_;
35+
uint32_t mask_;
36+
T* data_ = nullptr;
37+
slot* control_ = nullptr;
38+
std::atomic_uint_fast64_t* reserved_ = nullptr;
39+
std::atomic_uint_fast64_t reserved_local_{ 0 };
40+
bool own_ = false;
41+
bool use_shm_ = false;
4242
#if defined(_MSC_VER)
43-
HANDLE hMapFile_ = nullptr;
44-
LPVOID lpvMem_ = nullptr;
43+
HANDLE hMapFile_ = nullptr;
44+
LPVOID lpvMem_ = nullptr;
4545
#endif
4646

47-
public:
48-
SlickQueue(const char* const shm_name = nullptr, bool open_only = false)
49-
: data_(shm_name ? nullptr : new T[SIZE + 1024]) // add some buffer at the end
50-
, control_(shm_name ? nullptr : new slot[SIZE + 1024])
51-
, reserved_(shm_name ? nullptr : &reserved_local_)
52-
, own_(shm_name == nullptr)
53-
, use_shm_(shm_name != nullptr)
54-
{
55-
if (shm_name) {
56-
allocate_shm_data(shm_name, open_only);
47+
public:
48+
SlickQueue(uint32_t size, const char* const shm_name = nullptr)
49+
: size_(size + 1024) // add some buffer at the end
50+
, mask_(size - 1)
51+
, data_(shm_name ? nullptr : new T[size_])
52+
, control_(shm_name ? nullptr : new slot[size_])
53+
, reserved_(shm_name ? nullptr : &reserved_local_)
54+
, own_(shm_name == nullptr)
55+
, use_shm_(shm_name != nullptr)
56+
{
57+
assert((size && !(size & (size - 1))) && "size must power of 2");
58+
if (shm_name) {
59+
allocate_shm_data(shm_name, false);
60+
}
61+
62+
if (own_) {
63+
// invalidate first slot
64+
control_[0].data_index.store(1, std::memory_order_relaxed);
65+
}
5766
}
5867

59-
if (own_) {
60-
// invalidate first slot
61-
control_[0].data_index.store(1, std::memory_order_relaxed);
68+
SlickQueue(const char* const shm_name)
69+
: own_(false)
70+
, use_shm_(true)
71+
{
72+
allocate_shm_data(shm_name, true);
6273
}
63-
}
64-
virtual ~SlickQueue() noexcept {
74+
75+
virtual ~SlickQueue() noexcept {
6576
#if defined(_MSC_VER)
66-
if (lpvMem_) {
67-
UnmapViewOfFile(lpvMem_);
68-
lpvMem_ = nullptr;
77+
if (lpvMem_) {
78+
UnmapViewOfFile(lpvMem_);
79+
lpvMem_ = nullptr;
80+
}
81+
82+
if (hMapFile_) {
83+
printf("Destroy MapFile %p\n", hMapFile_);
84+
CloseHandle(hMapFile_);
85+
hMapFile_ = nullptr;
86+
}
87+
88+
if (!use_shm_) {
89+
delete[] data_;
90+
data_ = nullptr;
91+
92+
delete[] control_;
93+
control_ = nullptr;
94+
}
95+
#endif
6996
}
7097

71-
if (hMapFile_) {
72-
printf("Destroy MapFile %p\n", hMapFile_);
73-
CloseHandle(hMapFile_);
74-
hMapFile_ = nullptr;
75-
}
76-
77-
if (!use_shm_) {
78-
delete [] data_;
79-
data_ = nullptr;
98+
bool own_buffer() const noexcept { return own_; }
99+
bool use_shm() const noexcept { return use_shm_; }
80100

81-
delete [] control_;
82-
control_ = nullptr;
83-
}
84-
#endif
85-
}
86-
87-
bool own_buffer() const noexcept { return own_; }
88-
bool use_shm() const noexcept { return use_shm_; }
89-
90-
uint64_t initial_reading_index() const noexcept {
91-
return reserved_->load(std::memory_order_relaxed);
92-
}
93-
94-
uint64_t reserve(uint32_t n = 1) noexcept {
95-
return reserved_->fetch_add(n, std::memory_order_acq_rel);
96-
}
97-
98-
T* operator[] (uint64_t index) noexcept {
99-
return &data_[index & mask_];
100-
}
101-
102-
const T* operator[] (uint64_t index) const noexcept {
103-
return &data_[index & mask_];
104-
}
105-
106-
void publish(uint64_t index, uint32_t n = 1) noexcept {
107-
auto& slot = control_[index & mask_];
108-
slot.size = n;
109-
slot.data_index.store(index, std::memory_order_release);
110-
}
111-
112-
std::pair<T*, uint32_t> read(uint64_t& read_index) noexcept {
113-
auto& slot = control_[read_index & mask_];
114-
auto index = slot.data_index.load(std::memory_order_relaxed);
115-
if (reserved_->load(std::memory_order_relaxed) < index) {
116-
// queue has been reset
117-
read_index = 0;
101+
uint64_t initial_reading_index() const noexcept {
102+
return reserved_->load(std::memory_order_relaxed);
118103
}
119104

120-
if (index != read_index) {
121-
// data not ready yet
122-
return std::make_pair(nullptr, 0);
105+
uint64_t reserve(uint32_t n = 1) noexcept {
106+
return reserved_->fetch_add(n, std::memory_order_acq_rel);
123107
}
124108

125-
auto& data = data_[read_index & mask_];
126-
read_index += slot.size;
127-
return std::make_pair(&data, slot.size);
128-
}
129-
130-
void reset() noexcept {
131-
auto next = reserved_->load(std::memory_order_relaxed);
132-
if (next <= mask_) {
133-
// data hasn't wrapped yet, need to invalidate
134-
memset(control_, 0, sizeof(slot)* (SIZE + 1024));
135-
memset(data_, 0, sizeof(T) * (SIZE + 1024));
109+
T* operator[] (uint64_t index) noexcept {
110+
return &data_[index & mask_];
136111
}
137-
// invalidate first slot
138-
control_[0].data_index.store(1, std::memory_order_release);
139-
reserved_->store(0, std::memory_order_release);
140-
}
141-
142-
private:
143112

144-
#if defined(_MSC_VER)
145-
void allocate_shm_data(const char* const shm_name, bool open_only) {
146-
auto BF_SZ = 64 + sizeof(slot) * (SIZE + 1024) + sizeof(T) * (SIZE + 1024);
147-
HANDLE hMapFile = NULL;
148-
if (open_only) {
149-
hMapFile = OpenFileMapping(FILE_MAP_ALL_ACCESS, FALSE, (LPCWSTR)shm_name);
150-
own_ = false;
151-
auto err = GetLastError();
152-
if (hMapFile == NULL) {
153-
throw std::runtime_error("Failed to open shm. err=" + std::to_string(err));
154-
}
113+
const T* operator[] (uint64_t index) const noexcept {
114+
return &data_[index & mask_];
155115
}
156-
else {
157-
hMapFile = CreateFileMapping(
158-
INVALID_HANDLE_VALUE, // use paging file
159-
NULL, // default security
160-
PAGE_READWRITE, // read/write access
161-
0, // maximum object size (high-order DWORD)
162-
BF_SZ, // maximum object size (low-order DWORD)
163-
(LPCWSTR)shm_name // name of mapping object
164-
);
165-
166-
own_ = false;
167-
auto err = GetLastError();
168-
if (hMapFile == NULL) {
169-
throw std::runtime_error("Failed to create shm. err=" + std::to_string(err));
170-
}
171-
172-
if (err != ERROR_ALREADY_EXISTS) {
173-
own_ = true;
174-
}
175-
176-
printf("%s MapFile created %p\n", shm_name, hMapFile);
116+
117+
void publish(uint64_t index, uint32_t n = 1) noexcept {
118+
auto& slot = control_[index & mask_];
119+
slot.size = n;
120+
slot.data_index.store(index, std::memory_order_release);
177121
}
178122

179-
void* lpvMem = MapViewOfFile(hMapFile, FILE_MAP_ALL_ACCESS, 0, 0, BF_SZ);
180-
if (!lpvMem) {
181-
auto err = GetLastError();
182-
throw std::runtime_error("Failed to map shm. err=" + std::to_string(err));
123+
std::pair<T*, uint32_t> read(uint64_t& read_index) noexcept {
124+
auto& slot = control_[read_index & mask_];
125+
auto index = slot.data_index.load(std::memory_order_relaxed);
126+
if (reserved_->load(std::memory_order_relaxed) < index) {
127+
// queue has been reset
128+
read_index = 0;
129+
}
130+
131+
if (index != read_index) {
132+
// data not ready yet
133+
return std::make_pair(nullptr, 0);
134+
}
135+
136+
auto& data = data_[read_index & mask_];
137+
read_index += slot.size;
138+
return std::make_pair(&data, slot.size);
183139
}
184140

185-
if (own_) {
186-
reserved_ = new (lpvMem) std::atomic_uint_fast64_t{ 0 };
187-
control_ = new ((uint8_t*)lpvMem + 64) slot[SIZE + 1024];
188-
data_ = new ((uint8_t*)lpvMem + 64 + sizeof(slot) * (SIZE + 1024)) T[SIZE + 1024];
141+
void reset() noexcept {
142+
auto next = reserved_->load(std::memory_order_relaxed);
143+
if (next <= mask_) {
144+
// data hasn't wrapped yet, need to invalidate
145+
memset(control_, 0, sizeof(slot) * size_);
146+
memset(data_, 0, sizeof(T) * size_);
147+
}
148+
// invalidate first slot
149+
control_[0].data_index.store(1, std::memory_order_release);
150+
reserved_->store(0, std::memory_order_release);
189151
}
190-
else {
191-
reserved_ = reinterpret_cast<std::atomic_uint_fast64_t*>(lpvMem);
192-
control_ = reinterpret_cast<slot*>((uint8_t*)lpvMem + 64);
193-
data_ = reinterpret_cast<T*>((uint8_t*)lpvMem + 64 + sizeof(slot) * (SIZE + 1024));
152+
153+
private:
154+
155+
#if defined(_MSC_VER)
156+
void allocate_shm_data(const char* const shm_name, bool open_only) {
157+
SIZE_T BF_SZ;
158+
HANDLE hMapFile = NULL;
159+
if (open_only) {
160+
hMapFile = OpenFileMapping(FILE_MAP_ALL_ACCESS, FALSE, (LPCWSTR)shm_name);
161+
own_ = false;
162+
auto err = GetLastError();
163+
if (hMapFile == NULL) {
164+
throw std::runtime_error("Failed to open shm. err=" + std::to_string(err));
165+
}
166+
167+
void* lpvMem = MapViewOfFile(hMapFile, FILE_MAP_ALL_ACCESS, 0, 0, 64);
168+
if (!lpvMem) {
169+
auto err = GetLastError();
170+
throw std::runtime_error("Failed to map shm. err=" + std::to_string(err));
171+
}
172+
mask_ = *reinterpret_cast<uint32_t*>(reinterpret_cast<uint8_t*>(lpvMem) + sizeof(std::atomic_uint_fast64_t)) - 1;
173+
size_ = mask_ + 1025;
174+
BF_SZ = 64 + sizeof(slot) * size_ + sizeof(T) * size_;
175+
UnmapViewOfFile(lpvMem_);
176+
lpvMem_ = nullptr;
177+
}
178+
else {
179+
BF_SZ = 64 + sizeof(slot) * size_ + sizeof(T) * size_;
180+
181+
hMapFile = CreateFileMapping(
182+
INVALID_HANDLE_VALUE, // use paging file
183+
NULL, // default security
184+
PAGE_READWRITE, // read/write access
185+
0, // maximum object size (high-order DWORD)
186+
BF_SZ, // maximum object size (low-order DWORD)
187+
(LPCWSTR)shm_name // name of mapping object
188+
);
189+
190+
own_ = false;
191+
auto err = GetLastError();
192+
if (hMapFile == NULL) {
193+
throw std::runtime_error("Failed to create shm. err=" + std::to_string(err));
194+
}
195+
196+
if (err != ERROR_ALREADY_EXISTS) {
197+
own_ = true;
198+
}
199+
200+
printf("%s MapFile created %p\n", shm_name, hMapFile);
201+
}
202+
203+
void* lpvMem = MapViewOfFile(hMapFile, FILE_MAP_ALL_ACCESS, 0, 0, BF_SZ);
204+
if (!lpvMem) {
205+
auto err = GetLastError();
206+
throw std::runtime_error("Failed to map shm. err=" + std::to_string(err));
207+
}
208+
209+
if (own_) {
210+
reserved_ = new (lpvMem) std::atomic_uint_fast64_t{ 0 };
211+
*reinterpret_cast<uint32_t*>(reinterpret_cast<uint8_t*>(lpvMem) + sizeof(std::atomic_uint_fast64_t)) = mask_ + 1;
212+
control_ = new ((uint8_t*)lpvMem + 64) slot[size_];
213+
data_ = new ((uint8_t*)lpvMem + 64 + sizeof(slot) * size_) T[size_];
214+
printf("%s shm memory created %p size: %d(%d)\n", shm_name, lpvMem, *reinterpret_cast<uint32_t*>(reinterpret_cast<uint8_t*>(lpvMem) + sizeof(std::atomic_uint_fast64_t)), mask_);
215+
}
216+
else {
217+
reserved_ = reinterpret_cast<std::atomic_uint_fast64_t*>(lpvMem);
218+
control_ = reinterpret_cast<slot*>((uint8_t*)lpvMem + 64);
219+
data_ = reinterpret_cast<T*>((uint8_t*)lpvMem + 64 + sizeof(slot) * size_);
220+
}
194221
}
195-
}
196222
#else
197-
void allocateShmData(const char* const shm_name) noexcept {
198-
}
223+
void allocateShmData(const char* const shm_name) noexcept {
224+
}
199225
#endif
200226

201227
};

0 commit comments

Comments
 (0)