Skip to content

Commit f9cf9b0

Browse files
committed
Add stoppable_emplace and overloads of push that accept a std::stop_token.
1 parent ed75287 commit f9cf9b0

File tree

2 files changed

+40
-2
lines changed

2 files changed

+40
-2
lines changed

source/queue.cpp

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,24 @@ struct basic_queue_impl {
5151
[&]{ containers::emplace_back(m_container, OPERATORS_FORWARD(args)...); }
5252
);
5353
}
54+
auto stoppable_emplace(std::stop_token token, auto && ... args) -> bool {
55+
return generic_add(
56+
std::move(token),
57+
[&]{ containers::emplace_back(m_container, OPERATORS_FORWARD(args)...); }
58+
);
59+
}
5460
auto push(value_type && value) -> void {
5561
emplace(std::move(value));
5662
}
5763
auto push(value_type const & value) -> void {
5864
emplace(value);
5965
}
66+
auto push(std::stop_token token, value_type && value) -> bool {
67+
return stoppable_emplace(std::move(token), std::move(value));
68+
}
69+
auto push(std::stop_token token, value_type const & value) -> bool {
70+
return stoppable_emplace(std::move(token), value);
71+
}
6072

6173
auto non_blocking_emplace(auto && ... args) -> bool {
6274
return generic_non_blocking_add(
@@ -249,6 +261,14 @@ struct basic_queue_impl {
249261
derived().handle_add(m_container, lock);
250262
generic_add_impl(std::move(lock), add);
251263
}
264+
auto generic_add(std::stop_token token, auto const add) -> bool {
265+
auto lock = lock_type(m_mutex);
266+
auto const should_add = derived().handle_add(m_container, std::move(token), lock);
267+
if (should_add) {
268+
generic_add_impl(std::move(lock), add);
269+
}
270+
return should_add;
271+
}
252272

253273
auto generic_non_blocking_add(auto const add) -> bool {
254274
auto lock = lock_type(m_mutex, std::try_to_lock);
@@ -366,6 +386,9 @@ struct basic_unbounded_queue : private basic_queue_impl<Container, Mutex, basic_
366386

367387
auto handle_add(Container &, std::unique_lock<Mutex> &) -> void {
368388
}
389+
auto handle_add(Container &, std::stop_token const &, std::unique_lock<Mutex> &) -> bool {
390+
return true;
391+
}
369392
auto handle_non_blocking_add(Container &, std::unique_lock<Mutex> &) -> bool {
370393
return true;
371394
}
@@ -403,6 +426,7 @@ struct basic_blocking_queue : private basic_queue_impl<Container, Mutex, basic_b
403426
using base::append;
404427
using base::non_blocking_append;
405428
using base::emplace;
429+
using base::stoppable_emplace;
406430
using base::non_blocking_emplace;
407431
using base::push;
408432
using base::non_blocking_push;
@@ -426,8 +450,8 @@ struct basic_blocking_queue : private basic_queue_impl<Container, Mutex, basic_b
426450
[&]{ return containers::size(queue) < m_max_size; }
427451
);
428452
}
429-
auto handle_add(Container & queue, std::stop_token token, std::unique_lock<Mutex> & lock) -> void {
430-
m_notify_removal.wait(
453+
auto handle_add(Container & queue, std::stop_token token, std::unique_lock<Mutex> & lock) -> bool {
454+
return m_notify_removal.wait(
431455
lock,
432456
std::move(token),
433457
[&]{ return containers::size(queue) < m_max_size; }

test/queue.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,4 +180,18 @@ TEST_CASE("non_blocking_push never blocks", "concurrent_queue") {
180180
CHECK(!added);
181181
}
182182

183+
TEST_CASE("push unblocks when stop requested", "concurrent_queue") {
184+
auto queue = concurrent::blocking_queue<int>(0);
185+
auto source = std::stop_source();
186+
auto const time_to_wake_up = now() + duration;
187+
auto thread = std::jthread([&]{
188+
std::this_thread::sleep_until(time_to_wake_up);
189+
source.request_stop();
190+
});
191+
auto const added = queue.push(source.get_token(), 6);
192+
CHECK(now() >= time_to_wake_up);
193+
CHECK(!added);
194+
CHECK(queue.size() == 0);
195+
}
196+
183197
} // namespace

0 commit comments

Comments
 (0)