Skip to content

Commit da5515c

Browse files
committed
Ensure that non_blocking_push never blocks, even when a blocking_queue is full.
1 parent e12141a commit da5515c

File tree

2 files changed

+21
-5
lines changed

2 files changed

+21
-5
lines changed

source/queue.cpp

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -244,21 +244,25 @@ struct basic_queue_impl {
244244
}
245245

246246

247-
auto generic_add(auto && add) -> void {
248-
generic_add_impl(lock_type(m_mutex), add);
247+
auto generic_add(auto const add) -> void {
248+
auto lock = lock_type(m_mutex);
249+
derived().handle_add(m_container, lock);
250+
generic_add_impl(std::move(lock), add);
249251
}
250252

251-
auto generic_non_blocking_add(auto && add) -> bool {
253+
auto generic_non_blocking_add(auto const add) -> bool {
252254
auto lock = lock_type(m_mutex, std::try_to_lock);
253255
if (!lock.owns_lock()) {
254256
return false;
255257
}
258+
if (!derived().handle_non_blocking_add(m_container, lock)) {
259+
return false;
260+
}
256261
generic_add_impl(std::move(lock), add);
257262
return true;
258263
}
259264

260-
auto generic_add_impl(lock_type lock, auto && add) -> void {
261-
derived().handle_add(m_container, lock);
265+
auto generic_add_impl(lock_type lock, auto const add) -> void {
262266
auto const was_empty = containers::is_empty(m_container);
263267
add();
264268
lock.unlock();
@@ -362,6 +366,9 @@ struct basic_unbounded_queue : private basic_queue_impl<Container, Mutex, basic_
362366

363367
auto handle_add(Container &, std::unique_lock<Mutex> &) -> void {
364368
}
369+
auto handle_non_blocking_add(Container &, std::unique_lock<Mutex> &) -> bool {
370+
return true;
371+
}
365372
auto handle_remove_all(containers::range_size_t<Container>) -> void {
366373
}
367374
auto handle_remove_one(containers::range_size_t<Container>) -> void {
@@ -426,6 +433,9 @@ struct basic_blocking_queue : private basic_queue_impl<Container, Mutex, basic_b
426433
[&]{ return containers::size(queue) < m_max_size; }
427434
);
428435
}
436+
auto handle_non_blocking_add(Container & queue, std::unique_lock<Mutex> &) -> bool {
437+
return containers::size(queue) < m_max_size;
438+
}
429439
auto handle_remove_all(containers::range_size_t<Container> const previous_size) -> void {
430440
if (previous_size >= max_size()) {
431441
m_notify_removal.notify_all();

test/queue.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,4 +174,10 @@ TEST_CASE("blocking", "concurrent_queue") {
174174
CHECK(result.front() == value);
175175
}
176176

177+
TEST_CASE("non_blocking_push never blocks", "concurrent_queue") {
178+
auto queue = concurrent::blocking_queue<int>(0);
179+
auto const added = queue.non_blocking_push(6);
180+
CHECK(!added);
181+
}
182+
177183
} // namespace

0 commit comments

Comments
 (0)