Skip to content

Commit fe49dc8

Browse files
committed
Fix queue stopped reading if the reader is too slow and the writer catches the reader
1 parent 33e0c10 commit fe49dc8

File tree

1 file changed

+21
-4
lines changed

1 file changed

+21
-4
lines changed

include/slick_queue.h

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ class SlickQueue {
6161

6262
if (own_) {
6363
// invalidate first slot
64-
control_[0].data_index.store(1, std::memory_order_relaxed);
64+
control_[0].data_index.store(-1, std::memory_order_relaxed);
6565
}
6666
}
6767

@@ -127,19 +127,36 @@ class SlickQueue {
127127
read_index = 0;
128128
}
129129

130-
if (index != read_index) {
130+
if (index == -1 || index < read_index) {
131131
// data not ready yet
132132
return std::make_pair(nullptr, 0);
133133
}
134134

135135
auto& data = data_[read_index & mask_];
136-
read_index += slot.size;
136+
read_index = slot.data_index + slot.size;
137137
return std::make_pair(&data, slot.size);
138138
}
139139

140+
/**
141+
* Read the last data in the queue
142+
*
143+
* This function is safe only for fixed sized data and alwasy reserve, publish one data at a time
144+
*/
145+
T* read_last() noexcept {
146+
auto reserved = reserved_->load(std::memory_order_relaxed);
147+
auto index = reserved - 1;
148+
if (reserved == 0) {
149+
return nullptr;
150+
}
151+
152+
// wait for the data published
153+
while (control_[index & mask_].data_index.load(std::memory_order_relaxed) != index);
154+
return &data_[index & mask_];
155+
}
156+
140157
void reset() noexcept {
141158
// invalidate first slot
142-
control_[0].data_index.store(1, std::memory_order_release);
159+
control_[0].data_index.store(-1, std::memory_order_release);
143160
reserved_->store(0, std::memory_order_release);
144161
}
145162

0 commit comments

Comments
 (0)